Source code for pyarlo.base_station

# coding: utf-8
"""Generic Python Class file for Netgear Arlo Base Station module."""
import json
import threading
import logging
import time
import base64
import zlib
import sseclient
from pyarlo.const import (
    ACTION_BODY, SUBSCRIBE_ENDPOINT, UNSUBSCRIBE_ENDPOINT,
    FIXED_MODES, NOTIFY_ENDPOINT, RESOURCES)
from pyarlo.utils import assert_is_dict

_LOGGER = logging.getLogger(__name__)

REFRESH_RATE = 15


[docs]class ArloBaseStation(object): """Arlo Base Station module implementation.""" def __init__(self, name, attrs, session_token, arlo_session, refresh_rate=REFRESH_RATE): """Initialize Arlo Base Station object. :param name: Base Station name :param attrs: Attributes :param session_token: Session token passed by camera class :param arlo_session: PyArlo shared session :param refresh_rate: Attributes refresh rate. Defaults to 15 """ self.name = name self._attrs = attrs self._session = arlo_session self._session_token = session_token self._available_modes = None self._available_mode_ids = None self._camera_properties = None self._camera_extended_properties = None self._ambient_sensor_data = None self._last_refresh = None self._refresh_rate = refresh_rate self.__sseclient = None self.__subscribed = False self.__events = [] self.__event_handle = None self._attrs = assert_is_dict(self._attrs) def __repr__(self): """Representation string of object.""" return "<{0}: {1}>".format(self.__class__.__name__, self.name)
[docs] def thread_function(self): """Thread function.""" self.__subscribed = True url = SUBSCRIBE_ENDPOINT + "?token=" + self._session_token data = self._session.query(url, method='GET', raw=True, stream=True) if not data or not data.ok: _LOGGER.debug("Did not receive a valid response. Aborting..") return None self.__sseclient = sseclient.SSEClient(data) try: for event in (self.__sseclient).events(): if not self.__subscribed: break data = json.loads(event.data) if data.get('status') == "connected": _LOGGER.debug("Successfully subscribed this base station") elif data.get('action'): action = data.get('action') resource = data.get('resource') if action == "logout": _LOGGER.debug("Logged out by some other entity") self.__subscribed = False break elif action == "is" and "subscriptions/" not in resource: self.__events.append(data) self.__event_handle.set() except TypeError as error: _LOGGER.debug("Got unexpected error: %s", error) return None return True
def _get_event_stream(self): """Spawn a thread and monitor the Arlo Event Stream.""" self.__event_handle = threading.Event() event_thread = threading.Thread(target=self.thread_function) event_thread.start() def _subscribe_myself(self): """Subscribe this base station for all events.""" return self.publish( action='set', resource='subscribe', mode=None, publish_response=False) def _unsubscribe_myself(self): """Unsubscribe this base station for all events.""" url = UNSUBSCRIBE_ENDPOINT return self._session.query(url, method='GET', raw=True, stream=False) def _close_event_stream(self): """Stop the Event stream thread.""" self.__subscribed = False del self.__events[:] self.__event_handle.clear()
[docs] def publish_and_get_event(self, resource): """Publish and get the event from base station.""" l_subscribed = False this_event = None if not self.__subscribed: self._get_event_stream() self._subscribe_myself() l_subscribed = True status = self.publish( action='get', resource=resource, mode=None, publish_response=False) if status == 'success': i = 0 while not this_event and i < 2: self.__event_handle.wait(5.0) self.__event_handle.clear() _LOGGER.debug("Instance %s resource: %s", str(i), resource) for event in self.__events: if event['resource'] == resource: this_event = event self.__events.remove(event) break i = i + 1 if l_subscribed: self._unsubscribe_myself() self._close_event_stream() l_subscribed = False return this_event
[docs] def publish( self, action='get', resource=None, camera_id=None, mode=None, publish_response=None, properties=None): """Run action. :param method: Specify the method GET, POST or PUT. Default is GET. :param resource: Specify one of the resources to fetch from arlo. :param camera_id: Specify the camera ID involved with this action :param mode: Specify the mode to set, else None for GET operations :param publish_response: Set to True for SETs. Default False """ url = NOTIFY_ENDPOINT.format(self.device_id) body = ACTION_BODY.copy() if properties is None: properties = {} if resource: body['resource'] = resource if action == 'get': body['properties'] = None else: # consider moving this logic up a layer if resource == 'schedule': properties.update({'active': True}) elif resource == 'subscribe': body['resource'] = "subscriptions/" + \ "{0}_web".format(self.user_id) dev = [] dev.append(self.device_id) properties.update({'devices': dev}) elif resource == 'modes': available_modes = self.available_modes_with_ids properties.update({'active': available_modes.get(mode)}) elif resource == 'privacy': properties.update({'privacyActive': not mode}) body['resource'] = "cameras/{0}".format(camera_id) body['action'] = action body['properties'] = properties body['publishResponse'] = publish_response body['from'] = "{0}_web".format(self.user_id) body['to'] = self.device_id body['transId'] = "web!{0}".format(self.xcloud_id) _LOGGER.debug("Action body: %s", body) ret = \ self._session.query(url, method='POST', extra_params=body, extra_headers={"xCloudId": self.xcloud_id}) if ret and ret.get('success'): return 'success' return None
# pylint: disable=invalid-name @property def device_id(self): """Return device_id.""" if self._attrs is not None: return self._attrs.get('deviceId') return None @property def device_type(self): """Return device_type.""" if self._attrs is not None: return self._attrs.get('deviceType') return None @property def model_id(self): """Return model_id.""" if self._attrs is not None: return self._attrs.get('modelId') return None @property def hw_version(self): """Return hardware version.""" if self._attrs is not None: return self._attrs.get('properties').get('hwVersion') return None @property def timezone(self): """Return timezone.""" if self._attrs is not None: return self._attrs.get('properties').get('olsonTimeZone') return None @property def unique_id(self): """Return unique_id.""" if self._attrs is not None: return self._attrs.get('uniqueId') return None @property def serial_number(self): """Return serial number.""" if self._attrs is not None: return self._attrs.get('properties').get('serialNumber') return None @property def user_id(self): """Return userID.""" if self._attrs is not None: return self._attrs.get('userId') return None @property def user_role(self): """Return userRole.""" if self._attrs is not None: return self._attrs.get('userRole') return None @property def xcloud_id(self): """Return X-Cloud-ID attribute.""" if self._attrs is not None: return self._attrs.get('xCloudId') return None @property def last_refresh(self): """Return last_refresh attribute.""" return self._last_refresh @property def refresh_rate(self): """Return refresh_rate attribute.""" return self._refresh_rate @refresh_rate.setter def refresh_rate(self, value): """Override the refresh_rate attribute.""" if isinstance(value, (int, float)): self._refresh_rate = value @property def available_modes(self): """Return list of available mode names.""" if not self._available_modes: modes = self.available_modes_with_ids if not modes: return None self._available_modes = list(modes.keys()) return self._available_modes @property def available_modes_with_ids(self): """Return list of objects containing available mode name and id.""" if not self._available_mode_ids: all_modes = FIXED_MODES.copy() self._available_mode_ids = all_modes modes = self.get_available_modes() try: if modes: # pylint: disable=consider-using-dict-comprehension simple_modes = dict( [(m.get("type", m.get("name")), m.get("id")) for m in modes] ) all_modes.update(simple_modes) self._available_mode_ids = all_modes except TypeError: _LOGGER.debug("Did not receive a valid response. Passing..") return self._available_mode_ids @property def available_resources(self): """Return list of available resources.""" return list(RESOURCES.keys()) @property def mode(self): """Return current mode key.""" if self.is_in_schedule_mode: return "schedule" resource = "modes" mode_event = self.publish_and_get_event(resource) if mode_event: properties = mode_event.get('properties') active_mode = properties.get('active') modes = properties.get('modes') if not modes: return None for mode in modes: if mode.get('id') == active_mode: return mode.get('type') \ if mode.get('type') is not None else mode.get('name') return None @property def is_in_schedule_mode(self): """Returns True if base_station is currently on a scheduled mode.""" resource = "schedule" mode_event = self.publish_and_get_event(resource) if mode_event and mode_event.get("resource", None) == "schedule": properties = mode_event.get('properties') return properties.get("active", False) return False
[docs] def get_available_modes(self): """Return a list of available mode objects for an Arlo user.""" resource = "modes" resource_event = self.publish_and_get_event(resource) if resource_event: properties = resource_event.get("properties") return properties.get("modes") return None
@property def camera_properties(self): """Return _camera_properties""" if self._camera_properties is None: self.get_cameras_properties() return self._camera_properties
[docs] def get_cameras_properties(self): """Return camera properties.""" resource = "cameras" resource_event = self.publish_and_get_event(resource) if resource_event: self._last_refresh = int(time.time()) self._camera_properties = resource_event.get('properties')
[docs] def get_cameras_battery_level(self): """Return a list of battery levels of all cameras.""" battery_levels = {} if not self.camera_properties: return None for camera in self.camera_properties: serialnum = camera.get('serialNumber') cam_battery = camera.get('batteryLevel') battery_levels[serialnum] = cam_battery return battery_levels
[docs] def get_cameras_signal_strength(self): """Return a list of signal strength of all cameras.""" signal_strength = {} if not self.camera_properties: return None for camera in self.camera_properties: serialnum = camera.get('serialNumber') cam_strength = camera.get('signalStrength') signal_strength[serialnum] = cam_strength return signal_strength
@property def camera_extended_properties(self): """Return _camera_extended_properties.""" if self._camera_extended_properties is None: self.get_camera_extended_properties() return self._camera_extended_properties
[docs] def get_camera_extended_properties(self): """Return camera extended properties.""" resource = 'cameras/{}'.format(self.device_id) resource_event = self.publish_and_get_event(resource) if resource_event is None: return None self._camera_extended_properties = resource_event.get('properties') return self._camera_extended_properties
[docs] def get_speaker_muted(self): """Return whether or not the speaker is muted.""" if not self.camera_extended_properties: return None speaker = self.camera_extended_properties.get('speaker') if not speaker: return None return speaker.get('mute')
[docs] def get_speaker_volume(self): """Return the volume setting of the speaker.""" if not self.camera_extended_properties: return None speaker = self.camera_extended_properties.get('speaker') if not speaker: return None return speaker.get('volume')
[docs] def get_night_light_state(self): """Return the state of the night light (on/off).""" if not self.camera_extended_properties: return None night_light = self.camera_extended_properties.get('nightLight') if not night_light: return None if night_light.get('enabled'): return 'on' return 'off'
[docs] def get_night_light_brightness(self): """Return the brightness (0-255) of the night light.""" if not self.camera_extended_properties: return None night_light = self.camera_extended_properties.get('nightLight') if not night_light: return None return night_light.get('brightness')
@property def properties(self): """Return the base station info.""" resource = "basestation" basestn_event = self.publish_and_get_event(resource) if basestn_event: return basestn_event.get('properties') return None
[docs] def get_cameras_rules(self): """Return the camera rules.""" resource = "rules" rules_event = self.publish_and_get_event(resource) if rules_event: return rules_event.get('properties') return None
[docs] def get_cameras_schedule(self): """Return the schedule set for cameras.""" resource = "schedule" schedule_event = self.publish_and_get_event(resource) if schedule_event: return schedule_event.get('properties') return None
@property def is_motion_detection_enabled(self): """Return Boolean if motion is enabled.""" return self.mode == "armed" @property def ambient_sensor_data(self): """Return _ambient_sensor_data""" if self._ambient_sensor_data is None: self.get_ambient_sensor_data() return self._ambient_sensor_data @property def ambient_temperature(self): """Return the temperature property of the most recent history entry (in degrees celsius)""" return self.get_latest_ambient_sensor_statistic('temperature') @property def ambient_humidity(self): """Return the humidity property of the most recent history entry (in percent)""" return self.get_latest_ambient_sensor_statistic('humidity') @property def ambient_air_quality(self): """Return the air quality property of the most recent history entry (in VOC PPM)""" return self.get_latest_ambient_sensor_statistic('airQuality')
[docs] def get_ambient_sensor_data(self): """Refresh ambient sensor history""" resource = 'cameras/{}/ambientSensors/history'.format(self.device_id) history_event = self.publish_and_get_event(resource) if history_event is None: return None properties = history_event.get('properties') self._ambient_sensor_data = \ ArloBaseStation._decode_sensor_data(properties) return self._ambient_sensor_data
@staticmethod def _decode_sensor_data(properties): """Decode, decompress, and parse the data from the history API""" b64_input = "" for s in properties.get('payload'): # pylint: disable=consider-using-join b64_input += s decoded = base64.b64decode(b64_input) data = zlib.decompress(decoded) points = [] i = 0 while i < len(data): points.append({ 'timestamp': int(1e3 * ArloBaseStation._parse_statistic( data[i:(i + 4)], 0)), 'temperature': ArloBaseStation._parse_statistic( data[(i + 8):(i + 10)], 1), 'humidity': ArloBaseStation._parse_statistic( data[(i + 14):(i + 16)], 1), 'airQuality': ArloBaseStation._parse_statistic( data[(i + 20):(i + 22)], 1) }) i += 22 return points @staticmethod def _parse_statistic(data, scale): """Parse binary statistics returned from the history API""" i = 0 for byte in bytearray(data): i = (i << 8) + byte if i == 32768: return None if scale == 0: return i return float(i) / (scale * 10)
[docs] def get_latest_ambient_sensor_statistic(self, statistic): """Gets the most recent ambient sensor history entry""" if self._ambient_sensor_data is None: self.get_ambient_sensor_data() if self._ambient_sensor_data is None: return None return self._ambient_sensor_data[-1].get(statistic)
[docs] def get_audio_playback_status(self): """Gets the current playback status and available track list""" resource = 'audioPlayback' return self.publish_and_get_event(resource)
DEFAULT_TRACK_ID = '229dca67-7e3c-4a5f-8f43-90e1a9bffc38'
[docs] def play_track(self, track_id=DEFAULT_TRACK_ID, position=0): """Plays a track at the given position.""" self.publish( action='playTrack', resource='audioPlayback/player', publish_response=False, properties={'trackId': track_id, 'position': position} )
[docs] def pause_track(self): """Pauses the currently playing track.""" self.publish( action='pause', resource='audioPlayback/player', publish_response=False )
[docs] def skip_track(self): """Skips to the next track in the playlist.""" self.publish( action='nextTrack', resource='audioPlayback/player', publish_response=False )
[docs] def set_music_loop_mode_continuous(self): """Sets the music loop mode to repeat the entire playlist.""" self.publish( action='set', resource='audioPlayback/config', publish_response=False, properties={'config': {'loopbackMode': 'continuous'}} )
[docs] def set_music_loop_mode_single(self): """Sets the music loop mode to repeat the current track.""" self.publish( action='set', resource='audioPlayback/config', publish_response=False, properties={'config': {'loopbackMode': 'singleTrack'}} )
[docs] def set_shuffle_on(self): """Sets playback to shuffle.""" self.publish( action='set', resource='audioPlayback/config', publish_response=False, properties={'config': {'shuffleActive': True}} )
[docs] def set_shuffle_off(self): """Sets playback to sequential.""" self.publish( action='set', resource='audioPlayback/config', publish_response=False, properties={'config': {'shuffleActive': False}} )
[docs] def set_volume(self, mute=False, volume=50): """Sets the music volume (0-100)""" self.publish( action='set', resource='cameras/{}'.format(self.device_id), publish_response=False, properties={'speaker': {'mute': mute, 'volume': volume}} )
[docs] def set_night_light_on(self): """Turns on the night light.""" self.publish( action='set', resource='cameras/{}'.format(self.device_id), publish_response=False, properties={'nightLight': {'enabled': True}} )
[docs] def set_night_light_off(self): """Turns off the night light.""" self.publish( action='set', resource='cameras/{}'.format(self.device_id), publish_response=False, properties={'nightLight': {'enabled': False}} )
[docs] def set_night_light_brightness(self, brightness=200): """Sets the brightness of the night light (0-255).""" self.publish( action='set', resource='cameras/{}'.format(self.device_id), publish_response=False, properties={'nightLight': {'brightness': brightness}} )
@property def subscribe(self): """Subscribe this session with Arlo system.""" self._get_event_stream() self._subscribe_myself() @property def unsubscribe(self): """Unsubscribe this session.""" self._unsubscribe_myself() self._close_event_stream() @mode.setter def mode(self, mode): """Set Arlo camera mode. :param mode: arm, disarm """ modes = self.available_modes if (not modes) or (mode not in modes): return self.publish( action='set', resource='modes' if mode != 'schedule' else 'schedule', mode=mode, publish_response=True) self.update()
[docs] def set_camera_enabled(self, camera_id, is_enabled): """Turn Arlo camera On/Off. :param mode: True, False """ self.publish( action='set', resource='privacy', camera_id=camera_id, mode=is_enabled, publish_response=True) self.update()
[docs] def update(self): """Update object properties.""" current_time = int(time.time()) last_refresh = 0 if self._last_refresh is None else self._last_refresh if current_time >= (last_refresh + self._refresh_rate): self.get_cameras_properties() self.get_ambient_sensor_data() self.get_camera_extended_properties() self._attrs = self._session.refresh_attributes(self.name) self._attrs = assert_is_dict(self._attrs) _LOGGER.debug("Called base station update of camera properties: " "Scan Interval: %s, New Properties: %s", self._refresh_rate, self.camera_properties)
# vim:sw=4:ts=4:et: