diff velib_python/test/mock_dbus_monitor.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/velib_python/test/mock_dbus_monitor.py	Sun Dec 05 14:35:36 2021 +1030
@@ -0,0 +1,167 @@
+import dbus
+from collections import defaultdict
+from functools import partial
+
+# Simulation a DbusMonitor object, without using the D-Bus (intended for unit tests). Instead of changes values
+# on the D-Bus you can use the set_value function. set_value will automatically expand the service list. Note
+# that all simulated D-Bus paths passed to set_value must be part of the dbusTree passed to the constructor of
+# the monitor.
+class MockDbusMonitor(object):
+    def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None,
+            deviceRemovedCallback=None, mountEventCallback=None, vebusDeviceInstance0=False, checkPaths=True):
+        self._services = {}
+        self._tree = {}
+        self._seen = defaultdict(set)
+        self._watches = defaultdict(dict)
+        self._checkPaths = checkPaths
+        self._value_changed_callback = valueChangedCallback
+        self._device_removed_callback = deviceRemovedCallback
+        self._device_added_callback = deviceAddedCallback
+        for s, sv in dbusTree.items():
+            service = self._tree.setdefault(s, set())
+            service.update(['/Connected', '/ProductName', '/Mgmt/Connection', '/DeviceInstance'])
+            for p in sv:
+                service.add(p)
+
+    # Gets the value for a certain servicename and path, returns the default_value when
+    # request service and objectPath combination does not not exists or when it is invalid
+    def get_value(self, serviceName, objectPath, default_value=None):
+        item = self._get_item(serviceName, objectPath)
+        if item is None:
+            return default_value
+        r = item.get_value()
+        return default_value if r is None else r
+
+    def _get_item(self, serviceName, objectPath):
+        service = self._services.get(serviceName)
+        if service is None:
+            return None
+        if objectPath not in self._tree[_class_name(serviceName)]:
+            return None
+        item = service.get(objectPath)
+        if item is None:
+            item = MockImportItem(None, valid=False)
+            service[objectPath] = item
+        return item
+
+    def exists(self, serviceName, objectPath):
+        if serviceName not in self._services:
+            return False
+        if objectPath not in self._tree[_class_name(serviceName)]:
+            return False
+        return True
+
+    def set_seen(self, serviceName, path):
+        self._seen[serviceName].add(path)
+
+    def seen(self, serviceName, objectPath):
+        return objectPath in self._seen[serviceName]
+
+    # returns a dictionary, keys are the servicenames, value the instances
+    # optionally use the classfilter to get only a certain type of services, for
+    # example com.victronenergy.battery.
+    def get_service_list(self, classfilter=None):
+        r = {}
+        for servicename,items in self._services.items():
+            if not classfilter or _class_name(servicename) == classfilter:
+                item = items.get('/DeviceInstance')
+                r[servicename] = None if item is None else item.get_value()
+        return r
+
+    def add_value(self, service, path, value):
+        class_name = _class_name(service)
+        s = self._tree.get(class_name, None)
+        if s is None:
+            raise Exception('service not found')
+        if self._checkPaths and path not in s:
+            raise Exception('Path not found: {}{} (check dbusTree passed to __init__)'.format(service, path))
+        s = self._services.setdefault(service, {})
+        s[path] = MockImportItem(value)
+        self.set_seen(service, path)
+
+    def set_value(self, serviceName, objectPath, value):
+        item = self._get_item(serviceName, objectPath)
+        if item is None:
+            return -1
+        item.set_value(value)
+        self.set_seen(serviceName, objectPath)
+        if self._value_changed_callback != None:
+            self._value_changed_callback(serviceName, objectPath, None, None, None)
+        if serviceName in self._watches:
+            if objectPath in self._watches[serviceName]:
+                self._watches[serviceName][objectPath]({'Value': value, 'Text': str(value)})
+            elif None in self._watches[serviceName]:
+                self._watches[serviceName][None]({'Value': value, 'Text': str(value)})
+        return 0
+
+    def set_value_async(self, serviceName, objectPath, value,
+            reply_handler=None, error_handler=None):
+        item = self._get_item(serviceName, objectPath)
+
+        if item is not None and item.exists:
+            item.set_value(value)
+            if reply_handler is not None:
+                reply_handler(0)
+            return
+
+        if error_handler is not None:
+            error_handler(TypeError('Service or path not found, '
+                        'service=%s, path=%s' % (serviceName, objectPath)))
+
+    def add_service(self, service, values):
+        if service in self._services:
+            raise Exception('Service already exists: {}'.format(service))
+        self._services[service] = {}
+        for k,v in values.items():
+            self.add_value(service, k, v)
+        if self._device_added_callback != None:
+            self._device_added_callback(service, values.get('/DeviceInstance', 0))
+
+    def remove_service(self, service):
+        s = self._services.get(service)
+        if s is None:
+            return
+        item = s.get('/DeviceInstance')
+        instance = 0 if item is None else item.get_value()
+        for item in s.values():
+            item.set_service_exists(False)
+        self._services.pop(service)
+        if self._device_removed_callback != None:
+            self._device_removed_callback(service, instance)
+        if service in self._watches:
+            del self._watches[service]
+
+    def track_value(self, serviceName, objectPath, callback, *args, **kwargs):
+        self._watches[serviceName][objectPath] = partial(callback, *args, **kwargs)
+
+    @property
+    def dbusConn(self):
+        raise dbus.DBusException("No Connection")
+
+
+class MockImportItem(object):
+    def __init__(self, value, valid=True, service_exists=True):
+        self._value = value
+        self._valid = valid
+        self._service_exists = service_exists
+
+    def set_service_exists(self, service_exists):
+        self._service_exists = service_exists
+
+    def get_value(self):
+        return self._value
+
+    @property
+    def exists(self):
+        return self._valid
+
+    def set_value(self, value):
+        if not self._service_exists:
+            raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.ServiceUnknown')
+        if not self._valid:
+            raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.UnknownObject')
+        self._value = value
+
+
+def _class_name(service):
+    return '.'.join(service.split('.')[:3])