Mercurial > ~darius > hgwebdir.cgi > epro
diff velib_python/dbusdummyservice.py @ 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/dbusdummyservice.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +""" +A class to put a simple service on the dbus, according to victron standards, with constantly updating +paths. See example usage below. It is used to generate dummy data for other processes that rely on the +dbus. See files in dbus_vebus_to_pvinverter/test and dbus_vrm/test for other usage examples. + +To change a value while testing, without stopping your dummy script and changing its initial value, write +to the dummy data via the dbus. See example. + +https://github.com/victronenergy/dbus_vebus_to_pvinverter/tree/master/test +""" +from gi.repository import GLib +import platform +import argparse +import logging +import sys +import os + +# our own packages +sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../ext/velib_python')) +from vedbus import VeDbusService + +class DbusDummyService(object): + def __init__(self, servicename, deviceinstance, paths, productname='Dummy product', connection='Dummy service'): + self._dbusservice = VeDbusService(servicename) + self._paths = paths + + logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance)) + + # Create the management objects, as specified in the ccgx dbus-api document + self._dbusservice.add_path('/Mgmt/ProcessName', __file__) + self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version()) + self._dbusservice.add_path('/Mgmt/Connection', connection) + + # Create the mandatory objects + self._dbusservice.add_path('/DeviceInstance', deviceinstance) + self._dbusservice.add_path('/ProductId', 0) + self._dbusservice.add_path('/ProductName', productname) + self._dbusservice.add_path('/FirmwareVersion', 0) + self._dbusservice.add_path('/HardwareVersion', 0) + self._dbusservice.add_path('/Connected', 1) + + for path, settings in self._paths.items(): + self._dbusservice.add_path( + path, settings['initial'], writeable=True, onchangecallback=self._handlechangedvalue) + + GLib.timeout_add(1000, self._update) + + def _update(self): + with self._dbusservice as s: + for path, settings in self._paths.items(): + if 'update' in settings: + update = settings['update'] + if callable(update): + s[path] = update(path, s[path]) + else: + s[path] += update + logging.debug("%s: %s" % (path, s[path])) + return True + + def _handlechangedvalue(self, path, value): + logging.debug("someone else updated %s to %s" % (path, value)) + return True # accept the change + + +# === All code below is to simply run it from the commandline for debugging purposes === + +# It will created a dbus service called com.victronenergy.pvinverter.output. +# To try this on commandline, start this program in one terminal, and try these commands +# from another terminal: +# dbus com.victronenergy.pvinverter.output +# dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward GetValue +# dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward SetValue %20 +# +# Above examples use this dbus client: http://code.google.com/p/dbus-tools/wiki/DBusCli +# See their manual to explain the % in %20 + +def main(): + logging.basicConfig(level=logging.DEBUG) + + from dbus.mainloop.glib import DBusGMainLoop + # Have a mainloop, so we can send/receive asynchronous calls to and from dbus + DBusGMainLoop(set_as_default=True) + + pvac_output = DbusDummyService( + servicename='com.victronenergy.dummyservice.ttyO1', + deviceinstance=0, + paths={ + '/Ac/Energy/Forward': {'initial': 0, 'update': 1}, + '/Position': {'initial': 0, 'update': 0}, + '/Nonupdatingvalue/UseForTestingWritesForExample': {'initial': None}, + '/DbusInvalid': {'initial': None} + }) + + logging.info('Connected to dbus, and switching over to GLib.MainLoop() (= event based)') + mainloop = GLib.MainLoop() + mainloop.run() + + +if __name__ == "__main__": + main()