comparison velib_python/dbusmonitor.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
parents
children
comparison
equal deleted inserted replaced
5:982eeffe9d95 8:9c0435a617db
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 ## @package dbus_vrm
5 # This code takes care of the D-Bus interface (not all of below is implemented yet):
6 # - on startup it scans the dbus for services we know. For each known service found, it searches for
7 # objects/paths we know. Everything we find is stored in items{}, and an event is registered: if a
8 # value changes weĺl be notified and can pass that on to our owner. For example the vrmLogger.
9 # we know.
10 # - after startup, it continues to monitor the dbus:
11 # 1) when services are added we do the same check on that
12 # 2) when services are removed, we remove any items that we had that referred to that service
13 # 3) if an existing services adds paths we update ourselves as well: on init, we make a
14 # VeDbusItemImport for a non-, or not yet existing objectpaths as well1
15 #
16 # Code is used by the vrmLogger, and also the pubsub code. Both are other modules in the dbus_vrm repo.
17
18 from dbus.mainloop.glib import DBusGMainLoop
19 from gi.repository import GLib
20 import dbus
21 import dbus.service
22 import inspect
23 import logging
24 import argparse
25 import pprint
26 import traceback
27 import os
28 from collections import defaultdict
29 from functools import partial
30
31 # our own packages
32 from ve_utils import exit_on_error, wrap_dbus_value, unwrap_dbus_value
33 notfound = object() # For lookups where None is a valid result
34
35 logger = logging.getLogger(__name__)
36 logger.setLevel(logging.INFO)
37 class SystemBus(dbus.bus.BusConnection):
38 def __new__(cls):
39 return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SYSTEM)
40
41 class SessionBus(dbus.bus.BusConnection):
42 def __new__(cls):
43 return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SESSION)
44
45 class MonitoredValue(object):
46 def __init__(self, value, text, options):
47 super(MonitoredValue, self).__init__()
48 self.value = value
49 self.text = text
50 self.options = options
51
52 # For legacy code, allow treating this as a tuple/list
53 def __iter__(self):
54 return iter((self.value, self.text, self.options))
55
56 class Service(object):
57 whentologoptions = ['configChange', 'onIntervalAlwaysAndOnEvent',
58 'onIntervalOnlyWhenChanged', 'onIntervalAlways', 'never']
59 def __init__(self, id, serviceName, deviceInstance):
60 super(Service, self).__init__()
61 self.id = id
62 self.name = serviceName
63 self.paths = {}
64 self._seen = set()
65 self.deviceInstance = deviceInstance
66
67 self.configChange = []
68 self.onIntervalAlwaysAndOnEvent = []
69 self.onIntervalOnlyWhenChanged = []
70 self.onIntervalAlways = []
71 self.never = []
72
73 # For legacy code, attributes can still be accessed as if keys from a
74 # dictionary.
75 def __setitem__(self, key, value):
76 self.__dict__[key] = value
77 def __getitem__(self, key):
78 return self.__dict__[key]
79
80 def set_seen(self, path):
81 self._seen.add(path)
82
83 def seen(self, path):
84 return path in self._seen
85
86 @property
87 def service_class(self):
88 return '.'.join(self.name.split('.')[:3])
89
90 class DbusMonitor(object):
91 ## Constructor
92 def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None,
93 deviceRemovedCallback=None, vebusDeviceInstance0=False):
94 # valueChangedCallback is the callback that we call when something has changed.
95 # def value_changed_on_dbus(dbusServiceName, dbusPath, options, changes, deviceInstance):
96 # in which changes is a tuple with GetText() and GetValue()
97 self.valueChangedCallback = valueChangedCallback
98 self.deviceAddedCallback = deviceAddedCallback
99 self.deviceRemovedCallback = deviceRemovedCallback
100 self.dbusTree = dbusTree
101 self.vebusDeviceInstance0 = vebusDeviceInstance0
102
103 # Lists all tracked services. Stores name, id, device instance, value per path, and whenToLog info
104 # indexed by service name (eg. com.victronenergy.settings).
105 self.servicesByName = {}
106
107 # Same values as self.servicesByName, but indexed by service id (eg. :1.30)
108 self.servicesById = {}
109
110 # Keep track of services by class to speed up calls to get_service_list
111 self.servicesByClass = defaultdict(list)
112
113 # Keep track of any additional watches placed on items
114 self.serviceWatches = defaultdict(list)
115
116 # For a PC, connect to the SessionBus
117 # For a CCGX, connect to the SystemBus
118 self.dbusConn = SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else SystemBus()
119
120 # subscribe to NameOwnerChange for bus connect / disconnect events.
121 (dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ \
122 else dbus.SystemBus()).add_signal_receiver(
123 self.dbus_name_owner_changed,
124 signal_name='NameOwnerChanged')
125
126 # Subscribe to PropertiesChanged for all services
127 self.dbusConn.add_signal_receiver(self.handler_value_changes,
128 dbus_interface='com.victronenergy.BusItem',
129 signal_name='PropertiesChanged', path_keyword='path',
130 sender_keyword='senderId')
131
132 # Subscribe to ItemsChanged for all services
133 self.dbusConn.add_signal_receiver(self.handler_item_changes,
134 dbus_interface='com.victronenergy.BusItem',
135 signal_name='ItemsChanged', path='/',
136 sender_keyword='senderId')
137
138 logger.info('===== Search on dbus for services that we will monitor starting... =====')
139 serviceNames = self.dbusConn.list_names()
140 for serviceName in serviceNames:
141 self.scan_dbus_service(serviceName)
142
143 logger.info('===== Search on dbus for services that we will monitor finished =====')
144
145 def dbus_name_owner_changed(self, name, oldowner, newowner):
146 if not name.startswith("com.victronenergy."):
147 return
148
149 #decouple, and process in main loop
150 GLib.idle_add(exit_on_error, self._process_name_owner_changed, name, oldowner, newowner)
151
152 def _process_name_owner_changed(self, name, oldowner, newowner):
153 if newowner != '':
154 # so we found some new service. Check if we can do something with it.
155 newdeviceadded = self.scan_dbus_service(name)
156 if newdeviceadded and self.deviceAddedCallback is not None:
157 self.deviceAddedCallback(name, self.get_device_instance(name))
158
159 elif name in self.servicesByName:
160 # it disappeared, we need to remove it.
161 logger.info("%s disappeared from the dbus. Removing it from our lists" % name)
162 service = self.servicesByName[name]
163 deviceInstance = service['deviceInstance']
164 del self.servicesById[service.id]
165 del self.servicesByName[name]
166 for watch in self.serviceWatches[name]:
167 watch.remove()
168 del self.serviceWatches[name]
169 self.servicesByClass[service.service_class].remove(service)
170 if self.deviceRemovedCallback is not None:
171 self.deviceRemovedCallback(name, deviceInstance)
172
173 def scan_dbus_service(self, serviceName):
174 try:
175 return self.scan_dbus_service_inner(serviceName)
176 except:
177 logger.error("Ignoring %s because of error while scanning:" % (serviceName))
178 traceback.print_exc()
179 return False
180
181 # Errors 'org.freedesktop.DBus.Error.ServiceUnknown' and
182 # 'org.freedesktop.DBus.Error.Disconnected' seem to happen when the service
183 # disappears while its being scanned. Which might happen, but is not really
184 # normal either, so letting them go into the logs.
185
186 # Scans the given dbus service to see if it contains anything interesting for us. If it does, add
187 # it to our list of monitored D-Bus services.
188 def scan_dbus_service_inner(self, serviceName):
189
190 # make it a normal string instead of dbus string
191 serviceName = str(serviceName)
192
193 paths = self.dbusTree.get('.'.join(serviceName.split('.')[0:3]), None)
194 if paths is None:
195 logger.debug("Ignoring service %s, not in the tree" % serviceName)
196 return False
197
198 logger.info("Found: %s, scanning and storing items" % serviceName)
199 serviceId = self.dbusConn.get_name_owner(serviceName)
200
201 # we should never be notified to add a D-Bus service that we already have. If this assertion
202 # raises, check process_name_owner_changed, and D-Bus workings.
203 assert serviceName not in self.servicesByName
204 assert serviceId not in self.servicesById
205
206 # for vebus.ttyO1, this is workaround, since VRM Portal expects the main vebus
207 # devices at instance 0. Not sure how to fix this yet.
208 if serviceName == 'com.victronenergy.vebus.ttyO1' and self.vebusDeviceInstance0:
209 di = 0
210 elif serviceName == 'com.victronenergy.settings':
211 di = 0
212 elif serviceName.startswith('com.victronenergy.vecan.'):
213 di = 0
214 else:
215 try:
216 di = self.dbusConn.call_blocking(serviceName,
217 '/DeviceInstance', None, 'GetValue', '', [])
218 except dbus.exceptions.DBusException:
219 logger.info(" %s was skipped because it has no device instance" % serviceName)
220 return False # Skip it
221 else:
222 di = int(di)
223
224 logger.info(" %s has device instance %s" % (serviceName, di))
225 service = Service(serviceId, serviceName, di)
226
227 # Let's try to fetch everything in one go
228 values = {}
229 texts = {}
230 try:
231 values.update(self.dbusConn.call_blocking(serviceName, '/', None, 'GetValue', '', []))
232 texts.update(self.dbusConn.call_blocking(serviceName, '/', None, 'GetText', '', []))
233 except:
234 pass
235
236 for path, options in paths.items():
237 # path will be the D-Bus path: '/Ac/ActiveIn/L1/V'
238 # options will be a dictionary: {'code': 'V', 'whenToLog': 'onIntervalAlways'}
239 # check that the whenToLog setting is set to something we expect
240 assert options['whenToLog'] is None or options['whenToLog'] in Service.whentologoptions
241
242 # Try to obtain the value we want from our bulk fetch. If we
243 # cannot find it there, do an individual query.
244 value = values.get(path[1:], notfound)
245 if value != notfound:
246 service.set_seen(path)
247 text = texts.get(path[1:], notfound)
248 if value is notfound or text is notfound:
249 try:
250 value = self.dbusConn.call_blocking(serviceName, path, None, 'GetValue', '', [])
251 service.set_seen(path)
252 text = self.dbusConn.call_blocking(serviceName, path, None, 'GetText', '', [])
253 except dbus.exceptions.DBusException as e:
254 if e.get_dbus_name() in (
255 'org.freedesktop.DBus.Error.ServiceUnknown',
256 'org.freedesktop.DBus.Error.Disconnected'):
257 raise # This exception will be handled below
258
259 # TODO org.freedesktop.DBus.Error.UnknownMethod really
260 # shouldn't happen but sometimes does.
261 logger.debug("%s %s does not exist (yet)" % (serviceName, path))
262 value = None
263 text = None
264
265 service.paths[path] = MonitoredValue(unwrap_dbus_value(value), unwrap_dbus_value(text), options)
266
267 if options['whenToLog']:
268 service[options['whenToLog']].append(path)
269
270
271 logger.debug("Finished scanning and storing items for %s" % serviceName)
272
273 # Adjust self at the end of the scan, so we don't have an incomplete set of
274 # data if an exception occurs during the scan.
275 self.servicesByName[serviceName] = service
276 self.servicesById[serviceId] = service
277 self.servicesByClass[service.service_class].append(service)
278
279 return True
280
281 def handler_item_changes(self, items, senderId):
282 if not isinstance(items, dict):
283 return
284
285 try:
286 service = self.servicesById[senderId]
287 except KeyError:
288 # senderId isn't there, which means it hasn't been scanned yet.
289 return
290
291 for path, changes in items.items():
292 try:
293 v = unwrap_dbus_value(changes['Value'])
294 except (KeyError, TypeError):
295 continue
296
297 try:
298 t = changes['Text']
299 except KeyError:
300 t = str(v)
301 self._handler_value_changes(service, path, v, t)
302
303 def handler_value_changes(self, changes, path, senderId):
304 # If this properyChange does not involve a value, our work is done.
305 if 'Value' not in changes:
306 return
307
308 try:
309 service = self.servicesById[senderId]
310 except KeyError:
311 # senderId isn't there, which means it hasn't been scanned yet.
312 return
313
314 v = unwrap_dbus_value(changes['Value'])
315 # Some services don't send Text with their PropertiesChanged events.
316 try:
317 t = changes['Text']
318 except KeyError:
319 t = str(v)
320 self._handler_value_changes(service, path, v, t)
321
322 def _handler_value_changes(self, service, path, value, text):
323 try:
324 a = service.paths[path]
325 except KeyError:
326 # path isn't there, which means it hasn't been scanned yet.
327 return
328
329 service.set_seen(path)
330
331 # First update our store to the new value
332 if a.value == value:
333 return
334
335 a.value = value
336 a.text = text
337
338 # And do the rest of the processing in on the mainloop
339 if self.valueChangedCallback is not None:
340 GLib.idle_add(exit_on_error, self._execute_value_changes, service.name, path, {
341 'Value': value, 'Text': text}, a.options)
342
343 def _execute_value_changes(self, serviceName, objectPath, changes, options):
344 # double check that the service still exists, as it might have
345 # disappeared between scheduling-for and executing this function.
346 if serviceName not in self.servicesByName:
347 return
348
349 self.valueChangedCallback(serviceName, objectPath,
350 options, changes, self.get_device_instance(serviceName))
351
352 # Gets the value for a certain servicename and path
353 # The default_value is returned when:
354 # 1. When the service doesn't exist.
355 # 2. When the path asked for isn't being monitored.
356 # 3. When the path exists, but has dbus-invalid, ie an empty byte array.
357 # 4. When the path asked for is being monitored, but doesn't exist for that service.
358 def get_value(self, serviceName, objectPath, default_value=None):
359 service = self.servicesByName.get(serviceName, None)
360 if service is None:
361 return default_value
362
363 value = service.paths.get(objectPath, None)
364 if value is None or value.value is None:
365 return default_value
366
367 return value.value
368
369 # returns if a dbus exists now, by doing a blocking dbus call.
370 # Typically seen will be sufficient and doesn't need access to the dbus.
371 def exists(self, serviceName, objectPath):
372 try:
373 self.dbusConn.call_blocking(serviceName, objectPath, None, 'GetValue', '', [])
374 return True
375 except dbus.exceptions.DBusException as e:
376 return False
377
378 # Returns if there ever was a successful GetValue or valueChanged event.
379 # Unlike get_value this return True also if the actual value is invalid.
380 #
381 # Note: the path might no longer exists anymore, but that doesn't happen in
382 # practice. If a service really wants to reconfigure itself typically it should
383 # reconnect to the dbus which causes it to be rescanned and seen will be updated.
384 # If it is really needed to know if a path still exists, use exists.
385 def seen(self, serviceName, objectPath):
386 try:
387 return self.servicesByName[serviceName].seen(objectPath)
388 except KeyError:
389 return False
390
391 # Sets the value for a certain servicename and path, returns the return value of the D-Bus SetValue
392 # method. If the underlying item does not exist (the service does not exist, or the objectPath was not
393 # registered) the function will return -1
394 def set_value(self, serviceName, objectPath, value):
395 # Check if the D-Bus object referenced by serviceName and objectPath is registered. There is no
396 # necessity to do this, but it is in line with previous implementations which kept VeDbusItemImport
397 # objects for registers items only.
398 service = self.servicesByName.get(serviceName, None)
399 if service is None:
400 return -1
401 if objectPath not in service.paths:
402 return -1
403 # We do not catch D-Bus exceptions here, because the previous implementation did not do that either.
404 return self.dbusConn.call_blocking(serviceName, objectPath,
405 dbus_interface='com.victronenergy.BusItem',
406 method='SetValue', signature=None,
407 args=[wrap_dbus_value(value)])
408
409 # Similar to set_value, but operates asynchronously
410 def set_value_async(self, serviceName, objectPath, value,
411 reply_handler=None, error_handler=None):
412 service = self.servicesByName.get(serviceName, None)
413 if service is not None:
414 if objectPath in service.paths:
415 self.dbusConn.call_async(serviceName, objectPath,
416 dbus_interface='com.victronenergy.BusItem',
417 method='SetValue', signature=None,
418 args=[wrap_dbus_value(value)],
419 reply_handler=reply_handler, error_handler=error_handler)
420 return
421
422 if error_handler is not None:
423 error_handler(TypeError('Service or path not found, '
424 'service=%s, path=%s' % (serviceName, objectPath)))
425
426 # returns a dictionary, keys are the servicenames, value the instances
427 # optionally use the classfilter to get only a certain type of services, for
428 # example com.victronenergy.battery.
429 def get_service_list(self, classfilter=None):
430 if classfilter is None:
431 return { servicename: service.deviceInstance \
432 for servicename, service in self.servicesByName.items() }
433
434 if classfilter not in self.servicesByClass:
435 return {}
436
437 return { service.name: service.deviceInstance \
438 for service in self.servicesByClass[classfilter] }
439
440 def get_device_instance(self, serviceName):
441 return self.servicesByName[serviceName].deviceInstance
442
443 # Parameter categoryfilter is to be a list, containing the categories you want (configChange,
444 # onIntervalAlways, etc).
445 # Returns a dictionary, keys are codes + instance, in VRM querystring format. For example vvt[0]. And
446 # values are the value.
447 def get_values(self, categoryfilter, converter=None):
448
449 result = {}
450
451 for serviceName in self.servicesByName:
452 result.update(self.get_values_for_service(categoryfilter, serviceName, converter))
453
454 return result
455
456 # same as get_values above, but then for one service only
457 def get_values_for_service(self, categoryfilter, servicename, converter=None):
458 deviceInstance = self.get_device_instance(servicename)
459 result = {}
460
461 service = self.servicesByName[servicename]
462
463 for category in categoryfilter:
464
465 for path in service[category]:
466
467 value, text, options = service.paths[path]
468
469 if value is not None:
470
471 value = value if converter is None else converter.convert(path, options['code'], value, text)
472
473 precision = options.get('precision')
474 if precision:
475 value = round(value, precision)
476
477 result[options['code'] + "[" + str(deviceInstance) + "]"] = value
478
479 return result
480
481 def track_value(self, serviceName, objectPath, callback, *args, **kwargs):
482 """ A DbusMonitor can watch specific service/path combos for changes
483 so that it is not fully reliant on the global handler_value_changes
484 in this class. Additional watches are deleted automatically when
485 the service disappears from dbus. """
486 cb = partial(callback, *args, **kwargs)
487
488 def root_tracker(items):
489 # Check if objectPath in dict
490 try:
491 v = items[objectPath]
492 _v = unwrap_dbus_value(v['Value'])
493 except (KeyError, TypeError):
494 return # not in this dict
495
496 try:
497 t = v['Text']
498 except KeyError:
499 cb({'Value': _v })
500 else:
501 cb({'Value': _v, 'Text': t})
502
503 # Track changes on the path, and also on root
504 self.serviceWatches[serviceName].extend((
505 self.dbusConn.add_signal_receiver(cb,
506 dbus_interface='com.victronenergy.BusItem',
507 signal_name='PropertiesChanged',
508 path=objectPath, bus_name=serviceName),
509 self.dbusConn.add_signal_receiver(root_tracker,
510 dbus_interface='com.victronenergy.BusItem',
511 signal_name='ItemsChanged',
512 path="/", bus_name=serviceName),
513 ))
514
515
516 # ====== ALL CODE BELOW THIS LINE IS PURELY FOR DEVELOPING THIS CLASS ======
517
518 # Example function that can be used as a starting point to use this code
519 def value_changed_on_dbus(dbusServiceName, dbusPath, dict, changes, deviceInstance):
520 logger.debug("0 ----------------")
521 logger.debug("1 %s%s changed" % (dbusServiceName, dbusPath))
522 logger.debug("2 vrm dict : %s" % dict)
523 logger.debug("3 changes-text: %s" % changes['Text'])
524 logger.debug("4 changes-value: %s" % changes['Value'])
525 logger.debug("5 deviceInstance: %s" % deviceInstance)
526 logger.debug("6 - end")
527
528
529 def nameownerchange(a, b):
530 # used to find memory leaks in dbusmonitor and VeDbusItemImport
531 import gc
532 gc.collect()
533 objects = gc.get_objects()
534 print (len([o for o in objects if type(o).__name__ == 'VeDbusItemImport']))
535 print (len([o for o in objects if type(o).__name__ == 'SignalMatch']))
536 print (len(objects))
537
538
539 def print_values(dbusmonitor):
540 a = dbusmonitor.get_value('wrongservice', '/DbusInvalid', default_value=1000)
541 b = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/NotInTheMonitorList', default_value=1000)
542 c = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/DbusInvalid', default_value=1000)
543 d = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/NonExistingButMonitored', default_value=1000)
544
545 print ("All should be 1000: Wrong Service: %s, NotInTheMonitorList: %s, DbusInvalid: %s, NonExistingButMonitored: %s" % (a, b, c, d))
546 return True
547
548 # We have a mainloop, but that is just for developing this code. Normally above class & code is used from
549 # some other class, such as vrmLogger or the pubsub Implementation.
550 def main():
551 # Init logging
552 logging.basicConfig(level=logging.DEBUG)
553 logger.info(__file__ + " is starting up")
554
555 # Have a mainloop, so we can send/receive asynchronous calls to and from dbus
556 DBusGMainLoop(set_as_default=True)
557
558 import os
559 import sys
560 sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../../'))
561
562 dummy = {'code': None, 'whenToLog': 'configChange', 'accessLevel': None}
563 monitorlist = {'com.victronenergy.dummyservice': {
564 '/Connected': dummy,
565 '/ProductName': dummy,
566 '/Mgmt/Connection': dummy,
567 '/Dc/0/Voltage': dummy,
568 '/Dc/0/Current': dummy,
569 '/Dc/0/Temperature': dummy,
570 '/Load/I': dummy,
571 '/FirmwareVersion': dummy,
572 '/DbusInvalid': dummy,
573 '/NonExistingButMonitored': dummy}}
574
575 d = DbusMonitor(monitorlist, value_changed_on_dbus,
576 deviceAddedCallback=nameownerchange, deviceRemovedCallback=nameownerchange)
577
578 # logger.info("==configchange values==")
579 # logger.info(pprint.pformat(d.get_values(['configChange'])))
580
581 # logger.info("==onIntervalAlways and onIntervalOnlyWhenChanged==")
582 # logger.info(pprint.pformat(d.get_values(['onIntervalAlways', 'onIntervalAlwaysAndOnEvent'])))
583
584 GLib.timeout_add(1000, print_values, d)
585
586 # Start and run the mainloop
587 logger.info("Starting mainloop, responding on only events")
588 mainloop = GLib.MainLoop()
589 mainloop.run()
590
591 if __name__ == "__main__":
592 main()