1
0
mirror of https://github.com/natekspencer/hacs-oasis_mini.git synced 2025-11-13 15:43:52 -05:00

4 Commits
0.3.1 ... 0.5.0

Author SHA1 Message Date
Nathan Spencer
71180f68f9 Merge pull request #9 from natekspencer/dev
Updates to handle firmware version 0.71 and other improvements
2024-07-18 13:05:28 -06:00
Nathan Spencer
0d539888e5 Updates to handle firmware version 0.71 and other improvements 2024-07-18 13:03:19 -06:00
Nathan Spencer
4186755a92 Merge pull request #8 from natekspencer/dev
Add update entity
2024-07-17 09:47:02 -06:00
Nathan Spencer
7c8ca361ba Add update entity 2024-07-17 09:44:32 -06:00
13 changed files with 384 additions and 179 deletions

View File

@@ -23,7 +23,8 @@ PLATFORMS = [
Platform.NUMBER,
Platform.SELECT,
Platform.SENSOR,
Platform.SWITCH,
# Platform.SWITCH,
Platform.UPDATE,
]

View File

@@ -12,6 +12,7 @@ from homeassistant.components.button import (
ButtonEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@@ -42,9 +43,12 @@ async def play_random_track(device: OasisMini) -> None:
await device.async_add_track_to_playlist(track)
# Move track to next item in the playlist and then select it
if (idx := device.playlist.index(track)) != (next_idx := device.playlist_index + 1):
await device.async_move_track(idx, next_idx)
await device.async_change_track(next_idx)
if (index := device.playlist.index(track)) != device.playlist_index:
if index != (next_index := device.playlist_index + 1):
await device.async_move_track(index, next_index)
await device.async_change_track(next_index)
if device.status_code != 4:
await device.async_play()
@@ -59,6 +63,7 @@ DESCRIPTORS = (
OasisMiniButtonEntityDescription(
key="reboot",
device_class=ButtonDeviceClass.RESTART,
entity_category=EntityCategory.CONFIG,
press_fn=lambda device: device.async_reboot(),
),
OasisMiniButtonEntityDescription(

View File

@@ -32,6 +32,8 @@ class OasisMiniCoordinator(DataUpdateCoordinator[str]):
async def _async_update_data(self):
try:
async with async_timeout.timeout(10):
if not self.device.mac_address:
await self.device.async_get_mac_address()
if not self.device.serial_number:
await self.device.async_get_serial_number()
if not self.device.software_version:

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import logging
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC, format_mac
from homeassistant.helpers.entity import DeviceInfo, EntityDescription
from homeassistant.helpers.update_coordinator import CoordinatorEntity
@@ -29,16 +30,18 @@ class OasisMiniEntity(CoordinatorEntity[OasisMiniCoordinator]):
"""Construct an Oasis Mini entity."""
super().__init__(coordinator)
self.entity_description = description
serial_number = coordinator.device.serial_number
device = coordinator.device
serial_number = device.serial_number
self._attr_unique_id = f"{serial_number}-{description.key}"
self._attr_device_info = DeviceInfo(
connections={(CONNECTION_NETWORK_MAC, format_mac(device.mac_address))},
identifiers={(DOMAIN, serial_number)},
name=entry.title,
manufacturer="Kinetic Oasis",
model="Oasis Mini",
serial_number=serial_number,
sw_version=coordinator.device.software_version,
sw_version=device.software_version,
)
@property

View File

@@ -39,11 +39,7 @@ class OasisMiniImageEntity(OasisMiniEntity, ImageEntity):
def image(self) -> bytes | None:
"""Return bytes of image."""
return draw_svg(
self.device._current_track_details,
self.device.progress,
"1",
)
return draw_svg(self.device.track, self.device.progress, "1")
async def async_setup_entry(

View File

@@ -22,17 +22,18 @@ from .coordinator import OasisMiniCoordinator
from .entity import OasisMiniEntity
from .pyoasismini.const import TRACKS
BRIGHTNESS_SCALE = (1, 200)
class OasisMiniMediaPlayerEntity(OasisMiniEntity, MediaPlayerEntity):
"""Oasis Mini media player entity."""
_attr_media_image_remotely_accessible = True
_attr_supported_features = (
MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.PAUSE
MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.CLEAR_PLAYLIST
| MediaPlayerEntityFeature.REPEAT_SET
)
@@ -44,17 +45,15 @@ class OasisMiniMediaPlayerEntity(OasisMiniEntity, MediaPlayerEntity):
@property
def media_duration(self) -> int:
"""Duration of current playing media in seconds."""
if (
track := self.device._current_track_details
) and "reduced_svg_content" in track:
if (track := self.device.track) and "reduced_svg_content" in track:
return track["reduced_svg_content"].get("1")
return math.ceil(self.media_position / 0.99)
@property
def media_image_url(self) -> str | None:
"""Image url of current playing media."""
if not (track := self.device._current_track_details):
track = TRACKS.get(str(self.device.current_track_id))
if not (track := self.device.track):
track = TRACKS.get(str(self.device.track_id))
if track and "image" in track:
return f"https://app.grounded.so/uploads/{track['image']}"
return None
@@ -72,28 +71,32 @@ class OasisMiniMediaPlayerEntity(OasisMiniEntity, MediaPlayerEntity):
@property
def media_title(self) -> str:
"""Title of current playing media."""
if not (track := self.device._current_track_details):
track = TRACKS.get(str(self.device.current_track_id), {})
return track.get("name", f"Unknown Title (#{self.device.current_track_id})")
if not (track := self.device.track):
track = TRACKS.get(str(self.device.track_id), {})
return track.get("name", f"Unknown Title (#{self.device.track_id})")
@property
def repeat(self) -> RepeatMode:
"""Return current repeat mode."""
if self.device.repeat_playlist:
return RepeatMode.ALL
return RepeatMode.OFF
return RepeatMode.ALL if self.device.repeat_playlist else RepeatMode.OFF
@property
def state(self) -> MediaPlayerState:
"""State of the player."""
status_code = self.device.status_code
if status_code in (3, 13):
if self.device.error or status_code == 9:
return MediaPlayerState.OFF
if status_code == 2:
return MediaPlayerState.IDLE
if status_code in (3, 11, 13):
return MediaPlayerState.BUFFERING
if status_code in (2, 5):
return MediaPlayerState.PAUSED
if status_code == 4:
return MediaPlayerState.PLAYING
return MediaPlayerState.STANDBY
if status_code == 5:
return MediaPlayerState.PAUSED
if status_code == 15:
return MediaPlayerState.ON
return MediaPlayerState.IDLE
async def async_media_pause(self) -> None:
"""Send pause command."""
@@ -105,6 +108,11 @@ class OasisMiniMediaPlayerEntity(OasisMiniEntity, MediaPlayerEntity):
await self.device.async_play()
await self.coordinator.async_request_refresh()
async def async_media_stop(self) -> None:
"""Send stop command."""
await self.device.async_stop()
await self.coordinator.async_request_refresh()
async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set repeat mode."""
await self.device.async_set_repeat_playlist(
@@ -113,6 +121,13 @@ class OasisMiniMediaPlayerEntity(OasisMiniEntity, MediaPlayerEntity):
)
await self.coordinator.async_request_refresh()
async def async_media_previous_track(self) -> None:
"""Send previous track command."""
if (index := self.device.playlist_index - 1) < 0:
index = len(self.device.playlist) - 1
await self.device.async_change_track(index)
await self.coordinator.async_request_refresh()
async def async_media_next_track(self) -> None:
"""Send next track command."""
if (index := self.device.playlist_index + 1) >= len(self.device.playlist):
@@ -120,6 +135,11 @@ class OasisMiniMediaPlayerEntity(OasisMiniEntity, MediaPlayerEntity):
await self.device.async_change_track(index)
await self.coordinator.async_request_refresh()
async def async_clear_playlist(self) -> None:
"""Clear players playlist."""
await self.device.async_set_playlist([0])
await self.coordinator.async_request_refresh()
DESCRIPTOR = MediaPlayerEntityDescription(key="oasis_mini", name=None)

View File

@@ -10,6 +10,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN
from .coordinator import OasisMiniCoordinator
from .entity import OasisMiniEntity
from .pyoasismini import BALL_SPEED_MAX, BALL_SPEED_MIN, LED_SPEED_MAX, LED_SPEED_MIN
class OasisMiniNumberEntity(OasisMiniEntity, NumberEntity):
@@ -33,14 +34,14 @@ DESCRIPTORS = {
NumberEntityDescription(
key="ball_speed",
name="Ball speed",
native_max_value=800,
native_min_value=200,
native_max_value=BALL_SPEED_MAX,
native_min_value=BALL_SPEED_MIN,
),
NumberEntityDescription(
key="led_speed",
name="LED speed",
native_max_value=90,
native_min_value=-90,
native_max_value=LED_SPEED_MAX,
native_min_value=LED_SPEED_MIN,
),
}

View File

@@ -12,18 +12,29 @@ from .utils import _bit_to_bool
_LOGGER = logging.getLogger(__name__)
STATUS_CODE_MAP = {
0: "booting", # maybe?
2: "stopped",
3: "centering",
4: "running",
5: "paused",
9: "error",
11: "updating",
13: "downloading",
15: "live drawing",
}
AUTOPLAY_MAP = {
"0": "on",
"1": "off",
"2": "5 minutes",
"3": "10 minutes",
"4": "30 minutes",
}
ATTRIBUTES: Final[list[tuple[str, Callable[[str], Any]]]] = [
("status_code", int), # see status code map
("error", str), # error, 0 = none, and 10 = ?, 18 = can't download?
("ball_speed", int), # 200 - 800
("error", int), # error, 0 = none, and 10 = ?, 18 = can't download?
("ball_speed", int), # 200 - 1000
("playlist", lambda value: [int(track) for track in value.split(",")]), # noqa: E501 # comma separated track ids
("playlist_index", int), # index of above
("progress", int), # 0 - max svg path
@@ -37,7 +48,7 @@ ATTRIBUTES: Final[list[tuple[str, Callable[[str], Any]]]] = [
("max_brightness", int),
("wifi_connected", _bit_to_bool),
("repeat_playlist", _bit_to_bool),
("pause_between_tracks", _bit_to_bool),
("autoplay", AUTOPLAY_MAP.get),
]
LED_EFFECTS: Final[dict[str, str]] = {
@@ -59,25 +70,35 @@ LED_EFFECTS: Final[dict[str, str]] = {
}
CLOUD_BASE_URL = "https://app.grounded.so"
CLOUD_API_URL = f"{CLOUD_BASE_URL}/api"
BALL_SPEED_MAX: Final = 1000
BALL_SPEED_MIN: Final = 200
LED_SPEED_MAX: Final = 90
LED_SPEED_MIN: Final = -90
class OasisMini:
"""Oasis Mini API client class."""
_access_token: str | None = None
_current_track_details: dict | None = None
_mac_address: str | None = None
_ip_address: str | None = None
_serial_number: str | None = None
_software_version: str | None = None
_track: dict | None = None
autoplay: str
brightness: int
color: str
download_progress: int
error: int
led_effect: str
led_speed: int
max_brightness: int
playlist: list[int]
playlist_index: int
progress: int
repeat_playlist: bool
status_code: int
def __init__(
@@ -97,10 +118,9 @@ class OasisMini:
return self._access_token
@property
def current_track_id(self) -> int:
"""Return the current track."""
i = self.playlist_index
return self.playlist[0] if i >= len(self.playlist) else self.playlist[i]
def mac_address(self) -> str | None:
"""Return the mac address."""
return self._mac_address
@property
def serial_number(self) -> str | None:
@@ -122,6 +142,19 @@ class OasisMini:
"""Return the status."""
return STATUS_CODE_MAP.get(self.status_code, f"Unknown ({self.status_code})")
@property
def track(self) -> dict | None:
"""Return the current track info."""
if self._track and self._track.get("id") == self.track_id:
return self._track
return None
@property
def track_id(self) -> int:
"""Return the current track id."""
i = self.playlist_index
return self.playlist[0] if i >= len(self.playlist) else self.playlist[i]
@property
def url(self) -> str:
"""Return the url."""
@@ -129,15 +162,31 @@ class OasisMini:
async def async_add_track_to_playlist(self, track: int) -> None:
"""Add track to playlist."""
if 0 in self.playlist:
playlist = [t for t in self.playlist if t] + [track]
await self.async_set_playlist(playlist)
else:
await self._async_command(params={"ADDJOBLIST": track})
self.playlist.append(track)
async def async_change_track(self, index: int) -> None:
"""Change the track."""
if index >= len(self.playlist):
raise ValueError("Invalid selection")
raise ValueError("Invalid index specified")
await self._async_command(params={"CMDCHANGETRACK": index})
async def async_get_ip_address(self) -> str | None:
"""Get the ip address."""
self._ip_address = await self._async_get(params={"GETIP": ""})
_LOGGER.debug("IP address: %s", self._ip_address)
return self._ip_address
async def async_get_mac_address(self) -> str | None:
"""Get the mac address."""
self._mac_address = await self._async_get(params={"GETMAC": ""})
_LOGGER.debug("MAC address: %s", self._mac_address)
return self._mac_address
async def async_get_serial_number(self) -> str | None:
"""Get the serial number."""
self._serial_number = await self._async_get(params={"GETOASISID": ""})
@@ -171,6 +220,8 @@ class OasisMini:
async def async_play(self) -> None:
"""Send play command."""
if self.status_code == 15:
await self.async_stop()
await self._async_command(params={"CMDPLAY": ""})
async def async_reboot(self) -> None:
@@ -187,8 +238,8 @@ class OasisMini:
async def async_set_ball_speed(self, speed: int) -> None:
"""Set the Oasis Mini ball speed."""
if not 200 <= speed <= 800:
raise Exception("Invalid speed specified")
if not BALL_SPEED_MIN <= speed <= BALL_SPEED_MAX:
raise ValueError("Invalid speed specified")
await self._async_command(params={"WRIOASISSPEED": speed})
@@ -211,36 +262,40 @@ class OasisMini:
brightness = self.brightness
if led_effect not in LED_EFFECTS:
raise Exception("Invalid led effect specified")
if not -90 <= led_speed <= 90:
raise Exception("Invalid led speed specified")
if not 0 <= brightness <= 200:
raise Exception("Invalid brightness specified")
raise ValueError("Invalid led effect specified")
if not LED_SPEED_MIN <= led_speed <= LED_SPEED_MAX:
raise ValueError("Invalid led speed specified")
if not 0 <= brightness <= self.max_brightness:
raise ValueError("Invalid brightness specified")
await self._async_command(
params={"WRILED": f"{led_effect};0;{color};{led_speed};{brightness}"}
)
async def async_set_pause_between_tracks(self, pause: bool) -> None:
"""Set the Oasis Mini pause between tracks."""
await self._async_command(params={"WRIWAITAFTER": 1 if pause else 0})
async def async_set_autoplay(self, option: bool | int | str) -> None:
"""Set autoplay."""
if isinstance(option, bool):
option = 0 if option else 1
if str(option) not in AUTOPLAY_MAP:
raise ValueError("Invalid pause option specified")
await self._async_command(params={"WRIWAITAFTER": option})
async def async_set_playlist(self, playlist: list[int]) -> None:
"""Set playlist."""
await self._async_command(params={"WRIJOBLIST": ",".join(map(str, playlist))})
self.playlist = playlist
async def async_set_repeat_playlist(self, repeat: bool) -> None:
"""Set the Oasis Mini repeat playlist."""
"""Set repeat playlist."""
await self._async_command(params={"WRIREPEATJOB": 1 if repeat else 0})
async def _async_command(self, **kwargs: Any) -> str | None:
"""Send a command request."""
result = await self._async_get(**kwargs)
_LOGGER.debug("Result: %s", result)
async def async_stop(self) -> None:
"""Send stop command."""
await self._async_command(params={"CMDSTOP": ""})
async def _async_get(self, **kwargs: Any) -> str | None:
"""Perform a GET request."""
response = await self._session.get(self.url, **kwargs)
if response.status == 200 and response.content_type == "text/plain":
text = await response.text()
return text
return None
async def async_upgrade(self, beta: bool = False) -> None:
"""Trigger a software upgrade."""
await self._async_command(params={"CMDUPGRADE": 1 if beta else 0})
async def async_cloud_login(self, email: str, password: str) -> None:
"""Login via the cloud."""
@@ -253,59 +308,62 @@ class OasisMini:
async def async_cloud_logout(self) -> None:
"""Login via the cloud."""
if not self.access_token:
return
await self._async_request(
"GET",
urljoin(CLOUD_BASE_URL, "api/auth/logout"),
headers={"Authorization": f"Bearer {self.access_token}"},
)
await self._async_cloud_request("GET", "api/auth/logout")
async def async_cloud_get_track_info(self, track_id: int) -> None:
async def async_cloud_get_track_info(self, track_id: int) -> dict[str, Any]:
"""Get cloud track info."""
if not self.access_token:
return
return await self._async_cloud_request("GET", f"api/track/{track_id}")
response = await self._async_request(
"GET",
urljoin(CLOUD_BASE_URL, f"api/track/{track_id}"),
headers={"Authorization": f"Bearer {self.access_token}"},
async def async_cloud_get_tracks(self, tracks: list[int]) -> dict:
"""Get tracks info from the cloud"""
return await self._async_cloud_request(
"GET", "api/track", params={"ids[]": tracks}
)
return response
async def async_cloud_get_tracks(self, tracks: list[int]) -> None:
"""Get cloud tracks."""
if not self.access_token:
return
response = await self._async_request(
"GET",
urljoin(CLOUD_BASE_URL, "api/track"),
headers={"Authorization": f"Bearer {self.access_token}"},
params={"ids[]": tracks},
)
return response
async def _async_request(self, method: str, url: str, **kwargs) -> Any:
"""Login via the cloud."""
response = await self._session.request(method, url, **kwargs)
if response.status == 200:
if response.headers.get("Content-Type") == "application/json":
return await response.json()
return await response.text()
response.raise_for_status()
async def async_cloud_get_latest_software_details(self) -> dict[str, int | str]:
"""Get the latest software details from the cloud."""
return await self._async_cloud_request("GET", "api/software/last-version")
async def async_get_current_track_details(self) -> dict:
"""Get current track info, refreshing if needed."""
if (track_details := self._current_track_details) and track_details.get(
"id"
) == self.current_track_id:
return track_details
self._current_track_details = await self.async_cloud_get_track_info(
self.current_track_id
)
if (track := self._track) and track.get("id") == self.track_id:
return track
if self.track_id:
self._track = await self.async_cloud_get_track_info(self.track_id)
return self._track
async def async_get_playlist_details(self) -> dict:
"""Get playlist info."""
return await self.async_cloud_get_tracks(self.playlist)
async def _async_cloud_request(self, method: str, url: str, **kwargs: Any) -> Any:
"""Perform a cloud request."""
if not self.access_token:
return
return await self._async_request(
method,
urljoin(CLOUD_BASE_URL, url),
headers={"Authorization": f"Bearer {self.access_token}"},
**kwargs,
)
async def _async_command(self, **kwargs: Any) -> str | None:
"""Send a command to the device."""
result = await self._async_get(**kwargs)
_LOGGER.debug("Result: %s", result)
async def _async_get(self, **kwargs: Any) -> str | None:
"""Perform a GET request."""
return await self._async_request("GET", self.url, **kwargs)
async def _async_request(self, method: str, url: str, **kwargs) -> Any:
"""Perform a request."""
response = await self._session.request(method, url, **kwargs)
if response.status == 200:
if response.content_type == "application/json":
return await response.json()
if response.content_type == "text/plain":
return await response.text()
return None
response.raise_for_status()

View File

@@ -823,5 +823,15 @@
"name": "Yorkshire",
"author": "Otávio Bittencourt",
"image": "2024/06/be59f584c87cfff3aa13e5887a69e183.svg"
},
"953": {
"name": "Grizzly bear",
"author": "Otávio Bittencourt",
"image": "2024/07/a3c63d580c4e4a95cdcc457fedf7dcce.svg"
},
"670": {
"name": "Horse",
"author": "Otávio Bittencourt",
"image": "2024/07/9fec8716ce98fdbf0c02db14b47b0d66.svg"
}
}

View File

@@ -2,7 +2,8 @@
from __future__ import annotations
from typing import Any
from dataclasses import dataclass
from typing import Any, Awaitable, Callable
from homeassistant.components.select import SelectEntity, SelectEntityDescription
from homeassistant.config_entries import ConfigEntry
@@ -13,12 +14,23 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN
from .coordinator import OasisMiniCoordinator
from .entity import OasisMiniEntity
from .pyoasismini import AUTOPLAY_MAP, OasisMini
from .pyoasismini.const import TRACKS
@dataclass(frozen=True, kw_only=True)
class OasisMiniSelectEntityDescription(SelectEntityDescription):
"""Oasis Mini select entity description."""
select_fn: Callable[[OasisMini, int], Awaitable[None]]
update_handler: Callable[[OasisMiniSelectEntity], None] | None = None
class OasisMiniSelectEntity(OasisMiniEntity, SelectEntity):
"""Oasis Mini select entity."""
entity_description: OasisMiniSelectEntityDescription
def __init__(
self,
coordinator: OasisMiniCoordinator,
@@ -31,22 +43,47 @@ class OasisMiniSelectEntity(OasisMiniEntity, SelectEntity):
async def async_select_option(self, option: str) -> None:
"""Change the selected option."""
await self.device.async_change_track(self.options.index(option))
await self.entity_description.select_fn(self.device, self.options.index(option))
await self.coordinator.async_request_refresh()
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
options = [
TRACKS.get(str(track), {}).get("name", str(track))
for track in self.device.playlist
]
self._attr_options = options
self._attr_current_option = options[self.device.playlist_index]
if update_handler := self.entity_description.update_handler:
update_handler(self)
else:
self._attr_current_option = getattr(
self.device, self.entity_description.key
)
if self.hass:
return super()._handle_coordinator_update()
DESCRIPTOR = SelectEntityDescription(key="playlist", name="Playlist")
def playlist_update_handler(entity: OasisMiniSelectEntity) -> None:
"""Handle playlist updates."""
# pylint: disable=protected-access
options = [
TRACKS.get(str(track), {}).get("name", str(track))
for track in entity.device.playlist
]
entity._attr_options = options
index = min(entity.device.playlist_index, len(options) - 1)
entity._attr_current_option = options[index]
DESCRIPTORS = (
OasisMiniSelectEntityDescription(
key="playlist",
name="Playlist",
select_fn=lambda device, option: device.async_change_track(option),
update_handler=playlist_update_handler,
),
OasisMiniSelectEntityDescription(
key="autoplay",
name="Autoplay",
options=list(AUTOPLAY_MAP.values()),
select_fn=lambda device, option: device.async_set_autoplay(option),
),
)
async def async_setup_entry(
@@ -54,4 +91,9 @@ async def async_setup_entry(
) -> None:
"""Set up Oasis Mini select using config entry."""
coordinator: OasisMiniCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities([OasisMiniSelectEntity(coordinator, entry, DESCRIPTOR)])
async_add_entities(
[
OasisMiniSelectEntity(coordinator, entry, descriptor)
for descriptor in DESCRIPTORS
]
)

View File

@@ -8,7 +8,7 @@ from homeassistant.components.sensor import (
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.const import PERCENTAGE, EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@@ -36,7 +36,8 @@ DESCRIPTORS = {
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
name="Download progress",
state_class=SensorStateClass.TOTAL_INCREASING,
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
} | {
SensorEntityDescription(

View File

@@ -1,64 +1,54 @@
"""Oasis Mini switch entity."""
# """Oasis Mini switch entity."""
from __future__ import annotations
# from __future__ import annotations
from typing import Any
# from typing import Any
from homeassistant.components.switch import SwitchEntity, SwitchEntityDescription
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
# from homeassistant.components.switch import SwitchEntity, SwitchEntityDescription
# from homeassistant.config_entries import ConfigEntry
# from homeassistant.core import HomeAssistant
# from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN
from .coordinator import OasisMiniCoordinator
from .entity import OasisMiniEntity
# from .const import DOMAIN
# from .coordinator import OasisMiniCoordinator
# from .entity import OasisMiniEntity
class OasisMiniSwitchEntity(OasisMiniEntity, SwitchEntity):
"""Oasis Mini switch entity."""
@property
def is_on(self) -> bool:
"""Return True if entity is on."""
return int(getattr(self.device, self.entity_description.key))
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
if self.entity_description.key == "pause_between_tracks":
await self.device.async_set_pause_between_tracks(False)
elif self.entity_description.key == "repeat_playlist":
await self.device.async_set_repeat_playlist(False)
await self.coordinator.async_request_refresh()
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
if self.entity_description.key == "pause_between_tracks":
await self.device.async_set_pause_between_tracks(True)
elif self.entity_description.key == "repeat_playlist":
await self.device.async_set_repeat_playlist(True)
await self.coordinator.async_request_refresh()
# async def async_setup_entry(
# hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
# ) -> None:
# """Set up Oasis Mini switchs using config entry."""
# coordinator: OasisMiniCoordinator = hass.data[DOMAIN][entry.entry_id]
# async_add_entities(
# [
# OasisMiniSwitchEntity(coordinator, entry, descriptor)
# for descriptor in DESCRIPTORS
# ]
# )
DESCRIPTORS = {
SwitchEntityDescription(
key="pause_between_tracks",
name="Pause between tracks",
),
# SwitchEntityDescription(
# key="repeat_playlist",
# name="Repeat playlist",
# ),
}
# class OasisMiniSwitchEntity(OasisMiniEntity, SwitchEntity):
# """Oasis Mini switch entity."""
# @property
# def is_on(self) -> bool:
# """Return True if entity is on."""
# return int(getattr(self.device, self.entity_description.key))
# async def async_turn_off(self, **kwargs: Any) -> None:
# """Turn the entity off."""
# await self.device.async_set_repeat_playlist(False)
# await self.coordinator.async_request_refresh()
# async def async_turn_on(self, **kwargs: Any) -> None:
# """Turn the entity on."""
# await self.device.async_set_repeat_playlist(True)
# await self.coordinator.async_request_refresh()
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up Oasis Mini switchs using config entry."""
coordinator: OasisMiniCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities(
[
OasisMiniSwitchEntity(coordinator, entry, descriptor)
for descriptor in DESCRIPTORS
]
)
# DESCRIPTORS = {
# SwitchEntityDescription(
# key="repeat_playlist",
# name="Repeat playlist",
# ),
# }

View File

@@ -0,0 +1,76 @@
"""Oasis Mini update entity."""
from __future__ import annotations
from datetime import timedelta
from typing import Any
from homeassistant.components.update import (
UpdateDeviceClass,
UpdateEntity,
UpdateEntityDescription,
UpdateEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN
from .coordinator import OasisMiniCoordinator
from .entity import OasisMiniEntity
SCAN_INTERVAL = timedelta(hours=6)
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up Oasis Mini updates using config entry."""
coordinator: OasisMiniCoordinator = hass.data[DOMAIN][entry.entry_id]
if coordinator.device.access_token:
async_add_entities(
[OasisMiniUpdateEntity(coordinator, entry, DESCRIPTOR)], True
)
DESCRIPTOR = UpdateEntityDescription(
key="software", device_class=UpdateDeviceClass.FIRMWARE
)
class OasisMiniUpdateEntity(OasisMiniEntity, UpdateEntity):
"""Oasis Mini update entity."""
_attr_supported_features = (
UpdateEntityFeature.INSTALL | UpdateEntityFeature.PROGRESS
)
@property
def in_progress(self) -> bool | int:
"""Update installation progress."""
if self.device.status_code == 11:
return self.device.download_progress
return False
@property
def installed_version(self) -> str:
"""Version installed and in use."""
return self.device.software_version
@property
def should_poll(self) -> bool:
"""Set polling to True."""
return True
async def async_install(
self, version: str | None, backup: bool, **kwargs: Any
) -> None:
"""Install an update."""
await self.device.async_upgrade()
async def async_update(self) -> None:
"""Update the entity."""
software = await self.device.async_cloud_get_latest_software_details()
self._attr_latest_version = software["version"]
self._attr_release_summary = software["description"]
self._attr_release_url = f"https://app.grounded.so/software/{software['id']}"