1
0
mirror of https://github.com/natekspencer/hacs-oasis_mini.git synced 2025-12-06 18:44:14 -05:00

Better mqtt handling when connection is interrupted

This commit is contained in:
Nathan Spencer
2025-11-22 20:51:17 +00:00
parent 886d7598f3
commit ecad472bbd
14 changed files with 260 additions and 100 deletions

View File

@@ -11,9 +11,8 @@ from typing import Any, Final
import aiomqtt
from ..const import AUTOPLAY_MAP
from ..device import OasisDevice
from ..utils import _bit_to_bool
from ..utils import _bit_to_bool, _parse_int
from .transport import OasisClientProtocol
_LOGGER = logging.getLogger(__name__)
@@ -26,6 +25,9 @@ USERNAME: Final = "YXBw"
PASSWORD: Final = "RWdETFlKMDczfi4t"
RECONNECT_INTERVAL: Final = 4
# Command queue behaviour
MAX_PENDING_COMMANDS: Final = 10
class OasisMqttClient(OasisClientProtocol):
"""MQTT-based Oasis transport using WSS.
@@ -58,6 +60,11 @@ class OasisMqttClient(OasisClientProtocol):
self._subscribed_serials: set[str] = set()
self._subscription_lock = asyncio.Lock()
# Pending command queue: (serial, payload)
self._command_queue: asyncio.Queue[tuple[str, str]] = asyncio.Queue(
maxsize=MAX_PENDING_COMMANDS
)
def register_device(self, device: OasisDevice) -> None:
"""Register a device so MQTT messages can be routed to it."""
if not device.serial_number:
@@ -70,6 +77,10 @@ class OasisMqttClient(OasisClientProtocol):
self._first_status_events.setdefault(serial, asyncio.Event())
self._mac_events.setdefault(serial, asyncio.Event())
# Attach ourselves as the client if the device doesn't already have one
if not device.client:
device.attach_client(self)
# If we're already connected, subscribe to this device's topics
if self._client is not None:
try:
@@ -81,9 +92,6 @@ class OasisMqttClient(OasisClientProtocol):
"Could not schedule subscription for %s (no running loop)", serial
)
if not device.client:
device.attach_client(self)
def unregister_device(self, device: OasisDevice) -> None:
serial = device.serial_number
if not serial:
@@ -168,6 +176,14 @@ class OasisMqttClient(OasisClientProtocol):
finally:
self._client = None
# Drop pending commands on stop
while not self._command_queue.empty():
try:
self._command_queue.get_nowait()
self._command_queue.task_done()
except asyncio.QueueEmpty:
break
async def wait_until_ready(
self, device: OasisDevice, timeout: float = 10.0, request_status: bool = True
) -> bool:
@@ -260,7 +276,7 @@ class OasisMqttClient(OasisClientProtocol):
brightness: int,
) -> None:
payload = f"WRILED={led_effect};0;{color};{led_speed};{brightness}"
await self._publish_command(device, payload)
await self._publish_command(device, payload, bool(brightness))
async def async_send_sleep_command(self, device: OasisDevice) -> None:
await self._publish_command(device, "CMDSLEEP")
@@ -328,7 +344,7 @@ class OasisMqttClient(OasisClientProtocol):
await self._publish_command(device, payload)
async def async_send_play_command(self, device: OasisDevice) -> None:
await self._publish_command(device, "CMDPLAY")
await self._publish_command(device, "CMDPLAY", True)
async def async_send_pause_command(self, device: OasisDevice) -> None:
await self._publish_command(device, "CMDPAUSE")
@@ -339,21 +355,96 @@ class OasisMqttClient(OasisClientProtocol):
async def async_send_reboot_command(self, device: OasisDevice) -> None:
await self._publish_command(device, "CMDBOOT")
async def async_get_all(self, device: OasisDevice) -> None:
"""Request FULLSTATUS + SCHEDULE (compact snapshot)."""
await self._publish_command(device, "GETALL")
async def async_get_status(self, device: OasisDevice) -> None:
"""Ask device to publish STATUS topics."""
await self._publish_command(device, "GETSTATUS")
async def _publish_command(self, device: OasisDevice, payload: str) -> None:
if not self._client:
raise RuntimeError("MQTT client not connected yet")
async def _enqueue_command(self, serial: str, payload: str) -> None:
"""Queue a command to be sent when connected.
If the queue is full, drop the oldest command to make room.
"""
if self._command_queue.full():
try:
dropped = self._command_queue.get_nowait()
self._command_queue.task_done()
_LOGGER.debug(
"Command queue full, dropping oldest command: %s", dropped
)
except asyncio.QueueEmpty:
# race: became empty between full() and get_nowait()
pass
await self._command_queue.put((serial, payload))
_LOGGER.debug("Queued command for %s: %s", serial, payload)
async def _flush_pending_commands(self) -> None:
"""Send any queued commands now that we're connected."""
if not self._client:
return
while not self._command_queue.empty():
try:
serial, payload = self._command_queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
# Skip commands for unknown devices
if serial not in self._devices:
_LOGGER.debug(
"Skipping queued command for unknown device %s: %s",
serial,
payload,
)
self._command_queue.task_done()
continue
topic = f"{serial}/COMMAND/CMD"
_LOGGER.debug("Flushing queued MQTT command %s => %s", topic, payload)
await self._client.publish(topic, payload.encode(), qos=1)
except Exception:
_LOGGER.debug(
"Failed to flush queued command for %s, re-queuing", serial
)
# Put it back and break; we'll try again on next reconnect
await self._enqueue_command(serial, payload)
self._command_queue.task_done()
break
self._command_queue.task_done()
async def _publish_command(
self, device: OasisDevice, payload: str, wake: bool = False
) -> None:
serial = device.serial_number
if not serial:
raise RuntimeError("Device has no serial_number set")
raise RuntimeError("Device has no serial number set")
if wake and device.is_sleeping:
await self.async_get_all(device)
# If not connected, just queue the command
if not self._client or not self._connected_event.is_set():
_LOGGER.debug(
"MQTT not connected, queueing command for %s: %s", serial, payload
)
await self._enqueue_command(serial, payload)
return
topic = f"{serial}/COMMAND/CMD"
_LOGGER.debug("MQTT publish %s => %s", topic, payload)
await self._client.publish(topic, payload.encode(), qos=1)
try:
_LOGGER.debug("MQTT publish %s => %s", topic, payload)
await self._client.publish(topic, payload.encode(), qos=1)
except Exception:
_LOGGER.debug(
"MQTT publish failed, queueing command for %s: %s", serial, payload
)
await self._enqueue_command(serial, payload)
async def _mqtt_loop(self) -> None:
loop = asyncio.get_running_loop()
@@ -361,12 +452,7 @@ class OasisMqttClient(OasisClientProtocol):
while not self._stop_event.is_set():
try:
_LOGGER.debug(
"Connecting MQTT WSS to wss://%s:%s/%s",
HOST,
PORT,
PATH,
)
_LOGGER.info("Connecting MQTT WSS to wss://%s:%s/%s", HOST, PORT, PATH)
async with aiomqtt.Client(
hostname=HOST,
@@ -386,6 +472,9 @@ class OasisMqttClient(OasisClientProtocol):
# Subscribe only to STATUS topics for known devices
await self._resubscribe_all()
# Flush any queued commands now that we're connected
await self._flush_pending_commands()
async for msg in client.messages:
if self._stop_event.is_set():
break
@@ -394,13 +483,13 @@ class OasisMqttClient(OasisClientProtocol):
except asyncio.CancelledError:
break
except Exception:
_LOGGER.debug("MQTT connection error")
_LOGGER.info("MQTT connection error")
finally:
if self._connected_event.is_set():
self._connected_event.clear()
if self._connected_at:
_LOGGER.debug(
_LOGGER.info(
"MQTT was connected for %s",
datetime.now(UTC) - self._connected_at,
)
@@ -409,14 +498,13 @@ class OasisMqttClient(OasisClientProtocol):
self._subscribed_serials.clear()
if not self._stop_event.is_set():
_LOGGER.debug(
_LOGGER.info(
"Disconnected from broker, retrying in %.1fs", RECONNECT_INTERVAL
)
await asyncio.sleep(RECONNECT_INTERVAL)
async def _handle_status_message(self, msg: aiomqtt.Message) -> None:
"""Map MQTT STATUS topics → OasisDevice.update_from_status_dict payloads."""
topic_str = str(msg.topic) if msg.topic is not None else ""
payload = msg.payload.decode(errors="replace")
@@ -429,7 +517,6 @@ class OasisMqttClient(OasisClientProtocol):
device = self._devices.get(serial)
if not device:
# Ignore devices we don't know about
_LOGGER.debug("Received MQTT for unknown device %s: %s", serial, topic_str)
return
@@ -451,13 +538,13 @@ class OasisMqttClient(OasisClientProtocol):
elif status_name == "LED_EFFECT":
data["led_effect"] = payload
elif status_name == "LED_EFFECT_COLOR":
data["led_effect_color"] = payload
data["led_color_id"] = payload
elif status_name == "LED_SPEED":
data["led_speed"] = int(payload)
elif status_name == "LED_BRIGHTNESS":
data["brightness"] = int(payload)
elif status_name == "LED_MAX":
data["max_brightness"] = int(payload)
data["brightness_max"] = int(payload)
elif status_name == "LED_EFFECT_PARAM":
data["color"] = payload if payload.startswith("#") else None
elif status_name == "SYSTEM_BUSY":
@@ -467,7 +554,7 @@ class OasisMqttClient(OasisClientProtocol):
elif status_name == "REPEAT_JOB":
data["repeat_playlist"] = payload in ("1", "true", "True")
elif status_name == "WAIT_AFTER_JOB":
data["autoplay"] = AUTOPLAY_MAP.get(payload, payload)
data["autoplay"] = _parse_int(payload)
elif status_name == "AUTO_CLEAN":
data["auto_clean"] = payload in ("1", "true", "True")
elif status_name == "SOFTWARE_VER":
@@ -494,6 +581,9 @@ class OasisMqttClient(OasisClientProtocol):
data["schedule"] = payload
elif status_name == "ENVIRONMENT":
data["environment"] = payload
elif status_name == "FULLSTATUS":
if parsed := device.parse_status_string(payload):
data = parsed
else:
_LOGGER.warning(
"Unknown status received for %s: %s=%s",