Mercurial > ~darius > hgwebdir.cgi > epro
diff velib_python/ve_utils.py @ 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/ve_utils.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import sys +from traceback import print_exc +from os import _exit as os_exit +from os import statvfs +from subprocess import check_output, CalledProcessError +import logging +import dbus +logger = logging.getLogger(__name__) + +VEDBUS_INVALID = dbus.Array([], signature=dbus.Signature('i'), variant_level=1) + +class NoVrmPortalIdError(Exception): + pass + +# Use this function to make sure the code quits on an unexpected exception. Make sure to use it +# when using GLib.idle_add and also GLib.timeout_add. +# Without this, the code will just keep running, since GLib does not stop the mainloop on an +# exception. +# Example: GLib.idle_add(exit_on_error, myfunc, arg1, arg2) +def exit_on_error(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except: + try: + print ('exit_on_error: there was an exception. Printing stacktrace will be tried and then exit') + print_exc() + except: + pass + + # sys.exit() is not used, since that throws an exception, which does not lead to a program + # halt when used in a dbus callback, see connection.py in the Python/Dbus libraries, line 230. + os_exit(1) + + +__vrm_portal_id = None +def get_vrm_portal_id(): + # The original definition of the VRM Portal ID is that it is the mac + # address of the onboard- ethernet port (eth0), stripped from its colons + # (:) and lower case. This may however differ between platforms. On Venus + # the task is therefore deferred to /sbin/get-unique-id so that a + # platform specific method can be easily defined. + # + # If /sbin/get-unique-id does not exist, then use the ethernet address + # of eth0. This also handles the case where velib_python is used as a + # package install on a Raspberry Pi. + # + # On a Linux host where the network interface may not be eth0, you can set + # the VRM_IFACE environment variable to the correct name. + + global __vrm_portal_id + + if __vrm_portal_id: + return __vrm_portal_id + + portal_id = None + + # First try the method that works if we don't have a data partition. This + # will fail when the current user is not root. + try: + portal_id = check_output("/sbin/get-unique-id").decode("utf-8", "ignore").strip() + if not portal_id: + raise NoVrmPortalIdError("get-unique-id returned blank") + __vrm_portal_id = portal_id + return portal_id + except CalledProcessError: + # get-unique-id returned non-zero + raise NoVrmPortalIdError("get-unique-id returned non-zero") + except OSError: + # File doesn't exist, use fallback + pass + + # Fall back to getting our id using a syscall. Assume we are on linux. + # Allow the user to override what interface is used using an environment + # variable. + import fcntl, socket, struct, os + + iface = os.environ.get('VRM_IFACE', 'eth0').encode('ascii') + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', iface[:15])) + except IOError: + raise NoVrmPortalIdError("ioctl failed for eth0") + + __vrm_portal_id = info[18:24].hex() + return __vrm_portal_id + + +# See VE.Can registers - public.docx for definition of this conversion +def convert_vreg_version_to_readable(version): + def str_to_arr(x, length): + a = [] + for i in range(0, len(x), length): + a.append(x[i:i+length]) + return a + + x = "%x" % version + x = x.upper() + + if len(x) == 5 or len(x) == 3 or len(x) == 1: + x = '0' + x + + a = str_to_arr(x, 2); + + # remove the first 00 if there are three bytes and it is 00 + if len(a) == 3 and a[0] == '00': + a.remove(0); + + # if we have two or three bytes now, and the first character is a 0, remove it + if len(a) >= 2 and a[0][0:1] == '0': + a[0] = a[0][1]; + + result = '' + for item in a: + result += ('.' if result != '' else '') + item + + + result = 'v' + result + + return result + + +def get_free_space(path): + result = -1 + + try: + s = statvfs(path) + result = s.f_frsize * s.f_bavail # Number of free bytes that ordinary users + except Exception as ex: + logger.info("Error while retrieving free space for path %s: %s" % (path, ex)) + + return result + + +def get_load_averages(): + c = read_file('/proc/loadavg') + return c.split(' ')[:3] + + +def _get_sysfs_machine_name(): + try: + with open('/sys/firmware/devicetree/base/model', 'r') as f: + return f.read().rstrip('\x00') + except IOError: + pass + + return None + +# Returns None if it cannot find a machine name. Otherwise returns the string +# containing the name +def get_machine_name(): + # First try calling the venus utility script + try: + return check_output("/usr/bin/product-name").strip().decode('UTF-8') + except (CalledProcessError, OSError): + pass + + # Fall back to sysfs + name = _get_sysfs_machine_name() + if name is not None: + return name + + # Fall back to venus build machine name + try: + with open('/etc/venus/machine', 'r', encoding='UTF-8') as f: + return f.read().strip() + except IOError: + pass + + return None + + +def get_product_id(): + """ Find the machine ID and return it. """ + + # First try calling the venus utility script + try: + return check_output("/usr/bin/product-id").strip() + except (CalledProcessError, OSError): + pass + + # Fall back machine name mechanism + name = _get_sysfs_machine_name() + return { + 'Color Control GX': 'C001', + 'Venus GX': 'C002', + 'Octo GX': 'C006', + 'EasySolar-II': 'C007', + 'MultiPlus-II': 'C008' + }.get(name, 'C003') # C003 is Generic + + +# Returns False if it cannot open the file. Otherwise returns its rstripped contents +def read_file(path): + content = False + + try: + with open(path, 'r') as f: + content = f.read().rstrip() + except Exception as ex: + logger.debug("Error while reading %s: %s" % (path, ex)) + + return content + + +def wrap_dbus_value(value): + if value is None: + return VEDBUS_INVALID + if isinstance(value, float): + return dbus.Double(value, variant_level=1) + if isinstance(value, bool): + return dbus.Boolean(value, variant_level=1) + if isinstance(value, int): + try: + return dbus.Int32(value, variant_level=1) + except OverflowError: + return dbus.Int64(value, variant_level=1) + if isinstance(value, str): + return dbus.String(value, variant_level=1) + if isinstance(value, list): + if len(value) == 0: + # If the list is empty we cannot infer the type of the contents. So assume unsigned integer. + # A (signed) integer is dangerous, because an empty list of signed integers is used to encode + # an invalid value. + return dbus.Array([], signature=dbus.Signature('u'), variant_level=1) + return dbus.Array([wrap_dbus_value(x) for x in value], variant_level=1) + if isinstance(value, dict): + # Wrapping the keys of the dictionary causes D-Bus errors like: + # 'arguments to dbus_message_iter_open_container() were incorrect, + # assertion "(type == DBUS_TYPE_ARRAY && contained_signature && + # *contained_signature == DBUS_DICT_ENTRY_BEGIN_CHAR) || (contained_signature == NULL || + # _dbus_check_is_valid_signature (contained_signature))" failed in file ...' + return dbus.Dictionary({(k, wrap_dbus_value(v)) for k, v in value.items()}, variant_level=1) + return value + + +dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16, dbus.UInt32, dbus.Int64, dbus.UInt64) + + +def unwrap_dbus_value(val): + """Converts D-Bus values back to the original type. For example if val is of type DBus.Double, + a float will be returned.""" + if isinstance(val, dbus_int_types): + return int(val) + if isinstance(val, dbus.Double): + return float(val) + if isinstance(val, dbus.Array): + v = [unwrap_dbus_value(x) for x in val] + return None if len(v) == 0 else v + if isinstance(val, (dbus.Signature, dbus.String)): + return str(val) + # Python has no byte type, so we convert to an integer. + if isinstance(val, dbus.Byte): + return int(val) + if isinstance(val, dbus.ByteArray): + return "".join([bytes(x) for x in val]) + if isinstance(val, (list, tuple)): + return [unwrap_dbus_value(x) for x in val] + if isinstance(val, (dbus.Dictionary, dict)): + # Do not unwrap the keys, see comment in wrap_dbus_value + return dict([(x, unwrap_dbus_value(y)) for x, y in val.items()]) + if isinstance(val, dbus.Boolean): + return bool(val) + return val