1
0
mirror of https://github.com/natekspencer/hacs-oasis_mini.git synced 2025-11-08 05:03:52 -05:00
Files
hacs-oasis_mini/custom_components/oasis_mini/pyoasismini/__init__.py
2025-08-02 14:21:34 +00:00

536 lines
19 KiB
Python

"""Oasis Mini API client."""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
import logging
from typing import Any, Awaitable, Final
from urllib.parse import urljoin
from aiohttp import ClientResponseError, ClientSession
from .const import TRACKS
from .utils import _bit_to_bool, _parse_int, decrypt_svg_content, now
_LOGGER = logging.getLogger(__name__)
STATUS_CODE_MAP = {
0: "booting",
2: "stopped",
3: "centering",
4: "playing",
5: "paused",
6: "sleeping",
9: "error",
11: "updating",
13: "downloading",
14: "busy",
15: "live",
}
ERROR_CODE_MAP = {
0: "None",
1: "Error has occurred while reading the flash memory",
2: "Error while starting the Wifi",
3: "Error when starting DNS settings for your machine",
4: "Failed to open the file to write",
5: "Not enough memory to perform the upgrade",
6: "Error while trying to upgrade your system",
7: "Error while trying to download the new version of the software",
8: "Error while reading the upgrading file",
9: "Failed to start downloading the upgrade file",
10: "Error while starting downloading the job file",
11: "Error while opening the file folder",
12: "Failed to delete a file",
13: "Error while opening the job file",
14: "You have wrong power adapter",
15: "Failed to update the device IP on Oasis Server",
16: "Your device failed centering itself",
17: "There appears to be an issue with your Oasis Device",
18: "Error while downloading the job file",
}
AUTOPLAY_MAP = {
"0": "on",
"1": "off",
"2": "5 minutes",
"3": "10 minutes",
"4": "30 minutes",
"5": "24 hours",
}
LED_EFFECTS: Final[dict[str, str]] = {
"0": "Solid",
"1": "Rainbow",
"2": "Glitter",
"3": "Confetti",
"4": "Sinelon",
"5": "BPM",
"6": "Juggle",
"7": "Theater",
"8": "Color Wipe",
"9": "Sparkle",
"10": "Comet",
"11": "Follow Ball",
"12": "Follow Rainbow",
"13": "Chasing Comet",
"14": "Gradient Follow",
"15": "Cumulative Fill",
"16": "Multi Comets A",
"17": "Rainbow Chaser",
"18": "Twinkle Lights",
"19": "Tennis Game",
"20": "Breathing Exercise 4-7-8",
"21": "Cylon Scanner",
"22": "Palette Mode",
"23": "Aurora Flow",
"24": "Colorful Drops",
"25": "Color Snake",
"26": "Flickering Candles",
"27": "Digital Rain",
"28": "Center Explosion",
"29": "Rainbow Plasma",
"30": "Comet Race",
"31": "Color Waves",
"32": "Meteor Storm",
"33": "Firefly Flicker",
"34": "Ripple",
"35": "Jelly Bean",
"36": "Forest Rain",
"37": "Multi Comets",
"38": "Multi Comets with Background",
"39": "Rainbow Fill",
"40": "White Red Comet",
"41": "Color Comets",
}
CLOUD_BASE_URL = "https://app.grounded.so"
BALL_SPEED_MAX: Final = 400
BALL_SPEED_MIN: Final = 100
LED_SPEED_MAX: Final = 90
LED_SPEED_MIN: Final = -90
PLAYLISTS_REFRESH_LIMITER = timedelta(minutes=5)
class OasisMini:
"""Oasis Mini API client class."""
_access_token: str | None = None
_mac_address: str | None = None
_ip_address: str | None = None
_playlist: dict[int, dict[str, str]] = {}
_serial_number: str | None = None
_software_version: str | None = None
_track: dict | None = None
autoplay: str
brightness: int
busy: bool
color: str | None = None
download_progress: int
error: int
led_effect: str
led_speed: int
max_brightness: int
playlist: list[int]
playlist_index: int
progress: int
repeat_playlist: bool
status_code: int
playlists: list[dict[str, Any]] = []
_playlists_next_refresh: datetime = now()
def __init__(
self,
host: str,
access_token: str | None = None,
session: ClientSession | None = None,
) -> None:
"""Initialize the client."""
self._host = host
self._access_token = access_token
self._session = session if session else ClientSession()
@property
def access_token(self) -> str | None:
"""Return the access token, if any."""
return self._access_token
@property
def mac_address(self) -> str | None:
"""Return the mac address."""
return self._mac_address
@property
def drawing_progress(self) -> float | None:
"""Return the drawing progress percent."""
if not (self.track and (svg_content := self.track.get("svg_content"))):
return None
svg_content = decrypt_svg_content(svg_content)
paths = svg_content.split("L")
total = self.track.get("reduced_svg_content_new", 0) or len(paths)
percent = (100 * self.progress) / total
return percent
@property
def error_message(self) -> str | None:
"""Return the error message, if any."""
if self.status_code == 9:
return ERROR_CODE_MAP.get(self.error, f"Unknown ({self.error})")
return None
@property
def serial_number(self) -> str | None:
"""Return the serial number."""
return self._serial_number
@property
def session(self) -> ClientSession:
"""Return the session."""
return self._session
@property
def software_version(self) -> str | None:
"""Return the software version."""
return self._software_version
@property
def status(self) -> str:
"""Return the status."""
return STATUS_CODE_MAP.get(self.status_code, f"Unknown ({self.status_code})")
@property
def track(self) -> dict | None:
"""Return the current track info."""
if self._track and self._track.get("id") == self.track_id:
return self._track
return None
@property
def track_id(self) -> int | None:
"""Return the current track id."""
if not self.playlist:
return None
i = self.playlist_index
return self.playlist[0] if i >= len(self.playlist) else self.playlist[i]
@property
def url(self) -> str:
"""Return the url."""
return f"http://{self._host}/"
async def async_add_track_to_playlist(self, track: int | list[int]) -> None:
"""Add track to playlist."""
if not track:
return
if isinstance(track, int):
track = [track]
if 0 in self.playlist:
playlist = [t for t in self.playlist if t] + track
return await self.async_set_playlist(playlist)
await self._async_command(params={"ADDJOBLIST": track})
self.playlist.extend(track)
async def async_change_track(self, index: int) -> None:
"""Change the track."""
if index >= len(self.playlist):
raise ValueError("Invalid index specified")
await self._async_command(params={"CMDCHANGETRACK": index})
async def async_clear_playlist(self) -> None:
"""Clear the playlist."""
await self.async_set_playlist([])
async def async_get_ip_address(self) -> str | None:
"""Get the ip address."""
self._ip_address = await self._async_get(params={"GETIP": ""})
_LOGGER.debug("IP address: %s", self._ip_address)
return self._ip_address
async def async_get_mac_address(self) -> str | None:
"""Get the mac address."""
self._mac_address = await self._async_get(params={"GETMAC": ""})
_LOGGER.debug("MAC address: %s", self._mac_address)
return self._mac_address
async def async_get_serial_number(self) -> str | None:
"""Get the serial number."""
self._serial_number = await self._async_get(params={"GETOASISID": ""})
_LOGGER.debug("Serial number: %s", self._serial_number)
return self._serial_number
async def async_get_software_version(self) -> str | None:
"""Get the software version."""
self._software_version = await self._async_get(params={"GETSOFTWAREVER": ""})
_LOGGER.debug("Software version: %s", self._software_version)
return self._software_version
async def async_get_status(self) -> str:
"""Get the status from the device."""
raw_status = await self._async_get(params={"GETSTATUS": ""})
_LOGGER.debug("Status: %s", raw_status)
values = raw_status.split(";")
playlist = [_parse_int(track) for track in values[3].split(",") if track]
shift = len(values) - 18 if len(values) > 17 else 0
status = {
"status_code": _parse_int(values[0]), # see status code map
"error": _parse_int(values[1]),
"ball_speed": _parse_int(values[2]), # 200 - 1000
"playlist": playlist,
"playlist_index": min(_parse_int(values[4]), len(playlist)), # noqa: E501; index of above
"progress": _parse_int(values[5]), # 0 - max svg path
"led_effect": values[6], # led effect (code lookup)
"led_color_id": values[7], # led color id?
"led_speed": _parse_int(values[8]), # -90 - 90
"brightness": _parse_int(values[9]), # noqa: E501; 0 - 200 in app, but seems to be 0 (off) to 304 (max), then repeats
"color": values[10] if "#" in values[10] else None, # hex color code
"busy": _bit_to_bool(values[11 + shift]), # noqa: E501; device is busy (downloading track, centering, software update)?
"download_progress": _parse_int(values[12 + shift]),
"max_brightness": _parse_int(values[13 + shift]),
"wifi_connected": _bit_to_bool(values[14 + shift]),
"repeat_playlist": _bit_to_bool(values[15 + shift]),
"autoplay": AUTOPLAY_MAP.get(value := values[16 + shift], value),
"autoclean": _bit_to_bool(values[17 + shift])
if len(values) > 17
else False,
}
for key, value in status.items():
if (old_value := getattr(self, key, None)) != value:
_LOGGER.debug(
"%s changed: '%s' -> '%s'",
key.replace("_", " ").capitalize(),
old_value,
value,
)
setattr(self, key, value)
return raw_status
async def async_move_track(self, _from: int, _to: int) -> None:
"""Move a track in the playlist."""
await self._async_command(params={"MOVEJOB": f"{_from};{_to}"})
async def async_pause(self) -> None:
"""Send pause command."""
await self._async_command(params={"CMDPAUSE": ""})
async def async_play(self) -> None:
"""Send play command."""
if self.status_code == 15:
await self.async_stop()
if self.track_id:
await self._async_command(params={"CMDPLAY": ""})
async def async_reboot(self) -> None:
"""Send reboot command."""
async def _no_response_needed(coro: Awaitable) -> None:
try:
await coro
except Exception as ex:
_LOGGER.error(ex)
reboot = self._async_command(params={"CMDBOOT": ""})
asyncio.create_task(_no_response_needed(reboot))
async def async_set_ball_speed(self, speed: int) -> None:
"""Set the Oasis Mini ball speed."""
if not BALL_SPEED_MIN <= speed <= BALL_SPEED_MAX:
raise ValueError("Invalid speed specified")
await self._async_command(params={"WRIOASISSPEED": speed})
async def async_set_led(
self,
*,
led_effect: str | None = None,
color: str | None = None,
led_speed: int | None = None,
brightness: int | None = None,
) -> None:
"""Set the Oasis Mini led."""
if led_effect is None:
led_effect = self.led_effect
if color is None:
color = self.color or "#ffffff"
if led_speed is None:
led_speed = self.led_speed
if brightness is None:
brightness = self.brightness
if led_effect not in LED_EFFECTS:
raise ValueError("Invalid led effect specified")
if not LED_SPEED_MIN <= led_speed <= LED_SPEED_MAX:
raise ValueError("Invalid led speed specified")
if not 0 <= brightness <= self.max_brightness:
raise ValueError("Invalid brightness specified")
await self._async_command(
params={"WRILED": f"{led_effect};0;{color};{led_speed};{brightness}"}
)
async def async_set_autoplay(self, option: bool | int | str) -> None:
"""Set autoplay."""
if isinstance(option, bool):
option = 0 if option else 1
if str(option) not in AUTOPLAY_MAP:
raise ValueError("Invalid pause option specified")
await self._async_command(params={"WRIWAITAFTER": option})
async def async_set_playlist(self, playlist: list[int] | int) -> None:
"""Set the playlist."""
if isinstance(playlist, int):
playlist = [playlist]
if is_playing := (self.status_code == 4):
await self.async_stop()
await self._async_command(params={"WRIJOBLIST": ",".join(map(str, playlist))})
self.playlist = playlist
if is_playing:
await self.async_play()
async def async_set_repeat_playlist(self, repeat: bool) -> None:
"""Set repeat playlist."""
await self._async_command(params={"WRIREPEATJOB": 1 if repeat else 0})
async def async_sleep(self) -> None:
"""Send sleep command."""
await self._async_command(params={"CMDSLEEP": ""})
async def async_stop(self) -> None:
"""Send stop command."""
await self._async_command(params={"CMDSTOP": ""})
async def async_upgrade(self, beta: bool = False) -> None:
"""Trigger a software upgrade."""
await self._async_command(params={"CMDUPGRADE": 1 if beta else 0})
async def async_cloud_login(self, email: str, password: str) -> None:
"""Login via the cloud."""
response = await self._async_request(
"POST",
urljoin(CLOUD_BASE_URL, "api/auth/login"),
json={"email": email, "password": password},
)
self._access_token = response.get("access_token")
async def async_cloud_logout(self) -> None:
"""Login via the cloud."""
await self._async_cloud_request("GET", "api/auth/logout")
async def async_cloud_get_playlists(
self, personal_only: bool = False
) -> list[dict[str, Any]]:
"""Get playlists from the cloud."""
if self._playlists_next_refresh <= now():
if playlists := await self._async_cloud_request(
"GET", "api/playlist", params={"my_playlists": str(personal_only)}
):
self.playlists = playlists
self._playlists_next_refresh = now() + PLAYLISTS_REFRESH_LIMITER
return self.playlists
async def async_cloud_get_track_info(self, track_id: int) -> dict[str, Any] | None:
"""Get cloud track info."""
try:
return await self._async_cloud_request("GET", f"api/track/{track_id}")
except ClientResponseError as err:
if err.status == 404:
return {"id": track_id, "name": f"Unknown Title (#{track_id})"}
except Exception as ex:
_LOGGER.exception(ex)
return None
async def async_cloud_get_tracks(
self, tracks: list[int] | None = None
) -> list[dict[str, Any]]:
"""Get tracks info from the cloud"""
response = await self._async_cloud_request(
"GET", "api/track", params={"ids[]": tracks or []}
)
if not response:
return []
track_details = response.get("data", [])
while next_page_url := response.get("next_page_url"):
response = await self._async_cloud_request("GET", next_page_url)
track_details += response.get("data", [])
return track_details
async def async_cloud_get_latest_software_details(self) -> dict[str, int | str]:
"""Get the latest software details from the cloud."""
return await self._async_cloud_request("GET", "api/software/last-version")
async def async_get_current_track_details(self) -> dict | None:
"""Get current track info, refreshing if needed."""
track_id = self.track_id
if (track := self._track) and track.get("id") == track_id:
return track
if track_id:
self._track = await self.async_cloud_get_track_info(track_id)
if not self._track:
self._track = TRACKS.get(
track_id, {"id": track_id, "name": f"Unknown Title (#{track_id})"}
)
return self._track
async def async_get_playlist_details(self) -> dict[int, dict[str, str]]:
"""Get playlist info."""
if set(self.playlist).difference(self._playlist.keys()):
tracks = await self.async_cloud_get_tracks(self.playlist)
all_tracks = TRACKS | {
track["id"]: {
"name": track["name"],
"author": ((track.get("author") or {}).get("person") or {}).get(
"name", "Oasis Mini"
),
"image": track["image"],
}
for track in tracks
}
for track in self.playlist:
self._playlist[track] = all_tracks.get(
track, {"name": f"Unknown Title (#{track})"}
)
return self._playlist
async def _async_cloud_request(self, method: str, url: str, **kwargs: Any) -> Any:
"""Perform a cloud request."""
if not self.access_token:
return
return await self._async_request(
method,
urljoin(CLOUD_BASE_URL, url),
headers={"Authorization": f"Bearer {self.access_token}"},
**kwargs,
)
async def _async_command(self, **kwargs: Any) -> str | None:
"""Send a command to the device."""
result = await self._async_get(**kwargs)
_LOGGER.debug("Result: %s", result)
async def _async_get(self, **kwargs: Any) -> str | None:
"""Perform a GET request."""
return await self._async_request("GET", self.url, **kwargs)
async def _async_request(self, method: str, url: str, **kwargs) -> Any:
"""Perform a request."""
_LOGGER.debug(
"%s %s",
method,
self._session._build_url(url).update_query( # pylint: disable=protected-access
kwargs.get("params")
),
)
response = await self._session.request(method, url, **kwargs)
if response.status == 200:
if response.content_type == "application/json":
return await response.json()
if response.content_type == "text/plain":
return await response.text()
return None
response.raise_for_status()