From a3ea4dc05abf52dcf60bd1fb68e11bd3186aab06 Mon Sep 17 00:00:00 2001 From: Nathan Spencer Date: Mon, 24 Nov 2025 11:54:04 -0700 Subject: [PATCH] Add convenience properties and more logging to mqtt client, better mqtt management via coordinator (#100) * Add convenience properties and more logging to mqtt client, better mqtt management via coordinator * Address PR comments * Address PR comments * Fix --- custom_components/oasis_mini/__init__.py | 28 ++++---- custom_components/oasis_mini/coordinator.py | 19 ++++-- custom_components/oasis_mini/light.py | 8 ++- .../pyoasiscontrol/clients/mqtt_client.py | 68 +++++++++++++++---- .../oasis_mini/pyoasiscontrol/device.py | 1 - 5 files changed, 86 insertions(+), 38 deletions(-) diff --git a/custom_components/oasis_mini/__init__.py b/custom_components/oasis_mini/__init__.py index adbe647..312ef3d 100644 --- a/custom_components/oasis_mini/__init__.py +++ b/custom_components/oasis_mini/__init__.py @@ -18,7 +18,7 @@ from .const import DOMAIN from .coordinator import OasisDeviceCoordinator from .entity import OasisDeviceEntity from .helpers import create_client -from .pyoasiscontrol import OasisDevice, OasisMqttClient, UnauthenticatedError +from .pyoasiscontrol import OasisDevice, UnauthenticatedError type OasisDeviceConfigEntry = ConfigEntry[OasisDeviceCoordinator] @@ -94,7 +94,9 @@ def setup_platform_from_coordinator( async def async_setup_entry(hass: HomeAssistant, entry: OasisDeviceConfigEntry) -> bool: """ - Initialize Oasis cloud and MQTT integration for a config entry, create and refresh the device coordinator, register update listeners for discovered devices, forward platform setup, and update the entry's metadata as needed. + Initialize Oasis cloud for a config entry, create and refresh the device + coordinator, register update listeners for discovered devices, forward platform + setup, and update the entry's metadata as needed. Returns: True if the config entry was set up successfully. @@ -109,15 +111,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: OasisDeviceConfigEntry) await cloud_client.async_close() raise - mqtt_client = OasisMqttClient() - coordinator = OasisDeviceCoordinator(hass, entry, cloud_client, mqtt_client) + coordinator = OasisDeviceCoordinator(hass, entry, cloud_client) try: - mqtt_client.start() await coordinator.async_config_entry_first_refresh() except Exception: - await mqtt_client.async_close() - await cloud_client.async_close() + await coordinator.async_close() raise if entry.unique_id != (user_id := str(user["id"])): @@ -151,18 +150,17 @@ async def async_unload_entry( """ Cleanly unload an Oasis device config entry. - Closes the MQTT and cloud clients stored on the entry and unloads all supported platforms. + Unloads all supported platforms and closes the coordinator connections. Returns: `True` if all platforms were unloaded successfully, `False` otherwise. """ - mqtt_client = entry.runtime_data.mqtt_client - await mqtt_client.async_close() - - cloud_client = entry.runtime_data.cloud_client - await cloud_client.async_close() - - return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + try: + await entry.runtime_data.async_close() + except Exception: + _LOGGER.exception("Error closing Oasis coordinator during unload") + return unload_ok async def async_remove_entry( diff --git a/custom_components/oasis_mini/coordinator.py b/custom_components/oasis_mini/coordinator.py index f0c2663..f90cce6 100644 --- a/custom_components/oasis_mini/coordinator.py +++ b/custom_components/oasis_mini/coordinator.py @@ -32,7 +32,6 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]): hass: HomeAssistant, config_entry: OasisDeviceConfigEntry, cloud_client: OasisCloudClient, - mqtt_client: OasisMqttClient, ) -> None: """ Create an OasisDeviceCoordinator that manages OasisDevice discovery and updates using cloud and MQTT clients. @@ -40,7 +39,6 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]): Parameters: config_entry (OasisDeviceConfigEntry): The config entry whose runtime data contains device serial numbers. cloud_client (OasisCloudClient): Client for communicating with the Oasis cloud API and fetching device data. - mqtt_client (OasisMqttClient): Client for registering devices and coordinating MQTT-based readiness/status. """ super().__init__( hass, @@ -51,7 +49,7 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]): always_update=False, ) self.cloud_client = cloud_client - self.mqtt_client = mqtt_client + self.mqtt_client = OasisMqttClient() async def _async_update_data(self) -> list[OasisDevice]: """ @@ -110,14 +108,19 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]): remove_config_entry_id=self.config_entry.entry_id, ) - # ✅ Valid state: logged in but no devices on account + # If logged in, but no devices on account, return without starting mqtt if not devices: _LOGGER.debug("No Oasis devices found for account") + if self.mqtt_client.is_running: + # Close the mqtt client if it was previously started + await self.mqtt_client.async_close() self.attempt = 0 if devices != self.data: self.last_updated = dt_util.now() return [] + if not self.mqtt_client.is_running: + self.mqtt_client.start() self.mqtt_client.register_devices(devices) # Best-effort playlists @@ -182,3 +185,11 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]): self.last_updated = dt_util.now() return devices + + async def async_close(self) -> None: + """Close client connections.""" + await asyncio.gather( + self.mqtt_client.async_close(), + self.cloud_client.async_close(), + return_exceptions=True, + ) diff --git a/custom_components/oasis_mini/light.py b/custom_components/oasis_mini/light.py index f70d332..8cfed41 100644 --- a/custom_components/oasis_mini/light.py +++ b/custom_components/oasis_mini/light.py @@ -121,13 +121,15 @@ class OasisDeviceLightEntity(OasisDeviceEntity, LightEntity): """ Turn the light on and set its LED state. - Processes optional keyword arguments to compute the device-specific LED parameters, then updates the device's LEDs with the resulting brightness, color, and effect. + Processes optional keyword arguments to compute the device-specific LED + parameters, then updates the device's LEDs with the resulting brightness, color, + and effect. Parameters: kwargs: Optional control parameters recognized by the method: ATTR_BRIGHTNESS (int): Brightness in the 0-255 Home Assistant scale. When provided, it is converted and rounded up to the device's brightness scale (1..device.brightness_max). - When omitted, uses self.device.brightness or self.device.brightness_on. + When omitted, uses self.device.brightness_on (last non-zero brightness). ATTR_RGB_COLOR (tuple[int, int, int]): RGB tuple (R, G, B). When provided, it is converted to a hex color string prefixed with '#'. ATTR_EFFECT (str): Human-readable effect name. When provided, it is mapped to the @@ -140,7 +142,7 @@ class OasisDeviceLightEntity(OasisDeviceEntity, LightEntity): scale = (1, self.device.brightness_max) brightness = math.ceil(brightness_to_value(scale, brightness)) else: - brightness = self.device.brightness or self.device.brightness_on + brightness = self.device.brightness_on if color := kwargs.get(ATTR_RGB_COLOR): color = f"#{color_rgb_to_hex(*color)}" diff --git a/custom_components/oasis_mini/pyoasiscontrol/clients/mqtt_client.py b/custom_components/oasis_mini/pyoasiscontrol/clients/mqtt_client.py index f050f63..0e10113 100644 --- a/custom_components/oasis_mini/pyoasiscontrol/clients/mqtt_client.py +++ b/custom_components/oasis_mini/pyoasiscontrol/clients/mqtt_client.py @@ -83,6 +83,20 @@ class OasisMqttClient(OasisClientProtocol): maxsize=MAX_PENDING_COMMANDS ) + @property + def is_running(self) -> bool: + """Return `True` if the MQTT loop has been started and is not stopped.""" + return ( + self._loop_task is not None + and not self._loop_task.done() + and not self._stop_event.is_set() + ) + + @property + def is_connected(self) -> bool: + """Return `True` if the MQTT client is currently connected.""" + return self._connected_event.is_set() + def register_device(self, device: OasisDevice) -> None: """ Register an OasisDevice so MQTT messages for its serial are routed to that device. @@ -218,32 +232,49 @@ class OasisMqttClient(OasisClientProtocol): """ Stop the MQTT client and clean up resources. - Signals the background MQTT loop to stop, cancels the loop task, disconnects the MQTT client if connected, and clears any pending commands from the internal command queue. + Signals the background MQTT loop to stop, cancels the loop task, + disconnects the MQTT client if connected, and drops any pending commands. """ + _LOGGER.debug("MQTT stop() called - beginning shutdown sequence") self._stop_event.set() if self._loop_task: + _LOGGER.debug( + "Cancelling MQTT background task (task=%s, done=%s)", + self._loop_task, + self._loop_task.done(), + ) self._loop_task.cancel() try: await self._loop_task except asyncio.CancelledError: pass + _LOGGER.debug("MQTT background task cancelled") if self._client: + _LOGGER.debug("Disconnecting MQTT client from broker") try: await self._client.disconnect() + _LOGGER.debug("MQTT client disconnected") except Exception: _LOGGER.exception("Error disconnecting MQTT client") finally: self._client = None - # Drop pending commands on stop - while not self._command_queue.empty(): - try: - self._command_queue.get_nowait() - self._command_queue.task_done() - except asyncio.QueueEmpty: - break + # Drop queued commands + if not self._command_queue.empty(): + _LOGGER.debug("Dropping queued commands") + dropped = 0 + while not self._command_queue.empty(): + try: + self._command_queue.get_nowait() + self._command_queue.task_done() + dropped += 1 + except asyncio.QueueEmpty: + break + _LOGGER.debug("MQTT dropped %s queued command(s)", dropped) + + _LOGGER.debug("MQTT shutdown sequence complete") async def wait_until_ready( self, device: OasisDevice, timeout: float = 10.0, request_status: bool = True @@ -586,6 +617,9 @@ class OasisMqttClient(OasisClientProtocol): return while not self._command_queue.empty(): + if not self._client: + break + try: serial, payload = self._command_queue.get_nowait() except asyncio.QueueEmpty: @@ -599,7 +633,6 @@ class OasisMqttClient(OasisClientProtocol): serial, payload, ) - self._command_queue.task_done() continue topic = f"{serial}/COMMAND/CMD" @@ -609,12 +642,11 @@ class OasisMqttClient(OasisClientProtocol): _LOGGER.debug( "Failed to flush queued command for %s, re-queuing", serial ) - # Put it back and break; we'll try again on next reconnect + # Put it back; we'll try again on next reconnect await self._enqueue_command(serial, payload) + finally: + # Ensure we always balance the get(), even on cancellation self._command_queue.task_done() - break - - self._command_queue.task_done() async def _publish_command( self, device: OasisDevice, payload: str, wake: bool = False @@ -659,9 +691,15 @@ class OasisMqttClient(OasisClientProtocol): async def _mqtt_loop(self) -> None: """ - Run the MQTT WebSocket connection loop that maintains connection, subscriptions, and message handling. + Run the MQTT WebSocket connection loop that maintains connection, subscriptions, + and message handling. - This background coroutine establishes a persistent WSS MQTT connection to the configured broker, sets connection state on successful connect, resubscribes to known device STATUS topics, flushes any queued outbound commands, and dispatches incoming MQTT messages to the status handler. On disconnect or error it clears connection state and subscription tracking, and retries connecting after the configured backoff interval until the client is stopped. + This background coroutine establishes a persistent WSS MQTT connection to the + configured broker, sets connection state on successful connect, resubscribes to + known device STATUS topics, flushes any queued outbound commands, and dispatches + incoming MQTT messages to the status handler. On disconnect or error it clears + connection state and subscription tracking, and retries connecting after the + configured backoff interval until the client is stopped. """ loop = asyncio.get_running_loop() tls_context = await loop.run_in_executor(None, ssl.create_default_context) diff --git a/custom_components/oasis_mini/pyoasiscontrol/device.py b/custom_components/oasis_mini/pyoasiscontrol/device.py index 0ad9e91..bf9a543 100644 --- a/custom_components/oasis_mini/pyoasiscontrol/device.py +++ b/custom_components/oasis_mini/pyoasiscontrol/device.py @@ -70,7 +70,6 @@ class OasisDevice: cloud: OasisCloudClient | None = None, client: OasisClientProtocol | None = None, ) -> None: - # Transport """ Initialize an OasisDevice with identification, network, transport references, and default state fields.