1
0
mirror of https://github.com/natekspencer/hacs-oasis_mini.git synced 2025-12-06 18:44:14 -05:00

3 Commits

Author SHA1 Message Date
Nathan Spencer
009cd8cde3 Fix missing dependency for update tracks action (#101) 2025-11-24 12:17:53 -07:00
Nathan Spencer
a3ea4dc05a Add convenience properties and more logging to mqtt client, better mqtt management via coordinator (#100)
* Add convenience properties and more logging to mqtt client, better mqtt management via coordinator

* Address PR comments

* Address PR comments

* Fix
2025-11-24 11:54:04 -07:00
Nathan Spencer
379b6f67f2 Swap out direct HTTP connection with server MQTT connection to handle firmware 2.60+ (#98)
* Switch to using mqtt

* Better mqtt handling when connection is interrupted

* Get track info from the cloud when playlist or index changes

* Add additional helpers

* Dynamically handle devices and other enhancements

* 📝 Add docstrings to `mqtt`

Docstrings generation was requested by @natekspencer.

* https://github.com/natekspencer/hacs-oasis_mini/pull/98#issuecomment-3568450288

The following files were modified:

* `custom_components/oasis_mini/__init__.py`
* `custom_components/oasis_mini/binary_sensor.py`
* `custom_components/oasis_mini/button.py`
* `custom_components/oasis_mini/config_flow.py`
* `custom_components/oasis_mini/coordinator.py`
* `custom_components/oasis_mini/entity.py`
* `custom_components/oasis_mini/helpers.py`
* `custom_components/oasis_mini/image.py`
* `custom_components/oasis_mini/light.py`
* `custom_components/oasis_mini/media_player.py`
* `custom_components/oasis_mini/number.py`
* `custom_components/oasis_mini/pyoasiscontrol/clients/cloud_client.py`
* `custom_components/oasis_mini/pyoasiscontrol/clients/http_client.py`
* `custom_components/oasis_mini/pyoasiscontrol/clients/mqtt_client.py`
* `custom_components/oasis_mini/pyoasiscontrol/clients/transport.py`
* `custom_components/oasis_mini/pyoasiscontrol/device.py`
* `custom_components/oasis_mini/pyoasiscontrol/utils.py`
* `custom_components/oasis_mini/select.py`
* `custom_components/oasis_mini/sensor.py`
* `custom_components/oasis_mini/switch.py`
* `custom_components/oasis_mini/update.py`
* `update_tracks.py`

* Fix formatting in transport.py

* Replace tabs with spaces

* Use tuples instead of sets for descriptors

* Encode svg in image entity

* Fix iot_class

* Fix tracks list url

* Ensure update_tracks closes the connection

* Fix number typing and docstring

* Fix docstring in update_tracks

* Cache playlist based on type

* Fix formatting in device.py

* Add missing async_send_auto_clean_command to http client

* Propagate UnauthenticatedError from async_get_track_info

* Adjust exceptions

* Move create_client outside of try block in config_flow

* Formatting

* Address PR comments

* Formatting

* Add noqa: ARG001 on unused hass

* Close cloud/MQTT clients if initial coordinator refresh fails.

* Address PR again

* PR fixes

* Pass config entry to coordinator

* Remove async_timeout (thanks ChatGPT... not)

* Address PR

* Replace magic numbers for status code

* Update autoplay wording/ordering

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
2025-11-24 01:09:23 -07:00
12 changed files with 169 additions and 99 deletions

View File

@@ -23,7 +23,7 @@ jobs:
python-version: "3.13"
- name: Install dependencies
run: pip install homeassistant
run: pip install homeassistant aiomqtt
- name: Update tracks
env:

View File

@@ -18,7 +18,7 @@ from .const import DOMAIN
from .coordinator import OasisDeviceCoordinator
from .entity import OasisDeviceEntity
from .helpers import create_client
from .pyoasiscontrol import OasisDevice, OasisMqttClient, UnauthenticatedError
from .pyoasiscontrol import OasisDevice, UnauthenticatedError
type OasisDeviceConfigEntry = ConfigEntry[OasisDeviceCoordinator]
@@ -94,7 +94,9 @@ def setup_platform_from_coordinator(
async def async_setup_entry(hass: HomeAssistant, entry: OasisDeviceConfigEntry) -> bool:
"""
Initialize Oasis cloud and MQTT integration for a config entry, create and refresh the device coordinator, register update listeners for discovered devices, forward platform setup, and update the entry's metadata as needed.
Initialize Oasis cloud for a config entry, create and refresh the device
coordinator, register update listeners for discovered devices, forward platform
setup, and update the entry's metadata as needed.
Returns:
True if the config entry was set up successfully.
@@ -109,15 +111,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: OasisDeviceConfigEntry)
await cloud_client.async_close()
raise
mqtt_client = OasisMqttClient()
coordinator = OasisDeviceCoordinator(hass, entry, cloud_client, mqtt_client)
coordinator = OasisDeviceCoordinator(hass, entry, cloud_client)
try:
mqtt_client.start()
await coordinator.async_config_entry_first_refresh()
except Exception:
await mqtt_client.async_close()
await cloud_client.async_close()
await coordinator.async_close()
raise
if entry.unique_id != (user_id := str(user["id"])):
@@ -151,18 +150,17 @@ async def async_unload_entry(
"""
Cleanly unload an Oasis device config entry.
Closes the MQTT and cloud clients stored on the entry and unloads all supported platforms.
Unloads all supported platforms and closes the coordinator connections.
Returns:
`True` if all platforms were unloaded successfully, `False` otherwise.
"""
mqtt_client = entry.runtime_data.mqtt_client
await mqtt_client.async_close()
cloud_client = entry.runtime_data.cloud_client
await cloud_client.async_close()
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
try:
await entry.runtime_data.async_close()
except Exception:
_LOGGER.exception("Error closing Oasis coordinator during unload")
return unload_ok
async def async_remove_entry(

View File

@@ -2,12 +2,11 @@
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
import logging
from typing import TYPE_CHECKING
import async_timeout
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -33,7 +32,6 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
hass: HomeAssistant,
config_entry: OasisDeviceConfigEntry,
cloud_client: OasisCloudClient,
mqtt_client: OasisMqttClient,
) -> None:
"""
Create an OasisDeviceCoordinator that manages OasisDevice discovery and updates using cloud and MQTT clients.
@@ -41,7 +39,6 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
Parameters:
config_entry (OasisDeviceConfigEntry): The config entry whose runtime data contains device serial numbers.
cloud_client (OasisCloudClient): Client for communicating with the Oasis cloud API and fetching device data.
mqtt_client (OasisMqttClient): Client for registering devices and coordinating MQTT-based readiness/status.
"""
super().__init__(
hass,
@@ -52,7 +49,7 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
always_update=False,
)
self.cloud_client = cloud_client
self.mqtt_client = mqtt_client
self.mqtt_client = OasisMqttClient()
async def _async_update_data(self) -> list[OasisDevice]:
"""
@@ -68,7 +65,7 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
self.attempt += 1
try:
async with async_timeout.timeout(30):
async with asyncio.timeout(30):
raw_devices = await self.cloud_client.async_get_devices()
existing_by_serial = {
@@ -111,14 +108,19 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
remove_config_entry_id=self.config_entry.entry_id,
)
# ✅ Valid state: logged in but no devices on account
# If logged in, but no devices on account, return without starting mqtt
if not devices:
_LOGGER.debug("No Oasis devices found for account")
if self.mqtt_client.is_running:
# Close the mqtt client if it was previously started
await self.mqtt_client.async_close()
self.attempt = 0
if devices != self.data:
self.last_updated = dt_util.now()
return []
if not self.mqtt_client.is_running:
self.mqtt_client.start()
self.mqtt_client.register_devices(devices)
# Best-effort playlists
@@ -183,3 +185,11 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
self.last_updated = dt_util.now()
return devices
async def async_close(self) -> None:
"""Close client connections."""
await asyncio.gather(
self.mqtt_client.async_close(),
self.cloud_client.async_close(),
return_exceptions=True,
)

View File

@@ -6,14 +6,12 @@ import asyncio
import logging
from typing import Any
import async_timeout
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .pyoasiscontrol import OasisCloudClient, OasisDevice
from .pyoasiscontrol.const import TRACKS
from .pyoasiscontrol.const import STATUS_PLAYING, TRACKS
_LOGGER = logging.getLogger(__name__)
@@ -44,24 +42,26 @@ async def add_and_play_track(device: OasisDevice, track: int) -> None:
track (int): The track id to add and play.
Raises:
async_timeout.TimeoutError: If the operation does not complete within 10 seconds.
TimeoutError: If the operation does not complete within 10 seconds.
"""
async with async_timeout.timeout(10):
async with asyncio.timeout(10):
if track not in device.playlist:
await device.async_add_track_to_playlist(track)
# Wait for device state to reflect the newly added track
while track not in device.playlist:
await asyncio.sleep(0.1)
# Move track to next item in the playlist and then select it
# Ensure the track is positioned immediately after the current track and select it
if (index := device.playlist.index(track)) != device.playlist_index:
# Calculate the position after the current track
if index != (
_next := min(device.playlist_index + 1, len(device.playlist) - 1)
):
await device.async_move_track(index, _next)
await device.async_change_track(_next)
if device.status_code != 4:
if device.status_code != STATUS_PLAYING:
await device.async_play()

View File

@@ -121,13 +121,15 @@ class OasisDeviceLightEntity(OasisDeviceEntity, LightEntity):
"""
Turn the light on and set its LED state.
Processes optional keyword arguments to compute the device-specific LED parameters, then updates the device's LEDs with the resulting brightness, color, and effect.
Processes optional keyword arguments to compute the device-specific LED
parameters, then updates the device's LEDs with the resulting brightness, color,
and effect.
Parameters:
kwargs: Optional control parameters recognized by the method:
ATTR_BRIGHTNESS (int): Brightness in the 0-255 Home Assistant scale. When provided,
it is converted and rounded up to the device's brightness scale (1..device.brightness_max).
When omitted, uses self.device.brightness or self.device.brightness_on.
When omitted, uses self.device.brightness_on (last non-zero brightness).
ATTR_RGB_COLOR (tuple[int, int, int]): RGB tuple (R, G, B). When provided, it is
converted to a hex color string prefixed with '#'.
ATTR_EFFECT (str): Human-readable effect name. When provided, it is mapped to the
@@ -140,7 +142,7 @@ class OasisDeviceLightEntity(OasisDeviceEntity, LightEntity):
scale = (1, self.device.brightness_max)
brightness = math.ceil(brightness_to_value(scale, brightness))
else:
brightness = self.device.brightness or self.device.brightness_on
brightness = self.device.brightness_on
if color := kwargs.get(ATTR_RGB_COLOR):
color = f"#{color_rgb_to_hex(*color)}"

View File

@@ -23,6 +23,16 @@ from .const import DOMAIN
from .entity import OasisDeviceEntity
from .helpers import get_track_id
from .pyoasiscontrol import OasisDevice
from .pyoasiscontrol.const import (
STATUS_CENTERING,
STATUS_DOWNLOADING,
STATUS_ERROR,
STATUS_LIVE,
STATUS_PAUSED,
STATUS_PLAYING,
STATUS_STOPPED,
STATUS_UPDATING,
)
async def async_setup_entry(
@@ -130,17 +140,17 @@ class OasisDeviceMediaPlayerEntity(OasisDeviceEntity, MediaPlayerEntity):
def state(self) -> MediaPlayerState:
"""State of the player."""
status_code = self.device.status_code
if self.device.error or status_code in (9, 11):
if self.device.error or status_code in (STATUS_ERROR, STATUS_UPDATING):
return MediaPlayerState.OFF
if status_code == 2:
if status_code == STATUS_STOPPED:
return MediaPlayerState.IDLE
if status_code in (3, 13):
if status_code in (STATUS_CENTERING, STATUS_DOWNLOADING):
return MediaPlayerState.BUFFERING
if status_code == 4:
if status_code == STATUS_PLAYING:
return MediaPlayerState.PLAYING
if status_code == 5:
if status_code == STATUS_PAUSED:
return MediaPlayerState.PAUSED
if status_code == 15:
if status_code == STATUS_LIVE:
return MediaPlayerState.ON
return MediaPlayerState.IDLE

View File

@@ -83,6 +83,20 @@ class OasisMqttClient(OasisClientProtocol):
maxsize=MAX_PENDING_COMMANDS
)
@property
def is_running(self) -> bool:
"""Return `True` if the MQTT loop has been started and is not stopped."""
return (
self._loop_task is not None
and not self._loop_task.done()
and not self._stop_event.is_set()
)
@property
def is_connected(self) -> bool:
"""Return `True` if the MQTT client is currently connected."""
return self._connected_event.is_set()
def register_device(self, device: OasisDevice) -> None:
"""
Register an OasisDevice so MQTT messages for its serial are routed to that device.
@@ -218,32 +232,49 @@ class OasisMqttClient(OasisClientProtocol):
"""
Stop the MQTT client and clean up resources.
Signals the background MQTT loop to stop, cancels the loop task, disconnects the MQTT client if connected, and clears any pending commands from the internal command queue.
Signals the background MQTT loop to stop, cancels the loop task,
disconnects the MQTT client if connected, and drops any pending commands.
"""
_LOGGER.debug("MQTT stop() called - beginning shutdown sequence")
self._stop_event.set()
if self._loop_task:
_LOGGER.debug(
"Cancelling MQTT background task (task=%s, done=%s)",
self._loop_task,
self._loop_task.done(),
)
self._loop_task.cancel()
try:
await self._loop_task
except asyncio.CancelledError:
pass
_LOGGER.debug("MQTT background task cancelled")
if self._client:
_LOGGER.debug("Disconnecting MQTT client from broker")
try:
await self._client.disconnect()
_LOGGER.debug("MQTT client disconnected")
except Exception:
_LOGGER.exception("Error disconnecting MQTT client")
finally:
self._client = None
# Drop pending commands on stop
while not self._command_queue.empty():
try:
self._command_queue.get_nowait()
self._command_queue.task_done()
except asyncio.QueueEmpty:
break
# Drop queued commands
if not self._command_queue.empty():
_LOGGER.debug("Dropping queued commands")
dropped = 0
while not self._command_queue.empty():
try:
self._command_queue.get_nowait()
self._command_queue.task_done()
dropped += 1
except asyncio.QueueEmpty:
break
_LOGGER.debug("MQTT dropped %s queued command(s)", dropped)
_LOGGER.debug("MQTT shutdown sequence complete")
async def wait_until_ready(
self, device: OasisDevice, timeout: float = 10.0, request_status: bool = True
@@ -586,6 +617,9 @@ class OasisMqttClient(OasisClientProtocol):
return
while not self._command_queue.empty():
if not self._client:
break
try:
serial, payload = self._command_queue.get_nowait()
except asyncio.QueueEmpty:
@@ -599,7 +633,6 @@ class OasisMqttClient(OasisClientProtocol):
serial,
payload,
)
self._command_queue.task_done()
continue
topic = f"{serial}/COMMAND/CMD"
@@ -609,12 +642,11 @@ class OasisMqttClient(OasisClientProtocol):
_LOGGER.debug(
"Failed to flush queued command for %s, re-queuing", serial
)
# Put it back and break; we'll try again on next reconnect
# Put it back; we'll try again on next reconnect
await self._enqueue_command(serial, payload)
finally:
# Ensure we always balance the get(), even on cancellation
self._command_queue.task_done()
break
self._command_queue.task_done()
async def _publish_command(
self, device: OasisDevice, payload: str, wake: bool = False
@@ -659,9 +691,15 @@ class OasisMqttClient(OasisClientProtocol):
async def _mqtt_loop(self) -> None:
"""
Run the MQTT WebSocket connection loop that maintains connection, subscriptions, and message handling.
Run the MQTT WebSocket connection loop that maintains connection, subscriptions,
and message handling.
This background coroutine establishes a persistent WSS MQTT connection to the configured broker, sets connection state on successful connect, resubscribes to known device STATUS topics, flushes any queued outbound commands, and dispatches incoming MQTT messages to the status handler. On disconnect or error it clears connection state and subscription tracking, and retries connecting after the configured backoff interval until the client is stopped.
This background coroutine establishes a persistent WSS MQTT connection to the
configured broker, sets connection state on successful connect, resubscribes to
known device STATUS topics, flushes any queued outbound commands, and dispatches
incoming MQTT messages to the status handler. On disconnect or error it clears
connection state and subscription tracking, and retries connecting after the
configured backoff interval until the client is stopped.
"""
loop = asyncio.get_running_loop()
tls_context = await loop.run_in_executor(None, ssl.create_default_context)

View File

@@ -12,19 +12,19 @@ try:
TRACKS: Final[dict[int, dict[str, Any]]] = {
int(k): v for k, v in json.load(file).items()
}
except Exception: # ignore: broad-except
except (FileNotFoundError, json.JSONDecodeError, OSError):
TRACKS = {}
AUTOPLAY_MAP: Final[dict[str, str]] = {
"0": "on",
"1": "off",
"2": "5 minutes",
"3": "10 minutes",
"4": "30 minutes",
"6": "1 hour",
"7": "6 hours",
"8": "12 hours",
"5": "24 hours",
"1": "Off", # display off (disabled) first
"0": "Immediately",
"2": "After 5 minutes",
"3": "After 10 minutes",
"4": "After 30 minutes",
"6": "After 1 hour",
"7": "After 6 hours",
"8": "After 12 hours",
"5": "After 24 hours", # purposefully placed so time is incrementally displayed
}
ERROR_CODE_MAP: Final[dict[int, str]] = {
@@ -94,17 +94,28 @@ LED_EFFECTS: Final[dict[str, str]] = {
"41": "Color Comets",
}
STATUS_CODE_SLEEPING: Final = 6
STATUS_BOOTING: Final[int] = 0
STATUS_STOPPED: Final[int] = 2
STATUS_CENTERING: Final[int] = 3
STATUS_PLAYING: Final[int] = 4
STATUS_PAUSED: Final[int] = 5
STATUS_SLEEPING: Final[int] = 6
STATUS_ERROR: Final[int] = 9
STATUS_UPDATING: Final[int] = 11
STATUS_DOWNLOADING: Final[int] = 13
STATUS_BUSY: Final[int] = 14
STATUS_LIVE: Final[int] = 15
STATUS_CODE_MAP: Final[dict[int, str]] = {
0: "booting",
2: "stopped",
3: "centering",
4: "playing",
5: "paused",
STATUS_CODE_SLEEPING: "sleeping",
9: "error",
11: "updating",
13: "downloading",
14: "busy",
15: "live",
STATUS_BOOTING: "booting",
STATUS_STOPPED: "stopped",
STATUS_CENTERING: "centering",
STATUS_PLAYING: "playing",
STATUS_PAUSED: "paused",
STATUS_SLEEPING: "sleeping",
STATUS_ERROR: "error",
STATUS_UPDATING: "updating",
STATUS_DOWNLOADING: "downloading",
STATUS_BUSY: "busy",
STATUS_LIVE: "live",
}

View File

@@ -10,7 +10,8 @@ from .const import (
ERROR_CODE_MAP,
LED_EFFECTS,
STATUS_CODE_MAP,
STATUS_CODE_SLEEPING,
STATUS_ERROR,
STATUS_SLEEPING,
TRACKS,
)
from .utils import _bit_to_bool, _parse_int, create_svg, decrypt_svg_content
@@ -69,7 +70,6 @@ class OasisDevice:
cloud: OasisCloudClient | None = None,
client: OasisClientProtocol | None = None,
) -> None:
# Transport
"""
Initialize an OasisDevice with identification, network, transport references, and default state fields.
@@ -163,7 +163,7 @@ class OasisDevice:
Returns:
`true` if the device is sleeping, `false` otherwise.
"""
return self.status_code == STATUS_CODE_SLEEPING
return self.status_code == STATUS_SLEEPING
def attach_client(self, client: OasisClientProtocol) -> None:
"""Attach a transport client (MQTT, HTTP, etc.) to this device."""
@@ -343,7 +343,7 @@ class OasisDevice:
Returns:
str: The mapped error message when the device status indicates an error (status code 9); `None` otherwise.
"""
if self.status_code == 9:
if self.status_code == STATUS_ERROR:
return ERROR_CODE_MAP.get(self.error, f"Unknown ({self.error})")
return None

View File

@@ -76,15 +76,15 @@
"autoplay": {
"name": "Autoplay",
"state": {
"0": "on",
"1": "off",
"2": "5 minutes",
"3": "10 minutes",
"4": "30 minutes",
"6": "1 hour",
"7": "6 hours",
"8": "12 hours",
"5": "24 hours"
"1": "Off",
"0": "Immediately",
"2": "After 5 minutes",
"3": "After 10 minutes",
"4": "After 30 minutes",
"6": "After 1 hour",
"7": "After 6 hours",
"8": "After 12 hours",
"5": "After 24 hours"
}
},
"playlist": {

View File

@@ -76,15 +76,15 @@
"autoplay": {
"name": "Autoplay",
"state": {
"0": "on",
"1": "off",
"2": "5 minutes",
"3": "10 minutes",
"4": "30 minutes",
"6": "1 hour",
"7": "6 hours",
"8": "12 hours",
"5": "24 hours"
"1": "Off",
"0": "Immediately",
"2": "After 5 minutes",
"3": "After 10 minutes",
"4": "After 30 minutes",
"6": "After 1 hour",
"7": "After 6 hours",
"8": "After 12 hours",
"5": "After 24 hours"
}
},
"playlist": {

View File

@@ -18,6 +18,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import OasisDeviceConfigEntry, setup_platform_from_coordinator
from .entity import OasisDeviceEntity
from .pyoasiscontrol import OasisDevice
from .pyoasiscontrol.const import STATUS_UPDATING
_LOGGER = logging.getLogger(__name__)
@@ -71,7 +72,7 @@ class OasisDeviceUpdateEntity(OasisDeviceEntity, UpdateEntity):
@property
def in_progress(self) -> bool | int:
"""Update installation progress."""
if self.device.status_code == 11:
if self.device.status_code == STATUS_UPDATING:
return self.device.download_progress
return False