comparison velib_python/dbusdummyservice.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
parents
children
comparison
equal deleted inserted replaced
5:982eeffe9d95 8:9c0435a617db
1 #!/usr/bin/env python3
2
3 """
4 A class to put a simple service on the dbus, according to victron standards, with constantly updating
5 paths. See example usage below. It is used to generate dummy data for other processes that rely on the
6 dbus. See files in dbus_vebus_to_pvinverter/test and dbus_vrm/test for other usage examples.
7
8 To change a value while testing, without stopping your dummy script and changing its initial value, write
9 to the dummy data via the dbus. See example.
10
11 https://github.com/victronenergy/dbus_vebus_to_pvinverter/tree/master/test
12 """
13 from gi.repository import GLib
14 import platform
15 import argparse
16 import logging
17 import sys
18 import os
19
20 # our own packages
21 sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../ext/velib_python'))
22 from vedbus import VeDbusService
23
24 class DbusDummyService(object):
25 def __init__(self, servicename, deviceinstance, paths, productname='Dummy product', connection='Dummy service'):
26 self._dbusservice = VeDbusService(servicename)
27 self._paths = paths
28
29 logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance))
30
31 # Create the management objects, as specified in the ccgx dbus-api document
32 self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
33 self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version())
34 self._dbusservice.add_path('/Mgmt/Connection', connection)
35
36 # Create the mandatory objects
37 self._dbusservice.add_path('/DeviceInstance', deviceinstance)
38 self._dbusservice.add_path('/ProductId', 0)
39 self._dbusservice.add_path('/ProductName', productname)
40 self._dbusservice.add_path('/FirmwareVersion', 0)
41 self._dbusservice.add_path('/HardwareVersion', 0)
42 self._dbusservice.add_path('/Connected', 1)
43
44 for path, settings in self._paths.items():
45 self._dbusservice.add_path(
46 path, settings['initial'], writeable=True, onchangecallback=self._handlechangedvalue)
47
48 GLib.timeout_add(1000, self._update)
49
50 def _update(self):
51 with self._dbusservice as s:
52 for path, settings in self._paths.items():
53 if 'update' in settings:
54 update = settings['update']
55 if callable(update):
56 s[path] = update(path, s[path])
57 else:
58 s[path] += update
59 logging.debug("%s: %s" % (path, s[path]))
60 return True
61
62 def _handlechangedvalue(self, path, value):
63 logging.debug("someone else updated %s to %s" % (path, value))
64 return True # accept the change
65
66
67 # === All code below is to simply run it from the commandline for debugging purposes ===
68
69 # It will created a dbus service called com.victronenergy.pvinverter.output.
70 # To try this on commandline, start this program in one terminal, and try these commands
71 # from another terminal:
72 # dbus com.victronenergy.pvinverter.output
73 # dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward GetValue
74 # dbus com.victronenergy.pvinverter.output /Ac/Energy/Forward SetValue %20
75 #
76 # Above examples use this dbus client: http://code.google.com/p/dbus-tools/wiki/DBusCli
77 # See their manual to explain the % in %20
78
79 def main():
80 logging.basicConfig(level=logging.DEBUG)
81
82 from dbus.mainloop.glib import DBusGMainLoop
83 # Have a mainloop, so we can send/receive asynchronous calls to and from dbus
84 DBusGMainLoop(set_as_default=True)
85
86 pvac_output = DbusDummyService(
87 servicename='com.victronenergy.dummyservice.ttyO1',
88 deviceinstance=0,
89 paths={
90 '/Ac/Energy/Forward': {'initial': 0, 'update': 1},
91 '/Position': {'initial': 0, 'update': 0},
92 '/Nonupdatingvalue/UseForTestingWritesForExample': {'initial': None},
93 '/DbusInvalid': {'initial': None}
94 })
95
96 logging.info('Connected to dbus, and switching over to GLib.MainLoop() (= event based)')
97 mainloop = GLib.MainLoop()
98 mainloop.run()
99
100
101 if __name__ == "__main__":
102 main()