Mercurial > ~darius > hgwebdir.cgi > epro
changeset 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | 982eeffe9d95 |
children | 446cfe74827b |
files | .hgignore __init__.py velib_python/.travis.yml velib_python/LICENSE velib_python/README.md velib_python/dbusdummyservice.py velib_python/dbusmonitor.py velib_python/examples/vedbusitem_import_examples.py velib_python/examples/vedbusservice_example.py velib_python/logger.py velib_python/mosquitto_bridge_registrator.py velib_python/settingsdevice.py velib_python/streamcommand.py velib_python/test/fixture_vedbus.py velib_python/test/mock_dbus_monitor.py velib_python/test/mock_dbus_service.py velib_python/test/mock_gobject.py velib_python/test/mock_settings_device.py velib_python/test/test_settingsdevice.py velib_python/test/test_vedbus.py velib_python/tools/dbus_signal_cntr.py velib_python/tracing.py velib_python/ve_utils.py velib_python/vedbus.py |
diffstat | 23 files changed, 3264 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Wed Sep 25 21:37:28 2019 +0930 +++ b/.hgignore Sun Dec 05 14:35:36 2021 +1030 @@ -1,2 +1,3 @@ .*\.pyc +velib_python/.git
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/.travis.yml Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,16 @@ +language: python +dist: focal + +python: + - "3.7" + +before_install: + - sudo apt-get update + - sudo apt-get install dbus-x11 libcairo2-dev libdbus-1-dev libgirepository1.0-dev pkg-config + - python -m pip install --upgrade pip + +install: + - pip3 install dbus-python PyGObject + +script: + - eval `dbus-launch --sh-syntax` && cd test && python3 test_vedbus.py -v
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/LICENSE Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2014 Victron Energy BV + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/README.md Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,102 @@ +velib_python +============ + +[![Build Status](https://travis-ci.com/victronenergy/velib_python.svg?branch=master)](https://travis-ci.org/victronenergy/velib_python) + +This is the general python library within Victron. It contains code that is related to D-Bus and the Color +Control GX. See http://www.victronenergy.com/panel-systems-remote-monitoring/colorcontrol/ for more +infomation about that panel. + +Files busitem.py, dbusitem.py and tracing.py are deprecated. + +The main files are vedbus.py, dbusmonitor.py and settingsdevice.py. + +- Use VeDbusService to put your process on dbus and let other services interact with you. +- Use VeDbusItemImport to read a single value from other processes the dbus, and monitor its signals. +- Use DbusMonitor to monitor multiple values from other processes +- Use SettingsDevice to store your settings in flash, via the com.victronenergy.settings dbus service. See +https://github.com/victronenergy/localsettings for more info. + +Code style +========== + +Comply with PEP8, except: +- use tabs instead of spaces, since we use tabs for all projects within Victron. +- max line length = 110 + +Run this command to set git diff to tabsize is 4 spaces. Replace --local with --global to do it globally for the current +user account. + + git config --local core.pager 'less -x4' + +Run this command to check your code agains PEP8 + + pep8 --max-line-length=110 --ignore=W191 *.py + +D-Bus +===== + +D-Bus is an/the inter process communication bus used on Linux for many things. Victron uses it on the CCGX to have all the different processes exchange data. Protocol drivers publish data read from products (for example battery voltage) on the D-Bus, and other processes (the GUI for example) takes it from the D-Bus to show it on the display. + +Libraries that implement D-Bus connectivity are available in many programming languages (C, Python, etc). There are also many commandline tools available to talk to a running process via D-bus. See for example the dbuscli (executeable name dbus): http://code.google.com/p/dbus-tools/wiki/DBusCli, and also dbus-monitor and dbus-send. + +There are two sides in using the D-Bus, putting information on it (exporting as service with objects) and reading/writing to a process exporting a service. Note that instead of reading with GetValue, you can also subscribe to receive a signal when datachanges. Doing this saves unncessary context-switches in most cases. + +To get an idea of how to publish data on the dbus, run the example: + + matthijs@matthijs-VirtualBox:~/dev/velib_python/examples$ python vedbusservice_example.py + vedbusservice_example.py starting up + /Position value is 5 + /Position value is now 10 + try changing our RPM by executing the following command from a terminal + + dbus-send --print-reply --dest=com.victronenergy.example /RPM com.victronenergy.BusItem.SetValue int32:1200 + Reply will be <> 0 for values > 1000: not accepted. And reply will be 0 for values < 1000: accepted. + +Leave that terminal open, start a second terminal, and interrogate above service from the commandline: + + matthijs@matthijs-VirtualBox:~/dev/velib_python/examples$ dbus + org.freedesktop.DBus + org.freedesktop.PowerManagement + com.victronenergy.example + org.xfce.Terminal5 + org.xfce.Xfconf + [and many more services in which we are not interested] + +To get more details, add the servicename: + + matthijs@matthijs-VirtualBox:~/dev/velib_python/examples$ dbus com.victronenergy.example + / + /Float + /Int + /NegativeInt + /Position + /RPM + /String + +And get the value for the position: + + matthijs@matthijs-VirtualBox:~/dev/velib_python/examples$ dbus com.victronenergy.example /RPM GetValue + 100 + +And setting the value is also possible, the % makes dbus evaluate what comes behind it, resulting in an int instead of the default (a string).: + + matthijs@matthijs-VirtualBox:~/dev/velib_python/examples$ dbus com.victronenergy.example /RPM SetValue %1 + 0 + +In this example, the 0 indicates succes. When trying an unsupported value, 2000, this is what happens: + + matthijs@matthijs-VirtualBox:~/dev/velib_python/examples$ dbus com.victronenergy.example /RPM SetValue %2000 + 2 + +Exporting services, and the object paths (/Float, /Position, /Group1/Value1, etcetera) is standard D-Bus functionality. At Victron we designed and implemented a D-Bus interface, called com.victronenergy.BusItem. Example showing all interfaces supported by an object: + + matthijs@matthijs-VirtualBox:~/dev/velib_python/examples$ dbus com.victronenergy.example /RPM + Interface org.freedesktop.DBus.Introspectable: + String Introspect() + + Interface com.victronenergy.BusItem: + Int32 SetValue(Variant newvalue) + String GetDescription(String language, Int32 length) + String GetText() + Variant GetValue()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/dbusdummyservice.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +""" +A class to put a simple service on the dbus, according to victron standards, with constantly updating +paths. See example usage below. It is used to generate dummy data for other processes that rely on the +dbus. See files in dbus_vebus_to_pvinverter/test and dbus_vrm/test for other usage examples. + +To change a value while testing, without stopping your dummy script and changing its initial value, write +to the dummy data via the dbus. See example. + +https://github.com/victronenergy/dbus_vebus_to_pvinverter/tree/master/test +""" +from gi.repository import GLib +import platform +import argparse +import logging +import sys +import os + +# our own packages +sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../ext/velib_python')) +from vedbus import VeDbusService + +class DbusDummyService(object): + def __init__(self, servicename, deviceinstance, paths, productname='Dummy product', connection='Dummy service'): + self._dbusservice = VeDbusService(servicename) + self._paths = paths + + logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance)) + + # Create the management objects, as specified in the ccgx dbus-api document + self._dbusservice.add_path('/Mgmt/ProcessName', __file__) + self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version()) + self._dbusservice.add_path('/Mgmt/Connection', connection) + + # Create the mandatory objects + self._dbusservice.add_path('/DeviceInstance', deviceinstance) + self._dbusservice.add_path('/ProductId', 0) + self._dbusservice.add_path('/ProductName', productname) + self._dbusservice.add_path('/FirmwareVersion', 0) + self._dbusservice.add_path('/HardwareVersion', 0) + self._dbusservice.add_path('/Connected', 1) + + for path, settings in self._paths.items(): + self._dbusservice.add_path( + path, settings['initial'], writeable=True, onchangecallback=self._handlechangedvalue) + + GLib.timeout_add(1000, self._update) + + def _update(self): + with self._dbusservice as s: + for path, settings in self._paths.items(): + if 'update' in settings: + update = settings['update'] + if callable(update): + s[path] = update(path, s[path]) + else: + s[path] += update + logging.debug("%s: %s" % (path, s[path])) + return True + + def _handlechangedvalue(self, path, value): + logging.debug("someone else updated %s to %s" % (path, value)) + return True # accept the change + + +# === All code below is to simply run it from the commandline for debugging purposes === + +# It will created a dbus service called com.victronenergy.pvinverter.output. +# To try this on commandline, start this program in one terminal, and try these commands +# from another terminal: +# dbus com.victronenergy.pvinverter.output +# dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward GetValue +# dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward SetValue %20 +# +# Above examples use this dbus client: http://code.google.com/p/dbus-tools/wiki/DBusCli +# See their manual to explain the % in %20 + +def main(): + logging.basicConfig(level=logging.DEBUG) + + from dbus.mainloop.glib import DBusGMainLoop + # Have a mainloop, so we can send/receive asynchronous calls to and from dbus + DBusGMainLoop(set_as_default=True) + + pvac_output = DbusDummyService( + servicename='com.victronenergy.dummyservice.ttyO1', + deviceinstance=0, + paths={ + '/Ac/Energy/Forward': {'initial': 0, 'update': 1}, + '/Position': {'initial': 0, 'update': 0}, + '/Nonupdatingvalue/UseForTestingWritesForExample': {'initial': None}, + '/DbusInvalid': {'initial': None} + }) + + logging.info('Connected to dbus, and switching over to GLib.MainLoop() (= event based)') + mainloop = GLib.MainLoop() + mainloop.run() + + +if __name__ == "__main__": + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/dbusmonitor.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,592 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +## @package dbus_vrm +# This code takes care of the D-Bus interface (not all of below is implemented yet): +# - on startup it scans the dbus for services we know. For each known service found, it searches for +# objects/paths we know. Everything we find is stored in items{}, and an event is registered: if a +# value changes weĺl be notified and can pass that on to our owner. For example the vrmLogger. +# we know. +# - after startup, it continues to monitor the dbus: +# 1) when services are added we do the same check on that +# 2) when services are removed, we remove any items that we had that referred to that service +# 3) if an existing services adds paths we update ourselves as well: on init, we make a +# VeDbusItemImport for a non-, or not yet existing objectpaths as well1 +# +# Code is used by the vrmLogger, and also the pubsub code. Both are other modules in the dbus_vrm repo. + +from dbus.mainloop.glib import DBusGMainLoop +from gi.repository import GLib +import dbus +import dbus.service +import inspect +import logging +import argparse +import pprint +import traceback +import os +from collections import defaultdict +from functools import partial + +# our own packages +from ve_utils import exit_on_error, wrap_dbus_value, unwrap_dbus_value +notfound = object() # For lookups where None is a valid result + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +class SystemBus(dbus.bus.BusConnection): + def __new__(cls): + return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SYSTEM) + +class SessionBus(dbus.bus.BusConnection): + def __new__(cls): + return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SESSION) + +class MonitoredValue(object): + def __init__(self, value, text, options): + super(MonitoredValue, self).__init__() + self.value = value + self.text = text + self.options = options + + # For legacy code, allow treating this as a tuple/list + def __iter__(self): + return iter((self.value, self.text, self.options)) + +class Service(object): + whentologoptions = ['configChange', 'onIntervalAlwaysAndOnEvent', + 'onIntervalOnlyWhenChanged', 'onIntervalAlways', 'never'] + def __init__(self, id, serviceName, deviceInstance): + super(Service, self).__init__() + self.id = id + self.name = serviceName + self.paths = {} + self._seen = set() + self.deviceInstance = deviceInstance + + self.configChange = [] + self.onIntervalAlwaysAndOnEvent = [] + self.onIntervalOnlyWhenChanged = [] + self.onIntervalAlways = [] + self.never = [] + + # For legacy code, attributes can still be accessed as if keys from a + # dictionary. + def __setitem__(self, key, value): + self.__dict__[key] = value + def __getitem__(self, key): + return self.__dict__[key] + + def set_seen(self, path): + self._seen.add(path) + + def seen(self, path): + return path in self._seen + + @property + def service_class(self): + return '.'.join(self.name.split('.')[:3]) + +class DbusMonitor(object): + ## Constructor + def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None, + deviceRemovedCallback=None, vebusDeviceInstance0=False): + # valueChangedCallback is the callback that we call when something has changed. + # def value_changed_on_dbus(dbusServiceName, dbusPath, options, changes, deviceInstance): + # in which changes is a tuple with GetText() and GetValue() + self.valueChangedCallback = valueChangedCallback + self.deviceAddedCallback = deviceAddedCallback + self.deviceRemovedCallback = deviceRemovedCallback + self.dbusTree = dbusTree + self.vebusDeviceInstance0 = vebusDeviceInstance0 + + # Lists all tracked services. Stores name, id, device instance, value per path, and whenToLog info + # indexed by service name (eg. com.victronenergy.settings). + self.servicesByName = {} + + # Same values as self.servicesByName, but indexed by service id (eg. :1.30) + self.servicesById = {} + + # Keep track of services by class to speed up calls to get_service_list + self.servicesByClass = defaultdict(list) + + # Keep track of any additional watches placed on items + self.serviceWatches = defaultdict(list) + + # For a PC, connect to the SessionBus + # For a CCGX, connect to the SystemBus + self.dbusConn = SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else SystemBus() + + # subscribe to NameOwnerChange for bus connect / disconnect events. + (dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ \ + else dbus.SystemBus()).add_signal_receiver( + self.dbus_name_owner_changed, + signal_name='NameOwnerChanged') + + # Subscribe to PropertiesChanged for all services + self.dbusConn.add_signal_receiver(self.handler_value_changes, + dbus_interface='com.victronenergy.BusItem', + signal_name='PropertiesChanged', path_keyword='path', + sender_keyword='senderId') + + # Subscribe to ItemsChanged for all services + self.dbusConn.add_signal_receiver(self.handler_item_changes, + dbus_interface='com.victronenergy.BusItem', + signal_name='ItemsChanged', path='/', + sender_keyword='senderId') + + logger.info('===== Search on dbus for services that we will monitor starting... =====') + serviceNames = self.dbusConn.list_names() + for serviceName in serviceNames: + self.scan_dbus_service(serviceName) + + logger.info('===== Search on dbus for services that we will monitor finished =====') + + def dbus_name_owner_changed(self, name, oldowner, newowner): + if not name.startswith("com.victronenergy."): + return + + #decouple, and process in main loop + GLib.idle_add(exit_on_error, self._process_name_owner_changed, name, oldowner, newowner) + + def _process_name_owner_changed(self, name, oldowner, newowner): + if newowner != '': + # so we found some new service. Check if we can do something with it. + newdeviceadded = self.scan_dbus_service(name) + if newdeviceadded and self.deviceAddedCallback is not None: + self.deviceAddedCallback(name, self.get_device_instance(name)) + + elif name in self.servicesByName: + # it disappeared, we need to remove it. + logger.info("%s disappeared from the dbus. Removing it from our lists" % name) + service = self.servicesByName[name] + deviceInstance = service['deviceInstance'] + del self.servicesById[service.id] + del self.servicesByName[name] + for watch in self.serviceWatches[name]: + watch.remove() + del self.serviceWatches[name] + self.servicesByClass[service.service_class].remove(service) + if self.deviceRemovedCallback is not None: + self.deviceRemovedCallback(name, deviceInstance) + + def scan_dbus_service(self, serviceName): + try: + return self.scan_dbus_service_inner(serviceName) + except: + logger.error("Ignoring %s because of error while scanning:" % (serviceName)) + traceback.print_exc() + return False + + # Errors 'org.freedesktop.DBus.Error.ServiceUnknown' and + # 'org.freedesktop.DBus.Error.Disconnected' seem to happen when the service + # disappears while its being scanned. Which might happen, but is not really + # normal either, so letting them go into the logs. + + # Scans the given dbus service to see if it contains anything interesting for us. If it does, add + # it to our list of monitored D-Bus services. + def scan_dbus_service_inner(self, serviceName): + + # make it a normal string instead of dbus string + serviceName = str(serviceName) + + paths = self.dbusTree.get('.'.join(serviceName.split('.')[0:3]), None) + if paths is None: + logger.debug("Ignoring service %s, not in the tree" % serviceName) + return False + + logger.info("Found: %s, scanning and storing items" % serviceName) + serviceId = self.dbusConn.get_name_owner(serviceName) + + # we should never be notified to add a D-Bus service that we already have. If this assertion + # raises, check process_name_owner_changed, and D-Bus workings. + assert serviceName not in self.servicesByName + assert serviceId not in self.servicesById + + # for vebus.ttyO1, this is workaround, since VRM Portal expects the main vebus + # devices at instance 0. Not sure how to fix this yet. + if serviceName == 'com.victronenergy.vebus.ttyO1' and self.vebusDeviceInstance0: + di = 0 + elif serviceName == 'com.victronenergy.settings': + di = 0 + elif serviceName.startswith('com.victronenergy.vecan.'): + di = 0 + else: + try: + di = self.dbusConn.call_blocking(serviceName, + '/DeviceInstance', None, 'GetValue', '', []) + except dbus.exceptions.DBusException: + logger.info(" %s was skipped because it has no device instance" % serviceName) + return False # Skip it + else: + di = int(di) + + logger.info(" %s has device instance %s" % (serviceName, di)) + service = Service(serviceId, serviceName, di) + + # Let's try to fetch everything in one go + values = {} + texts = {} + try: + values.update(self.dbusConn.call_blocking(serviceName, '/', None, 'GetValue', '', [])) + texts.update(self.dbusConn.call_blocking(serviceName, '/', None, 'GetText', '', [])) + except: + pass + + for path, options in paths.items(): + # path will be the D-Bus path: '/Ac/ActiveIn/L1/V' + # options will be a dictionary: {'code': 'V', 'whenToLog': 'onIntervalAlways'} + # check that the whenToLog setting is set to something we expect + assert options['whenToLog'] is None or options['whenToLog'] in Service.whentologoptions + + # Try to obtain the value we want from our bulk fetch. If we + # cannot find it there, do an individual query. + value = values.get(path[1:], notfound) + if value != notfound: + service.set_seen(path) + text = texts.get(path[1:], notfound) + if value is notfound or text is notfound: + try: + value = self.dbusConn.call_blocking(serviceName, path, None, 'GetValue', '', []) + service.set_seen(path) + text = self.dbusConn.call_blocking(serviceName, path, None, 'GetText', '', []) + except dbus.exceptions.DBusException as e: + if e.get_dbus_name() in ( + 'org.freedesktop.DBus.Error.ServiceUnknown', + 'org.freedesktop.DBus.Error.Disconnected'): + raise # This exception will be handled below + + # TODO org.freedesktop.DBus.Error.UnknownMethod really + # shouldn't happen but sometimes does. + logger.debug("%s %s does not exist (yet)" % (serviceName, path)) + value = None + text = None + + service.paths[path] = MonitoredValue(unwrap_dbus_value(value), unwrap_dbus_value(text), options) + + if options['whenToLog']: + service[options['whenToLog']].append(path) + + + logger.debug("Finished scanning and storing items for %s" % serviceName) + + # Adjust self at the end of the scan, so we don't have an incomplete set of + # data if an exception occurs during the scan. + self.servicesByName[serviceName] = service + self.servicesById[serviceId] = service + self.servicesByClass[service.service_class].append(service) + + return True + + def handler_item_changes(self, items, senderId): + if not isinstance(items, dict): + return + + try: + service = self.servicesById[senderId] + except KeyError: + # senderId isn't there, which means it hasn't been scanned yet. + return + + for path, changes in items.items(): + try: + v = unwrap_dbus_value(changes['Value']) + except (KeyError, TypeError): + continue + + try: + t = changes['Text'] + except KeyError: + t = str(v) + self._handler_value_changes(service, path, v, t) + + def handler_value_changes(self, changes, path, senderId): + # If this properyChange does not involve a value, our work is done. + if 'Value' not in changes: + return + + try: + service = self.servicesById[senderId] + except KeyError: + # senderId isn't there, which means it hasn't been scanned yet. + return + + v = unwrap_dbus_value(changes['Value']) + # Some services don't send Text with their PropertiesChanged events. + try: + t = changes['Text'] + except KeyError: + t = str(v) + self._handler_value_changes(service, path, v, t) + + def _handler_value_changes(self, service, path, value, text): + try: + a = service.paths[path] + except KeyError: + # path isn't there, which means it hasn't been scanned yet. + return + + service.set_seen(path) + + # First update our store to the new value + if a.value == value: + return + + a.value = value + a.text = text + + # And do the rest of the processing in on the mainloop + if self.valueChangedCallback is not None: + GLib.idle_add(exit_on_error, self._execute_value_changes, service.name, path, { + 'Value': value, 'Text': text}, a.options) + + def _execute_value_changes(self, serviceName, objectPath, changes, options): + # double check that the service still exists, as it might have + # disappeared between scheduling-for and executing this function. + if serviceName not in self.servicesByName: + return + + self.valueChangedCallback(serviceName, objectPath, + options, changes, self.get_device_instance(serviceName)) + + # Gets the value for a certain servicename and path + # The default_value is returned when: + # 1. When the service doesn't exist. + # 2. When the path asked for isn't being monitored. + # 3. When the path exists, but has dbus-invalid, ie an empty byte array. + # 4. When the path asked for is being monitored, but doesn't exist for that service. + def get_value(self, serviceName, objectPath, default_value=None): + service = self.servicesByName.get(serviceName, None) + if service is None: + return default_value + + value = service.paths.get(objectPath, None) + if value is None or value.value is None: + return default_value + + return value.value + + # returns if a dbus exists now, by doing a blocking dbus call. + # Typically seen will be sufficient and doesn't need access to the dbus. + def exists(self, serviceName, objectPath): + try: + self.dbusConn.call_blocking(serviceName, objectPath, None, 'GetValue', '', []) + return True + except dbus.exceptions.DBusException as e: + return False + + # Returns if there ever was a successful GetValue or valueChanged event. + # Unlike get_value this return True also if the actual value is invalid. + # + # Note: the path might no longer exists anymore, but that doesn't happen in + # practice. If a service really wants to reconfigure itself typically it should + # reconnect to the dbus which causes it to be rescanned and seen will be updated. + # If it is really needed to know if a path still exists, use exists. + def seen(self, serviceName, objectPath): + try: + return self.servicesByName[serviceName].seen(objectPath) + except KeyError: + return False + + # Sets the value for a certain servicename and path, returns the return value of the D-Bus SetValue + # method. If the underlying item does not exist (the service does not exist, or the objectPath was not + # registered) the function will return -1 + def set_value(self, serviceName, objectPath, value): + # Check if the D-Bus object referenced by serviceName and objectPath is registered. There is no + # necessity to do this, but it is in line with previous implementations which kept VeDbusItemImport + # objects for registers items only. + service = self.servicesByName.get(serviceName, None) + if service is None: + return -1 + if objectPath not in service.paths: + return -1 + # We do not catch D-Bus exceptions here, because the previous implementation did not do that either. + return self.dbusConn.call_blocking(serviceName, objectPath, + dbus_interface='com.victronenergy.BusItem', + method='SetValue', signature=None, + args=[wrap_dbus_value(value)]) + + # Similar to set_value, but operates asynchronously + def set_value_async(self, serviceName, objectPath, value, + reply_handler=None, error_handler=None): + service = self.servicesByName.get(serviceName, None) + if service is not None: + if objectPath in service.paths: + self.dbusConn.call_async(serviceName, objectPath, + dbus_interface='com.victronenergy.BusItem', + method='SetValue', signature=None, + args=[wrap_dbus_value(value)], + reply_handler=reply_handler, error_handler=error_handler) + return + + if error_handler is not None: + error_handler(TypeError('Service or path not found, ' + 'service=%s, path=%s' % (serviceName, objectPath))) + + # returns a dictionary, keys are the servicenames, value the instances + # optionally use the classfilter to get only a certain type of services, for + # example com.victronenergy.battery. + def get_service_list(self, classfilter=None): + if classfilter is None: + return { servicename: service.deviceInstance \ + for servicename, service in self.servicesByName.items() } + + if classfilter not in self.servicesByClass: + return {} + + return { service.name: service.deviceInstance \ + for service in self.servicesByClass[classfilter] } + + def get_device_instance(self, serviceName): + return self.servicesByName[serviceName].deviceInstance + + # Parameter categoryfilter is to be a list, containing the categories you want (configChange, + # onIntervalAlways, etc). + # Returns a dictionary, keys are codes + instance, in VRM querystring format. For example vvt[0]. And + # values are the value. + def get_values(self, categoryfilter, converter=None): + + result = {} + + for serviceName in self.servicesByName: + result.update(self.get_values_for_service(categoryfilter, serviceName, converter)) + + return result + + # same as get_values above, but then for one service only + def get_values_for_service(self, categoryfilter, servicename, converter=None): + deviceInstance = self.get_device_instance(servicename) + result = {} + + service = self.servicesByName[servicename] + + for category in categoryfilter: + + for path in service[category]: + + value, text, options = service.paths[path] + + if value is not None: + + value = value if converter is None else converter.convert(path, options['code'], value, text) + + precision = options.get('precision') + if precision: + value = round(value, precision) + + result[options['code'] + "[" + str(deviceInstance) + "]"] = value + + return result + + def track_value(self, serviceName, objectPath, callback, *args, **kwargs): + """ A DbusMonitor can watch specific service/path combos for changes + so that it is not fully reliant on the global handler_value_changes + in this class. Additional watches are deleted automatically when + the service disappears from dbus. """ + cb = partial(callback, *args, **kwargs) + + def root_tracker(items): + # Check if objectPath in dict + try: + v = items[objectPath] + _v = unwrap_dbus_value(v['Value']) + except (KeyError, TypeError): + return # not in this dict + + try: + t = v['Text'] + except KeyError: + cb({'Value': _v }) + else: + cb({'Value': _v, 'Text': t}) + + # Track changes on the path, and also on root + self.serviceWatches[serviceName].extend(( + self.dbusConn.add_signal_receiver(cb, + dbus_interface='com.victronenergy.BusItem', + signal_name='PropertiesChanged', + path=objectPath, bus_name=serviceName), + self.dbusConn.add_signal_receiver(root_tracker, + dbus_interface='com.victronenergy.BusItem', + signal_name='ItemsChanged', + path="/", bus_name=serviceName), + )) + + +# ====== ALL CODE BELOW THIS LINE IS PURELY FOR DEVELOPING THIS CLASS ====== + +# Example function that can be used as a starting point to use this code +def value_changed_on_dbus(dbusServiceName, dbusPath, dict, changes, deviceInstance): + logger.debug("0 ----------------") + logger.debug("1 %s%s changed" % (dbusServiceName, dbusPath)) + logger.debug("2 vrm dict : %s" % dict) + logger.debug("3 changes-text: %s" % changes['Text']) + logger.debug("4 changes-value: %s" % changes['Value']) + logger.debug("5 deviceInstance: %s" % deviceInstance) + logger.debug("6 - end") + + +def nameownerchange(a, b): + # used to find memory leaks in dbusmonitor and VeDbusItemImport + import gc + gc.collect() + objects = gc.get_objects() + print (len([o for o in objects if type(o).__name__ == 'VeDbusItemImport'])) + print (len([o for o in objects if type(o).__name__ == 'SignalMatch'])) + print (len(objects)) + + +def print_values(dbusmonitor): + a = dbusmonitor.get_value('wrongservice', '/DbusInvalid', default_value=1000) + b = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/NotInTheMonitorList', default_value=1000) + c = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/DbusInvalid', default_value=1000) + d = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/NonExistingButMonitored', default_value=1000) + + print ("All should be 1000: Wrong Service: %s, NotInTheMonitorList: %s, DbusInvalid: %s, NonExistingButMonitored: %s" % (a, b, c, d)) + return True + +# We have a mainloop, but that is just for developing this code. Normally above class & code is used from +# some other class, such as vrmLogger or the pubsub Implementation. +def main(): + # Init logging + logging.basicConfig(level=logging.DEBUG) + logger.info(__file__ + " is starting up") + + # Have a mainloop, so we can send/receive asynchronous calls to and from dbus + DBusGMainLoop(set_as_default=True) + + import os + import sys + sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../../')) + + dummy = {'code': None, 'whenToLog': 'configChange', 'accessLevel': None} + monitorlist = {'com.victronenergy.dummyservice': { + '/Connected': dummy, + '/ProductName': dummy, + '/Mgmt/Connection': dummy, + '/Dc/0/Voltage': dummy, + '/Dc/0/Current': dummy, + '/Dc/0/Temperature': dummy, + '/Load/I': dummy, + '/FirmwareVersion': dummy, + '/DbusInvalid': dummy, + '/NonExistingButMonitored': dummy}} + + d = DbusMonitor(monitorlist, value_changed_on_dbus, + deviceAddedCallback=nameownerchange, deviceRemovedCallback=nameownerchange) + + # logger.info("==configchange values==") + # logger.info(pprint.pformat(d.get_values(['configChange']))) + + # logger.info("==onIntervalAlways and onIntervalOnlyWhenChanged==") + # logger.info(pprint.pformat(d.get_values(['onIntervalAlways', 'onIntervalAlwaysAndOnEvent']))) + + GLib.timeout_add(1000, print_values, d) + + # Start and run the mainloop + logger.info("Starting mainloop, responding on only events") + mainloop = GLib.MainLoop() + mainloop.run() + +if __name__ == "__main__": + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/examples/vedbusitem_import_examples.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# This file has some tests, to do type checking of vedbus.py +# This file makes it easy to compare the values put on the dbus through +# Python (vedbus.VeDbusItemExport) with items exported in C (the mk2dbus process) + +# Note that this file requires vedbusitemexport_examples to be running. + +import dbus +import pprint +import os +import sys +from dbus.mainloop.glib import DBusGMainLoop + +# our own packages +sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../')) +from vedbus import VeDbusItemExport, VeDbusItemImport + +DBusGMainLoop(set_as_default=True) + +# Connect to the sessionbus. Note that on ccgx we use systembus instead. +dbusConn = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else dbus.SystemBus() + + +# dictionary containing the different items +dbusObjects = {} + +# check if the vbus.ttyO1 exists (it normally does on a ccgx, and for linux a pc, there is +# some emulator. +hasVEBus = 'com.victronenergy.vebus.ttyO1' in dbusConn.list_names() + +dbusObjects['PyString'] = VeDbusItemImport(dbusConn, 'com.victronenergy.example', '/String') +if hasVEBus: dbusObjects['C_string'] = VeDbusItemImport(dbusConn, 'com.victronenergy.vebus.ttyO1', '/Mgmt/ProcessName') + +dbusObjects['PyFloat'] = VeDbusItemImport(dbusConn, 'com.victronenergy.example', '/Float') +if hasVEBus: dbusObjects['C_float'] = VeDbusItemImport(dbusConn, 'com.victronenergy.vebus.ttyO1', '/Dc/V') + +dbusObjects['PyInt'] = VeDbusItemImport(dbusConn, 'com.victronenergy.example', '/Int') +if hasVEBus: dbusObjects['C_int'] = VeDbusItemImport(dbusConn, 'com.victronenergy.vebus.ttyO1', '/State') + +dbusObjects['PyNegativeInt'] = VeDbusItemImport(dbusConn, 'com.victronenergy.example', '/NegativeInt') +if hasVEBus: dbusObjects['C_negativeInt'] = VeDbusItemImport(dbusConn, 'com.victronenergy.vebus.ttyO1', '/Dc/I') + +# print the results +print('----') +for key, o in dbusObjects.items(): + print(key + ' at ' + o.serviceName + o.path) + pprint.pprint(dbusObjects[key]) + print('pprint veBusItem.get_value(): ') + pprint.pprint(dbusObjects[key].get_value()) + print('pprint veBusItem.get_text(): ') + pprint.pprint(dbusObjects[key].get_text()) + print('----')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/examples/vedbusservice_example.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from dbus.mainloop.glib import DBusGMainLoop +from gi.repository import GLib +import dbus +import dbus.service +import inspect +import pprint +import os +import sys + +# our own packages +sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../')) +from vedbus import VeDbusService + +softwareVersion = '1.0' + +def validate_new_value(path, newvalue): + # Max RPM setpoint = 1000 + return newvalue <= 1000 + +def get_text_for_rpm(path, value): + return('%d rotations per minute' % value) + +def main(argv): + global dbusObjects + + print(__file__ + " starting up") + + # Have a mainloop, so we can send/receive asynchronous calls to and from dbus + DBusGMainLoop(set_as_default=True) + + # Put ourselves on to the dbus + dbusservice = VeDbusService('com.victronenergy.example') + + # Most simple and short way to add an object with an initial value of 5. + dbusservice.add_path('/Position', value=5) + + # Most advanced wayt to add a path + dbusservice.add_path('/RPM', value=100, description='RPM setpoint', writeable=True, + onchangecallback=validate_new_value, gettextcallback=get_text_for_rpm) + + # You can access the paths as if the dbusservice is a dictionary + print('/Position value is %s' % dbusservice['/Position']) + + # Same for changing it + dbusservice['/Position'] = 10 + + print('/Position value is now %s' % dbusservice['/Position']) + + # To invalidate a value (see com.victronenergy.BusItem specs for definition of invalid), set to None + dbusservice['/Position'] = None + + dbusservice.add_path('/String', 'this is a string') + dbusservice.add_path('/Int', 0) + dbusservice.add_path('/NegativeInt', -10) + dbusservice.add_path('/Float', 1.5) + + print('try changing our RPM by executing the following command from a terminal\n') + print('dbus-send --print-reply --dest=com.victronenergy.example /RPM com.victronenergy.BusItem.SetValue int32:1200') + print('Reply will be <> 0 for values > 1000: not accepted. And reply will be 0 for values < 1000: accepted.') + mainloop = GLib.MainLoop() + mainloop.run() + +main("")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/logger.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,41 @@ +#!/usr/bin/python3 -u +# -*- coding: utf-8 -*- + +import logging +import sys + +class LevelFilter(logging.Filter): + def __init__(self, passlevels, reject): + self.passlevels = passlevels + self.reject = reject + + def filter(self, record): + if self.reject: + return (record.levelno not in self.passlevels) + else: + return (record.levelno in self.passlevels) + +# Leave the name set to None to get the root logger. For some reason specifying 'root' has a +# different effect: there will be two root loggers, both with their own handlers... +def setup_logging(debug=False, name=None): + formatter = logging.Formatter(fmt='%(levelname)s:%(module)s:%(message)s') + + # Make info and debug stream to stdout and the rest to stderr + h1 = logging.StreamHandler(sys.stdout) + h1.addFilter(LevelFilter([logging.INFO, logging.DEBUG], False)) + h1.setFormatter(formatter) + + h2 = logging.StreamHandler(sys.stderr) + h2.addFilter(LevelFilter([logging.INFO, logging.DEBUG], True)) + h2.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.addHandler(h1) + logger.addHandler(h2) + + # Set the loglevel and show it + logger.setLevel(level=(logging.DEBUG if debug else logging.INFO)) + logLevel = {0: 'NOTSET', 10: 'DEBUG', 20: 'INFO', 30: 'WARNING', 40: 'ERROR'} + logger.info('Loglevel set to ' + logLevel[logger.getEffectiveLevel()]) + + return logger
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/mosquitto_bridge_registrator.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,247 @@ +import fcntl +import threading +import logging +import os +import requests +import subprocess +import traceback +from ve_utils import exit_on_error +VrmNumberOfBrokers = 128 +VrmApiServer = 'https://ccgxlogging.victronenergy.com' +CaBundlePath = "/etc/ssl/certs/ccgx-ca.pem" +RpcBroker = 'mqtt-rpc.victronenergy.com' +SettingsPath = os.environ.get('DBUS_MQTT_PATH') or '/data/conf/mosquitto.d' +BridgeConfigPath = os.path.join(SettingsPath, 'vrm_bridge.conf') +MqttPasswordFile = "/data/conf/mqtt_password.txt" +BridgeSettings = '''# Generated by MosquittoBridgeRegistrator. Any changes will be overwritten on service start. +connection rpc +address {4}:443 +cleansession true +topic P/{0}/in/# in +topic P/{0}/out/# out +remote_clientid rpc-{2} +remote_username {6} +remote_password {1} +bridge_cafile {5} + +connection vrm +address {3}:443 +cleansession true +topic N/{0}/# out +topic R/{0}/# in +topic W/{0}/# in +remote_clientid {2} +remote_username {6} +remote_password {1} +bridge_cafile {5} +''' +LockFilePath = "/run/mosquittobridgeregistrator.lock" + + +class RepeatingTimer(threading.Thread): + def __init__(self, callback, interval): + threading.Thread.__init__(self) + self.event = threading.Event() + self.callback = callback + self.interval = interval + + def run(self): + while not self.event.is_set(): + if not self.callback(): + self.event.set() + + # either call your function here, + # or put the body of the function here + self.event.wait(self.interval) + + def stop(self): + self.event.set() + + +class MosquittoBridgeRegistrator(object): + """ + The MosquittoBridgeRegistrator manages a bridge connection between the local Mosquitto + MQTT server, and the global VRM broker. It can be called + concurrently by different processes; efforts will be synchronized using an + advisory lock file. + + It now also supports registering the API key and getting it and the password without + restarting Mosquitto. This allows using the API key, but not use the local broker and + instead connect directly to the VRM broker url. + """ + + def __init__(self, system_id): + self._init_broker_timer = None + self._aborted = threading.Event() + self._client_id = None + self._system_id = system_id + self._global_broker_username = "ccgxapikey_" + self._system_id + self._global_broker_password = None + self._requests_log_level = logging.getLogger("requests").getEffectiveLevel() + + def _get_vrm_broker_url(self): + """To allow scaling, the VRM broker URL is generated based on the system identifier + The function returns a numbered broker URL between 0 and VrmNumberOfBrokers, which makes sure + that broker connections are distributed equally between all VRM brokers + """ + sum = 0 + for character in self._system_id.lower().strip(): + sum += ord(character) + broker_index = sum % VrmNumberOfBrokers + return "mqtt{}.victronenergy.com".format(broker_index) + + + def load_or_generate_mqtt_password(self): + """In case posting the password to storemqttpassword.php was processed + by the server, but we never saw the response, we need to keep it around + for the next time (don't post a random new one). + + This way of storing the password was incepted later, and makes it + backwards compatible. + """ + + if os.path.exists(MqttPasswordFile): + with open(MqttPasswordFile, "r") as f: + logging.info("Using {}".format(MqttPasswordFile)) + password = f.read().strip() + return password + else: + with open(MqttPasswordFile + ".tmp", "w") as f: + logging.info("Writing new {}".format(MqttPasswordFile)) + password = get_random_string(32) + + # make sure the password is on the disk + f.write(password) + f.flush() + os.fsync(f.fileno()) + + os.rename(MqttPasswordFile + ".tmp", MqttPasswordFile) + + # update the directory meta-info + fd = os.open(os.path.dirname(MqttPasswordFile), 0) + os.fsync(fd) + os.close(fd) + + return password + + def register(self): + if self._init_broker_timer is not None: + return + if self._init_broker(quiet=False, timeout=5): + if not self._aborted.is_set(): + logging.info("[InitBroker] Registration failed. Retrying in thread, silently.") + logging.getLogger("requests").setLevel(logging.WARNING) + # Not using gobject to keep these blocking operations out of the event loop + self._init_broker_timer = RepeatingTimer(self._init_broker, 60) + self._init_broker_timer.start() + + def abort_gracefully(self): + self._aborted.set() + if self._init_broker_timer: + self._init_broker_timer.stop() + self._init_broker_timer.join() + + @property + def client_id(self): + return self._client_id + + def _write_config_atomically(self, path, contents): + + config_dir = os.path.dirname(path) + if not os.path.exists(config_dir): + os.makedirs(config_dir) + + with open(path + ".tmp", 'wt') as out_file: + # make sure the new config is on the disk + out_file.write(contents) + out_file.flush() + os.fsync(out_file.fileno()) + + # make sure there is either the old file or the new one + os.rename(path + ".tmp", path) + + # update the directory meta-info + fd = os.open(os.path.dirname(path), 0) + os.fsync(fd) + os.close(fd) + + def _init_broker(self, quiet=True, timeout=5): + try: + with open(LockFilePath, "a") as lockFile: + fcntl.flock(lockFile, fcntl.LOCK_EX) + + orig_config = None + # Read the current config file (if present) + try: + if not quiet: + logging.info('[InitBroker] Reading config file') + with open(BridgeConfigPath, 'rt') as in_file: + orig_config = in_file.read() + settings = dict(tuple(l.strip().split(' ', 1)) for l in orig_config.split('\n') + if not l.startswith('#') and l.strip() != '') + self._client_id = settings.get('remote_clientid') + self._global_broker_password = settings.get('remote_password') + except IOError: + if not quiet: + logging.info('[InitBroker] Reading config file failed.') + # We need a guarantee an empty file, otherwise Mosquitto crashes on load. + if not os.path.exists(BridgeConfigPath): + self._write_config_atomically(BridgeConfigPath, ""); + # Fix items missing from config + if self._client_id is None: + self._client_id = 'ccgx_' + get_random_string(12) + if self._global_broker_password is None: + self._global_broker_password = self.load_or_generate_mqtt_password() + # Get to the actual registration + if not quiet: + logging.info('[InitBroker] Registering CCGX at VRM portal') + with requests.Session() as session: + headers = {'content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'dbus-mqtt'} + r = session.post( + VrmApiServer + '/log/storemqttpassword.php', + data=dict(identifier=self._global_broker_username, mqttPassword=self._global_broker_password), + headers=headers, + verify=CaBundlePath, + timeout=(timeout,timeout)) + if r.status_code == requests.codes.ok: + config = BridgeSettings.format(self._system_id, + self._global_broker_password, self._client_id, + self._get_vrm_broker_url(), RpcBroker, CaBundlePath, + self._global_broker_username) + # Do we need to adjust the settings file? + if config != orig_config: + logging.info('[InitBroker] Writing new config file') + self._write_config_atomically(BridgeConfigPath, config) + self._restart_broker() + else: + logging.info('[InitBroker] Not updating config file and not restarting Mosquitto, because config is correct.') + self._init_broker_timer = None + logging.getLogger("requests").setLevel(self._requests_log_level) + logging.info('[InitBroker] Registration successful') + return False + if not quiet: + logging.error('VRM registration failed. Http status was: {}'.format(r.status_code)) + logging.error('Message was: {}'.format(r.text)) + except: + if not quiet: + traceback.print_exc() + # Notify the timer we want to be called again + return True + + def _restart_broker(self): + logging.info('Restarting broker') + subprocess.call(['svc', '-t', '/service/mosquitto']) + + def get_password(self): + assert self._global_broker_password is not None + return self._global_broker_password + + def get_apikey(self): + return self._global_broker_username + + +def get_random_string(size=32): + """Creates a random (hex) string which contains 'size' characters.""" + return ''.join("{0:02x}".format(b) for b in open('/dev/urandom', 'rb').read(int(size/2))) + +# vim: noexpandtab:shiftwidth=4:tabstop=4:softtabstop=0
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/settingsdevice.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,118 @@ +import dbus +import logging +import time +from functools import partial + +# Local imports +from vedbus import VeDbusItemImport + +## Indexes for the setting dictonary. +PATH = 0 +VALUE = 1 +MINIMUM = 2 +MAXIMUM = 3 +SILENT = 4 + +## The Settings Device class. +# Used by python programs, such as the vrm-logger, to read and write settings they +# need to store on disk. And since these settings might be changed from a different +# source, such as the GUI, the program can pass an eventCallback that will be called +# as soon as some setting is changed. +# +# The settings are stored in flash via the com.victronenergy.settings service on dbus. +# See https://github.com/victronenergy/localsettings for more info. +# +# If there are settings in de supportSettings list which are not yet on the dbus, +# and therefore not yet in the xml file, they will be added through the dbus-addSetting +# interface of com.victronenergy.settings. +class SettingsDevice(object): + ## The constructor processes the tree of dbus-items. + # @param bus the system-dbus object + # @param name the dbus-service-name of the settings dbus service, 'com.victronenergy.settings' + # @param supportedSettings dictionary with all setting-names, and their defaultvalue, min, max and whether + # the setting is silent. The 'silent' entry is optional. If set to true, no changes in the setting will + # be logged by localsettings. + # @param eventCallback function that will be called on changes on any of these settings + # @param timeout Maximum interval to wait for localsettings. An exception is thrown at the end of the + # interval if the localsettings D-Bus service has not appeared yet. + def __init__(self, bus, supportedSettings, eventCallback, name='com.victronenergy.settings', timeout=0): + logging.debug("===== Settings device init starting... =====") + self._bus = bus + self._dbus_name = name + self._eventCallback = eventCallback + self._values = {} # stored the values, used to pass the old value along on a setting change + self._settings = {} + + count = 0 + while True: + if 'com.victronenergy.settings' in self._bus.list_names(): + break + if count == timeout: + raise Exception("The settings service com.victronenergy.settings does not exist!") + count += 1 + logging.info('waiting for settings') + time.sleep(1) + + # Add the items. + self.addSettings(supportedSettings) + + logging.debug("===== Settings device init finished =====") + + def addSettings(self, settings): + for setting, options in settings.items(): + silent = len(options) > SILENT and options[SILENT] + busitem = self.addSetting(options[PATH], options[VALUE], + options[MINIMUM], options[MAXIMUM], silent, callback=partial(self.handleChangedSetting, setting)) + self._settings[setting] = busitem + self._values[setting] = busitem.get_value() + + def addSetting(self, path, value, _min, _max, silent=False, callback=None): + busitem = VeDbusItemImport(self._bus, self._dbus_name, path, callback) + if busitem.exists and (value, _min, _max, silent) == busitem._proxy.GetAttributes(): + logging.debug("Setting %s found" % path) + else: + logging.info("Setting %s does not exist yet or must be adjusted" % path) + + # Prepare to add the setting. Most dbus types extend the python + # type so it is only necessary to additionally test for Int64. + if isinstance(value, (int, dbus.Int64)): + itemType = 'i' + elif isinstance(value, float): + itemType = 'f' + else: + itemType = 's' + + # Add the setting + # TODO, make an object that inherits VeDbusItemImport, and complete the D-Bus settingsitem interface + settings_item = VeDbusItemImport(self._bus, self._dbus_name, '/Settings', createsignal=False) + setting_path = path.replace('/Settings/', '', 1) + if silent: + settings_item._proxy.AddSilentSetting('', setting_path, value, itemType, _min, _max) + else: + settings_item._proxy.AddSetting('', setting_path, value, itemType, _min, _max) + + busitem = VeDbusItemImport(self._bus, self._dbus_name, path, callback) + + return busitem + + def handleChangedSetting(self, setting, servicename, path, changes): + oldvalue = self._values[setting] if setting in self._values else None + self._values[setting] = changes['Value'] + + if self._eventCallback is None: + return + + self._eventCallback(setting, oldvalue, changes['Value']) + + def setDefault(self, path): + item = VeDbusItemImport(self._bus, self._dbus_name, path, createsignal=False) + item.set_default() + + def __getitem__(self, setting): + return self._settings[setting].get_value() + + def __setitem__(self, setting, newvalue): + result = self._settings[setting].set_value(newvalue) + if result != 0: + # Trying to make some false change to our own settings? How dumb! + assert False
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/streamcommand.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +## @package dbus_vrm + +import logging +logger = logging.getLogger(__name__) + +import codecs +import threading +import subprocess +from time import sleep + +# Runs a command, and calls sendfeedback with the statusupdates. +class StreamCommand(object): + SIGNALS = { + 1: "SIGHUP", 2: "SIGINT", 3: "SIGQUIT", 4: "SIGILL", 6: "SIGABRT", 7: "SIGBUS", 8: "SIGFPE", + 9: "SIGKILL", 10: "SIGBUS", 11: "SIGSEGV", 12: "SIGSYS", 13: "SIGPIPE", 14: "SIGALRM", + 15: "SIGTERM"} + + def run(self, command, timeout, feedbacksender): + self.feedbacksender = feedbacksender + self.returncode = None + self.utf8_decoder = codecs.getdecoder("utf_8") + self.latin1_decoder = codecs.getdecoder("latin1") + + def target(): + logger.info('Thread started for running %s' % command) + self.feedbacksender.send({"status": "starting"}) + + try: + self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + except OSError as e: + logger.info("Command %s could not be started, errno: %s, msg: %s" + % (command, e.errno, e.strerror)) + self.feedbacksender.send({"status": "error", + "errormessage": "Could not start (errno %s, msg %s)" % (e.errno, e.strerror), + "errorcode": 731}, finished=True) + + self.process = None + return + + self.readandsend() + + + thread = threading.Thread(target=target) + thread.start() + thread.join(timeout) + + if self.process is None: + # Error message has already beent sent + return None + + # Make sure to send all the output + self.readandsend() + + if thread.is_alive(): + logger.warning("Command %s will now be terminated because of timeout" % command) + self.process.terminate() # TODO or should it be killed? + thread.join() + logger.warning("Command %s has been terminated" % command) + r = {"status": "error", "errormessage": "Stopped by timeout", "errorcode": 732} + + elif self.process.returncode < 0: + signal = -1 * self.process.returncode + error = "Stopped with signal %d - %s" % (signal, self.SIGNALS.get(signal, "unknown")) + logger.warning("Command %s abnormal stop. %s" % (command, error)) + r = {"status": "error", "errorcode": 733, "errormessage": error} + + else: + logger.info("Command %s execution completed. Exitcode %d" % (command, self.process.returncode)) + r = {"status": "finished", "exitcode": self.process.returncode} + + self.feedbacksender.send(r, finished=True) + return self.process.returncode + + def readandsend(self): + # TODO: check that below code works OK with vup stdout encoding (UTF-8), including non-standard ASCII chars + + while True: + self.process.stdout.flush() + line = self.process.stdout.readline() + try: + unicode_line, _ = self.utf8_decoder(line) + except UnicodeDecodeError: + unicode_line, _ = self.latin1_decoder(line) + + # Max length on pubnub is 1800 chars, and output is much better readable with the bare eye + # when sent per line. So no need to send it alltogether. + self.feedbacksender.send({"status": "running", "xmloutput": unicode_line}) + if line == b'' and self.process.poll() != None: + break + sleep(0.04)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/test/fixture_vedbus.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from dbus.mainloop.glib import DBusGMainLoop +import dbus +import dbus.service +import inspect +import platform +import pprint +import sys +import os + +# our own packages +sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../')) +from gi.repository import GLib +from vedbus import VeDbusItemExport + +# Dictionary containing all objects exported to dbus +dbusObjects = {} + +def changerequest(path, newvalue): + if newvalue < 100: + return True + else: + return False + +def gettext(path, value): + return 'gettexted %s %s' % (path, value) + +def main(argv): + global dbusObjects + + # Have a mainloop, so we can send/receive asynchronous calls to and from dbus + DBusGMainLoop(set_as_default=True) + + # Connect to session bus whenever present, else use the system bus + dbusConn = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else dbus.SystemBus() + + # Register ourserves on the dbus as a service + name = dbus.service.BusName("com.victronenergy.dbusexample", dbusConn) + + # Create the management objects, as specified in the ccgx dbus-api document + + # Keep a reference in the global dictionary. Without this they would be removed by + # garbage collector again. + dbusObjects['string'] = VeDbusItemExport(dbusConn, '/String', 'this is a string') + dbusObjects['int'] = VeDbusItemExport(dbusConn, '/Int', 40000) + dbusObjects['negativeInt'] = VeDbusItemExport(dbusConn, '/NegativeInt', -10) + dbusObjects['float'] = VeDbusItemExport(dbusConn, '/Float', 1.5) + dbusObjects['invalid'] = VeDbusItemExport(dbusConn, '/Invalid', None) + dbusObjects['byte'] = VeDbusItemExport(dbusConn, '/Byte', dbus.Byte(84)) + dbusObjects['writeable'] = VeDbusItemExport(dbusConn, '/Writeable', 'original', writeable=True) + dbusObjects['not-writeable'] = VeDbusItemExport(dbusConn, '/NotWriteable', 'original', writeable=False) + + dbusObjects['not-writeable with cb'] = VeDbusItemExport(dbusConn, '/WriteableUpTo100', + 'original', writeable=True, onchangecallback=changerequest) + + dbusObjects['gettextcallback'] = VeDbusItemExport(dbusConn, '/Gettextcallback', + '10', gettextcallback=gettext, writeable=True) + + mainloop = GLib.MainLoop() + print("up and running") + sys.stdout.flush() + + mainloop.run() + +main("")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/test/mock_dbus_monitor.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,167 @@ +import dbus +from collections import defaultdict +from functools import partial + +# Simulation a DbusMonitor object, without using the D-Bus (intended for unit tests). Instead of changes values +# on the D-Bus you can use the set_value function. set_value will automatically expand the service list. Note +# that all simulated D-Bus paths passed to set_value must be part of the dbusTree passed to the constructor of +# the monitor. +class MockDbusMonitor(object): + def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None, + deviceRemovedCallback=None, mountEventCallback=None, vebusDeviceInstance0=False, checkPaths=True): + self._services = {} + self._tree = {} + self._seen = defaultdict(set) + self._watches = defaultdict(dict) + self._checkPaths = checkPaths + self._value_changed_callback = valueChangedCallback + self._device_removed_callback = deviceRemovedCallback + self._device_added_callback = deviceAddedCallback + for s, sv in dbusTree.items(): + service = self._tree.setdefault(s, set()) + service.update(['/Connected', '/ProductName', '/Mgmt/Connection', '/DeviceInstance']) + for p in sv: + service.add(p) + + # Gets the value for a certain servicename and path, returns the default_value when + # request service and objectPath combination does not not exists or when it is invalid + def get_value(self, serviceName, objectPath, default_value=None): + item = self._get_item(serviceName, objectPath) + if item is None: + return default_value + r = item.get_value() + return default_value if r is None else r + + def _get_item(self, serviceName, objectPath): + service = self._services.get(serviceName) + if service is None: + return None + if objectPath not in self._tree[_class_name(serviceName)]: + return None + item = service.get(objectPath) + if item is None: + item = MockImportItem(None, valid=False) + service[objectPath] = item + return item + + def exists(self, serviceName, objectPath): + if serviceName not in self._services: + return False + if objectPath not in self._tree[_class_name(serviceName)]: + return False + return True + + def set_seen(self, serviceName, path): + self._seen[serviceName].add(path) + + def seen(self, serviceName, objectPath): + return objectPath in self._seen[serviceName] + + # returns a dictionary, keys are the servicenames, value the instances + # optionally use the classfilter to get only a certain type of services, for + # example com.victronenergy.battery. + def get_service_list(self, classfilter=None): + r = {} + for servicename,items in self._services.items(): + if not classfilter or _class_name(servicename) == classfilter: + item = items.get('/DeviceInstance') + r[servicename] = None if item is None else item.get_value() + return r + + def add_value(self, service, path, value): + class_name = _class_name(service) + s = self._tree.get(class_name, None) + if s is None: + raise Exception('service not found') + if self._checkPaths and path not in s: + raise Exception('Path not found: {}{} (check dbusTree passed to __init__)'.format(service, path)) + s = self._services.setdefault(service, {}) + s[path] = MockImportItem(value) + self.set_seen(service, path) + + def set_value(self, serviceName, objectPath, value): + item = self._get_item(serviceName, objectPath) + if item is None: + return -1 + item.set_value(value) + self.set_seen(serviceName, objectPath) + if self._value_changed_callback != None: + self._value_changed_callback(serviceName, objectPath, None, None, None) + if serviceName in self._watches: + if objectPath in self._watches[serviceName]: + self._watches[serviceName][objectPath]({'Value': value, 'Text': str(value)}) + elif None in self._watches[serviceName]: + self._watches[serviceName][None]({'Value': value, 'Text': str(value)}) + return 0 + + def set_value_async(self, serviceName, objectPath, value, + reply_handler=None, error_handler=None): + item = self._get_item(serviceName, objectPath) + + if item is not None and item.exists: + item.set_value(value) + if reply_handler is not None: + reply_handler(0) + return + + if error_handler is not None: + error_handler(TypeError('Service or path not found, ' + 'service=%s, path=%s' % (serviceName, objectPath))) + + def add_service(self, service, values): + if service in self._services: + raise Exception('Service already exists: {}'.format(service)) + self._services[service] = {} + for k,v in values.items(): + self.add_value(service, k, v) + if self._device_added_callback != None: + self._device_added_callback(service, values.get('/DeviceInstance', 0)) + + def remove_service(self, service): + s = self._services.get(service) + if s is None: + return + item = s.get('/DeviceInstance') + instance = 0 if item is None else item.get_value() + for item in s.values(): + item.set_service_exists(False) + self._services.pop(service) + if self._device_removed_callback != None: + self._device_removed_callback(service, instance) + if service in self._watches: + del self._watches[service] + + def track_value(self, serviceName, objectPath, callback, *args, **kwargs): + self._watches[serviceName][objectPath] = partial(callback, *args, **kwargs) + + @property + def dbusConn(self): + raise dbus.DBusException("No Connection") + + +class MockImportItem(object): + def __init__(self, value, valid=True, service_exists=True): + self._value = value + self._valid = valid + self._service_exists = service_exists + + def set_service_exists(self, service_exists): + self._service_exists = service_exists + + def get_value(self): + return self._value + + @property + def exists(self): + return self._valid + + def set_value(self, value): + if not self._service_exists: + raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.ServiceUnknown') + if not self._valid: + raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.UnknownObject') + self._value = value + + +def _class_name(service): + return '.'.join(service.split('.')[:3])
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/test/mock_dbus_service.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,58 @@ +# Simulates the busService object without using the D-Bus (intended for unit tests). Data usually stored in +# D-Bus items is now stored in memory. +class MockDbusService(object): + def __init__(self, servicename): + self._dbusobjects = {} + self._callbacks = {} + self._service_name = servicename + + def add_path(self, path, value, description="", writeable=False, onchangecallback=None, + gettextcallback=None): + self._dbusobjects[path] = value + if onchangecallback is not None: + self._callbacks[path] = onchangecallback + + # Add the mandatory paths, as per victron dbus api doc + def add_mandatory_paths(self, processname, processversion, connection, + deviceinstance, productid, productname, firmwareversion, hardwareversion, connected): + self.add_path('/Management/ProcessName', processname) + self.add_path('/Management/ProcessVersion', processversion) + self.add_path('/Management/Connection', connection) + + # Create rest of the mandatory objects + self.add_path('/DeviceInstance', deviceinstance) + self.add_path('/ProductId', productid) + self.add_path('/ProductName', productname) + self.add_path('/FirmwareVersion', firmwareversion) + self.add_path('/HardwareVersion', hardwareversion) + self.add_path('/Connected', connected) + + # Simulates a SetValue from the D-Bus, if avaible the onchangecallback associated with the path will + # be called before the data is changed. + def set_value(self, path, newvalue): + callback = self._callbacks.get(path) + if callback is None or callback(path, newvalue): + self._dbusobjects[path] = newvalue + + def __getitem__(self, path): + return self._dbusobjects[path] + + def __setitem__(self, path, newvalue): + if path not in self._dbusobjects: + raise Exception('Path not registered in service: {}{} (use add_path to register)'.\ + format(self._service_name, path)) + self._dbusobjects[path] = newvalue + + def __delitem__(self, path): + del self._dbusobjects[path] + + def __contains__(self, path): + return path in self._dbusobjects + + def __enter__(self): + # No batching done in mock object, and we already + # support the required dict interface. + return self + + def __exit__(self, *exc): + pass
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/test/mock_gobject.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,153 @@ +# This module contains mock functions for some of the functionality in gobject. +# You can use this to create unit tests on code using gobject timers without having to wait for those timer. +# Use the patch functions to replace the original gobject functions. The timer_manager object defined here +# allows you to set a virtual time stamp, which will invoke all timers that would normally run in the +# specified interval. + +from datetime import datetime as dt +import time + +class MockTimer(object): + def __init__(self, start, timeout, callback, *args, **kwargs): + self._timeout = timeout + self._next = start + timeout + self._callback = callback + self._args = args + self._kwargs = kwargs + + def run(self): + self._next += self._timeout + return self._callback(*self._args, **self._kwargs) + + @property + def next(self): + return self._next + + +class MockTimerManager(object): + def __init__(self, start_time=None): + self._resources = [] + self._time = 0 + self._id = 0 + self._timestamp = start_time or time.time() + + def add_timer(self, timeout, callback, *args, **kwargs): + return self._add_resource(MockTimer(self._time, timeout, callback, *args, **kwargs)) + + def add_idle(self, callback, *args, **kwargs): + return self.add_timer(self._time, callback, *args, **kwargs) + + def remove_resouce(self, id): + for rid, rr in self._resources: + if rid == id: + self._resources.remove((rid, rr)) + return + raise Exception('Resource not found: {}'.format(id)) + + def _add_resource(self, resource): + self._id += 1 + self._resources.append((self._id, resource)) + return self._id + + def _terminate(self): + raise StopIteration() + + @property + def time(self): + return self._time + + @property + def datetime(self): + return dt.fromtimestamp(self._timestamp + self._time / 1000.0) + + def run(self, interval=None): + ''' + Simulate the given interval. Starting from the current (mock) time until time + interval, all timers + will be triggered. The timers will be triggered in chronological order. Timer removal (calling + source_remove or a False/None return value) and addition within the callback function is supported. + If interval is None or not supplied, the function will run until there are no timers left. + ''' + if interval != None: + self.add_timer(interval, self._terminate) + try: + while True: + next_timer = None + next_id = None + for id,t in self._resources: + if next_timer == None or t.next < next_timer.next: + next_timer = t + next_id = id + if next_timer == None: + return + self._time = next_timer.next + if not next_timer.run(): + self._resources.remove((next_id, next_timer)) + except StopIteration: + self._resources.remove((next_id, next_timer)) + pass + + def reset(self): + self._resources = [] + self._time = 0 + + +timer_manager = MockTimerManager() + + +def idle_add(callback, *args, **kwargs): + return timer_manager.add_idle(callback, *args, **kwargs) + + +def timeout_add(timeout, callback, *args, **kwargs): + return timer_manager.add_timer(timeout, callback, *args, **kwargs) + + +def timeout_add_seconds(timeout, callback, *args, **kwargs): + return timeout_add(timeout * 1000, callback, *args, **kwargs) + + +class datetime(object): + @staticmethod + def now(): + return timer_manager.datetime + + @staticmethod + def strptime(*args, **kwargs): + return dt.strptime(*args, **kwargs) + + +def source_remove(id): + timer_manager.remove_resouce(id) + + +def test_function(m, name): + print(m.time, m.datetime, name) + return True + + +def patch_gobject(dest): + ''' + Use this function to replace the original gobject/GLib functions with the + mocked versions in this file. Suppose your source files being tested uses + 'from gi.repository import GLib' and the unit test uses 'import tested' you + should call path(tested.GLib). + ''' + dest.timeout_add = timeout_add + dest.timeout_add_seconds = timeout_add_seconds + dest.idle_add = idle_add + dest.source_remove = source_remove + + +def patch_datetime(dest): + dest.datetime = datetime + + +if __name__ == '__main__': + m = MockTimerManager() + id1 = m.add_timer(100, test_function, m, 'F1') + id2 = m.add_timer(30, test_function, m, 'F2') + m.run(5000) + m.remove_resouce(id1) + m.run(2000) + m.remove_resouce(id2) + m.run(2000)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/test/mock_settings_device.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,57 @@ +PATH = 0 +VALUE = 1 +MINIMUM = 2 +MAXIMUM = 3 + + +class MockSettingsItem(object): + def __init__(self, parent, path): + self._parent = parent + self.path = path + + def get_value(self): + setting = 'addSetting'+self.path + if setting in self._parent._settings: + return self._parent[setting] + return None + + def set_value(self, value): + self._parent['addSetting'+self.path] = value + + @property + def exists(self): + return 'addSetting'+self.path in self._parent._settings + +# Simulates the SettingsSevice object without using the D-Bus (intended for unit tests). Values passed to +# __setitem__ (or the [] operator) will be stored in memory for later retrieval by __getitem__. +class MockSettingsDevice(object): + def __init__(self, supported_settings, event_callback, name='com.victronenergy.settings', timeout=0): + self._dbus_name = name + self._settings = supported_settings + self._event_callback = event_callback + + def addSetting(self, path, value, _min, _max, silent=False, callback=None): + # Persist in our settings stash so the settings is available through + # the mock item + self._settings['addSetting'+path] = [path, value, _min, _max, silent] + return MockSettingsItem(self, path) + + def get_short_name(self, path): + for k,v in self._settings.items(): + if v[PATH] == path: + return k + return None + + def __getitem__(self, setting): + return self._settings[setting][VALUE] + + def __setitem__(self, setting, new_value): + s = self._settings.get(setting, None) + if s is None: + raise Exception('setting not found') + old_value = s[VALUE] + if old_value == new_value: + return + s[VALUE] = new_value + if self._event_callback is not None: + self._event_callback(setting, old_value, new_value)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/test/test_settingsdevice.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Python +import logging +import os +import sqlite3 +import sys +import unittest +import subprocess +import time +import dbus +import threading +import fcntl +from dbus.mainloop.glib import DBusGMainLoop + +# Local +sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../')) +from settingsdevice import SettingsDevice + +logger = logging.getLogger(__file__) + +class CreateSettingsTest(unittest.TestCase): + # The actual code calling VeDbusItemExport is in fixture_vedbus.py, which is ran as a subprocess. That + # code exports several values to the dbus. And then below test cases check if the exported values are + # what the should be, by using the bare dbus import objects and functions. + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_adding_new_settings(self): + # to make sure that we make new settings, put something random in its name: + rnd = os.urandom(16).encode('hex') + + # ofcourse below could be simplified, for now just use all settings from the example: + settings = SettingsDevice( + bus=dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else dbus.SystemBus(), + supportedSettings={ + 'loggingenabled': ['/Settings/' + rnd + '/Logscript/Enabled', 1, 0, 1], + 'proxyaddress': ['/Settings/' + rnd + '/Logscript/Http/Proxy', '', 0, 0], + 'proxyport': ['/Settings/' + rnd + '/Logscript/Http/ProxyPort', '', 0, 0], + 'backlogenabled': ['/Settings/' + rnd + '/Logscript/LogFlash/Enabled', 1, 0, 1], + 'backlogpath': ['/Settings/' + rnd + '/Logscript/LogFlash/Path', '', 0, 0], # When empty, default path will be used. + 'interval': ['/Settings/' + rnd + '/Logscript/LogInterval', 900, 0, 0], + 'url': ['/Settings/' + rnd + '/Logscript/Url', '', 0, 0] # When empty, the default url will be used. + }, + eventCallback=self.handle_changed_setting) + + """ + self.assertIs(type(v), dbus.Double) + self.assertEqual(self.dbusConn.get_object('com.victronenergy.dbusexample', '/Float').GetText(), '1.5') + """ + + def handle_changed_setting(setting, oldvalue, newvalue): + pass + +if __name__ == "__main__": + logging.basicConfig(stream=sys.stderr) + logging.getLogger('').setLevel(logging.WARNING) + DBusGMainLoop(set_as_default=True) + unittest.main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/test/test_vedbus.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Python +import logging +import os +import sys +import unittest +import subprocess +import time +import dbus +import threading +import fcntl +from dbus.mainloop.glib import DBusGMainLoop + +# Local +sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../')) +from vedbus import VeDbusService, VeDbusItemImport + +logger = logging.getLogger(__file__) +""" +class VeDbusServiceTests(unittest.TestCase): + def incrementcallback(self, path, value): + self.calledback += 1 + return True if value < 50 else False + + def setUp(self): + self.calledback = 0 + + + self.service = VeDbusService('com.victronenergy.testservice') + self.service.add_path(path='/Int', value=10, description="int", writeable=True, + onchangecallback=self.incrementcallback, gettextcallback=None) + + self.thread = threading.Thread(target=self.mainloop.run) + self.thread.start() + + def test_callback(self): + a = subprocess.check_output('dbus', '-y com.victronenergy.testservice') + print(a) + + def tearDown(self): + self.thread.kill() + self.thread = None +""" + + +class VeDbusItemExportTests(unittest.TestCase): + # The actual code calling VeDbusItemExport is in fixture_vedbus.py, which is ran as a subprocess. That + # code exports several values to the dbus. And then below test cases check if the exported values are + # what the should be, by using the bare dbus import objects and functions. + + def setUp(self): + self.sp = subprocess.Popen([sys.executable, "fixture_vedbus.py"], stdout=subprocess.PIPE) + self.dbusConn = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else dbus.SystemBus() + + # Wait for fixture to be up and running. 'b' prefix is for python3, + # it works in both python versions. + while (self.sp.stdout.readline().rstrip() != b'up and running'): + pass + + def tearDown(self): + self.sp.kill() + self.sp.wait() + self.sp.stdout.close() + + def test_get_value_invalid(self): + v = self.dbusConn.get_object('com.victronenergy.dbusexample', '/Invalid').GetValue() + self.assertEqual(v, dbus.Array([], signature=dbus.Signature('i'), variant_level=1)) + self.assertIs(type(v), dbus.Array) + self.assertEqual(self.dbusConn.get_object('com.victronenergy.dbusexample', '/Invalid').GetText(), '---') + + def test_get_value_string(self): + v = self.dbusConn.get_object('com.victronenergy.dbusexample', '/String').GetValue() + self.assertEqual(v, 'this is a string') + self.assertIs(type(v), dbus.String) + self.assertEqual(self.dbusConn.get_object('com.victronenergy.dbusexample', '/String').GetText(), 'this is a string') + + def test_get_value_int(self): + v = self.dbusConn.get_object('com.victronenergy.dbusexample', '/Int').GetValue() + self.assertEqual(v, 40000) + self.assertIs(type(v), dbus.Int32) + self.assertEqual(self.dbusConn.get_object('com.victronenergy.dbusexample', '/Int').GetText(), '40000') + + def test_get_value_negativeint(self): + v = self.dbusConn.get_object('com.victronenergy.dbusexample', '/NegativeInt').GetValue() + self.assertEqual(v, -10) + self.assertIs(type(v), dbus.Int32) + self.assertEqual(self.dbusConn.get_object('com.victronenergy.dbusexample', '/NegativeInt').GetText(), '-10') + + def test_get_value_float(self): + v = self.dbusConn.get_object('com.victronenergy.dbusexample', '/Float').GetValue() + self.assertEqual(v, 1.5) + self.assertIs(type(v), dbus.Double) + self.assertEqual(self.dbusConn.get_object('com.victronenergy.dbusexample', '/Float').GetText(), '1.5') + + def test_get_text_byte(self): + v = self.dbusConn.get_object('com.victronenergy.dbusexample', '/Byte').GetText() + self.assertEqual('84', v) + + def test_get_value_byte(self): + v = self.dbusConn.get_object('com.victronenergy.dbusexample', '/Byte').GetValue() + self.assertEqual(84, v) + + def test_set_value(self): + self.assertNotEqual(0, self.dbusConn.get_object('com.victronenergy.dbusexample', '/NotWriteable').SetValue(12)) + self.assertEqual('original', self.dbusConn.get_object('com.victronenergy.dbusexample', '/NotWriteable').GetValue()) + + self.assertEqual(0, self.dbusConn.get_object('com.victronenergy.dbusexample', '/Writeable').SetValue(12)) + self.assertEqual(12, self.dbusConn.get_object('com.victronenergy.dbusexample', '/Writeable').GetValue()) + + self.assertNotEqual(0, self.dbusConn.get_object('com.victronenergy.dbusexample', '/WriteableUpTo100').SetValue(102)) + self.assertEqual('original', self.dbusConn.get_object('com.victronenergy.dbusexample', '/WriteableUpTo100').GetValue()) + + self.assertEqual(0, self.dbusConn.get_object('com.victronenergy.dbusexample', '/WriteableUpTo100').SetValue(50)) + self.assertEqual(50, self.dbusConn.get_object('com.victronenergy.dbusexample', '/WriteableUpTo100').GetValue()) + + def test_gettextcallback(self): + self.assertEqual('gettexted /Gettextcallback 10', self.dbusConn.get_object('com.victronenergy.dbusexample', '/Gettextcallback').GetText()) + + def waitandkill(self, seconds=5): + time.sleep(seconds) + self.process.kill() + self.process.wait() + + def test_changedsignal(self): + self.process = subprocess.Popen(['dbus-monitor', "type='signal',sender='com.victronenergy.dbusexample',interface='com.victronenergy.BusItem'"], stdout=subprocess.PIPE) + + #wait for dbus-monitor to start up + time.sleep(0.5) + + #set timeout + thread = threading.Thread(target=self.waitandkill) + thread.start() + + self.dbusConn.get_object('com.victronenergy.dbusexample', '/Gettextcallback').SetValue(60) + + fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + + time.sleep(0.5) + + t = bytes() + while self.process.returncode is None: + try: + t += self.process.stdout.readline() + except IOError: + break + self.process.stdout.close() + + text = b" dict entry(\n" + text += b" string \"Text\"\n" + text += b" variant string \"gettexted /Gettextcallback 60\"\n" + text += b" )\n" + + value = b" dict entry(\n" + value += b" string \"Value\"\n" + value += b" variant int32 60\n" + value += b" )\n" + + self.assertNotEqual(-1, t.find(text)) + self.assertNotEqual(-1, t.find(value)) + + thread.join() + +""" +MVA 2014-08-30: this test of VEDbusItemImport doesn't work, since there is no gobject-mainloop. +Probably making some automated functional test, using bash and some scripts, will work much +simpler and better +class VeDbusItemImportTests(unittest.TestCase): + # VeDbusItemImport class is tested against dbus objects exported by fixture_vedbus.py, which is ran as a + # subprocess. + + def setUp(self): + self.sp = subprocess.Popen([sys.executable, "fixture_vedbus.py"], stdout=subprocess.PIPE) + self.dbusConn = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else dbus.SystemBus() + + #wait for fixture to be up and running + while (self.sp.stdout.readline().rstrip() != 'up and running'): + pass + + def tearDown(self): + self.sp.kill() + self.sp.wait() + + def test_get_invalid(self): + self.assertIs(None, VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/Invalid').get_value()) + self.assertEqual('---', VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/Invalid').get_text()) + + def test_get_string(self): + v = VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/String') + self.assertEqual('this is a string', v.get_value()) + self.assertIs(dbus.String, type(v.get_value())) + self.assertEqual('this is a string', v.get_text()) + + def test_get_int(self): + v = VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/Int') + self.assertEqual(40000, v.get_value()) + self.assertIs(dbus.Int32, type(v.get_value())) + self.assertEqual('40000', v.get_text()) + + def test_get_byte(self): + v = VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/Byte') + self.assertEqual(84, v.get_value()) + self.assertEqual('84', v.get_text()) + + def test_set_value(self): + nw = VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/NotWriteable') + wr = VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/Writeable') + wc = VeDbusItemImport(self.dbusConn, 'com.victronenergy.dbusexample', '/WriteableUpTo100') + + self.assertNotEqual(0, nw.set_value(12)) + self.assertEqual('original', nw.get_value()) + + self.assertEqual(0, wr.set_value(12)) + self.assertEqual(12, wr.get_value()) + + self.assertNotEqual(0, wc.set_value(102)) + self.assertEqual('original', wc.get_value()) + + self.assertEqual(0, wc.set_value(50)) + self.assertEqual(50, wc.get_value()) +""" + +if __name__ == "__main__": + logging.basicConfig(stream=sys.stderr) + logging.getLogger('').setLevel(logging.WARNING) + unittest.main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/tools/dbus_signal_cntr.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from dbus.mainloop.glib import DBusGMainLoop +import gobject +import dbus +import dbus.service +from pprint import pprint +import os +import signal +from time import time + +items = {} +total = 0 +t_started = time() + +class DbusTracker(object): + def __init__(self): + + self.items = {} + + # For a PC, connect to the SessionBus, otherwise (Venus device) connect to the systembus + self.dbusConn = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else dbus.SystemBus() + + # subscribe to all signals + self.dbusConn.add_signal_receiver(self._signal_receive_handler, + sender_keyword='sender', + path_keyword='path') + + names = self.dbusConn.list_names() + for name in names: + if name.startswith(":"): + continue + + items[str(self.dbusConn.get_name_owner(name))] = {"_total": 0, "_name": str(name)} + + + def _signal_receive_handler(*args, **kwargs): + global total + total = total + 1 + + sender = str(kwargs['sender']) + path = str(kwargs['path']) + + d = items.get(sender) + if d is None: + items[sender] = {"_total": 1, path: 1} + return + + d["_total"] = d["_total"] + 1 + + p = d.get(path) + if p is None: + d[path] = 1 + return + + d[path] = p + 1 + + +def printall(): + t_elapsed = time() - t_started + + print(chr(27) + "[2J" + chr(27) + "[;H") + + row_format = "{:<60} {:>4} {:>4}% {:>4.2f} / s" + + print(row_format.format("Total", total, 100, total / t_elapsed)) + + for service, values in items.iteritems(): + # skip the services that didn't emit any signals + if len(values) == 2 and "_name" in values: + continue + + print(row_format.format(values.get("_name", service), values["_total"], values["_total"] * 100 / total, values["_total"] / t_elapsed)) + + # uncomment this to see all the paths as well. + # print("--------------") + # pprint(items) + return True + + +def main(): + DBusGMainLoop(set_as_default=True) + + d = DbusTracker() + + gobject.timeout_add(2000, printall) + + mainloop = gobject.MainLoop() + mainloop.run() + + +if __name__ == "__main__": + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/tracing.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,60 @@ +## IMPORTANT NOTE - MVA 2015-2-5 +# This file is deprecated. Use the standard logging package of Python instead + +## @package tracing +# The tracing module for debug-purpose. + +log = None + +## Setup the debug traces. +# The traces can be logged to console and/or file. +# When logged to file a logrotate is used. +# @param enabled When True traces are enabled. +# @param path The path for the trace-file. +# @param fileName The trace-file-name. +# @param toConsole When True show traces to console. +# @param debugOn When True show debug-traces. +def setupTraces(enabled, path, fileName, toConsole, toFile, debugOn): + global log + + if enabled: + import logging + import logging.handlers + + log = logging.getLogger(fileName) + if debugOn == True: + level = logging.DEBUG + else: + level = logging.INFO + log.setLevel(level) + log.disabled = not enabled + if toConsole == True: + sth = logging.StreamHandler() + fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + sth.setFormatter(fmt) + sth.setLevel(level) + log.addHandler(sth) + if toFile == True: + fd = logging.handlers.RotatingFileHandler(path + fileName, maxBytes=1048576, backupCount=5) + fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + fd.setFormatter(fmt) + fd.setLevel(level) + log.addHandler(fd) + else: + log = LogDummy() + +class LogDummy(object): + def __init__(self): + self._str = '' + + def info(self, str, *args): + self._str = str + + def debug(self, str, *args): + self._str = str + + def warning(self, str, *args): + print("Warning: " + (str % args)) + + def error(self, str, *args): + print("Error: " + (str % args))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/ve_utils.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,265 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import sys +from traceback import print_exc +from os import _exit as os_exit +from os import statvfs +from subprocess import check_output, CalledProcessError +import logging +import dbus +logger = logging.getLogger(__name__) + +VEDBUS_INVALID = dbus.Array([], signature=dbus.Signature('i'), variant_level=1) + +class NoVrmPortalIdError(Exception): + pass + +# Use this function to make sure the code quits on an unexpected exception. Make sure to use it +# when using GLib.idle_add and also GLib.timeout_add. +# Without this, the code will just keep running, since GLib does not stop the mainloop on an +# exception. +# Example: GLib.idle_add(exit_on_error, myfunc, arg1, arg2) +def exit_on_error(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except: + try: + print ('exit_on_error: there was an exception. Printing stacktrace will be tried and then exit') + print_exc() + except: + pass + + # sys.exit() is not used, since that throws an exception, which does not lead to a program + # halt when used in a dbus callback, see connection.py in the Python/Dbus libraries, line 230. + os_exit(1) + + +__vrm_portal_id = None +def get_vrm_portal_id(): + # The original definition of the VRM Portal ID is that it is the mac + # address of the onboard- ethernet port (eth0), stripped from its colons + # (:) and lower case. This may however differ between platforms. On Venus + # the task is therefore deferred to /sbin/get-unique-id so that a + # platform specific method can be easily defined. + # + # If /sbin/get-unique-id does not exist, then use the ethernet address + # of eth0. This also handles the case where velib_python is used as a + # package install on a Raspberry Pi. + # + # On a Linux host where the network interface may not be eth0, you can set + # the VRM_IFACE environment variable to the correct name. + + global __vrm_portal_id + + if __vrm_portal_id: + return __vrm_portal_id + + portal_id = None + + # First try the method that works if we don't have a data partition. This + # will fail when the current user is not root. + try: + portal_id = check_output("/sbin/get-unique-id").decode("utf-8", "ignore").strip() + if not portal_id: + raise NoVrmPortalIdError("get-unique-id returned blank") + __vrm_portal_id = portal_id + return portal_id + except CalledProcessError: + # get-unique-id returned non-zero + raise NoVrmPortalIdError("get-unique-id returned non-zero") + except OSError: + # File doesn't exist, use fallback + pass + + # Fall back to getting our id using a syscall. Assume we are on linux. + # Allow the user to override what interface is used using an environment + # variable. + import fcntl, socket, struct, os + + iface = os.environ.get('VRM_IFACE', 'eth0').encode('ascii') + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', iface[:15])) + except IOError: + raise NoVrmPortalIdError("ioctl failed for eth0") + + __vrm_portal_id = info[18:24].hex() + return __vrm_portal_id + + +# See VE.Can registers - public.docx for definition of this conversion +def convert_vreg_version_to_readable(version): + def str_to_arr(x, length): + a = [] + for i in range(0, len(x), length): + a.append(x[i:i+length]) + return a + + x = "%x" % version + x = x.upper() + + if len(x) == 5 or len(x) == 3 or len(x) == 1: + x = '0' + x + + a = str_to_arr(x, 2); + + # remove the first 00 if there are three bytes and it is 00 + if len(a) == 3 and a[0] == '00': + a.remove(0); + + # if we have two or three bytes now, and the first character is a 0, remove it + if len(a) >= 2 and a[0][0:1] == '0': + a[0] = a[0][1]; + + result = '' + for item in a: + result += ('.' if result != '' else '') + item + + + result = 'v' + result + + return result + + +def get_free_space(path): + result = -1 + + try: + s = statvfs(path) + result = s.f_frsize * s.f_bavail # Number of free bytes that ordinary users + except Exception as ex: + logger.info("Error while retrieving free space for path %s: %s" % (path, ex)) + + return result + + +def get_load_averages(): + c = read_file('/proc/loadavg') + return c.split(' ')[:3] + + +def _get_sysfs_machine_name(): + try: + with open('/sys/firmware/devicetree/base/model', 'r') as f: + return f.read().rstrip('\x00') + except IOError: + pass + + return None + +# Returns None if it cannot find a machine name. Otherwise returns the string +# containing the name +def get_machine_name(): + # First try calling the venus utility script + try: + return check_output("/usr/bin/product-name").strip().decode('UTF-8') + except (CalledProcessError, OSError): + pass + + # Fall back to sysfs + name = _get_sysfs_machine_name() + if name is not None: + return name + + # Fall back to venus build machine name + try: + with open('/etc/venus/machine', 'r', encoding='UTF-8') as f: + return f.read().strip() + except IOError: + pass + + return None + + +def get_product_id(): + """ Find the machine ID and return it. """ + + # First try calling the venus utility script + try: + return check_output("/usr/bin/product-id").strip() + except (CalledProcessError, OSError): + pass + + # Fall back machine name mechanism + name = _get_sysfs_machine_name() + return { + 'Color Control GX': 'C001', + 'Venus GX': 'C002', + 'Octo GX': 'C006', + 'EasySolar-II': 'C007', + 'MultiPlus-II': 'C008' + }.get(name, 'C003') # C003 is Generic + + +# Returns False if it cannot open the file. Otherwise returns its rstripped contents +def read_file(path): + content = False + + try: + with open(path, 'r') as f: + content = f.read().rstrip() + except Exception as ex: + logger.debug("Error while reading %s: %s" % (path, ex)) + + return content + + +def wrap_dbus_value(value): + if value is None: + return VEDBUS_INVALID + if isinstance(value, float): + return dbus.Double(value, variant_level=1) + if isinstance(value, bool): + return dbus.Boolean(value, variant_level=1) + if isinstance(value, int): + try: + return dbus.Int32(value, variant_level=1) + except OverflowError: + return dbus.Int64(value, variant_level=1) + if isinstance(value, str): + return dbus.String(value, variant_level=1) + if isinstance(value, list): + if len(value) == 0: + # If the list is empty we cannot infer the type of the contents. So assume unsigned integer. + # A (signed) integer is dangerous, because an empty list of signed integers is used to encode + # an invalid value. + return dbus.Array([], signature=dbus.Signature('u'), variant_level=1) + return dbus.Array([wrap_dbus_value(x) for x in value], variant_level=1) + if isinstance(value, dict): + # Wrapping the keys of the dictionary causes D-Bus errors like: + # 'arguments to dbus_message_iter_open_container() were incorrect, + # assertion "(type == DBUS_TYPE_ARRAY && contained_signature && + # *contained_signature == DBUS_DICT_ENTRY_BEGIN_CHAR) || (contained_signature == NULL || + # _dbus_check_is_valid_signature (contained_signature))" failed in file ...' + return dbus.Dictionary({(k, wrap_dbus_value(v)) for k, v in value.items()}, variant_level=1) + return value + + +dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16, dbus.UInt32, dbus.Int64, dbus.UInt64) + + +def unwrap_dbus_value(val): + """Converts D-Bus values back to the original type. For example if val is of type DBus.Double, + a float will be returned.""" + if isinstance(val, dbus_int_types): + return int(val) + if isinstance(val, dbus.Double): + return float(val) + if isinstance(val, dbus.Array): + v = [unwrap_dbus_value(x) for x in val] + return None if len(v) == 0 else v + if isinstance(val, (dbus.Signature, dbus.String)): + return str(val) + # Python has no byte type, so we convert to an integer. + if isinstance(val, dbus.Byte): + return int(val) + if isinstance(val, dbus.ByteArray): + return "".join([bytes(x) for x in val]) + if isinstance(val, (list, tuple)): + return [unwrap_dbus_value(x) for x in val] + if isinstance(val, (dbus.Dictionary, dict)): + # Do not unwrap the keys, see comment in wrap_dbus_value + return dict([(x, unwrap_dbus_value(y)) for x, y in val.items()]) + if isinstance(val, dbus.Boolean): + return bool(val) + return val
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/vedbus.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,600 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import dbus.service +import logging +import traceback +import os +import weakref +from collections import defaultdict +from ve_utils import wrap_dbus_value, unwrap_dbus_value + +# vedbus contains three classes: +# VeDbusItemImport -> use this to read data from the dbus, ie import +# VeDbusItemExport -> use this to export data to the dbus (one value) +# VeDbusService -> use that to create a service and export several values to the dbus + +# Code for VeDbusItemImport is copied from busitem.py and thereafter modified. +# All projects that used busitem.py need to migrate to this package. And some +# projects used to define there own equivalent of VeDbusItemExport. Better to +# use VeDbusItemExport, or even better the VeDbusService class that does it all for you. + +# TODOS +# 1 check for datatypes, it works now, but not sure if all is compliant with +# com.victronenergy.BusItem interface definition. See also the files in +# tests_and_examples. And see 'if type(v) == dbus.Byte:' on line 102. Perhaps +# something similar should also be done in VeDbusBusItemExport? +# 2 Shouldn't VeDbusBusItemExport inherit dbus.service.Object? +# 7 Make hard rules for services exporting data to the D-Bus, in order to make tracking +# changes possible. Does everybody first invalidate its data before leaving the bus? +# And what about before taking one object away from the bus, instead of taking the +# whole service offline? +# They should! And after taking one value away, do we need to know that someone left +# the bus? Or we just keep that value in invalidated for ever? Result is that we can't +# see the difference anymore between an invalidated value and a value that was first on +# the bus and later not anymore. See comments above VeDbusItemImport as well. +# 9 there are probably more todos in the code below. + +# Some thoughts with regards to the data types: +# +# Text from: http://dbus.freedesktop.org/doc/dbus-python/doc/tutorial.html#data-types +# --- +# Variants are represented by setting the variant_level keyword argument in the +# constructor of any D-Bus data type to a value greater than 0 (variant_level 1 +# means a variant containing some other data type, variant_level 2 means a variant +# containing a variant containing some other data type, and so on). If a non-variant +# is passed as an argument but introspection indicates that a variant is expected, +# it'll automatically be wrapped in a variant. +# --- +# +# Also the different dbus datatypes, such as dbus.Int32, and dbus.UInt32 are a subclass +# of Python int. dbus.String is a subclass of Python standard class unicode, etcetera +# +# So all together that explains why we don't need to explicitly convert back and forth +# between the dbus datatypes and the standard python datatypes. Note that all datatypes +# in python are objects. Even an int is an object. + +# The signature of a variant is 'v'. + +# Export ourselves as a D-Bus service. +class VeDbusService(object): + def __init__(self, servicename, bus=None): + # dict containing the VeDbusItemExport objects, with their path as the key. + self._dbusobjects = {} + self._dbusnodes = {} + self._ratelimiters = [] + self._dbusname = None + + # dict containing the onchange callbacks, for each object. Object path is the key + self._onchangecallbacks = {} + + # Connect to session bus whenever present, else use the system bus + self._dbusconn = bus or (dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else dbus.SystemBus()) + + # make the dbus connection available to outside, could make this a true property instead, but ach.. + self.dbusconn = self._dbusconn + + # Register ourselves on the dbus, trigger an error if already in use (do_not_queue) + self._dbusname = dbus.service.BusName(servicename, self._dbusconn, do_not_queue=True) + + # Add the root item that will return all items as a tree + self._dbusnodes['/'] = VeDbusRootExport(self._dbusconn, '/', self) + + logging.info("registered ourselves on D-Bus as %s" % servicename) + + # To force immediate deregistering of this dbus service and all its object paths, explicitly + # call __del__(). + def __del__(self): + for node in list(self._dbusnodes.values()): + node.__del__() + self._dbusnodes.clear() + for item in list(self._dbusobjects.values()): + item.__del__() + self._dbusobjects.clear() + if self._dbusname: + self._dbusname.__del__() # Forces call to self._bus.release_name(self._name), see source code + self._dbusname = None + + # @param callbackonchange function that will be called when this value is changed. First parameter will + # be the path of the object, second the new value. This callback should return + # True to accept the change, False to reject it. + def add_path(self, path, value, description="", writeable=False, + onchangecallback=None, gettextcallback=None): + + if onchangecallback is not None: + self._onchangecallbacks[path] = onchangecallback + + item = VeDbusItemExport( + self._dbusconn, path, value, description, writeable, + self._value_changed, gettextcallback, deletecallback=self._item_deleted) + + spl = path.split('/') + for i in range(2, len(spl)): + subPath = '/'.join(spl[:i]) + if subPath not in self._dbusnodes and subPath not in self._dbusobjects: + self._dbusnodes[subPath] = VeDbusTreeExport(self._dbusconn, subPath, self) + self._dbusobjects[path] = item + logging.debug('added %s with start value %s. Writeable is %s' % (path, value, writeable)) + + # Add the mandatory paths, as per victron dbus api doc + def add_mandatory_paths(self, processname, processversion, connection, + deviceinstance, productid, productname, firmwareversion, hardwareversion, connected): + self.add_path('/Mgmt/ProcessName', processname) + self.add_path('/Mgmt/ProcessVersion', processversion) + self.add_path('/Mgmt/Connection', connection) + + # Create rest of the mandatory objects + self.add_path('/DeviceInstance', deviceinstance) + self.add_path('/ProductId', productid) + self.add_path('/ProductName', productname) + self.add_path('/FirmwareVersion', firmwareversion) + self.add_path('/HardwareVersion', hardwareversion) + self.add_path('/Connected', connected) + + # Callback function that is called from the VeDbusItemExport objects when a value changes. This function + # maps the change-request to the onchangecallback given to us for this specific path. + def _value_changed(self, path, newvalue): + if path not in self._onchangecallbacks: + return True + + return self._onchangecallbacks[path](path, newvalue) + + def _item_deleted(self, path): + self._dbusobjects.pop(path) + for np in list(self._dbusnodes.keys()): + if np != '/': + for ip in self._dbusobjects: + if ip.startswith(np + '/'): + break + else: + self._dbusnodes[np].__del__() + self._dbusnodes.pop(np) + + def __getitem__(self, path): + return self._dbusobjects[path].local_get_value() + + def __setitem__(self, path, newvalue): + self._dbusobjects[path].local_set_value(newvalue) + + def __delitem__(self, path): + self._dbusobjects[path].__del__() # Invalidates and then removes the object path + assert path not in self._dbusobjects + + def __contains__(self, path): + return path in self._dbusobjects + + def __enter__(self): + l = ServiceContext(self) + self._ratelimiters.append(l) + return l + + def __exit__(self, *exc): + # pop off the top one and flush it. If with statements are nested + # then each exit flushes its own part. + if self._ratelimiters: + self._ratelimiters.pop().flush() + +class ServiceContext(object): + def __init__(self, parent): + self.parent = parent + self.changes = {} + + def __getitem__(self, path): + return self.parent[path] + + def __setitem__(self, path, newvalue): + c = self.parent._dbusobjects[path]._local_set_value(newvalue) + if c is not None: + self.changes[path] = c + + def flush(self): + if self.changes: + self.parent._dbusnodes['/'].ItemsChanged(self.changes) + +class TrackerDict(defaultdict): + """ Same as defaultdict, but passes the key to default_factory. """ + def __missing__(self, key): + self[key] = x = self.default_factory(key) + return x + +class VeDbusRootTracker(object): + """ This tracks the root of a dbus path and listens for PropertiesChanged + signals. When a signal arrives, parse it and unpack the key/value changes + into traditional events, then pass it to the original eventCallback + method. """ + def __init__(self, bus, serviceName): + self.importers = defaultdict(weakref.WeakSet) + self.serviceName = serviceName + self._match = bus.get_object(serviceName, '/', introspect=False).connect_to_signal( + "ItemsChanged", weak_functor(self._items_changed_handler)) + + def __del__(self): + self._match.remove() + self._match = None + + def add(self, i): + self.importers[i.path].add(i) + + def _items_changed_handler(self, items): + if not isinstance(items, dict): + return + + for path, changes in items.items(): + try: + v = changes['Value'] + except KeyError: + continue + + try: + t = changes['Text'] + except KeyError: + t = str(unwrap_dbus_value(v)) + + for i in self.importers.get(path, ()): + i._properties_changed_handler({'Value': v, 'Text': t}) + +""" +Importing basics: + - If when we power up, the D-Bus service does not exist, or it does exist and the path does not + yet exist, still subscribe to a signal: as soon as it comes online it will send a signal with its + initial value, which VeDbusItemImport will receive and use to update local cache. And, when set, + call the eventCallback. + - If when we power up, save it + - When using get_value, know that there is no difference between services (or object paths) that don't + exist and paths that are invalid (= empty array, see above). Both will return None. In case you do + really want to know ifa path exists or not, use the exists property. + - When a D-Bus service leaves the D-Bus, it will first invalidate all its values, and send signals + with that update, and only then leave the D-Bus. (or do we need to subscribe to the NameOwnerChanged- + signal!?!) To be discussed and make sure. Not really urgent, since all existing code that uses this + class already subscribes to the NameOwnerChanged signal, and subsequently removes instances of this + class. + +Read when using this class: +Note that when a service leaves that D-Bus without invalidating all its exported objects first, for +example because it is killed, VeDbusItemImport doesn't have a clue. So when using VeDbusItemImport, +make sure to also subscribe to the NamerOwnerChanged signal on bus-level. Or just use dbusmonitor, +because that takes care of all of that for you. +""" +class VeDbusItemImport(object): + def __new__(cls, bus, serviceName, path, eventCallback=None, createsignal=True): + instance = object.__new__(cls) + + # If signal tracking should be done, also add to root tracker + if createsignal: + if "_roots" not in cls.__dict__: + cls._roots = TrackerDict(lambda k: VeDbusRootTracker(bus, k)) + + return instance + + ## Constructor + # @param bus the bus-object (SESSION or SYSTEM). + # @param serviceName the dbus-service-name (string), for example 'com.victronenergy.battery.ttyO1' + # @param path the object-path, for example '/Dc/V' + # @param eventCallback function that you want to be called on a value change + # @param createSignal only set this to False if you use this function to one time read a value. When + # leaving it to True, make sure to also subscribe to the NameOwnerChanged signal + # elsewhere. See also note some 15 lines up. + def __init__(self, bus, serviceName, path, eventCallback=None, createsignal=True): + # TODO: is it necessary to store _serviceName and _path? Isn't it + # stored in the bus_getobjectsomewhere? + self._serviceName = serviceName + self._path = path + self._match = None + # TODO: _proxy is being used in settingsdevice.py, make a getter for that + self._proxy = bus.get_object(serviceName, path, introspect=False) + self.eventCallback = eventCallback + + assert eventCallback is None or createsignal == True + if createsignal: + self._match = self._proxy.connect_to_signal( + "PropertiesChanged", weak_functor(self._properties_changed_handler)) + self._roots[serviceName].add(self) + + # store the current value in _cachedvalue. When it doesn't exists set _cachedvalue to + # None, same as when a value is invalid + self._cachedvalue = None + try: + v = self._proxy.GetValue() + except dbus.exceptions.DBusException: + pass + else: + self._cachedvalue = unwrap_dbus_value(v) + + def __del__(self): + if self._match is not None: + self._match.remove() + self._match = None + self._proxy = None + + def _refreshcachedvalue(self): + self._cachedvalue = unwrap_dbus_value(self._proxy.GetValue()) + + ## Returns the path as a string, for example '/AC/L1/V' + @property + def path(self): + return self._path + + ## Returns the dbus service name as a string, for example com.victronenergy.vebus.ttyO1 + @property + def serviceName(self): + return self._serviceName + + ## Returns the value of the dbus-item. + # the type will be a dbus variant, for example dbus.Int32(0, variant_level=1) + # this is not a property to keep the name consistant with the com.victronenergy.busitem interface + # returns None when the property is invalid + def get_value(self): + return self._cachedvalue + + ## Writes a new value to the dbus-item + def set_value(self, newvalue): + r = self._proxy.SetValue(wrap_dbus_value(newvalue)) + + # instead of just saving the value, go to the dbus and get it. So we have the right type etc. + if r == 0: + self._refreshcachedvalue() + + return r + + ## Resets the item to its default value + def set_default(self): + self._proxy.SetDefault() + self._refreshcachedvalue() + + ## Returns the text representation of the value. + # For example when the value is an enum/int GetText might return the string + # belonging to that enum value. Another example, for a voltage, GetValue + # would return a float, 12.0Volt, and GetText could return 12 VDC. + # + # Note that this depends on how the dbus-producer has implemented this. + def get_text(self): + return self._proxy.GetText() + + ## Returns true of object path exists, and false if it doesn't + @property + def exists(self): + # TODO: do some real check instead of this crazy thing. + r = False + try: + r = self._proxy.GetValue() + r = True + except dbus.exceptions.DBusException: + pass + + return r + + ## callback for the trigger-event. + # @param eventCallback the event-callback-function. + @property + def eventCallback(self): + return self._eventCallback + + @eventCallback.setter + def eventCallback(self, eventCallback): + self._eventCallback = eventCallback + + ## Is called when the value of the imported bus-item changes. + # Stores the new value in our local cache, and calls the eventCallback, if set. + def _properties_changed_handler(self, changes): + if "Value" in changes: + changes['Value'] = unwrap_dbus_value(changes['Value']) + self._cachedvalue = changes['Value'] + if self._eventCallback: + # The reason behind this try/except is to prevent errors silently ending up the an error + # handler in the dbus code. + try: + self._eventCallback(self._serviceName, self._path, changes) + except: + traceback.print_exc() + os._exit(1) # sys.exit() is not used, since that also throws an exception + + +class VeDbusTreeExport(dbus.service.Object): + def __init__(self, bus, objectPath, service): + dbus.service.Object.__init__(self, bus, objectPath) + self._service = service + logging.debug("VeDbusTreeExport %s has been created" % objectPath) + + def __del__(self): + # self._get_path() will raise an exception when retrieved after the call to .remove_from_connection, + # so we need a copy. + path = self._get_path() + if path is None: + return + self.remove_from_connection() + logging.debug("VeDbusTreeExport %s has been removed" % path) + + def _get_path(self): + if len(self._locations) == 0: + return None + return self._locations[0][1] + + def _get_value_handler(self, path, get_text=False): + logging.debug("_get_value_handler called for %s" % path) + r = {} + px = path + if not px.endswith('/'): + px += '/' + for p, item in self._service._dbusobjects.items(): + if p.startswith(px): + v = item.GetText() if get_text else wrap_dbus_value(item.local_get_value()) + r[p[len(px):]] = v + logging.debug(r) + return r + + @dbus.service.method('com.victronenergy.BusItem', out_signature='v') + def GetValue(self): + value = self._get_value_handler(self._get_path()) + return dbus.Dictionary(value, signature=dbus.Signature('sv'), variant_level=1) + + @dbus.service.method('com.victronenergy.BusItem', out_signature='v') + def GetText(self): + return self._get_value_handler(self._get_path(), True) + + def local_get_value(self): + return self._get_value_handler(self.path) + +class VeDbusRootExport(VeDbusTreeExport): + @dbus.service.signal('com.victronenergy.BusItem', signature='a{sa{sv}}') + def ItemsChanged(self, changes): + pass + + @dbus.service.method('com.victronenergy.BusItem', out_signature='a{sa{sv}}') + def GetItems(self): + return { + path: { + 'Value': wrap_dbus_value(item.local_get_value()), + 'Text': item.GetText() } + for path, item in self._service._dbusobjects.items() + } + + +class VeDbusItemExport(dbus.service.Object): + ## Constructor of VeDbusItemExport + # + # Use this object to export (publish), values on the dbus + # Creates the dbus-object under the given dbus-service-name. + # @param bus The dbus object. + # @param objectPath The dbus-object-path. + # @param value Value to initialize ourselves with, defaults to None which means Invalid + # @param description String containing a description. Can be called over the dbus with GetDescription() + # @param writeable what would this do!? :). + # @param callback Function that will be called when someone else changes the value of this VeBusItem + # over the dbus. First parameter passed to callback will be our path, second the new + # value. This callback should return True to accept the change, False to reject it. + def __init__(self, bus, objectPath, value=None, description=None, writeable=False, + onchangecallback=None, gettextcallback=None, deletecallback=None): + dbus.service.Object.__init__(self, bus, objectPath) + self._onchangecallback = onchangecallback + self._gettextcallback = gettextcallback + self._value = value + self._description = description + self._writeable = writeable + self._deletecallback = deletecallback + + # To force immediate deregistering of this dbus object, explicitly call __del__(). + def __del__(self): + # self._get_path() will raise an exception when retrieved after the + # call to .remove_from_connection, so we need a copy. + path = self._get_path() + if path == None: + return + if self._deletecallback is not None: + self._deletecallback(path) + self.local_set_value(None) + self.remove_from_connection() + logging.debug("VeDbusItemExport %s has been removed" % path) + + def _get_path(self): + if len(self._locations) == 0: + return None + return self._locations[0][1] + + ## Sets the value. And in case the value is different from what it was, a signal + # will be emitted to the dbus. This function is to be used in the python code that + # is using this class to export values to the dbus. + # set value to None to indicate that it is Invalid + def local_set_value(self, newvalue): + changes = self._local_set_value(newvalue) + if changes is not None: + self.PropertiesChanged(changes) + + def _local_set_value(self, newvalue): + if self._value == newvalue: + return None + + self._value = newvalue + return { + 'Value': wrap_dbus_value(newvalue), + 'Text': self.GetText() + } + + def local_get_value(self): + return self._value + + # ==== ALL FUNCTIONS BELOW THIS LINE WILL BE CALLED BY OTHER PROCESSES OVER THE DBUS ==== + + ## Dbus exported method SetValue + # Function is called over the D-Bus by other process. It will first check (via callback) if new + # value is accepted. And it is, stores it and emits a changed-signal. + # @param value The new value. + # @return completion-code When successful a 0 is return, and when not a -1 is returned. + @dbus.service.method('com.victronenergy.BusItem', in_signature='v', out_signature='i') + def SetValue(self, newvalue): + if not self._writeable: + return 1 # NOT OK + + newvalue = unwrap_dbus_value(newvalue) + + if newvalue == self._value: + return 0 # OK + + # call the callback given to us, and check if new value is OK. + if (self._onchangecallback is None or + (self._onchangecallback is not None and self._onchangecallback(self.__dbus_object_path__, newvalue))): + + self.local_set_value(newvalue) + return 0 # OK + + return 2 # NOT OK + + ## Dbus exported method GetDescription + # + # Returns the a description. + # @param language A language code (e.g. ISO 639-1 en-US). + # @param length Lenght of the language string. + # @return description + @dbus.service.method('com.victronenergy.BusItem', in_signature='si', out_signature='s') + def GetDescription(self, language, length): + return self._description if self._description is not None else 'No description given' + + ## Dbus exported method GetValue + # Returns the value. + # @return the value when valid, and otherwise an empty array + @dbus.service.method('com.victronenergy.BusItem', out_signature='v') + def GetValue(self): + return wrap_dbus_value(self._value) + + ## Dbus exported method GetText + # Returns the value as string of the dbus-object-path. + # @return text A text-value. '---' when local value is invalid + @dbus.service.method('com.victronenergy.BusItem', out_signature='s') + def GetText(self): + if self._value is None: + return '---' + + # Default conversion from dbus.Byte will get you a character (so 'T' instead of '84'), so we + # have to convert to int first. Note that if a dbus.Byte turns up here, it must have come from + # the application itself, as all data from the D-Bus should have been unwrapped by now. + if self._gettextcallback is None and type(self._value) == dbus.Byte: + return str(int(self._value)) + + if self._gettextcallback is None and self.__dbus_object_path__ == '/ProductId': + return "0x%X" % self._value + + if self._gettextcallback is None: + return str(self._value) + + return self._gettextcallback(self.__dbus_object_path__, self._value) + + ## The signal that indicates that the value has changed. + # Other processes connected to this BusItem object will have subscribed to the + # event when they want to track our state. + @dbus.service.signal('com.victronenergy.BusItem', signature='a{sv}') + def PropertiesChanged(self, changes): + pass + +## This class behaves like a regular reference to a class method (eg. self.foo), but keeps a weak reference +## to the object which method is to be called. +## Use this object to break circular references. +class weak_functor: + def __init__(self, f): + self._r = weakref.ref(f.__self__) + self._f = weakref.ref(f.__func__) + + def __call__(self, *args, **kargs): + r = self._r() + f = self._f() + if r == None or f == None: + return + f(r, *args, **kargs)