1
0
mirror of https://github.com/natekspencer/hacs-oasis_mini.git synced 2025-12-06 18:44:14 -05:00
Files
hacs-oasis_mini/custom_components/oasis_mini/pyoasiscontrol/device.py

773 lines
28 KiB
Python

"""Oasis device."""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING, Any, Callable, Final, Iterable
from .const import (
ERROR_CODE_MAP,
LED_EFFECTS,
STATUS_CODE_MAP,
STATUS_ERROR,
STATUS_SLEEPING,
TRACKS,
)
from .utils import _bit_to_bool, _parse_int, create_svg, decrypt_svg_content
if TYPE_CHECKING: # avoid runtime circular imports
from .clients import OasisCloudClient
from .clients.transport import OasisClientProtocol
_LOGGER = logging.getLogger(__name__)
BALL_SPEED_MAX: Final = 400
BALL_SPEED_MIN: Final = 100
BRIGHTNESS_DEFAULT: Final = 100
LED_SPEED_MAX: Final = 90
LED_SPEED_MIN: Final = -90
_STATE_FIELDS = (
"auto_clean",
"autoplay",
"ball_speed",
"brightness",
"busy",
"color",
"download_progress",
"error",
"led_effect",
"led_speed",
"mac_address",
"playlist",
"playlist_index",
"progress",
"repeat_playlist",
"serial_number",
"software_version",
"status_code",
)
class OasisDevice:
"""Oasis device model + behavior.
Transport-agnostic; all I/O is delegated to an attached
OasisClientProtocol (MQTT, HTTP, etc.) via `attach_client`.
"""
manufacturer: Final = "Kinetic Oasis"
def __init__(
self,
*,
model: str | None = None,
serial_number: str | None = None,
name: str | None = None,
ssid: str | None = None,
ip_address: str | None = None,
cloud: OasisCloudClient | None = None,
client: OasisClientProtocol | None = None,
) -> None:
"""
Initialize an OasisDevice with identification, network, transport references, and default state fields.
Parameters:
model (str | None): Device model identifier.
serial_number (str | None): Device serial number.
name (str | None): Human-readable device name; if omitted, defaults to "<model> <serial_number>".
ssid (str | None): Last-known Wi-Fi SSID for the device.
ip_address (str | None): Last-known IP address for the device.
cloud (OasisCloudClient | None): Optional cloud client used to fetch track metadata and remote data.
client (OasisClientProtocol | None): Optional transport client used to send commands to the device.
Notes:
- Creates an internal listener list for state-change callbacks.
- Initializes status fields (brightness, playlist, playback state, networking, etc.) with sensible defaults.
- Initializes a track metadata cache and a placeholder for a background refresh task.
"""
self._cloud = cloud
self._client = client
self._listeners: list[Callable[[], None]] = []
# Details
self.model = model
self.serial_number = serial_number
self.name = name if name else f"{model} {serial_number}"
self.ssid = ssid
self.ip_address = ip_address
# Status
self.auto_clean: bool = False
self.autoplay: int = 0
self.ball_speed: int = BALL_SPEED_MIN
self._brightness: int = 0
self.brightness_max: int = 200
self.brightness_on: int = BRIGHTNESS_DEFAULT
self.busy: bool = False
self.color: str | None = None
self.download_progress: int = 0
self.error: int = 0
self.led_color_id: str = "0"
self.led_effect: str = "0"
self.led_speed: int = 0
self.mac_address: str | None = None
self.playlist: list[int] = []
self.playlist_index: int = 0
self.progress: int = 0
self.repeat_playlist: bool = False
self.software_version: str | None = None
self.status_code: int = 0
self.wifi_connected: bool = False
self.wifi_ip: str | None = None
self.wifi_ssid: str | None = None
self.wifi_pdns: str | None = None
self.wifi_sdns: str | None = None
self.wifi_gate: str | None = None
self.wifi_sub: str | None = None
self.environment: str | None = None
self.schedule: Any | None = None
# Track metadata cache
self._track: dict | None = None
self._track_task: asyncio.Task | None = None
@property
def brightness(self) -> int:
"""
Current display brightness adjusted for the device sleep state.
Returns:
int: 0 when the device is sleeping, otherwise the stored brightness value.
"""
return 0 if self.is_sleeping else self._brightness
@brightness.setter
def brightness(self, value: int) -> None:
"""
Set the device brightness and update brightness_on when non-zero.
Parameters:
value (int): Brightness level to apply; if non-zero, also stored in `brightness_on`.
"""
self._brightness = value
if value:
self.brightness_on = value
@property
def is_initialized(self) -> bool:
"""Return `True` if the device is fully identified."""
return bool(self.serial_number and self.mac_address and self.software_version)
@property
def is_sleeping(self) -> bool:
"""
Indicates whether the device is currently in the sleeping status.
Returns:
`true` if the device is sleeping, `false` otherwise.
"""
return self.status_code == STATUS_SLEEPING
def attach_client(self, client: OasisClientProtocol) -> None:
"""Attach a transport client (MQTT, HTTP, etc.) to this device."""
self._client = client
@property
def client(self) -> OasisClientProtocol | None:
"""
Get the attached transport client, or `None` if no client is attached.
Returns:
The attached transport client, or `None` if not attached.
"""
return self._client
def _require_client(self) -> OasisClientProtocol:
"""
Get the attached transport client for this device.
Returns:
OasisClientProtocol: The attached transport client.
Raises:
RuntimeError: If no client/transport is attached to the device.
"""
if self._client is None:
raise RuntimeError(
f"No client/transport attached for device {self.serial_number!r}"
)
return self._client
def _update_field(self, name: str, value: Any) -> bool:
"""
Update an attribute on the device if the new value differs from the current value.
Sets the instance attribute named `name` to `value` and logs a debug message when a change occurs.
Parameters:
name (str): The attribute name to update.
value (Any): The new value to assign to the attribute.
Returns:
bool: True if the attribute was changed, False otherwise.
"""
old = getattr(self, name, None)
if old != value:
_LOGGER.debug(
"%s %s changed: '%s' -> '%s'",
self.serial_number,
name.replace("_", " ").capitalize(),
old,
value,
)
setattr(self, name, value)
return True
return False
def update_from_status_dict(self, data: dict[str, Any]) -> None:
"""
Update the device's attributes from a status dictionary.
Expects a mapping of attribute names to values; known attributes are applied to the device,
unknown keys are logged and ignored. If `playlist` or `playlist_index` change, a track
refresh is scheduled. If any attribute changed, registered update listeners are notified.
"""
changed = False
playlist_or_index_changed = False
for key, value in data.items():
if hasattr(self, key):
if self._update_field(key, value):
changed = True
if key in ("playlist", "playlist_index"):
playlist_or_index_changed = True
else:
_LOGGER.warning("Unknown field: %s=%s", key, value)
if playlist_or_index_changed:
self.schedule_track_refresh()
if changed:
self._notify_listeners()
def parse_status_string(self, raw_status: str) -> dict[str, Any] | None:
"""
Parse a semicolon-separated device status string into a structured state dictionary.
Expects a semicolon-separated string containing at least 18 fields (device status format returned by the device: e.g., HTTP GETSTATUS or MQTT FULLSTATUS). Returns None for empty input or if the string cannot be parsed into the expected fields.
Parameters:
raw_status (str): Semicolon-separated status string from the device.
Returns:
dict[str, Any] | None: A dictionary with these keys on success:
- `status_code`, `error`, `ball_speed`, `playlist` (list[int]), `playlist_index`,
`progress`, `led_effect`, `led_color_id`, `led_speed`, `brightness`, `color`,
`busy`, `download_progress`, `brightness_max`, `wifi_connected`, `repeat_playlist`,
`autoplay`, `auto_clean`
- `software_version` (str) is included if an additional trailing field is present.
Returns `None` if the input is empty or parsing fails.
"""
if not raw_status:
return None
values = raw_status.split(";")
# We rely on indices 0..17 existing (18 fields)
if (n := len(values)) < 18:
_LOGGER.warning(
"Unexpected status format for %s: %s", self.serial_number, values
)
return None
playlist = [_parse_int(track) for track in values[3].split(",") if track]
try:
status: dict[str, Any] = {
"status_code": _parse_int(values[0]),
"error": _parse_int(values[1]),
"ball_speed": _parse_int(values[2]),
"playlist": playlist,
"playlist_index": min(_parse_int(values[4]), len(playlist)),
"progress": _parse_int(values[5]),
"led_effect": values[6],
"led_color_id": values[7],
"led_speed": _parse_int(values[8]),
"brightness": _parse_int(values[9]),
"color": values[10] if "#" in values[10] else None,
"busy": _bit_to_bool(values[11]),
"download_progress": _parse_int(values[12]),
"brightness_max": _parse_int(values[13]),
"wifi_connected": _bit_to_bool(values[14]),
"repeat_playlist": _bit_to_bool(values[15]),
"autoplay": _parse_int(values[16]),
"auto_clean": _bit_to_bool(values[17]),
}
# Optional trailing field(s)
if n > 18:
status["software_version"] = values[18]
except Exception:
_LOGGER.exception(
"Error parsing status string for %s: %r", self.serial_number, raw_status
)
return None
return status
def update_from_status_string(self, raw_status: str) -> None:
"""
Parse a semicolon-separated device status string and apply the resulting fields to the device state.
If the string cannot be parsed into a valid status dictionary, no state is changed.
Parameters:
raw_status (str): Raw status payload received from the device (semicolon-separated fields).
"""
if status := self.parse_status_string(raw_status):
self.update_from_status_dict(status)
def as_dict(self) -> dict[str, Any]:
"""
Return a mapping of the device's core state fields to their current values.
Returns:
dict[str, Any]: A dictionary whose keys are the core state field names (as defined in _STATE_FIELDS)
and whose values are the current values for those fields.
"""
return {field: getattr(self, field) for field in _STATE_FIELDS}
@property
def error_message(self) -> str | None:
"""
Get the human-readable error message for the current device error code.
Returns:
str: The mapped error message when the device status indicates an error (status code 9); `None` otherwise.
"""
if self.status_code == STATUS_ERROR:
return ERROR_CODE_MAP.get(self.error, f"Unknown ({self.error})")
return None
@property
def status(self) -> str:
"""
Get a human-readable status description for the current status_code.
Returns:
str: Human-readable status corresponding to the device's status_code, or "Unknown (<code>)" when the code is not recognized.
"""
return STATUS_CODE_MAP.get(self.status_code, f"Unknown ({self.status_code})")
@property
def track(self) -> dict | None:
"""
Return the cached track metadata when it corresponds to the current track, otherwise retrieve the built-in track metadata.
Returns:
dict | None: The track metadata dictionary for the current `track_id`, or `None` if no matching track is available.
"""
if (track := self._track) and track["id"] == self.track_id:
return track
return TRACKS.get(self.track_id)
@property
def track_id(self) -> int | None:
"""
Determine the current track id from the active playlist.
If the playlist index is beyond the end of the playlist, the first track id is returned.
Returns:
int | None: The current track id, or `None` if there is no playlist.
"""
if not self.playlist:
return None
i = self.playlist_index
return self.playlist[0] if i >= len(self.playlist) else self.playlist[i]
@property
def track_image_url(self) -> str | None:
"""
Get the full HTTPS URL for the current track's image if available.
Returns:
str: Full URL to the track image (https://app.grounded.so/uploads/<image>), or `None` if no image is available.
"""
if (track := self.track) and (image := track.get("image")):
return f"https://app.grounded.so/uploads/{image}"
return None
@property
def track_name(self) -> str | None:
"""
Get the current track's display name.
If the current track has no explicit name, returns "Unknown Title (#{track_id})". If there is no current track, returns None.
Returns:
str | None: The track name, or `None` if no current track is available.
"""
if track := self.track:
return track.get("name", f"Unknown Title (#{self.track_id})")
return None
@property
def drawing_progress(self) -> float | None:
"""
Compute drawing progress percentage for the current track.
If the current track or its SVG content is unavailable, returns None.
Returns:
progress_percent (float | None): Percentage of the drawing completed (0-100), clamped to 100; `None` if no track or SVG content is available.
"""
if not (self.track and (svg_content := self.track.get("svg_content"))):
return None
svg_content = decrypt_svg_content(svg_content)
paths = svg_content.split("L")
total = self.track.get("reduced_svg_content_new", 0) or len(paths)
percent = (100 * self.progress) / total
return min(percent, 100)
@property
def playlist_details(self) -> dict[int, dict[str, str]]:
"""
Build a mapping of track IDs in the current playlist to their detail dictionaries, preferring the device's cached/current track data and falling back to built-in TRACKS.
Returns:
dict[int, dict[str, str]]: A mapping from track ID to a details dictionary (contains at least a `'name'` key). If track metadata is available from the device cache or built-in TRACKS it is used; otherwise a fallback `{"name": "Unknown Title (#<id>)"}` is provided.
"""
base = dict(TRACKS)
if (current_id := self.track_id) is not None and self.track:
base[current_id] = self.track
return {
track_id: base.get(track_id, {"name": f"Unknown Title (#{track_id})"})
for track_id in self.playlist
}
def create_svg(self) -> str | None:
"""
Generate an SVG representing the current track at the device's drawing progress.
Returns:
svg (str | None): SVG content for the current track reflecting current progress, or None if track data is unavailable.
"""
return create_svg(self.track, self.progress)
def add_update_listener(self, listener: Callable[[], None]) -> Callable[[], None]:
"""
Register a callback to be invoked whenever the device state changes.
Parameters:
listener (Callable[[], None]): A zero-argument callback that will be called on state updates.
Returns:
Callable[[], None]: An unsubscribe function that removes the registered listener; calling the unsubscribe function multiple times is safe.
"""
self._listeners.append(listener)
def _unsub() -> None:
"""
Remove the previously registered listener from the device's listener list if it is present.
This function silently does nothing if the listener is not found.
"""
try:
self._listeners.remove(listener)
except ValueError:
pass
return _unsub
def _notify_listeners(self) -> None:
"""
Invoke all registered update listeners in registration order.
Each listener is called synchronously; exceptions raised by a listener are caught and logged so other listeners still run.
"""
for listener in list(self._listeners):
try:
listener()
except Exception:
_LOGGER.exception("Error in update listener")
async def async_get_status(self) -> None:
"""Request the device update it's current status."""
client = self._require_client()
await client.async_get_status(self)
async def async_get_mac_address(self) -> str | None:
"""
Get the device MAC address, requesting it from the attached transport client if not already known.
Returns:
mac (str | None): The device MAC address if available, otherwise `None`.
"""
if self.mac_address:
return self.mac_address
client = self._require_client()
mac = await client.async_get_mac_address(self)
if mac:
self._update_field("mac_address", mac)
return mac
async def async_set_auto_clean(self, auto_clean: bool) -> None:
"""
Set whether the device performs automatic cleaning.
Parameters:
auto_clean (bool): `True` to enable automatic cleaning, `False` to disable it.
"""
client = self._require_client()
await client.async_send_auto_clean_command(self, auto_clean)
async def async_set_ball_speed(self, speed: int) -> None:
"""
Set the device's ball speed.
Parameters:
speed (int): Desired ball speed in the allowed range (BALL_SPEED_MIN to BALL_SPEED_MAX, inclusive).
Raises:
ValueError: If `speed` is outside the allowed range.
"""
if not BALL_SPEED_MIN <= speed <= BALL_SPEED_MAX:
raise ValueError("Invalid speed specified")
client = self._require_client()
await client.async_send_ball_speed_command(self, speed)
async def async_set_led(
self,
*,
led_effect: str | None = None,
color: str | None = None,
led_speed: int | None = None,
brightness: int | None = None,
) -> None:
"""
Set the device LED effect, color, speed, and brightness.
Parameters:
led_effect (str | None): LED effect name; if None, the device's current effect is used. Must be one of the supported LED effects.
color (str | None): Hex color string (e.g. "#rrggbb"); if None, the device's current color is used or `#ffffff` if unset.
led_speed (int | None): LED animation speed; if None, the device's current speed is used. Must be within the allowed LED speed range.
brightness (int | None): Brightness level; if None, the device's current brightness is used. Must be between 0 and the device's `brightness_max`.
Raises:
ValueError: If `led_effect` is not supported, or `led_speed` or `brightness` are outside their valid ranges.
RuntimeError: If no transport client is attached to the device.
"""
if led_effect is None:
led_effect = self.led_effect
if color is None:
color = self.color or "#ffffff"
if led_speed is None:
led_speed = self.led_speed
if brightness is None:
brightness = self.brightness
if led_effect not in LED_EFFECTS:
raise ValueError("Invalid led effect specified")
if not LED_SPEED_MIN <= led_speed <= LED_SPEED_MAX:
raise ValueError("Invalid led speed specified")
if not 0 <= brightness <= self.brightness_max:
raise ValueError("Invalid brightness specified")
client = self._require_client()
await client.async_send_led_command(
self, led_effect, color, led_speed, brightness
)
async def async_sleep(self) -> None:
"""
Put the device into sleep mode.
Sends a sleep command to the attached transport client.
Raises:
RuntimeError: If no client is attached.
"""
client = self._require_client()
await client.async_send_sleep_command(self)
async def async_move_track(self, from_index: int, to_index: int) -> None:
"""
Move a track within the device's playlist from one index to another.
Parameters:
from_index (int): Index of the track to move within the current playlist.
to_index (int): Destination index where the track should be placed.
"""
client = self._require_client()
await client.async_send_move_job_command(self, from_index, to_index)
async def async_change_track(self, index: int) -> None:
"""
Change the device's current track to the track at the given playlist index.
Parameters:
index (int): Zero-based index of the track in the device's current playlist.
"""
client = self._require_client()
await client.async_send_change_track_command(self, index)
async def async_add_track_to_playlist(self, track: int | Iterable[int]) -> None:
"""
Add one or more tracks to the device's playlist via the attached client.
Parameters:
track (int | Iterable[int]): A single track id or an iterable of track ids to append to the playlist.
Raises:
RuntimeError: If no transport client is attached to the device.
"""
if isinstance(track, int):
tracks = [track]
else:
tracks = list(track)
client = self._require_client()
await client.async_send_add_joblist_command(self, tracks)
async def async_set_playlist(self, playlist: int | Iterable[int]) -> None:
"""
Set the device's playlist to the provided track or tracks.
Accepts a single track ID or an iterable of track IDs and replaces the device's playlist by sending the corresponding command to the attached client.
Parameters:
playlist (int | Iterable[int]): A single track ID or an iterable of track IDs to set as the new playlist.
"""
if isinstance(playlist, int):
playlist_list = [playlist]
else:
playlist_list = list(playlist)
client = self._require_client()
await client.async_send_set_playlist_command(self, playlist_list)
async def async_set_repeat_playlist(self, repeat: bool) -> None:
"""
Set whether the device's playlist should repeat.
Parameters:
repeat (bool): True to enable repeating the playlist, False to disable it.
"""
client = self._require_client()
await client.async_send_set_repeat_playlist_command(self, repeat)
async def async_set_autoplay(self, option: bool | int | str) -> None:
"""
Set the device's autoplay / wait-after option.
Parameters:
option (bool | int | str): Desired autoplay/wait-after value. If a `bool` is provided, `True` is converted to `"0"` and `False` to `"1"`. Integer or string values are sent as their string representation.
"""
if isinstance(option, bool):
option = 0 if option else 1
client = self._require_client()
await client.async_send_set_autoplay_command(self, str(option))
async def async_upgrade(self, beta: bool = False) -> None:
"""
Initiates a firmware upgrade on the device.
Parameters:
beta (bool): If True, request a beta (pre-release) firmware; otherwise request the stable firmware.
"""
client = self._require_client()
await client.async_send_upgrade_command(self, beta)
async def async_play(self) -> None:
"""
Send a play command to the device via the attached transport client.
Raises:
RuntimeError: If no transport client is attached.
"""
client = self._require_client()
await client.async_send_play_command(self)
async def async_pause(self) -> None:
"""
Pause playback on the device.
Raises:
RuntimeError: If no transport client is attached.
"""
client = self._require_client()
await client.async_send_pause_command(self)
async def async_stop(self) -> None:
"""
Stop playback on the device by sending a stop command through the attached transport client.
Raises:
RuntimeError: if no transport client is attached to the device.
"""
client = self._require_client()
await client.async_send_stop_command(self)
async def async_reboot(self) -> None:
"""
Reboots the device using the attached transport client.
Requests the attached client to send a reboot command to the device.
Raises:
RuntimeError: If no transport client is attached.
"""
client = self._require_client()
await client.async_send_reboot_command(self)
def schedule_track_refresh(self) -> None:
"""
Schedule a background refresh of the current track metadata when the device's track may have changed.
Does nothing if no cloud client is attached or if there is no running event loop. If a previous refresh task is still pending, it is cancelled before a new background task is scheduled.
"""
if not self._cloud:
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
_LOGGER.debug("No running loop; cannot schedule track refresh")
return
if self._track_task and not self._track_task.done():
self._track_task.cancel()
self._track_task = loop.create_task(self._async_refresh_current_track())
async def _async_refresh_current_track(self) -> None:
"""
Refresh cached information for the current track by fetching details from the attached cloud client and notify listeners when updated.
If no cloud client is attached, no current track exists, or the cached track already matches the current track id, the method returns without change. On successful fetch, updates the device's track cache and invokes registered update listeners.
"""
if not self._cloud:
return
if (track_id := self.track_id) is None:
self._track = None
return
if self._track and self._track.get("id") == track_id:
return
try:
track = await self._cloud.async_get_track_info(track_id)
except Exception:
_LOGGER.exception("Error fetching track info for %s", track_id)
return
if not track:
return
self._track = track
self._notify_listeners()