1
0
mirror of https://github.com/natekspencer/hacs-oasis_mini.git synced 2025-12-06 18:44:14 -05:00

Add convenience properties and more logging to mqtt client, better mqtt management via coordinator (#100)

* Add convenience properties and more logging to mqtt client, better mqtt management via coordinator

* Address PR comments

* Address PR comments

* Fix
This commit is contained in:
Nathan Spencer
2025-11-24 11:54:04 -07:00
committed by GitHub
parent 379b6f67f2
commit a3ea4dc05a
5 changed files with 86 additions and 38 deletions

View File

@@ -83,6 +83,20 @@ class OasisMqttClient(OasisClientProtocol):
maxsize=MAX_PENDING_COMMANDS
)
@property
def is_running(self) -> bool:
"""Return `True` if the MQTT loop has been started and is not stopped."""
return (
self._loop_task is not None
and not self._loop_task.done()
and not self._stop_event.is_set()
)
@property
def is_connected(self) -> bool:
"""Return `True` if the MQTT client is currently connected."""
return self._connected_event.is_set()
def register_device(self, device: OasisDevice) -> None:
"""
Register an OasisDevice so MQTT messages for its serial are routed to that device.
@@ -218,32 +232,49 @@ class OasisMqttClient(OasisClientProtocol):
"""
Stop the MQTT client and clean up resources.
Signals the background MQTT loop to stop, cancels the loop task, disconnects the MQTT client if connected, and clears any pending commands from the internal command queue.
Signals the background MQTT loop to stop, cancels the loop task,
disconnects the MQTT client if connected, and drops any pending commands.
"""
_LOGGER.debug("MQTT stop() called - beginning shutdown sequence")
self._stop_event.set()
if self._loop_task:
_LOGGER.debug(
"Cancelling MQTT background task (task=%s, done=%s)",
self._loop_task,
self._loop_task.done(),
)
self._loop_task.cancel()
try:
await self._loop_task
except asyncio.CancelledError:
pass
_LOGGER.debug("MQTT background task cancelled")
if self._client:
_LOGGER.debug("Disconnecting MQTT client from broker")
try:
await self._client.disconnect()
_LOGGER.debug("MQTT client disconnected")
except Exception:
_LOGGER.exception("Error disconnecting MQTT client")
finally:
self._client = None
# Drop pending commands on stop
while not self._command_queue.empty():
try:
self._command_queue.get_nowait()
self._command_queue.task_done()
except asyncio.QueueEmpty:
break
# Drop queued commands
if not self._command_queue.empty():
_LOGGER.debug("Dropping queued commands")
dropped = 0
while not self._command_queue.empty():
try:
self._command_queue.get_nowait()
self._command_queue.task_done()
dropped += 1
except asyncio.QueueEmpty:
break
_LOGGER.debug("MQTT dropped %s queued command(s)", dropped)
_LOGGER.debug("MQTT shutdown sequence complete")
async def wait_until_ready(
self, device: OasisDevice, timeout: float = 10.0, request_status: bool = True
@@ -586,6 +617,9 @@ class OasisMqttClient(OasisClientProtocol):
return
while not self._command_queue.empty():
if not self._client:
break
try:
serial, payload = self._command_queue.get_nowait()
except asyncio.QueueEmpty:
@@ -599,7 +633,6 @@ class OasisMqttClient(OasisClientProtocol):
serial,
payload,
)
self._command_queue.task_done()
continue
topic = f"{serial}/COMMAND/CMD"
@@ -609,12 +642,11 @@ class OasisMqttClient(OasisClientProtocol):
_LOGGER.debug(
"Failed to flush queued command for %s, re-queuing", serial
)
# Put it back and break; we'll try again on next reconnect
# Put it back; we'll try again on next reconnect
await self._enqueue_command(serial, payload)
finally:
# Ensure we always balance the get(), even on cancellation
self._command_queue.task_done()
break
self._command_queue.task_done()
async def _publish_command(
self, device: OasisDevice, payload: str, wake: bool = False
@@ -659,9 +691,15 @@ class OasisMqttClient(OasisClientProtocol):
async def _mqtt_loop(self) -> None:
"""
Run the MQTT WebSocket connection loop that maintains connection, subscriptions, and message handling.
Run the MQTT WebSocket connection loop that maintains connection, subscriptions,
and message handling.
This background coroutine establishes a persistent WSS MQTT connection to the configured broker, sets connection state on successful connect, resubscribes to known device STATUS topics, flushes any queued outbound commands, and dispatches incoming MQTT messages to the status handler. On disconnect or error it clears connection state and subscription tracking, and retries connecting after the configured backoff interval until the client is stopped.
This background coroutine establishes a persistent WSS MQTT connection to the
configured broker, sets connection state on successful connect, resubscribes to
known device STATUS topics, flushes any queued outbound commands, and dispatches
incoming MQTT messages to the status handler. On disconnect or error it clears
connection state and subscription tracking, and retries connecting after the
configured backoff interval until the client is stopped.
"""
loop = asyncio.get_running_loop()
tls_context = await loop.run_in_executor(None, ssl.create_default_context)

View File

@@ -70,7 +70,6 @@ class OasisDevice:
cloud: OasisCloudClient | None = None,
client: OasisClientProtocol | None = None,
) -> None:
# Transport
"""
Initialize an OasisDevice with identification, network, transport references, and default state fields.