diff velib_python/dbusdummyservice.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/velib_python/dbusdummyservice.py	Sun Dec 05 14:35:36 2021 +1030
@@ -0,0 +1,102 @@
+#!/usr/bin/env python3
+
+"""
+A class to put a simple service on the dbus, according to victron standards, with constantly updating
+paths. See example usage below. It is used to generate dummy data for other processes that rely on the
+dbus. See files in dbus_vebus_to_pvinverter/test and dbus_vrm/test for other usage examples.
+
+To change a value while testing, without stopping your dummy script and changing its initial value, write
+to the dummy data via the dbus. See example.
+
+https://github.com/victronenergy/dbus_vebus_to_pvinverter/tree/master/test
+"""
+from gi.repository import GLib
+import platform
+import argparse
+import logging
+import sys
+import os
+
+# our own packages
+sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../ext/velib_python'))
+from vedbus import VeDbusService
+
+class DbusDummyService(object):
+    def __init__(self, servicename, deviceinstance, paths, productname='Dummy product', connection='Dummy service'):
+        self._dbusservice = VeDbusService(servicename)
+        self._paths = paths
+
+        logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance))
+
+        # Create the management objects, as specified in the ccgx dbus-api document
+        self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
+        self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version())
+        self._dbusservice.add_path('/Mgmt/Connection', connection)
+
+        # Create the mandatory objects
+        self._dbusservice.add_path('/DeviceInstance', deviceinstance)
+        self._dbusservice.add_path('/ProductId', 0)
+        self._dbusservice.add_path('/ProductName', productname)
+        self._dbusservice.add_path('/FirmwareVersion', 0)
+        self._dbusservice.add_path('/HardwareVersion', 0)
+        self._dbusservice.add_path('/Connected', 1)
+
+        for path, settings in self._paths.items():
+            self._dbusservice.add_path(
+                path, settings['initial'], writeable=True, onchangecallback=self._handlechangedvalue)
+
+        GLib.timeout_add(1000, self._update)
+
+    def _update(self):
+        with self._dbusservice as s:
+            for path, settings in self._paths.items():
+                if 'update' in settings:
+                    update = settings['update']
+                    if callable(update):
+                        s[path] = update(path, s[path])
+                    else:
+                        s[path] += update
+                    logging.debug("%s: %s" % (path, s[path]))
+        return True
+
+    def _handlechangedvalue(self, path, value):
+        logging.debug("someone else updated %s to %s" % (path, value))
+        return True # accept the change
+
+
+# === All code below is to simply run it from the commandline for debugging purposes ===
+
+# It will created a dbus service called com.victronenergy.pvinverter.output.
+# To try this on commandline, start this program in one terminal, and try these commands
+# from another terminal:
+# dbus com.victronenergy.pvinverter.output
+# dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward GetValue
+# dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward SetValue %20
+#
+# Above examples use this dbus client: http://code.google.com/p/dbus-tools/wiki/DBusCli
+# See their manual to explain the % in %20
+
+def main():
+    logging.basicConfig(level=logging.DEBUG)
+
+    from dbus.mainloop.glib import DBusGMainLoop
+    # Have a mainloop, so we can send/receive asynchronous calls to and from dbus
+    DBusGMainLoop(set_as_default=True)
+
+    pvac_output = DbusDummyService(
+        servicename='com.victronenergy.dummyservice.ttyO1',
+        deviceinstance=0,
+        paths={
+            '/Ac/Energy/Forward': {'initial': 0, 'update': 1},
+            '/Position': {'initial': 0, 'update': 0},
+            '/Nonupdatingvalue/UseForTestingWritesForExample': {'initial': None},
+            '/DbusInvalid': {'initial': None}
+        })
+
+    logging.info('Connected to dbus, and switching over to GLib.MainLoop() (= event based)')
+    mainloop = GLib.MainLoop()
+    mainloop.run()
+
+
+if __name__ == "__main__":
+    main()