mirror of
https://github.com/natekspencer/hacs-oasis_mini.git
synced 2025-12-06 18:44:14 -05:00
Merge pull request #104 from natekspencer/dispatcher
* **Performance Improvements** * Devices are integrated only after full initialization for more reliable discovery and faster setup. * Reduced unnecessary status requests for sleeping/inactive devices to conserve bandwidth and improve efficiency. * Improved real-time tracking so device state changes are reflected more quickly. * **New Features** * Newly initialized devices are added dynamically as they come online, improving responsiveness to device additions.
This commit is contained in:
@@ -10,9 +10,9 @@ from homeassistant.const import CONF_EMAIL, Platform
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.helpers.device_registry import DeviceEntry
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
import homeassistant.helpers.entity_registry as er
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .const import DOMAIN
|
||||
from .coordinator import OasisDeviceCoordinator
|
||||
@@ -56,19 +56,12 @@ def setup_platform_from_coordinator(
|
||||
update_before_add: If true, entities will be updated before being added.
|
||||
"""
|
||||
coordinator = entry.runtime_data
|
||||
|
||||
known_serials: set[str] = set()
|
||||
signal = coordinator._device_initialized_signal
|
||||
|
||||
@callback
|
||||
def _check_devices() -> None:
|
||||
"""
|
||||
Detect newly discovered Oasis devices from the coordinator and register their entities.
|
||||
|
||||
Scans the coordinator's current device list for devices with a serial number that has not
|
||||
been seen before. For any newly discovered devices, creates entity instances via
|
||||
make_entities and adds them to Home Assistant using async_add_entities with the
|
||||
update_before_add flag. Does not return a value.
|
||||
"""
|
||||
"""Add entities for any initialized devices not yet seen."""
|
||||
devices = coordinator.data or []
|
||||
new_devices: list[OasisDevice] = []
|
||||
|
||||
@@ -86,11 +79,33 @@ def setup_platform_from_coordinator(
|
||||
if entities := make_entities(new_devices):
|
||||
async_add_entities(entities, update_before_add)
|
||||
|
||||
# Initial population
|
||||
@callback
|
||||
def _handle_device_initialized(device: OasisDevice) -> None:
|
||||
"""
|
||||
Dispatcher callback for when a single device becomes initialized.
|
||||
|
||||
Adds entities immediately for that device if we haven't seen it yet.
|
||||
"""
|
||||
serial = device.serial_number
|
||||
if not serial or serial in known_serials or not device.is_initialized:
|
||||
return
|
||||
|
||||
known_serials.add(serial)
|
||||
|
||||
if entities := make_entities([device]):
|
||||
async_add_entities(entities, update_before_add)
|
||||
|
||||
# Initial population from current coordinator data
|
||||
_check_devices()
|
||||
# Future updates (new devices discovered)
|
||||
|
||||
# Future changes: new devices / account re-sync via coordinator
|
||||
entry.async_on_unload(coordinator.async_add_listener(_check_devices))
|
||||
|
||||
# Device-level initialization events via dispatcher
|
||||
entry.async_on_unload(
|
||||
async_dispatcher_connect(coordinator.hass, signal, _handle_device_initialized)
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: OasisDeviceConfigEntry) -> bool:
|
||||
"""
|
||||
@@ -127,18 +142,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: OasisDeviceConfigEntry)
|
||||
|
||||
entry.runtime_data = coordinator
|
||||
|
||||
def _on_oasis_update() -> None:
|
||||
"""
|
||||
Update the coordinator's last-updated timestamp and notify its listeners.
|
||||
|
||||
Sets the coordinator's last_updated to the current time and triggers its update listeners so dependent entities and tasks refresh.
|
||||
"""
|
||||
coordinator.last_updated = dt_util.now()
|
||||
coordinator.async_update_listeners()
|
||||
|
||||
for device in coordinator.data or []:
|
||||
device.add_update_listener(_on_oasis_update)
|
||||
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
|
||||
return True
|
||||
|
||||
@@ -9,6 +9,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import device_registry as dr
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
@@ -51,15 +52,56 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
|
||||
self.cloud_client = cloud_client
|
||||
self.mqtt_client = OasisMqttClient()
|
||||
|
||||
# Track which devices are currently considered initialized
|
||||
self._initialized_serials: set[str] = set()
|
||||
|
||||
@property
|
||||
def _device_initialized_signal(self) -> str:
|
||||
"""Dispatcher signal name for device initialization events."""
|
||||
return f"{DOMAIN}_{self.config_entry.entry_id}_device_initialized"
|
||||
|
||||
def _attach_device_listeners(self, device: OasisDevice) -> None:
|
||||
"""Attach a listener so we can fire dispatcher events when a device initializes."""
|
||||
|
||||
def _on_device_update() -> None:
|
||||
serial = device.serial_number
|
||||
if not serial:
|
||||
return
|
||||
|
||||
initialized = device.is_initialized
|
||||
was_initialized = serial in self._initialized_serials
|
||||
|
||||
if initialized and not was_initialized:
|
||||
self._initialized_serials.add(serial)
|
||||
_LOGGER.debug("%s ready for setup; dispatching signal", device.name)
|
||||
async_dispatcher_send(
|
||||
self.hass, self._device_initialized_signal, device
|
||||
)
|
||||
|
||||
elif not initialized and was_initialized:
|
||||
self._initialized_serials.remove(serial)
|
||||
_LOGGER.debug("Oasis device %s no longer initialized", serial)
|
||||
|
||||
self.last_updated = dt_util.now()
|
||||
self.async_update_listeners()
|
||||
|
||||
device.add_update_listener(_on_device_update)
|
||||
|
||||
# Seed the initialized set if the device is already initialized
|
||||
if device.is_initialized and device.serial_number:
|
||||
self._initialized_serials.add(device.serial_number)
|
||||
|
||||
async def _async_update_data(self) -> list[OasisDevice]:
|
||||
"""
|
||||
Fetch and assemble the current list of OasisDevice objects, reconcile removed devices in Home Assistant, register discovered devices with MQTT, and verify per-device readiness.
|
||||
Fetch and assemble the current list of OasisDevice objects, reconcile removed
|
||||
devices in Home Assistant, register discovered devices with MQTT, and
|
||||
best-effort trigger status updates for uninitialized devices.
|
||||
|
||||
Returns:
|
||||
A list of OasisDevice instances representing devices currently available for the account.
|
||||
|
||||
Raises:
|
||||
UpdateFailed: If no devices can be read after repeated attempts or an unexpected error persists past retry limits.
|
||||
UpdateFailed: If an unexpected error persists past retry limits.
|
||||
"""
|
||||
devices: list[OasisDevice] = []
|
||||
self.attempt += 1
|
||||
@@ -86,15 +128,18 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
|
||||
name=raw.get("name"),
|
||||
cloud=self.cloud_client,
|
||||
)
|
||||
self._attach_device_listeners(device)
|
||||
|
||||
devices.append(device)
|
||||
|
||||
# Handle devices removed from the account
|
||||
new_serials = {d.serial_number for d in devices if d.serial_number}
|
||||
removed_serials = set(existing_by_serial) - new_serials
|
||||
|
||||
if removed_serials:
|
||||
device_registry = dr.async_get(self.hass)
|
||||
for serial in removed_serials:
|
||||
self._initialized_serials.discard(serial)
|
||||
_LOGGER.info(
|
||||
"Oasis device %s removed from account; cleaning up in HA",
|
||||
serial,
|
||||
@@ -119,6 +164,7 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
|
||||
self.last_updated = dt_util.now()
|
||||
return []
|
||||
|
||||
# Ensure MQTT is running and devices are registered
|
||||
if not self.mqtt_client.is_running:
|
||||
self.mqtt_client.start()
|
||||
self.mqtt_client.register_devices(devices)
|
||||
@@ -129,32 +175,28 @@ class OasisDeviceCoordinator(DataUpdateCoordinator[list[OasisDevice]]):
|
||||
except Exception:
|
||||
_LOGGER.exception("Error fetching playlists from cloud")
|
||||
|
||||
# Best-effort: request status for devices that are not yet initialized
|
||||
for device in devices:
|
||||
try:
|
||||
ready = await self.mqtt_client.wait_until_ready(
|
||||
device, request_status=True
|
||||
)
|
||||
if not ready:
|
||||
_LOGGER.debug(
|
||||
"Oasis device %s not ready yet; will retry on next update",
|
||||
device.serial_number,
|
||||
)
|
||||
continue
|
||||
|
||||
if not device.is_initialized:
|
||||
await device.async_get_status()
|
||||
device.schedule_track_refresh()
|
||||
|
||||
except Exception:
|
||||
_LOGGER.exception(
|
||||
"Error preparing Oasis device %s", device.serial_number
|
||||
"Error requesting status for Oasis device %s; "
|
||||
"will retry on future updates",
|
||||
device.serial_number,
|
||||
)
|
||||
|
||||
self.attempt = 0
|
||||
|
||||
except Exception as ex:
|
||||
if self.attempt > 2 or not (devices or self.data):
|
||||
raise UpdateFailed(
|
||||
"Unexpected error talking to Oasis devices "
|
||||
f"after {self.attempt} attempts"
|
||||
) from ex
|
||||
|
||||
_LOGGER.warning(
|
||||
"Error updating Oasis devices; reusing previous data", exc_info=ex
|
||||
)
|
||||
|
||||
@@ -215,7 +215,8 @@ class OasisMqttClient(OasisClientProtocol):
|
||||
self._subscribed_serials.clear()
|
||||
for serial, device in self._devices.items():
|
||||
await self._subscribe_serial(serial)
|
||||
await self.async_get_all(device)
|
||||
if not device.is_sleeping:
|
||||
await self.async_get_all(device)
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start MQTT connection loop."""
|
||||
|
||||
@@ -495,6 +495,11 @@ class OasisDevice:
|
||||
except Exception:
|
||||
_LOGGER.exception("Error in update listener")
|
||||
|
||||
async def async_get_status(self) -> None:
|
||||
"""Request that the device update its current status."""
|
||||
client = self._require_client()
|
||||
await client.async_get_status(self)
|
||||
|
||||
async def async_get_mac_address(self) -> str | None:
|
||||
"""
|
||||
Get the device MAC address, requesting it from the attached transport client if not already known.
|
||||
|
||||
Reference in New Issue
Block a user