Source code for aquaipy.aquaipy

#
#   Copyright 2018 Stephen Mc Gowan <mcclown@gmail.com>
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

"""Module for working with the AquaIllumination range of lights."""

import asyncio
# pylint: disable=no-name-in-module,import-error
from distutils.version import StrictVersion
from enum import Enum
import json

import aiohttp

from aquaipy.error import ConnError, FirmwareError, MustBeParentError

MIN_SUPPORTED_AI_FIRMWARE_VERSION = "2.0.0"
MAX_SUPPORTED_AI_FIRMWARE_VERSION = "2.5.1"


[docs]class Response(Enum): """Response codes, for the AquaIPy methods.""" Success = 0 Error = 1 NoSuchColour = 2 InvalidBrightnessValue = 3 PowerLimitExceeded = 4 AllColorsMustBeSpecified = 5 InvalidData = 6
[docs]class HDDevice: """A class for handling the conversion of data for a device.""" def __init__(self, raw_data, primary_mac_address=None): """Initialise a class from the given input raw device data. :param raw_data: Raw device data, as returned by the AI API. :param primary_mac_address: The primary device MAC address. :type raw_data: json :type primary_mac_address: str """ self._primary_mac_address = primary_mac_address self._mac_address = raw_data['serial_number'] self._mw_norm = {} self._mw_hd = {} self._max_mw = 0 # Get the values for 100% for color, value in raw_data["normal"].items(): self._mw_norm[color] = value # Get the values for HD for color, value in raw_data["hd"].items(): self._mw_hd[color] = value self._max_mw = raw_data["max_power"] @property def is_primary(self): """Check is HDDevice object represents a parent light. :returns: true if parent, false if not :rtype: bool """ return self._primary_mac_address == self._mac_address @property def mac_address(self): """Get devices MAC address/serial number. :returns: MAC address of device :rtype: str """ return self._mac_address @property def max_mw(self): """Get the max mWatts supported power level for the device. :returns: max mWatts :rtype: int """ return self._max_mw
[docs] def convert_to_intensity(self, color, percentage): """Convert a percentage to the native AI API intensity value. The conversion is different for every color and model of light. The intensity to be returned will be bewtween 0-1000 for non-HD values and between 1000-2000 for HD (over 100%) values. :param color: the specified color to convert :type color: str :param percentage: the percentage to convert :type percentage: float :returns: intensity value (0-2000) :rtype: int """ if percentage < 0: raise ValueError("Percentage must be greater than 0") elif 0 <= percentage <= 100: return round(percentage * 10) else: # HD_Percentage # HD_Brightness_Value = -------------- * 1000 # Max_HD_Percent max_percentage = ((self._mw_hd[color]) / self._mw_norm[color]) * 100 if percentage > max_percentage: raise ValueError("Percentage for {} must be between 0 and {}" .format(color, max_percentage)) hd_percentage = percentage - 100 hd_brightness_value = (hd_percentage / (max_percentage - 100)) * 1000 return round(hd_brightness_value + 1000)
[docs] def convert_to_percentage(self, color, intensity): """Convert the native AI API intensity value to a percentage. The conversion of the intensity value (0 - 2000) will be different for every color and light model. :param color: the specified color to convert :type color: str :param intensity: the specified color intensity (0-2000) :type intensity: int :returns: the resulting percentage :rtype: float """ if intensity < 0 or intensity > 2000: raise ValueError("intensity must be between 0 and 2000") elif intensity <= 1000: return intensity/10 else: # Brightness - 1000 HD_Max - Normal_Max # HD_Percentage = ----------------- * ------------------- * 100 # 1000 Normal_Max_mW # Calculate max HD percentage available max_hd_percentage = (self._mw_hd[color] - self._mw_norm[color])/self._mw_norm[color] # Response from /color: First 1000 is for 0 -> 100%, # Second 1000 is for 100% -> Max HD% hd_in_use = (intensity - 1000) / 1000 # Calculate total current percentage return 100 + (max_hd_percentage * hd_in_use * 100)
[docs] def convert_to_mw(self, color, intensity): """Convert a given AI API native intensity value to the mWatt value. An input intesnity (0-2000), is converted to the equivalent mWatt value for that color channel, on the specified device. This will differ for each color channel and device type. :param color: the specified color to convert :type color: str :param intensity: the specified color intensity (0-2000) :type intensity: int :returns: the resulting mWatt value, for the given intensity :rtype: float """ if intensity < 0 or intensity > 2000: raise ValueError("intensity must be between 0 and 2000") elif intensity <= 1000: return self._mw_norm[color] * (intensity/1000) else: # intensity - 1000 # HD mW in use = (HD Max mW - Norm Max mW) * ---------------- # 1000 hd_in_use = (intensity - 1000)/1000 hd_mw_in_use = hd_in_use * (self._mw_hd[color] - self._mw_norm[color]) return self._mw_norm[color] + hd_mw_in_use
[docs]class AquaIPy: """A class that exposes the AquaIllumination Lights API.""" # pylint: disable=too-many-instance-attributes # All attributes are required, in this case. # pylint: disable=too-many-public-methods def __init__(self, name=None, session=None, loop=None): """Initialise class, with an optional instance name. :param name: Instance name, not currently used for anything. :type name: str """ self._host = None self._base_path = None self._mac_addr = None self._name = name self._product_type = None self._firmware_version = None self._primary_device = None self._other_devices = [] self._loop = loop self._loop_is_local = True if self._loop is None: try: self._loop = asyncio.get_event_loop() self._loop_is_local = False except RuntimeError: self._create_new_event_loop() if self._loop.is_closed(): self._create_new_event_loop() if session is None: self._session = aiohttp.ClientSession() self._session_is_local = True else: self._session = session self._session_is_local = False
[docs] def connect(self, host, check_firmware_support=True): """Connect **AquaIPy** instance to a specifed AI light, synchronously. :param host: Hostname/IP of AI light, for paired lights this should be the parent light. :param check_firmware_support: Set to False to skip the firmware check :type check_firmware_support: bool .. note:: It is **NOT** recommended to set *check_firmware_support=False*. Do so at your own risk! :raises FirmwareError: If the firmware version is unsupported. :raises ConnError: If unable to connect to specified AI light. :raises MustBeParentError: the specified host must be the parent light, if there are multiple lights linked. :Example: >>> from aquaipy import AquaIPy >>> ai = AquaIPy() >>> ai.connect("192.168.1.1") """ return self._loop.run_until_complete( self.async_connect(host, check_firmware_support))
[docs] async def async_connect(self, host, check_firmware_support=True): """Connect **AquaIPy** instance to a specified AI light. Also verifies connectivity and firmware version support. :param host: Hostname/IP of AI light, for paired lights this should be the parent light. :param check_firmware_support: Set to False to skip the firmware check :type check_firmware_support: bool .. note:: It is **NOT** recommended to set *check_firmware_support=False*. Do so at your own risk! :raises FirmwareError: If the firmware version is unsupported. :raises ConnError: If unable to connect to specified AI light. :raises MustBeParentError: the specified host must be the parent light, if there are multiple lights linked. :Example: >>> from aquaipy import AquaIPy >>> ai = AquaIPy() >>> await ai.async_connect("192.168.1.1") """ self._host = host self._base_path = 'http://' + host + '/api' await self._async_setup_device_details(check_firmware_support)
[docs] def close(self): """Clean-up and close the underlying async dependancies.. .. note:: There is no async method, as it is assumed if you are using async functions, you will use your own event loop and aiohttp.ClientSession and pass them in. This will close the client session and event loop, if they were created by this object, when it was initialised. """ self._base_path = None if self._session_is_local: self._loop.run_until_complete(self._session.close()) if self._loop_is_local: self._loop.stop() pending_tasks = asyncio.Task.all_tasks() self._loop.run_until_complete(asyncio.gather(*pending_tasks)) self._loop.close()
@property def mac_addr(self): """Get connected devices Mac Address/Serial Number. :returns: device mac address/serial number :rtype: str """ return self._mac_addr @property def name(self): """Get device name. :returns: device name :rtype: str """ return self._name @property def product_type(self): """Get product type. :returns: product type :rtype: str """ return self._product_type @property def supported_firmware(self): """Check if current firmware is supported. :returns: status of firmware support :rtype: bool """ return (StrictVersion(self._firmware_version) <= StrictVersion(MAX_SUPPORTED_AI_FIRMWARE_VERSION)) and \ (StrictVersion(self._firmware_version) >= StrictVersion(MIN_SUPPORTED_AI_FIRMWARE_VERSION)) @property def base_path(self): """Get base path of the AI API. :returns: base path :rtype: str """ return self._base_path @property def firmware_version(self): """Get firmware version. :returns: firmware version :rtype: str """ return self._firmware_version ################## # Internal Methods ################## def _validate_connection(self): """Verify connection, raise Error if not available.""" if self._base_path is None: raise ConnError("Error connecting to host", self._host) def _create_new_event_loop(self): """Create a new asyncio event loop.""" self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._loop_is_local = True async def _async_setup_device_details(self, check_firmware_support): """Verify connection to the device and populate device attributes.""" r_data = None try: path = "{0}/{1}".format(self._base_path, "identity") async with self._session.get(path) as resp: r_data = await resp.json() except Exception: self._base_path = None import traceback traceback.print_exc() raise ConnError("Unable to connect to host", self._host) if r_data['response_code'] != 0: self._base_path = None raise ConnError( "Error getting response for device identity", self._host) self._mac_addr = r_data['serial_number'] self._firmware_version = r_data['firmware'] self._product_type = r_data['product'] if check_firmware_support and not self.supported_firmware: self._base_path = None raise FirmwareError( "Support is not available for this version of the " + "AquaIllumination firmware yet.", self._firmware_version) if r_data['parent'] != "": self._base_path = None raise MustBeParentError( "Connected to non-parent device", r_data['parent']) await self._async_get_devices() async def _async_get_devices(self): """Populate the device attributes of the current class instance.""" path = "{0}/{1}".format(self._base_path, "power") async with self._session.get(path) as resp: r_data = await resp.json() if r_data['response_code'] != 0: self._base_path = None raise ConnError( "Unable to retrieve device details", self._host) self._primary_device = None self._other_devices = [] for device in r_data['devices']: temp = HDDevice(device, self.mac_addr) if temp.is_primary: self._primary_device = temp else: self._other_devices.append(temp) async def _async_get_brightness(self): """Get raw intensity values back from API.""" self._validate_connection() path = "{0}/{1}".format(self._base_path, "colors") async with self._session.get(path) as resp: r_data = await resp.json() if r_data["response_code"] != 0: return Response.Error, None del r_data["response_code"] return Response.Success, r_data async def _async_set_brightness(self, body): """Set raw intensity values, via AI API.""" self._validate_connection() path = "{0}/{1}".format(self._base_path, "colors") async with self._session.post(path, json=body) as resp: r_data = await resp.json() if r_data["response_code"] != 0: return Response.Error return Response.Success ####################################################### # Get/Set Manual Control (ie. Not using light schedule) #######################################################
[docs] def get_schedule_state(self): """Check if light schedule is enabled/disabled, synchronously. :returns: Schedule Enabled (*True*) / Schedule Disabled (*False*) or *None* if there's an error :rtype: bool :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ return self._loop.run_until_complete(self.async_get_schedule_state())
[docs] async def async_get_schedule_state(self): """Check if light schedule is enabled/disabled. :returns: Schedule Enabled (*True*) / Schedule Disabled (*False*) or *None* if there's an error :rtype: bool :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ self._validate_connection() path = "{0}/{1}".format(self._base_path, "schedule/enable") async with self._session.get(path) as resp: r_data = await resp.json() if r_data is None or r_data["response_code"] != 0: return None return r_data["enable"]
[docs] def set_schedule_state(self, enable): """Enable/Disable the light schedule, synchronously. :param enable: Schedule Enable (*True*) / Schedule Disable (*False*) :type enable: bool :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ return self._loop.run_until_complete( self.async_set_schedule_state(enable))
[docs] async def async_set_schedule_state(self, enable): """Enable/disable the light schedule. :param enable: Schedule Enable (*True*) / Schedule Disable (*False*) :type enable: bool :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ self._validate_connection() data = {"enable": enable} path = "{0}/{1}".format(self._base_path, "schedule/enable") async with self._session.put(path, data=json.dumps(data)) as resp: r_data = await resp.json() if r_data is None: return Response.Error if r_data['response_code'] != 0: return Response.Error return Response.Success
########################### # Color Control / Intensity ###########################
[docs] def get_colors(self): """Get the list of valid colors for other methods, synchronously. :returns: list of valid colors or *None* if there's an error :rtype: list( color_1..color_n ) or None :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ return self._loop.run_until_complete(self.async_get_colors())
[docs] async def async_get_colors(self): """Get the list of valid colors to pass to other colors methods. :returns: list of valid colors or *None* if there's an error :rtype: list( color_1..color_n ) or None :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``async_connect()`` has failed """ colors = [] resp, data = await self._async_get_brightness() if resp != Response.Success: return None for color in data: colors.append(color) return colors
[docs] def get_colors_brightness(self): """Get the current brightness of all color channels, synchronously. :returns: dictionary of color and brightness percentages, or *None* if there's an error :rtype: dict( color_1=percentage_1..color_n=percentage_n ) or None :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ return self._loop.run_until_complete( self.async_get_colors_brightness())
[docs] async def async_get_colors_brightness(self): """Get the current brightness of all color channels. :returns: dictionary of color and brightness percentages, or *None* if there's an error :rtype: dict( color_1=percentage_1..color_n=percentage_n ) or None :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ colors = {} # Get current brightness, for each colour channel resp_b, brightness = await self._async_get_brightness() if resp_b != Response.Success: return None for color, value in brightness.items(): colors[color] = self._primary_device.convert_to_percentage( color, value) return colors
[docs] def set_colors_brightness(self, colors): """Set all colors to the specified color percentage, synchronously. :param colors: dictionary of colors and percentage values :type colors: dict( color_1=percentage_1..color_n=percentage_n ) :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ return self._loop.run_until_complete( self.async_set_colors_brightness(colors))
[docs] async def async_set_colors_brightness(self, colors): """Set all colors to the specified color percentage. .. note:: All colors returned by *get_colors()* must be specified. :param colors: dictionary of colors and percentage values :type colors: dict( color_1=percentage_1..color_n=percentage_n ) :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ # Need to add better validation here if len(colors) < len(await self.async_get_colors()): return Response.AllColorsMustBeSpecified intensities = {} mw_value = 0 for color, value in colors.items(): intensities[color] = self._primary_device.convert_to_intensity( color, value) mw_value += self._primary_device.convert_to_mw( color, intensities[color]) if mw_value > self._primary_device.max_mw: print("Primary Device: mWatts exceeded - max: {} specified: {}" .format(str(self._primary_device.max_mw), str(mw_value))) return Response.PowerLimitExceeded # Check if planned intensities will exceed any child devices max_mW # (only issue if there are two different device types paired) for device in self._other_devices: mw_value = 0 for color, value in intensities.items(): mw_value += device.convert_to_mw(color, value) if mw_value > device.max_mw: print("mWatts exceeded - device: {} max: {} specified: {}" .format(device.mac_address, str(device.max_mw), str(mw_value))) return Response.PowerLimitExceeded return await self._async_set_brightness(intensities)
[docs] def patch_colors_brightness(self, colors): """Set specified colors to the given percentage values, sychronously. :param colors: Specify just the colors that should be updated :type colors: dict( color_1=percentage_1..color_n=percentage_n ) :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ return self._loop.run_until_complete( self.async_patch_colors_brightness(colors))
[docs] async def async_patch_colors_brightness(self, colors): """Set specified colors to the given percentage brightness. :param colors: Specify just the colors that should be updated :type colors: dict( color_1=percentage_1..color_n=percentage_n ) :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed """ if len(colors) < 1: return Response.InvalidData brightness = await self.async_get_colors_brightness() if brightness is None: return Response.Error for color, value in colors.items(): brightness[color] = value return await self.async_set_colors_brightness(brightness)
[docs] def update_color_brightness(self, color, value): """Update a given color by the specified brightness, synchronously. :param color: color to change :param value: value to change percentage by :type color: str :type value: float :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed. """ return self._loop.run_until_complete( self.async_update_color_brightness(color, value))
[docs] async def async_update_color_brightness(self, color, value): """Update a given color by the specified brightness percentage. :param color: color to change :param value: value to change percentage by :type color: str :type value: float :returns: Response.Success if it works, or a value indicating the error, if there is an issue. :rtype: Response :raises ConnError: if there is no valid connection to a device, usually because a previous call to ``connect()`` has failed. """ if not color: return Response.InvalidData # No change required if value == 0: return Response.Success brightness = await self.async_get_colors_brightness() if brightness is None: return Response.Error brightness[color] += value return await self.async_set_colors_brightness(brightness)