diff velib_python/dbusmonitor.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/velib_python/dbusmonitor.py	Sun Dec 05 14:35:36 2021 +1030
@@ -0,0 +1,592 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+## @package dbus_vrm
+# This code takes care of the D-Bus interface (not all of below is implemented yet):
+# - on startup it scans the dbus for services we know. For each known service found, it searches for
+#   objects/paths we know. Everything we find is stored in items{}, and an event is registered: if a
+#   value changes weĺl be notified and can pass that on to our owner. For example the vrmLogger.
+#   we know.
+# - after startup, it continues to monitor the dbus:
+#		1) when services are added we do the same check on that
+#		2) when services are removed, we remove any items that we had that referred to that service
+#		3) if an existing services adds paths we update ourselves as well: on init, we make a
+#		   VeDbusItemImport for a non-, or not yet existing objectpaths as well1
+# Code is used by the vrmLogger, and also the pubsub code. Both are other modules in the dbus_vrm repo.
+from dbus.mainloop.glib import DBusGMainLoop
+from gi.repository import GLib
+import dbus
+import dbus.service
+import inspect
+import logging
+import argparse
+import pprint
+import traceback
+import os
+from collections import defaultdict
+from functools import partial
+# our own packages
+from ve_utils import exit_on_error, wrap_dbus_value, unwrap_dbus_value
+notfound = object() # For lookups where None is a valid result
+logger = logging.getLogger(__name__)
+class SystemBus(dbus.bus.BusConnection):
+	def __new__(cls):
+		return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SYSTEM)
+class SessionBus(dbus.bus.BusConnection):
+	def __new__(cls):
+		return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SESSION)
+class MonitoredValue(object):
+	def __init__(self, value, text, options):
+		super(MonitoredValue, self).__init__()
+		self.value = value
+		self.text = text
+		self.options = options
+	# For legacy code, allow treating this as a tuple/list
+	def __iter__(self):
+		return iter((self.value, self.text, self.options))
+class Service(object):
+	whentologoptions = ['configChange', 'onIntervalAlwaysAndOnEvent',
+		'onIntervalOnlyWhenChanged', 'onIntervalAlways', 'never']
+	def __init__(self, id, serviceName, deviceInstance):
+		super(Service, self).__init__()
+		self.id = id
+		self.name = serviceName
+		self.paths = {}
+		self._seen = set()
+		self.deviceInstance = deviceInstance
+		self.configChange = []
+		self.onIntervalAlwaysAndOnEvent = []
+		self.onIntervalOnlyWhenChanged = []
+		self.onIntervalAlways = []
+		self.never = []
+	# For legacy code, attributes can still be accessed as if keys from a
+	# dictionary.
+	def __setitem__(self, key, value):
+		self.__dict__[key] = value
+	def __getitem__(self, key):
+		return self.__dict__[key]
+	def set_seen(self, path):
+		self._seen.add(path)
+	def seen(self, path):
+		return path in self._seen
+	@property
+	def service_class(self):
+		return '.'.join(self.name.split('.')[:3])
+class DbusMonitor(object):
+	## Constructor
+	def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None,
+					deviceRemovedCallback=None, vebusDeviceInstance0=False):
+		# valueChangedCallback is the callback that we call when something has changed.
+		# def value_changed_on_dbus(dbusServiceName, dbusPath, options, changes, deviceInstance):
+		# in which changes is a tuple with GetText() and GetValue()
+		self.valueChangedCallback = valueChangedCallback
+		self.deviceAddedCallback = deviceAddedCallback
+		self.deviceRemovedCallback = deviceRemovedCallback
+		self.dbusTree = dbusTree
+		self.vebusDeviceInstance0 = vebusDeviceInstance0
+		# Lists all tracked services. Stores name, id, device instance, value per path, and whenToLog info
+		# indexed by service name (eg. com.victronenergy.settings).
+		self.servicesByName = {}
+		# Same values as self.servicesByName, but indexed by service id (eg. :1.30)
+		self.servicesById = {}
+		# Keep track of services by class to speed up calls to get_service_list
+		self.servicesByClass = defaultdict(list)
+		# Keep track of any additional watches placed on items
+		self.serviceWatches = defaultdict(list)
+		# For a PC, connect to the SessionBus
+		# For a CCGX, connect to the SystemBus
+		self.dbusConn = SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else SystemBus()
+		# subscribe to NameOwnerChange for bus connect / disconnect events.
+		(dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ \
+			else dbus.SystemBus()).add_signal_receiver(
+			self.dbus_name_owner_changed,
+			signal_name='NameOwnerChanged')
+		# Subscribe to PropertiesChanged for all services
+		self.dbusConn.add_signal_receiver(self.handler_value_changes,
+			dbus_interface='com.victronenergy.BusItem',
+			signal_name='PropertiesChanged', path_keyword='path',
+			sender_keyword='senderId')
+		# Subscribe to ItemsChanged for all services
+		self.dbusConn.add_signal_receiver(self.handler_item_changes,
+			dbus_interface='com.victronenergy.BusItem',
+			signal_name='ItemsChanged', path='/',
+			sender_keyword='senderId')
+		logger.info('===== Search on dbus for services that we will monitor starting... =====')
+		serviceNames = self.dbusConn.list_names()
+		for serviceName in serviceNames:
+			self.scan_dbus_service(serviceName)
+		logger.info('===== Search on dbus for services that we will monitor finished =====')
+	def dbus_name_owner_changed(self, name, oldowner, newowner):
+		if not name.startswith("com.victronenergy."):
+			return
+		#decouple, and process in main loop
+		GLib.idle_add(exit_on_error, self._process_name_owner_changed, name, oldowner, newowner)
+	def _process_name_owner_changed(self, name, oldowner, newowner):
+		if newowner != '':
+			# so we found some new service. Check if we can do something with it.
+			newdeviceadded = self.scan_dbus_service(name)
+			if newdeviceadded and self.deviceAddedCallback is not None:
+				self.deviceAddedCallback(name, self.get_device_instance(name))
+		elif name in self.servicesByName:
+			# it disappeared, we need to remove it.
+			logger.info("%s disappeared from the dbus. Removing it from our lists" % name)
+			service = self.servicesByName[name]
+			deviceInstance = service['deviceInstance']
+			del self.servicesById[service.id]
+			del self.servicesByName[name]
+			for watch in self.serviceWatches[name]:
+				watch.remove()
+			del self.serviceWatches[name]
+			self.servicesByClass[service.service_class].remove(service)
+			if self.deviceRemovedCallback is not None:
+				self.deviceRemovedCallback(name, deviceInstance)
+	def scan_dbus_service(self, serviceName):
+		try:
+			return self.scan_dbus_service_inner(serviceName)
+		except:
+			logger.error("Ignoring %s because of error while scanning:" % (serviceName))
+			traceback.print_exc()
+			return False
+			# Errors 'org.freedesktop.DBus.Error.ServiceUnknown' and
+			# 'org.freedesktop.DBus.Error.Disconnected' seem to happen when the service
+			# disappears while its being scanned. Which might happen, but is not really
+			# normal either, so letting them go into the logs.
+	# Scans the given dbus service to see if it contains anything interesting for us. If it does, add
+	# it to our list of monitored D-Bus services.
+	def scan_dbus_service_inner(self, serviceName):
+		# make it a normal string instead of dbus string
+		serviceName = str(serviceName)
+		paths = self.dbusTree.get('.'.join(serviceName.split('.')[0:3]), None)
+		if paths is None:
+			logger.debug("Ignoring service %s, not in the tree" % serviceName)
+			return False
+		logger.info("Found: %s, scanning and storing items" % serviceName)
+		serviceId = self.dbusConn.get_name_owner(serviceName)
+		# we should never be notified to add a D-Bus service that we already have. If this assertion
+		# raises, check process_name_owner_changed, and D-Bus workings.
+		assert serviceName not in self.servicesByName
+		assert serviceId not in self.servicesById
+		# for vebus.ttyO1, this is workaround, since VRM Portal expects the main vebus
+		# devices at instance 0. Not sure how to fix this yet.
+		if serviceName == 'com.victronenergy.vebus.ttyO1' and self.vebusDeviceInstance0:
+			di = 0
+		elif serviceName == 'com.victronenergy.settings':
+			di = 0
+		elif serviceName.startswith('com.victronenergy.vecan.'):
+			di = 0
+		else:
+			try:
+				di = self.dbusConn.call_blocking(serviceName,
+					'/DeviceInstance', None, 'GetValue', '', [])
+			except dbus.exceptions.DBusException:
+				logger.info("       %s was skipped because it has no device instance" % serviceName)
+				return False # Skip it
+			else:
+				di = int(di)
+		logger.info("       %s has device instance %s" % (serviceName, di))
+		service = Service(serviceId, serviceName, di)
+		# Let's try to fetch everything in one go
+		values = {}
+		texts = {}
+		try:
+			values.update(self.dbusConn.call_blocking(serviceName, '/', None, 'GetValue', '', []))
+			texts.update(self.dbusConn.call_blocking(serviceName, '/', None, 'GetText', '', []))
+		except:
+			pass
+		for path, options in paths.items():
+			# path will be the D-Bus path: '/Ac/ActiveIn/L1/V'
+			# options will be a dictionary: {'code': 'V', 'whenToLog': 'onIntervalAlways'}
+			# check that the whenToLog setting is set to something we expect
+			assert options['whenToLog'] is None or options['whenToLog'] in Service.whentologoptions
+			# Try to obtain the value we want from our bulk fetch. If we
+			# cannot find it there, do an individual query.
+			value = values.get(path[1:], notfound)
+			if value != notfound:
+				service.set_seen(path)
+			text = texts.get(path[1:], notfound)
+			if value is notfound or text is notfound:
+				try:
+					value = self.dbusConn.call_blocking(serviceName, path, None, 'GetValue', '', [])
+					service.set_seen(path)
+					text = self.dbusConn.call_blocking(serviceName, path, None, 'GetText', '', [])
+				except dbus.exceptions.DBusException as e:
+					if e.get_dbus_name() in (
+							'org.freedesktop.DBus.Error.ServiceUnknown',
+							'org.freedesktop.DBus.Error.Disconnected'):
+						raise # This exception will be handled below
+					# TODO org.freedesktop.DBus.Error.UnknownMethod really
+					# shouldn't happen but sometimes does.
+					logger.debug("%s %s does not exist (yet)" % (serviceName, path))
+					value = None
+					text = None
+			service.paths[path] = MonitoredValue(unwrap_dbus_value(value), unwrap_dbus_value(text), options)
+			if options['whenToLog']:
+				service[options['whenToLog']].append(path)
+		logger.debug("Finished scanning and storing items for %s" % serviceName)
+		# Adjust self at the end of the scan, so we don't have an incomplete set of
+		# data if an exception occurs during the scan.
+		self.servicesByName[serviceName] = service
+		self.servicesById[serviceId] = service
+		self.servicesByClass[service.service_class].append(service)
+		return True
+	def handler_item_changes(self, items, senderId):
+		if not isinstance(items, dict):
+			return
+		try:
+			service = self.servicesById[senderId]
+		except KeyError:
+			# senderId isn't there, which means it hasn't been scanned yet.
+			return
+		for path, changes in items.items():
+			try:
+				v = unwrap_dbus_value(changes['Value'])
+			except (KeyError, TypeError):
+				continue
+			try:
+				t = changes['Text']
+			except KeyError:
+				t = str(v)
+			self._handler_value_changes(service, path, v, t)
+	def handler_value_changes(self, changes, path, senderId):
+		# If this properyChange does not involve a value, our work is done.
+		if 'Value' not in changes:
+			return
+		try:
+			service = self.servicesById[senderId]
+		except KeyError:
+			# senderId isn't there, which means it hasn't been scanned yet.
+			return
+		v = unwrap_dbus_value(changes['Value'])
+		# Some services don't send Text with their PropertiesChanged events.
+		try:
+			t = changes['Text']
+		except KeyError:
+			t = str(v)
+		self._handler_value_changes(service, path, v, t)
+	def _handler_value_changes(self, service, path, value, text):
+		try:
+			a = service.paths[path]
+		except KeyError:
+			# path isn't there, which means it hasn't been scanned yet.
+			return
+		service.set_seen(path)
+		# First update our store to the new value
+		if a.value == value:
+			return
+		a.value = value
+		a.text = text
+		# And do the rest of the processing in on the mainloop
+		if self.valueChangedCallback is not None:
+			GLib.idle_add(exit_on_error, self._execute_value_changes, service.name, path, {
+				'Value': value, 'Text': text}, a.options)
+	def _execute_value_changes(self, serviceName, objectPath, changes, options):
+		# double check that the service still exists, as it might have
+		# disappeared between scheduling-for and executing this function.
+		if serviceName not in self.servicesByName:
+			return
+		self.valueChangedCallback(serviceName, objectPath,
+			options, changes, self.get_device_instance(serviceName))
+	# Gets the value for a certain servicename and path
+	# The default_value is returned when:
+	# 1. When the service doesn't exist.
+	# 2. When the path asked for isn't being monitored.
+	# 3. When the path exists, but has dbus-invalid, ie an empty byte array.
+	# 4. When the path asked for is being monitored, but doesn't exist for that service.
+	def get_value(self, serviceName, objectPath, default_value=None):
+		service = self.servicesByName.get(serviceName, None)
+		if service is None:
+			return default_value
+		value = service.paths.get(objectPath, None)
+		if value is None or value.value is None:
+			return default_value
+		return value.value
+	# returns if a dbus exists now, by doing a blocking dbus call.
+	# Typically seen will be sufficient and doesn't need access to the dbus.
+	def exists(self, serviceName, objectPath):
+		try:
+			self.dbusConn.call_blocking(serviceName, objectPath, None, 'GetValue', '', [])
+			return True
+		except dbus.exceptions.DBusException as e:
+			return False
+	# Returns if there ever was a successful GetValue or valueChanged event.
+	# Unlike get_value this return True also if the actual value is invalid.
+	#
+	# Note: the path might no longer exists anymore, but that doesn't happen in
+	# practice. If a service really wants to reconfigure itself typically it should
+	# reconnect to the dbus which causes it to be rescanned and seen will be updated.
+	# If it is really needed to know if a path still exists, use exists.
+	def seen(self, serviceName, objectPath):
+		try:
+			return self.servicesByName[serviceName].seen(objectPath)
+		except KeyError:
+			return False
+	# Sets the value for a certain servicename and path, returns the return value of the D-Bus SetValue
+	# method. If the underlying item does not exist (the service does not exist, or the objectPath was not
+	# registered) the function will return -1
+	def set_value(self, serviceName, objectPath, value):
+		# Check if the D-Bus object referenced by serviceName and objectPath is registered. There is no
+		# necessity to do this, but it is in line with previous implementations which kept VeDbusItemImport
+		# objects for registers items only.
+		service = self.servicesByName.get(serviceName, None)
+		if service is None:
+			return -1
+		if objectPath not in service.paths:
+			return -1
+		# We do not catch D-Bus exceptions here, because the previous implementation did not do that either.
+		return self.dbusConn.call_blocking(serviceName, objectPath,
+				   dbus_interface='com.victronenergy.BusItem',
+				   method='SetValue', signature=None,
+				   args=[wrap_dbus_value(value)])
+	# Similar to set_value, but operates asynchronously
+	def set_value_async(self, serviceName, objectPath, value,
+			reply_handler=None, error_handler=None):
+		service = self.servicesByName.get(serviceName, None)
+		if service is not None:
+			if objectPath in service.paths:
+				self.dbusConn.call_async(serviceName, objectPath,
+					dbus_interface='com.victronenergy.BusItem',
+					method='SetValue', signature=None,
+					args=[wrap_dbus_value(value)],
+					reply_handler=reply_handler, error_handler=error_handler)
+				return
+		if error_handler is not None:
+			error_handler(TypeError('Service or path not found, '
+						'service=%s, path=%s' % (serviceName, objectPath)))
+	# returns a dictionary, keys are the servicenames, value the instances
+	# optionally use the classfilter to get only a certain type of services, for
+	# example com.victronenergy.battery.
+	def get_service_list(self, classfilter=None):
+		if classfilter is None:
+			return { servicename: service.deviceInstance \
+				for servicename, service in self.servicesByName.items() }
+		if classfilter not in self.servicesByClass:
+			return {}
+		return { service.name: service.deviceInstance \
+			for service in self.servicesByClass[classfilter] }
+	def get_device_instance(self, serviceName):
+		return self.servicesByName[serviceName].deviceInstance
+	# Parameter categoryfilter is to be a list, containing the categories you want (configChange,
+	# onIntervalAlways, etc).
+	# Returns a dictionary, keys are codes + instance, in VRM querystring format. For example vvt[0]. And
+	# values are the value.
+	def get_values(self, categoryfilter, converter=None):
+		result = {}
+		for serviceName in self.servicesByName:
+			result.update(self.get_values_for_service(categoryfilter, serviceName, converter))
+		return result
+	# same as get_values above, but then for one service only
+	def get_values_for_service(self, categoryfilter, servicename, converter=None):
+		deviceInstance = self.get_device_instance(servicename)
+		result = {}
+		service = self.servicesByName[servicename]
+		for category in categoryfilter:
+			for path in service[category]:
+				value, text, options = service.paths[path]
+				if value is not None:
+					value = value if converter is None else converter.convert(path, options['code'], value, text)
+					precision = options.get('precision')
+					if precision:
+						value = round(value, precision)
+					result[options['code'] + "[" + str(deviceInstance) + "]"] = value
+		return result
+	def track_value(self, serviceName, objectPath, callback, *args, **kwargs):
+		""" A DbusMonitor can watch specific service/path combos for changes
+		    so that it is not fully reliant on the global handler_value_changes
+		    in this class. Additional watches are deleted automatically when
+		    the service disappears from dbus. """
+		cb = partial(callback, *args, **kwargs)
+		def root_tracker(items):
+			# Check if objectPath in dict
+			try:
+				v = items[objectPath]
+				_v = unwrap_dbus_value(v['Value'])
+			except (KeyError, TypeError):
+				return # not in this dict
+			try:
+				t = v['Text']
+			except KeyError:
+				cb({'Value': _v })
+			else:
+				cb({'Value': _v, 'Text': t})
+		# Track changes on the path, and also on root
+		self.serviceWatches[serviceName].extend((
+			self.dbusConn.add_signal_receiver(cb,
+				dbus_interface='com.victronenergy.BusItem',
+				signal_name='PropertiesChanged',
+				path=objectPath, bus_name=serviceName),
+			self.dbusConn.add_signal_receiver(root_tracker,
+				dbus_interface='com.victronenergy.BusItem',
+				signal_name='ItemsChanged',
+				path="/", bus_name=serviceName),
+		))
+# Example function that can be used as a starting point to use this code
+def value_changed_on_dbus(dbusServiceName, dbusPath, dict, changes, deviceInstance):
+	logger.debug("0 ----------------")
+	logger.debug("1 %s%s changed" % (dbusServiceName, dbusPath))
+	logger.debug("2 vrm dict     : %s" % dict)
+	logger.debug("3 changes-text: %s" % changes['Text'])
+	logger.debug("4 changes-value: %s" % changes['Value'])
+	logger.debug("5 deviceInstance: %s" % deviceInstance)
+	logger.debug("6 - end")
+def nameownerchange(a, b):
+	# used to find memory leaks in dbusmonitor and VeDbusItemImport
+	import gc
+	gc.collect()
+	objects = gc.get_objects()
+	print (len([o for o in objects if type(o).__name__ == 'VeDbusItemImport']))
+	print (len([o for o in objects if type(o).__name__ == 'SignalMatch']))
+	print (len(objects))
+def print_values(dbusmonitor):
+	a = dbusmonitor.get_value('wrongservice', '/DbusInvalid', default_value=1000)
+	b = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/NotInTheMonitorList', default_value=1000)
+	c = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/DbusInvalid', default_value=1000)
+	d = dbusmonitor.get_value('com.victronenergy.dummyservice.ttyO1', '/NonExistingButMonitored', default_value=1000)
+	print ("All should be 1000: Wrong Service: %s, NotInTheMonitorList: %s, DbusInvalid: %s, NonExistingButMonitored: %s" % (a, b, c, d))
+	return True
+# We have a mainloop, but that is just for developing this code. Normally above class & code is used from
+# some other class, such as vrmLogger or the pubsub Implementation.
+def main():
+	# Init logging
+	logging.basicConfig(level=logging.DEBUG)
+	logger.info(__file__ + " is starting up")
+	# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
+	DBusGMainLoop(set_as_default=True)
+	import os
+	import sys
+	sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../../'))
+	dummy = {'code': None, 'whenToLog': 'configChange', 'accessLevel': None}
+	monitorlist = {'com.victronenergy.dummyservice': {
+				'/Connected': dummy,
+				'/ProductName': dummy,
+				'/Mgmt/Connection': dummy,
+				'/Dc/0/Voltage': dummy,
+				'/Dc/0/Current': dummy,
+				'/Dc/0/Temperature': dummy,
+				'/Load/I': dummy,
+				'/FirmwareVersion': dummy,
+				'/DbusInvalid': dummy,
+				'/NonExistingButMonitored': dummy}}
+	d = DbusMonitor(monitorlist, value_changed_on_dbus,
+		deviceAddedCallback=nameownerchange, deviceRemovedCallback=nameownerchange)
+	# logger.info("==configchange values==")
+	# logger.info(pprint.pformat(d.get_values(['configChange'])))
+	# logger.info("==onIntervalAlways and onIntervalOnlyWhenChanged==")
+	# logger.info(pprint.pformat(d.get_values(['onIntervalAlways', 'onIntervalAlwaysAndOnEvent'])))
+	GLib.timeout_add(1000, print_values, d)
+	# Start and run the mainloop
+	logger.info("Starting mainloop, responding on only events")
+	mainloop = GLib.MainLoop()
+	mainloop.run()
+if __name__ == "__main__":
+	main()