Mercurial > ~darius > hgwebdir.cgi > epro
comparison velib_python/dbusdummyservice.py @ 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:982eeffe9d95 | 8:9c0435a617db |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 """ | |
4 A class to put a simple service on the dbus, according to victron standards, with constantly updating | |
5 paths. See example usage below. It is used to generate dummy data for other processes that rely on the | |
6 dbus. See files in dbus_vebus_to_pvinverter/test and dbus_vrm/test for other usage examples. | |
7 | |
8 To change a value while testing, without stopping your dummy script and changing its initial value, write | |
9 to the dummy data via the dbus. See example. | |
10 | |
11 https://github.com/victronenergy/dbus_vebus_to_pvinverter/tree/master/test | |
12 """ | |
13 from gi.repository import GLib | |
14 import platform | |
15 import argparse | |
16 import logging | |
17 import sys | |
18 import os | |
19 | |
20 # our own packages | |
21 sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../ext/velib_python')) | |
22 from vedbus import VeDbusService | |
23 | |
24 class DbusDummyService(object): | |
25 def __init__(self, servicename, deviceinstance, paths, productname='Dummy product', connection='Dummy service'): | |
26 self._dbusservice = VeDbusService(servicename) | |
27 self._paths = paths | |
28 | |
29 logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance)) | |
30 | |
31 # Create the management objects, as specified in the ccgx dbus-api document | |
32 self._dbusservice.add_path('/Mgmt/ProcessName', __file__) | |
33 self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version()) | |
34 self._dbusservice.add_path('/Mgmt/Connection', connection) | |
35 | |
36 # Create the mandatory objects | |
37 self._dbusservice.add_path('/DeviceInstance', deviceinstance) | |
38 self._dbusservice.add_path('/ProductId', 0) | |
39 self._dbusservice.add_path('/ProductName', productname) | |
40 self._dbusservice.add_path('/FirmwareVersion', 0) | |
41 self._dbusservice.add_path('/HardwareVersion', 0) | |
42 self._dbusservice.add_path('/Connected', 1) | |
43 | |
44 for path, settings in self._paths.items(): | |
45 self._dbusservice.add_path( | |
46 path, settings['initial'], writeable=True, onchangecallback=self._handlechangedvalue) | |
47 | |
48 GLib.timeout_add(1000, self._update) | |
49 | |
50 def _update(self): | |
51 with self._dbusservice as s: | |
52 for path, settings in self._paths.items(): | |
53 if 'update' in settings: | |
54 update = settings['update'] | |
55 if callable(update): | |
56 s[path] = update(path, s[path]) | |
57 else: | |
58 s[path] += update | |
59 logging.debug("%s: %s" % (path, s[path])) | |
60 return True | |
61 | |
62 def _handlechangedvalue(self, path, value): | |
63 logging.debug("someone else updated %s to %s" % (path, value)) | |
64 return True # accept the change | |
65 | |
66 | |
67 # === All code below is to simply run it from the commandline for debugging purposes === | |
68 | |
69 # It will created a dbus service called com.victronenergy.pvinverter.output. | |
70 # To try this on commandline, start this program in one terminal, and try these commands | |
71 # from another terminal: | |
72 # dbus com.victronenergy.pvinverter.output | |
73 # dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward GetValue | |
74 # dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward SetValue %20 | |
75 # | |
76 # Above examples use this dbus client: http://code.google.com/p/dbus-tools/wiki/DBusCli | |
77 # See their manual to explain the % in %20 | |
78 | |
79 def main(): | |
80 logging.basicConfig(level=logging.DEBUG) | |
81 | |
82 from dbus.mainloop.glib import DBusGMainLoop | |
83 # Have a mainloop, so we can send/receive asynchronous calls to and from dbus | |
84 DBusGMainLoop(set_as_default=True) | |
85 | |
86 pvac_output = DbusDummyService( | |
87 servicename='com.victronenergy.dummyservice.ttyO1', | |
88 deviceinstance=0, | |
89 paths={ | |
90 '/Ac/Energy/Forward': {'initial': 0, 'update': 1}, | |
91 '/Position': {'initial': 0, 'update': 0}, | |
92 '/Nonupdatingvalue/UseForTestingWritesForExample': {'initial': None}, | |
93 '/DbusInvalid': {'initial': None} | |
94 }) | |
95 | |
96 logging.info('Connected to dbus, and switching over to GLib.MainLoop() (= event based)') | |
97 mainloop = GLib.MainLoop() | |
98 mainloop.run() | |
99 | |
100 | |
101 if __name__ == "__main__": | |
102 main() |