Source code for iotawatt_access

# -*- coding: utf-8 -*-
# Copyright (C) 2020-2025 by Brendt Wohlberg
# All rights reserved. GPL v2 License.

"""Classes for accessing IotaWatt status and data via the Query API."""

import os
from datetime import datetime
from json import JSONDecodeError

import numpy as np
import requests
from dateutil.parser import parse
from dateutil.tz import tzlocal, tzutc
from requests import ConnectionError
from requests.auth import HTTPDigestAuth
from requests.exceptions import RequestException

__version__ = "0.0.1.dev0"


[docs] def device_login(): """Get device URL, username, and password. Return device URL, username, and password from environment variables "IOTAWATT_URL", "IOTAWATT_USERNAME", and "IOTAWATT_PASSWORD" respectively. If not set, return defaults values "http://iotawatt.local", "admin", and ``None`` respectively. """ if "IOTAWATT_URL" in os.environ: url = os.environ["IOTAWATT_URL"] else: url = "http://iotawatt.local" if "IOTAWATT_USERNAME" in os.environ: user = os.environ["IOTAWATT_USERNAME"] else: user = "admin" if "IOTAWATT_PASSWORD" in os.environ: pwd = os.environ["IOTAWATT_PASSWORD"] else: pwd = None return url, user, pwd
[docs] def data_path(): """Get path for data store. Return the data path from environment variable "IOTAWATT_DATA_PATH" if set. Otherwise return default value "~/IotaWatt_Data". """ if "IOTAWATT_DATA_PATH" in os.environ: dpath = os.environ["IOTAWATT_DATA_PATH"] else: dpath = "~/IotaWatt_Data" return dpath
[docs] def str_to_datetime(timestr, utc=False): """Convert a string specifying a date and time into a datetime object. Args: timestr (str): String specifying a date and time. utc (bool, optional): Flag indicating whether default time zone should be set to UTC. Returns: datetime: A datetime object representation of the date/time. """ dt = parse(timestr) if dt.tzinfo is None: if utc: dt = dt.replace(tzinfo=tzutc()) else: dt = dt.replace(tzinfo=tzlocal()) return dt
[docs] def str_to_timestamp(timestr, utc=False): """Convert a string specifying a date and time into a Unix timestamp. Args: timestr (str): String specifying a date and time. utc (bool, optional): Flag indicating whether default time zone should be set to UTC. Returns: int: Unix timestamp representation of the date/time. """ return str_to_datetime(timestr, utc=utc).timestamp()
[docs] def timestamp_to_datetime(ts, utc=False): """Convert a Unix timestamp into a datetime object. Args: ts (int): Unix timestamp. utc (bool, optional): Flag indicating whether time zone should remain as UTC or be converted to local time. Returns: datetime: A datetime object representation of the date/time. """ if utc: dt = datetime.fromtimestamp(ts, tz=tzutc()) else: dt = datetime.fromtimestamp(ts, tz=tzlocal()) return dt
[docs] def timestamp_to_str(ts, utc=False, notz=False): """Convert a Unix timestamp into an ISO-format date/time string. Args: ts (int): Unix timestamp utc (bool, optional): Flag indicating whether time zone should remain as UTC or be converted to local time. notz (bool, optional): Flag indicating whether the time zone should be included in the returned string. Returns: string: ISO format string representation of the date/time. """ dt = timestamp_to_datetime(ts, utc=utc) if notz: dt = dt.replace(tzinfo=None) return dt.isoformat()
def _list_check(lst, rowlen): """Check that all rows in a nested list have the required length. Args: lst (list): List of lists. rowlen (int): Expected length of each inner list. Returns: ``None`` or int: ``None`` if all rows have required length, otherwise index of first row with incorrect length. """ ret = None for n, row in enumerate(lst): if len(row) != rowlen: ret = n return ret def _dict_get_str(d, key, fmt, cnv): """Get a string representation of a dictionary value. Args: d (dict): Dictionary to be accessed. key: Key of dict value to be retrieved. fmt (dict): Dictionary of string formatting specifications. cnv (dict): Dictionary of value mapping functions. Returns: str: String representation of retrieved dict value. """ if key in d: v = d[key] if key in cnv: v = cnv[key](v) if key in fmt: return f"{v:{fmt[key]}}" else: return str(v) return ""
[docs] def dict_list_to_str(dl, keys=None, fmt=None, cnv=None): """Construct a tabular representation of a list of dicts. Args: dl (list): List of dicts to be tabulated. keys (list): List of dict keys to be tabulated. fmt (dict): Dictionary of string formatting specifications. cnv (dict): Dictionary of value mapping functions. Returns: str: Tabular representation of list of dictionaries as string. """ if keys is None: keys = dl[0].keys() if fmt is None: fmt = {} if cnv is None: cnv = {} fdval = lambda d, key: _dict_get_str(d, key, fmt, cnv) ncol = len(keys) # number of columns colw = {k: len(str(k)) for k in keys} # column widths # Column width is the maximum of key width and all value widths for d in dl: colw = {k: max(len(fdval(d, k)), colw[k]) for k in keys} # Construct table header row vsep = "-" * (sum(colw.values()) + 2 * (ncol - 1)) + "\n" s = vsep s += " ".join([k.capitalize().ljust(colw[k]) for k in keys]) + "\n" s += vsep # Construct table content for d in dl: s += " ".join([fdval(d, k).ljust(colw[k]) for k in keys]) + "\n" s += vsep return s
[docs] def dict_to_str(d, fmt=None, cnv=None): """Construct a tabular representation of a dict. Args: d (dict): Dictionary to be tabulated. fmt (dict): Dictionary of string formatting specifications. cnv (dict): Dictionary of value mapping functions. Returns: str: Tabular representation of dictionary as string. """ if fmt is None: fmt = {} if cnv is None: cnv = {} klen = max([len(k) for k in d.keys()]) # max key length vlen = max( [len(_dict_get_str(d, k, fmt, cnv)) for k in d.keys()] ) # max value length vsep = "-" * (klen + 2 + vlen) + "\n" # separator bar s = vsep for k in d.keys(): v = _dict_get_str(d, k, fmt, cnv) s += k.ljust(klen) + ": " + v.ljust(vlen) + "\n" s += vsep return s
[docs] class IotaWattAPI: """Access IotaWatt status and recorded data via the Query API. Attributes: auth (HTTPDigestAuth): Authentication for access to Query API. channels (list): List of IotaWatt channel names. inputs (list): List of IotaWatt input channel names. outputs (list): List of IotaWatt output channel names. url (str): URL for access to Query API. """ def __init__( self, url="http://iotawatt.local", user="admin", pwd=None, debug=False, dbgfile="iotawatt_api_debug.log", ): """Initialize IotaWattAPI object. Args: url (str, optional): IotaWatt device URL. user (str, optional): IotaWatt API username. pwd (None, optional): IotaWatt API password. """ self.url = url if self.url[-1] != "/": self.url += "/" if pwd is None: self.auth = None else: self.auth = HTTPDigestAuth(user, pwd) self.debug = debug self.dbgfile = dbgfile inout = self.get_channel_info() numout = len(self.get_status(stype="outputs")) self.channels = [c["name"] for c in inout] # Make the assumption that the voltage input is always the first # one in the list self.v_inputs = self.channels[0:1] self.i_inputs = self.channels[1:-numout] self.outputs = self.channels[-numout:]
[docs] def get_channel_info(self, retry=3): """Get details of IotaWatt device input and output channels. Args: retry (int, optional): Number of retries allowed. Returns: list of dict: Channel details. """ return self._query("query?show=series", retry=retry)["series"]
[docs] def get_status(self, stype="inputs", retry=3): """Get IotaWatt device status. Valid values for stype: inputs, outputs, wifi, stats, datalogs Args: stype (str, optional): Status type. retry (int, optional): Number of retries allowed. Returns: dict or list of dict: Status details. """ json = self._query("status?" + stype, retry=retry) if json: return json[stype] else: return []
[docs] def get_channel_data( self, begin, end, channels=None, interval=5, frcdig=3, retry=3, callback=None ): """Get recorded data from IotaWatt device. Args: begin (str): Start date/time of channel data. end (str): End date/time of channel data. channels (list or ``None``, optional): List of current/power channel names. If ``None``, download all of them. interval (int, optional): Sampling interval in seconds (must be a multiple of 5). frcdig (int, optional): Number of fractional digits in numeric values. retry (int, optional): Number of retries allowed. callback (func or ``None``, optional): Callback function supporting data download progress monitoring. Returns: IotaWattData: Channel data and metadata. """ if channels is None: channels = self.i_inputs datcol = ( ["time.utc.unix"] + ["%s.volts.d%d" % (c, frcdig) for c in self.v_inputs] + ["%s.hz.d%d" % (c, frcdig) for c in self.v_inputs] + ["%s.watts.d%d" % (c, frcdig) for c in channels] + ["%s.amps.d%d" % (c, frcdig) for c in channels] ) select = "[" + ",".join(datcol) + "]" # Estimate of row length of response to query rowsize = ( 14 + len(self.v_inputs) * 2 * (7 + frcdig) + len(channels) * 2 * (7 + frcdig) ) # Maximum response size in bytes maxsize = 100000 # Maximum number of rows allowed per query queryrows = maxsize // rowsize # Begin and end dates as Unix timestamps tbegin = str_to_timestamp(begin) tend = str_to_timestamp(end) # Set t0 and t1 to point to start and end of time interval # for initial query t0 = tbegin t1 = min(tend, t0 + interval * queryrows) # Set number of retries for this value of t0 t0retry = 0 cdat = None # Repeat queries until start of current query interval reaches # end of requested interval while t0 < tend: # Call status reporting callback function if defined if callback is not None: callback(tbegin, tend, t0, t1) # Construct query and get response query = ( "select=%s&begin=%10d&end=%10d&group=%ds&format=json&" "missing=skip" ) % (select, t0, t1, interval) json = self._query("query?" + query, retry=retry) if len(json) == 0: # If the request does not result in any errors but has # zero length, assume that the corresponding data is # missing from the device history t0 = t1 t1 = min(tend, t0 + interval * queryrows) continue # Check JSON response list for rows with incorrect length # (not clear why these are encountered occasionally) rowidx = _list_check(json, len(datcol)) if rowidx is None: # No row length errors cblk = np.array(json) t0 = t1 t0retry = 0 else: # Row length error encountered RuntimeWarning("Bad record received for time %d" % json[rowidx][0]) if rowidx > 0: # Row length error is not in first row: assume that # the preceding rows are valid and prepare to retry # starting at the problematic row cblk = np.array(json[0 : rowidx - 1]) t0 = json[rowidx][0] t0retry = 0 else: # Row length error in in the first row: discard the # entire list of rows and prepare to retry with the # same starting time as the previous query cblk = None t0retry += 1 if cblk is None: # Allow only a limited number of retries with the same # value of t0 if t0retry > 2: raise RuntimeError( "Too many failures in accessing" " data block starting at time %d" % t0 ) else: # Set or extend the array accumulating the full set of # rows obtained over multiple requests if cdat is None: cdat = cblk else: cdat = np.vstack((cdat, cblk)) # Update end of query interval t1 = min(tend, t0 + interval * queryrows) return IotaWattData( begin, end, self.v_inputs, channels, frcdig, cols=datcol[1:], data=cdat )
def _query(self, query, retry=3): """Send query via device API and get response. Args: query (str): Query string. retry (int, optional): Number of retries allowed. Returns: dict: JSON formatted response. Raises: ConnectionError: Error in connecting to device. RuntimeError: Maxumum retries exceeded, or error in parsing device response. """ req = self.url + query if self.debug: self._debug_write("REQUEST\n%s\n" % req) response = None for reqtry in range(retry): jsonfail = False try: response = requests.get(req, auth=self.auth) except RequestException as ex: RuntimeWarning(ex) continue if self.debug: self._debug_write("STATUS CODE: %d\n" % response.status_code) if response.status_code == 200: if self.debug: self._debug_write("RESPONSE\n%s\n" % response.text) try: json = response.json() except JSONDecodeError: jsonfail = True RuntimeWarning("JSON parse error") if self.debug: self._debug_write("JSON PARSE ERROR\n") else: if self.debug: self._debug_write("JSON PARSE SUCCESS\n") break else: RuntimeWarning("Connection error code %d" % response.status_code) if reqtry >= retry: raise RuntimeError( "Maximum number of retries exceeded (%d tries)" % (reqtry + 1) ) if response is None: raise RuntimeError("Unrecognized failure") if response.status_code != 200: raise ConnectionError("Connection error code %d" % response.status_code) if jsonfail: raise RuntimeError( "Error parsing JSON response (length %d) to " "query %s" % (len(response.text), query) ) if self.debug: self._debug_write("JSON\n%s\n\n" % json) return json def _debug_write(self, text): """Write a transaction log for debugging purposes. Args: text (str): Text to add to transaction log. """ f = open(self.dbgfile, "a+") f.write(text) f.close()
[docs] class IotaWattData: """Dataset downloaded from IotaWatt device. Attributes: begin (str): Start date/time of channel data. vchannels (list): List of voltage channel names. ichannels (list): List of current/power channel names. data (numpy array): Array of channel data. end (str): End date/time of channel data. frcdig (int): Number of fractional digits in numeric values. time (numpy array): Array of sample times as Unix timestamps. units (str): Units for channel data. """ def __init__( self, begin, end, vchannels, ichannels, frcdig, cols=None, data=None, time=None ): """Initialize IotaWattData object. Args: begin (str): Start date/time of channel data. end (str): End date/time of channel data. vchannels (list): List of voltage channel names. ichannels (list): List of current/power channel names. frcdig (int): Number of fractional digits in numeric values. cols (list or ``None``, optional): List of data column descriptions. data (numpy array or ``None``, optional): Array of channel data. time (numpy array or ``None``, optional): Array of sample times. """ self.begin = begin self.end = end self.vchannels = vchannels self.ichannels = ichannels self.frcdig = frcdig self.cols = cols if data is not None and time is not None: self.time = time self.data = data else: if data is None: self.time = np.zeros((0,), dtype=int) self.data = np.zeros((0, len(cols)), dtype=np.float32) else: self.time = data[:, 0].astype(int) self.data = data[:, 1:].astype(np.float32)
[docs] def get_channel_data(self, name, units): """Get data from channel `name` in units `units`. Args: name (str): Channel name. units (str): Data units. Valid values are 'volts' and 'hertz' for voltage channels and 'watts', 'amps', 'wh', 'va', 'var', 'varh', 'pf' for current/power channels. Returns: numpy array: Channel data. """ if name not in self.vchannels and name not in self.ichannels: raise ValueError("No data for channel %s" % name) if name in self.vchannels: # Voltage channel selected if units not in ["volts", "hertz"]: raise ValueError("Unrecognized units %s" % units) if units == "volts": c = self.data[:, 0] else: c = self.data[:, 1] else: # Current/power channel selected if units not in ["watts", "amps", "wh", "va", "var", "varh", "pf"]: raise ValueError("Unrecognized units %s" % units) idx = self.ichannels.index(name) v = self.data[:, 0] w = self.data[:, idx + 2] a = self.data[:, idx + len(self.ichannels) + 2] if units == "watts": c = w elif units == "amps": c = a elif units == "wh": c = w * (5.0 / 3600.0) elif units == "va": c = v * a elif units == "var": c = np.sqrt((v * a) ** 2 - w**2) elif units == "varh": vah = v * a * (5.0 / 3600.0) wh = w * (5.0 / 3600.0) c = np.sqrt(vah**2 - wh**2) else: # units == 'pf' c = w / (v * a) return c
[docs] def save(self, filename): """Save data in numpy NPZ format. Args: filename (str): Filename. """ np.savez( filename, time=self.time, data=self.data, cols=self.cols, begin=self.begin, end=self.end, vchannels=self.vchannels, ichannels=self.ichannels, frcdig=self.frcdig, )
[docs] @classmethod def load(cls, filename): """Load data from numpy NPZ format file. Args: filename (str): Filename. Returns: IotaWattData : Object data retrieved from file. """ npz = np.load(filename) return cls( npz["begin"].item(), npz["end"].item(), list(npz["vchannels"]), list(npz["ichannels"]), npz["frcdig"], list(npz["cols"]), npz["data"], npz["time"], )