mirror of
https://github.com/natekspencer/hacs-oasis_mini.git
synced 2025-12-06 18:44:14 -05:00
798 lines
29 KiB
Python
798 lines
29 KiB
Python
"""Oasis device."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any, Callable, Final, Iterable
|
|
|
|
from .const import (
|
|
ERROR_CODE_MAP,
|
|
LED_EFFECTS,
|
|
STATUS_CODE_MAP,
|
|
STATUS_ERROR,
|
|
STATUS_PLAYING,
|
|
STATUS_SLEEPING,
|
|
TRACKS,
|
|
)
|
|
from .utils import (
|
|
_bit_to_bool,
|
|
_parse_int,
|
|
create_svg,
|
|
decrypt_svg_content,
|
|
get_image_url_from_track,
|
|
)
|
|
|
|
if TYPE_CHECKING: # avoid runtime circular imports
|
|
from .clients import OasisCloudClient
|
|
from .clients.transport import OasisClientProtocol
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
BALL_SPEED_MAX: Final = 400
|
|
BALL_SPEED_MIN: Final = 100
|
|
BRIGHTNESS_DEFAULT: Final = 100
|
|
LED_SPEED_MAX: Final = 90
|
|
LED_SPEED_MIN: Final = -90
|
|
|
|
_STATE_FIELDS = (
|
|
"auto_clean",
|
|
"autoplay",
|
|
"ball_speed",
|
|
"brightness",
|
|
"busy",
|
|
"color",
|
|
"download_progress",
|
|
"error",
|
|
"led_effect",
|
|
"led_speed",
|
|
"mac_address",
|
|
"playlist",
|
|
"playlist_index",
|
|
"progress",
|
|
"repeat_playlist",
|
|
"serial_number",
|
|
"software_version",
|
|
"status_code",
|
|
)
|
|
|
|
|
|
class OasisDevice:
|
|
"""Oasis device model + behavior.
|
|
|
|
Transport-agnostic; all I/O is delegated to an attached
|
|
OasisClientProtocol (MQTT, HTTP, etc.) via `attach_client`.
|
|
"""
|
|
|
|
manufacturer: Final = "Kinetic Oasis"
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
model: str | None = None,
|
|
serial_number: str | None = None,
|
|
name: str | None = None,
|
|
ssid: str | None = None,
|
|
ip_address: str | None = None,
|
|
cloud: OasisCloudClient | None = None,
|
|
client: OasisClientProtocol | None = None,
|
|
) -> None:
|
|
"""
|
|
Initialize an OasisDevice with identification, network, transport references, and default state fields.
|
|
|
|
Parameters:
|
|
model (str | None): Device model identifier.
|
|
serial_number (str | None): Device serial number.
|
|
name (str | None): Human-readable device name; if omitted, defaults to "<model> <serial_number>".
|
|
ssid (str | None): Last-known Wi-Fi SSID for the device.
|
|
ip_address (str | None): Last-known IP address for the device.
|
|
cloud (OasisCloudClient | None): Optional cloud client used to fetch track metadata and remote data.
|
|
client (OasisClientProtocol | None): Optional transport client used to send commands to the device.
|
|
|
|
Notes:
|
|
- Creates an internal listener list for state-change callbacks.
|
|
- Initializes status fields (brightness, playlist, playback state, networking, etc.) with sensible defaults.
|
|
- Initializes a track metadata cache and a placeholder for a background refresh task.
|
|
"""
|
|
self._cloud = cloud
|
|
self._client = client
|
|
self._listeners: list[Callable[[], None]] = []
|
|
|
|
# Details
|
|
self.model = model
|
|
self.serial_number = serial_number
|
|
self.name = name if name else f"{model} {serial_number}"
|
|
self.ssid = ssid
|
|
self.ip_address = ip_address
|
|
|
|
# Status
|
|
self.auto_clean: bool = False
|
|
self.autoplay: int = 0
|
|
self.ball_speed: int = BALL_SPEED_MIN
|
|
self._brightness: int = 0
|
|
self.brightness_max: int = 200
|
|
self.brightness_on: int = BRIGHTNESS_DEFAULT
|
|
self.busy: bool = False
|
|
self.color: str | None = None
|
|
self.download_progress: int = 0
|
|
self.error: int = 0
|
|
self.led_color_id: str = "0"
|
|
self.led_effect: str = "0"
|
|
self.led_speed: int = 0
|
|
self.mac_address: str | None = None
|
|
self.playlist: list[int] = []
|
|
self.playlist_index: int = 0
|
|
self.progress: int = 0
|
|
self.repeat_playlist: bool = False
|
|
self.software_version: str | None = None
|
|
self.status_code: int = 0
|
|
self.wifi_connected: bool = False
|
|
self.wifi_ip: str | None = None
|
|
self.wifi_ssid: str | None = None
|
|
self.wifi_pdns: str | None = None
|
|
self.wifi_sdns: str | None = None
|
|
self.wifi_gate: str | None = None
|
|
self.wifi_sub: str | None = None
|
|
self.environment: str | None = None
|
|
self.schedule: Any | None = None
|
|
|
|
# Track metadata cache
|
|
self._track: dict | None = None
|
|
self._track_task: asyncio.Task | None = None
|
|
|
|
@property
|
|
def brightness(self) -> int:
|
|
"""
|
|
Current display brightness adjusted for the device sleep state.
|
|
|
|
Returns:
|
|
int: 0 when the device is sleeping, otherwise the stored brightness value.
|
|
"""
|
|
return 0 if self.is_sleeping else self._brightness
|
|
|
|
@brightness.setter
|
|
def brightness(self, value: int) -> None:
|
|
"""
|
|
Set the device brightness and update brightness_on when non-zero.
|
|
|
|
Parameters:
|
|
value (int): Brightness level to apply; if non-zero, also stored in `brightness_on`.
|
|
"""
|
|
self._brightness = value
|
|
if value:
|
|
self.brightness_on = value
|
|
|
|
@property
|
|
def is_initialized(self) -> bool:
|
|
"""Return `True` if the device is fully identified."""
|
|
return bool(self.serial_number and self.mac_address and self.software_version)
|
|
|
|
@property
|
|
def is_sleeping(self) -> bool:
|
|
"""
|
|
Indicates whether the device is currently in the sleeping status.
|
|
|
|
Returns:
|
|
`true` if the device is sleeping, `false` otherwise.
|
|
"""
|
|
return self.status_code == STATUS_SLEEPING
|
|
|
|
def attach_client(self, client: OasisClientProtocol) -> None:
|
|
"""Attach a transport client (MQTT, HTTP, etc.) to this device."""
|
|
self._client = client
|
|
|
|
@property
|
|
def client(self) -> OasisClientProtocol | None:
|
|
"""
|
|
Get the attached transport client, or `None` if no client is attached.
|
|
|
|
Returns:
|
|
The attached transport client, or `None` if not attached.
|
|
"""
|
|
return self._client
|
|
|
|
def _require_client(self) -> OasisClientProtocol:
|
|
"""
|
|
Get the attached transport client for this device.
|
|
|
|
Returns:
|
|
OasisClientProtocol: The attached transport client.
|
|
|
|
Raises:
|
|
RuntimeError: If no client/transport is attached to the device.
|
|
"""
|
|
if self._client is None:
|
|
raise RuntimeError(
|
|
f"No client/transport attached for device {self.serial_number!r}"
|
|
)
|
|
return self._client
|
|
|
|
def _update_field(self, name: str, value: Any) -> bool:
|
|
"""
|
|
Update an attribute on the device if the new value differs from the current value.
|
|
|
|
Sets the instance attribute named `name` to `value` and logs a debug message when a change occurs.
|
|
|
|
Parameters:
|
|
name (str): The attribute name to update.
|
|
value (Any): The new value to assign to the attribute.
|
|
|
|
Returns:
|
|
bool: True if the attribute was changed, False otherwise.
|
|
"""
|
|
old = getattr(self, name, None)
|
|
if old != value:
|
|
_LOGGER.debug(
|
|
"%s %s changed: '%s' -> '%s'",
|
|
self.serial_number,
|
|
name.replace("_", " ").capitalize(),
|
|
old,
|
|
value,
|
|
)
|
|
setattr(self, name, value)
|
|
return True
|
|
return False
|
|
|
|
def update_from_status_dict(self, data: dict[str, Any]) -> None:
|
|
"""
|
|
Update the device's attributes from a status dictionary.
|
|
|
|
Expects a mapping of attribute names to values; known attributes are applied to the device,
|
|
unknown keys are logged and ignored. If `playlist` or `playlist_index` change, a track
|
|
refresh is scheduled. If any attribute changed, registered update listeners are notified.
|
|
"""
|
|
changed = False
|
|
playlist_or_index_changed = False
|
|
|
|
for key, value in data.items():
|
|
if hasattr(self, key):
|
|
if self._update_field(key, value):
|
|
changed = True
|
|
if key in ("playlist", "playlist_index"):
|
|
playlist_or_index_changed = True
|
|
else:
|
|
_LOGGER.warning("Unknown field: %s=%s", key, value)
|
|
|
|
if playlist_or_index_changed:
|
|
self.schedule_track_refresh()
|
|
|
|
if changed:
|
|
self._notify_listeners()
|
|
|
|
def parse_status_string(self, raw_status: str) -> dict[str, Any] | None:
|
|
"""
|
|
Parse a semicolon-separated device status string into a structured state dictionary.
|
|
|
|
Expects a semicolon-separated string containing at least 18 fields (device status format returned by the device: e.g., HTTP GETSTATUS or MQTT FULLSTATUS). Returns None for empty input or if the string cannot be parsed into the expected fields.
|
|
|
|
Parameters:
|
|
raw_status (str): Semicolon-separated status string from the device.
|
|
|
|
Returns:
|
|
dict[str, Any] | None: A dictionary with these keys on success:
|
|
- `status_code`, `error`, `ball_speed`, `playlist` (list[int]), `playlist_index`,
|
|
`progress`, `led_effect`, `led_color_id`, `led_speed`, `brightness`, `color`,
|
|
`busy`, `download_progress`, `brightness_max`, `wifi_connected`, `repeat_playlist`,
|
|
`autoplay`, `auto_clean`
|
|
- `software_version` (str) is included if an additional trailing field is present.
|
|
Returns `None` if the input is empty or parsing fails.
|
|
"""
|
|
if not raw_status:
|
|
return None
|
|
|
|
values = raw_status.split(";")
|
|
|
|
# We rely on indices 0..17 existing (18 fields)
|
|
if (n := len(values)) < 18:
|
|
_LOGGER.warning(
|
|
"Unexpected status format for %s: %s", self.serial_number, values
|
|
)
|
|
return None
|
|
|
|
playlist = [
|
|
track_id
|
|
for track_str in values[3].split(",")
|
|
if (track_id := _parse_int(track_str))
|
|
]
|
|
|
|
try:
|
|
status: dict[str, Any] = {
|
|
"status_code": _parse_int(values[0]),
|
|
"error": _parse_int(values[1]),
|
|
"ball_speed": _parse_int(values[2]),
|
|
"playlist": playlist,
|
|
"playlist_index": min(_parse_int(values[4]), len(playlist)),
|
|
"progress": _parse_int(values[5]),
|
|
"led_effect": values[6],
|
|
"led_color_id": values[7],
|
|
"led_speed": _parse_int(values[8]),
|
|
"brightness": _parse_int(values[9]),
|
|
"color": values[10] if "#" in values[10] else None,
|
|
"busy": _bit_to_bool(values[11]),
|
|
"download_progress": _parse_int(values[12]),
|
|
"brightness_max": _parse_int(values[13]),
|
|
"wifi_connected": _bit_to_bool(values[14]),
|
|
"repeat_playlist": _bit_to_bool(values[15]),
|
|
"autoplay": _parse_int(values[16]),
|
|
"auto_clean": _bit_to_bool(values[17]),
|
|
}
|
|
|
|
# Optional trailing field(s)
|
|
if n > 18:
|
|
status["software_version"] = values[18]
|
|
|
|
except Exception:
|
|
_LOGGER.exception(
|
|
"Error parsing status string for %s: %r", self.serial_number, raw_status
|
|
)
|
|
return None
|
|
|
|
return status
|
|
|
|
def update_from_status_string(self, raw_status: str) -> None:
|
|
"""
|
|
Parse a semicolon-separated device status string and apply the resulting fields to the device state.
|
|
|
|
If the string cannot be parsed into a valid status dictionary, no state is changed.
|
|
|
|
Parameters:
|
|
raw_status (str): Raw status payload received from the device (semicolon-separated fields).
|
|
"""
|
|
if status := self.parse_status_string(raw_status):
|
|
self.update_from_status_dict(status)
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
"""
|
|
Return a mapping of the device's core state fields to their current values.
|
|
|
|
Returns:
|
|
dict[str, Any]: A dictionary whose keys are the core state field names (as defined in _STATE_FIELDS)
|
|
and whose values are the current values for those fields.
|
|
"""
|
|
return {field: getattr(self, field) for field in _STATE_FIELDS}
|
|
|
|
@property
|
|
def error_message(self) -> str | None:
|
|
"""
|
|
Get the human-readable error message for the current device error code.
|
|
|
|
Returns:
|
|
str: The mapped error message when the device status indicates an error (status code 9); `None` otherwise.
|
|
"""
|
|
if self.status_code == STATUS_ERROR:
|
|
return ERROR_CODE_MAP.get(self.error, f"Unknown ({self.error})")
|
|
return None
|
|
|
|
@property
|
|
def status(self) -> str:
|
|
"""
|
|
Get a human-readable status description for the current status_code.
|
|
|
|
Returns:
|
|
str: Human-readable status corresponding to the device's status_code, or "Unknown (<code>)" when the code is not recognized.
|
|
"""
|
|
return STATUS_CODE_MAP.get(self.status_code, f"Unknown ({self.status_code})")
|
|
|
|
@property
|
|
def track(self) -> dict | None:
|
|
"""
|
|
Return the cached track metadata when it corresponds to the current track, otherwise retrieve the built-in track metadata.
|
|
|
|
Returns:
|
|
dict | None: The track metadata dictionary for the current `track_id`, or `None` if no matching track is available.
|
|
"""
|
|
if (track := self._track) and track["id"] == self.track_id:
|
|
return track
|
|
return TRACKS.get(self.track_id)
|
|
|
|
@property
|
|
def track_id(self) -> int | None:
|
|
"""
|
|
Determine the current track id from the active playlist.
|
|
|
|
If the playlist index is beyond the end of the playlist, the first track id is returned.
|
|
|
|
Returns:
|
|
int | None: The current track id, or `None` if there is no playlist.
|
|
"""
|
|
if not self.playlist:
|
|
return None
|
|
i = self.playlist_index
|
|
return self.playlist[0] if i >= len(self.playlist) else self.playlist[i]
|
|
|
|
@property
|
|
def track_image_url(self) -> str | None:
|
|
"""
|
|
Get the full HTTPS URL for the current track's image if available.
|
|
|
|
Returns:
|
|
str: Full URL to the track image or `None` if no image is available.
|
|
"""
|
|
return get_image_url_from_track(self.track)
|
|
|
|
@property
|
|
def track_name(self) -> str | None:
|
|
"""
|
|
Get the current track's display name.
|
|
|
|
If the current track has no explicit name, returns "Unknown Title (#{track_id})". If there is no current track, returns None.
|
|
|
|
Returns:
|
|
str | None: The track name, or `None` if no current track is available.
|
|
"""
|
|
if track := self.track:
|
|
return track.get("name", f"Unknown Title (#{self.track_id})")
|
|
return None
|
|
|
|
@property
|
|
def drawing_progress(self) -> float | None:
|
|
"""
|
|
Compute drawing progress percentage for the current track.
|
|
|
|
If the current track or its SVG content is unavailable, returns None.
|
|
|
|
Returns:
|
|
progress_percent (float | None): Percentage of the drawing completed (0-100), clamped to 100; `None` if no track or SVG content is available.
|
|
"""
|
|
if not (self.track and (svg_content := self.track.get("svg_content"))):
|
|
return None
|
|
svg_content = decrypt_svg_content(svg_content)
|
|
paths = svg_content.split("L")
|
|
total = self.track.get("reduced_svg_content_new", 0) or len(paths)
|
|
percent = (100 * self.progress) / total
|
|
return min(percent, 100)
|
|
|
|
@property
|
|
def playlist_details(self) -> dict[int, dict[str, str]]:
|
|
"""
|
|
Build a mapping of track IDs in the current playlist to their detail dictionaries, preferring the device's cached/current track data and falling back to built-in TRACKS.
|
|
|
|
Returns:
|
|
dict[int, dict[str, str]]: A mapping from track ID to a details dictionary (contains at least a `'name'` key). If track metadata is available from the device cache or built-in TRACKS it is used; otherwise a fallback `{"name": "Unknown Title (#<id>)"}` is provided.
|
|
"""
|
|
base = dict(TRACKS)
|
|
if (current_id := self.track_id) is not None and self.track:
|
|
base[current_id] = self.track
|
|
return {
|
|
track_id: base.get(track_id, {"name": f"Unknown Title (#{track_id})"})
|
|
for track_id in self.playlist
|
|
}
|
|
|
|
def create_svg(self) -> str | None:
|
|
"""
|
|
Generate an SVG representing the current track at the device's drawing progress.
|
|
|
|
Returns:
|
|
svg (str | None): SVG content for the current track reflecting current progress, or None if track data is unavailable.
|
|
"""
|
|
return create_svg(self.track, self.progress)
|
|
|
|
def add_update_listener(self, listener: Callable[[], None]) -> Callable[[], None]:
|
|
"""
|
|
Register a callback to be invoked whenever the device state changes.
|
|
|
|
Parameters:
|
|
listener (Callable[[], None]): A zero-argument callback that will be called on state updates.
|
|
|
|
Returns:
|
|
Callable[[], None]: An unsubscribe function that removes the registered listener; calling the unsubscribe function multiple times is safe.
|
|
"""
|
|
self._listeners.append(listener)
|
|
|
|
def _unsub() -> None:
|
|
"""
|
|
Remove the previously registered listener from the device's listener list if it is present.
|
|
|
|
This function silently does nothing if the listener is not found.
|
|
"""
|
|
try:
|
|
self._listeners.remove(listener)
|
|
except ValueError:
|
|
pass
|
|
|
|
return _unsub
|
|
|
|
def _notify_listeners(self) -> None:
|
|
"""
|
|
Invoke all registered update listeners in registration order.
|
|
|
|
Each listener is called synchronously; exceptions raised by a listener are caught and logged so other listeners still run.
|
|
"""
|
|
for listener in list(self._listeners):
|
|
try:
|
|
listener()
|
|
except Exception:
|
|
_LOGGER.exception("Error in update listener")
|
|
|
|
async def async_get_status(self) -> None:
|
|
"""Request that the device update its current status."""
|
|
client = self._require_client()
|
|
await client.async_get_status(self)
|
|
|
|
async def async_get_mac_address(self) -> str | None:
|
|
"""
|
|
Get the device MAC address, requesting it from the attached transport client if not already known.
|
|
|
|
Returns:
|
|
mac (str | None): The device MAC address if available, otherwise `None`.
|
|
"""
|
|
if self.mac_address:
|
|
return self.mac_address
|
|
|
|
client = self._require_client()
|
|
mac = await client.async_get_mac_address(self)
|
|
if mac:
|
|
self._update_field("mac_address", mac)
|
|
return mac
|
|
|
|
async def async_set_auto_clean(self, auto_clean: bool) -> None:
|
|
"""
|
|
Set whether the device performs automatic cleaning.
|
|
|
|
Parameters:
|
|
auto_clean (bool): `True` to enable automatic cleaning, `False` to disable it.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_auto_clean_command(self, auto_clean)
|
|
|
|
async def async_set_ball_speed(self, speed: int) -> None:
|
|
"""
|
|
Set the device's ball speed.
|
|
|
|
Parameters:
|
|
speed (int): Desired ball speed in the allowed range (BALL_SPEED_MIN to BALL_SPEED_MAX, inclusive).
|
|
|
|
Raises:
|
|
ValueError: If `speed` is outside the allowed range.
|
|
"""
|
|
if not BALL_SPEED_MIN <= speed <= BALL_SPEED_MAX:
|
|
raise ValueError("Invalid speed specified")
|
|
client = self._require_client()
|
|
await client.async_send_ball_speed_command(self, speed)
|
|
|
|
async def async_set_led(
|
|
self,
|
|
*,
|
|
led_effect: str | None = None,
|
|
color: str | None = None,
|
|
led_speed: int | None = None,
|
|
brightness: int | None = None,
|
|
) -> None:
|
|
"""
|
|
Set the device LED effect, color, speed, and brightness.
|
|
|
|
Parameters:
|
|
led_effect (str | None): LED effect name; if None, the device's current effect is used. Must be one of the supported LED effects.
|
|
color (str | None): Hex color string (e.g. "#rrggbb"); if None, the device's current color is used or `#ffffff` if unset.
|
|
led_speed (int | None): LED animation speed; if None, the device's current speed is used. Must be within the allowed LED speed range.
|
|
brightness (int | None): Brightness level; if None, the device's current brightness is used. Must be between 0 and the device's `brightness_max`.
|
|
|
|
Raises:
|
|
ValueError: If `led_effect` is not supported, or `led_speed` or `brightness` are outside their valid ranges.
|
|
RuntimeError: If no transport client is attached to the device.
|
|
"""
|
|
if led_effect is None:
|
|
led_effect = self.led_effect
|
|
if color is None:
|
|
color = self.color or "#ffffff"
|
|
if led_speed is None:
|
|
led_speed = self.led_speed
|
|
if brightness is None:
|
|
brightness = self.brightness
|
|
|
|
if led_effect not in LED_EFFECTS:
|
|
raise ValueError("Invalid led effect specified")
|
|
if not LED_SPEED_MIN <= led_speed <= LED_SPEED_MAX:
|
|
raise ValueError("Invalid led speed specified")
|
|
if not 0 <= brightness <= self.brightness_max:
|
|
raise ValueError("Invalid brightness specified")
|
|
|
|
client = self._require_client()
|
|
await client.async_send_led_command(
|
|
self, led_effect, color, led_speed, brightness
|
|
)
|
|
|
|
async def async_sleep(self) -> None:
|
|
"""
|
|
Put the device into sleep mode.
|
|
|
|
Sends a sleep command to the attached transport client.
|
|
|
|
Raises:
|
|
RuntimeError: If no client is attached.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_sleep_command(self)
|
|
|
|
async def async_move_track(self, from_index: int, to_index: int) -> None:
|
|
"""
|
|
Move a track within the device's playlist from one index to another.
|
|
|
|
Parameters:
|
|
from_index (int): Index of the track to move within the current playlist.
|
|
to_index (int): Destination index where the track should be placed.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_move_job_command(self, from_index, to_index)
|
|
|
|
async def async_change_track(self, index: int) -> None:
|
|
"""
|
|
Change the device's current track to the track at the given playlist index.
|
|
|
|
Parameters:
|
|
index (int): Zero-based index of the track in the device's current playlist.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_change_track_command(self, index)
|
|
|
|
async def async_clear_playlist(self) -> None:
|
|
"""Clear the playlist."""
|
|
await self.async_set_playlist([])
|
|
|
|
async def async_add_track_to_playlist(self, track: int | Iterable[int]) -> None:
|
|
"""
|
|
Add one or more tracks to the device's playlist via the attached client.
|
|
|
|
Parameters:
|
|
track (int | Iterable[int]): A single track id or an iterable of track ids to append to the playlist.
|
|
|
|
Raises:
|
|
RuntimeError: If no transport client is attached to the device.
|
|
"""
|
|
if isinstance(track, int):
|
|
tracks = [track]
|
|
else:
|
|
tracks = list(track)
|
|
client = self._require_client()
|
|
await client.async_send_add_joblist_command(self, tracks)
|
|
|
|
async def async_set_playlist(
|
|
self, playlist: int | Iterable[int], *, start_playing: bool | None = None
|
|
) -> None:
|
|
"""
|
|
Set the device's playlist to the provided track or tracks.
|
|
|
|
Accepts a single track ID or an iterable of track IDs, stops the device,
|
|
replaces the playlist, and resumes playback based on the `start_playing`
|
|
parameter or, if unspecified, the device's prior playing state.
|
|
|
|
Parameters:
|
|
playlist (int | Iterable[int]):
|
|
A single track ID or an iterable of track IDs to set as the new playlist.
|
|
start_playing (bool | None, keyword-only):
|
|
Whether to start playback after updating the playlist. If None (default),
|
|
playback will resume only if the device was previously playing and the
|
|
new playlist is non-empty.
|
|
"""
|
|
playlist = [playlist] if isinstance(playlist, int) else list(playlist)
|
|
if start_playing is None:
|
|
start_playing = self.status_code == STATUS_PLAYING
|
|
|
|
client = self._require_client()
|
|
await client.async_send_stop_command(self) # needed before replacing playlist
|
|
await client.async_send_set_playlist_command(self, playlist)
|
|
if start_playing and playlist:
|
|
await client.async_send_play_command(self)
|
|
|
|
async def async_set_repeat_playlist(self, repeat: bool) -> None:
|
|
"""
|
|
Set whether the device's playlist should repeat.
|
|
|
|
Parameters:
|
|
repeat (bool): True to enable repeating the playlist, False to disable it.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_set_repeat_playlist_command(self, repeat)
|
|
|
|
async def async_set_autoplay(self, option: bool | int | str) -> None:
|
|
"""
|
|
Set the device's autoplay / wait-after option.
|
|
|
|
Parameters:
|
|
option (bool | int | str): Desired autoplay/wait-after value. If a `bool` is provided, `True` is converted to `"0"` and `False` to `"1"`. Integer or string values are sent as their string representation.
|
|
"""
|
|
if isinstance(option, bool):
|
|
option = 0 if option else 1
|
|
client = self._require_client()
|
|
await client.async_send_set_autoplay_command(self, str(option))
|
|
|
|
async def async_upgrade(self, beta: bool = False) -> None:
|
|
"""
|
|
Initiates a firmware upgrade on the device.
|
|
|
|
Parameters:
|
|
beta (bool): If True, request a beta (pre-release) firmware; otherwise request the stable firmware.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_upgrade_command(self, beta)
|
|
|
|
async def async_play(self) -> None:
|
|
"""
|
|
Send a play command to the device via the attached transport client.
|
|
|
|
Raises:
|
|
RuntimeError: If no transport client is attached.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_play_command(self)
|
|
|
|
async def async_pause(self) -> None:
|
|
"""
|
|
Pause playback on the device.
|
|
|
|
Raises:
|
|
RuntimeError: If no transport client is attached.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_pause_command(self)
|
|
|
|
async def async_stop(self) -> None:
|
|
"""
|
|
Stop playback on the device by sending a stop command through the attached transport client.
|
|
|
|
Raises:
|
|
RuntimeError: if no transport client is attached to the device.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_stop_command(self)
|
|
|
|
async def async_reboot(self) -> None:
|
|
"""
|
|
Reboots the device using the attached transport client.
|
|
|
|
Requests the attached client to send a reboot command to the device.
|
|
|
|
Raises:
|
|
RuntimeError: If no transport client is attached.
|
|
"""
|
|
client = self._require_client()
|
|
await client.async_send_reboot_command(self)
|
|
|
|
def schedule_track_refresh(self) -> None:
|
|
"""
|
|
Schedule a background refresh of the current track metadata when the device's track may have changed.
|
|
|
|
Does nothing if no cloud client is attached or if there is no running event loop. If a previous refresh task is still pending, it is cancelled before a new background task is scheduled.
|
|
"""
|
|
if not self._cloud:
|
|
return
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
_LOGGER.debug("No running loop; cannot schedule track refresh")
|
|
return
|
|
|
|
if self._track_task and not self._track_task.done():
|
|
self._track_task.cancel()
|
|
|
|
self._track_task = loop.create_task(self._async_refresh_current_track())
|
|
|
|
async def _async_refresh_current_track(self) -> None:
|
|
"""
|
|
Refresh cached information for the current track by fetching details from the attached cloud client and notify listeners when updated.
|
|
|
|
If no cloud client is attached, no current track exists, or the cached track already matches the current track id, the method returns without change. On successful fetch, updates the device's track cache and invokes registered update listeners.
|
|
"""
|
|
if not self._cloud:
|
|
return
|
|
|
|
if (track_id := self.track_id) is None:
|
|
self._track = None
|
|
return
|
|
|
|
if self._track and self._track.get("id") == track_id:
|
|
return
|
|
|
|
try:
|
|
track = await self._cloud.async_get_track_info(track_id)
|
|
except Exception:
|
|
_LOGGER.exception("Error fetching track info for %s", track_id)
|
|
return
|
|
|
|
if not track:
|
|
return
|
|
|
|
self._track = track
|
|
self._notify_listeners()
|