comparison velib_python/test/mock_dbus_monitor.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
parents
children
comparison
equal deleted inserted replaced
5:982eeffe9d95 8:9c0435a617db
1 import dbus
2 from collections import defaultdict
3 from functools import partial
4
5 # Simulation a DbusMonitor object, without using the D-Bus (intended for unit tests). Instead of changes values
6 # on the D-Bus you can use the set_value function. set_value will automatically expand the service list. Note
7 # that all simulated D-Bus paths passed to set_value must be part of the dbusTree passed to the constructor of
8 # the monitor.
9 class MockDbusMonitor(object):
10 def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None,
11 deviceRemovedCallback=None, mountEventCallback=None, vebusDeviceInstance0=False, checkPaths=True):
12 self._services = {}
13 self._tree = {}
14 self._seen = defaultdict(set)
15 self._watches = defaultdict(dict)
16 self._checkPaths = checkPaths
17 self._value_changed_callback = valueChangedCallback
18 self._device_removed_callback = deviceRemovedCallback
19 self._device_added_callback = deviceAddedCallback
20 for s, sv in dbusTree.items():
21 service = self._tree.setdefault(s, set())
22 service.update(['/Connected', '/ProductName', '/Mgmt/Connection', '/DeviceInstance'])
23 for p in sv:
24 service.add(p)
25
26 # Gets the value for a certain servicename and path, returns the default_value when
27 # request service and objectPath combination does not not exists or when it is invalid
28 def get_value(self, serviceName, objectPath, default_value=None):
29 item = self._get_item(serviceName, objectPath)
30 if item is None:
31 return default_value
32 r = item.get_value()
33 return default_value if r is None else r
34
35 def _get_item(self, serviceName, objectPath):
36 service = self._services.get(serviceName)
37 if service is None:
38 return None
39 if objectPath not in self._tree[_class_name(serviceName)]:
40 return None
41 item = service.get(objectPath)
42 if item is None:
43 item = MockImportItem(None, valid=False)
44 service[objectPath] = item
45 return item
46
47 def exists(self, serviceName, objectPath):
48 if serviceName not in self._services:
49 return False
50 if objectPath not in self._tree[_class_name(serviceName)]:
51 return False
52 return True
53
54 def set_seen(self, serviceName, path):
55 self._seen[serviceName].add(path)
56
57 def seen(self, serviceName, objectPath):
58 return objectPath in self._seen[serviceName]
59
60 # returns a dictionary, keys are the servicenames, value the instances
61 # optionally use the classfilter to get only a certain type of services, for
62 # example com.victronenergy.battery.
63 def get_service_list(self, classfilter=None):
64 r = {}
65 for servicename,items in self._services.items():
66 if not classfilter or _class_name(servicename) == classfilter:
67 item = items.get('/DeviceInstance')
68 r[servicename] = None if item is None else item.get_value()
69 return r
70
71 def add_value(self, service, path, value):
72 class_name = _class_name(service)
73 s = self._tree.get(class_name, None)
74 if s is None:
75 raise Exception('service not found')
76 if self._checkPaths and path not in s:
77 raise Exception('Path not found: {}{} (check dbusTree passed to __init__)'.format(service, path))
78 s = self._services.setdefault(service, {})
79 s[path] = MockImportItem(value)
80 self.set_seen(service, path)
81
82 def set_value(self, serviceName, objectPath, value):
83 item = self._get_item(serviceName, objectPath)
84 if item is None:
85 return -1
86 item.set_value(value)
87 self.set_seen(serviceName, objectPath)
88 if self._value_changed_callback != None:
89 self._value_changed_callback(serviceName, objectPath, None, None, None)
90 if serviceName in self._watches:
91 if objectPath in self._watches[serviceName]:
92 self._watches[serviceName][objectPath]({'Value': value, 'Text': str(value)})
93 elif None in self._watches[serviceName]:
94 self._watches[serviceName][None]({'Value': value, 'Text': str(value)})
95 return 0
96
97 def set_value_async(self, serviceName, objectPath, value,
98 reply_handler=None, error_handler=None):
99 item = self._get_item(serviceName, objectPath)
100
101 if item is not None and item.exists:
102 item.set_value(value)
103 if reply_handler is not None:
104 reply_handler(0)
105 return
106
107 if error_handler is not None:
108 error_handler(TypeError('Service or path not found, '
109 'service=%s, path=%s' % (serviceName, objectPath)))
110
111 def add_service(self, service, values):
112 if service in self._services:
113 raise Exception('Service already exists: {}'.format(service))
114 self._services[service] = {}
115 for k,v in values.items():
116 self.add_value(service, k, v)
117 if self._device_added_callback != None:
118 self._device_added_callback(service, values.get('/DeviceInstance', 0))
119
120 def remove_service(self, service):
121 s = self._services.get(service)
122 if s is None:
123 return
124 item = s.get('/DeviceInstance')
125 instance = 0 if item is None else item.get_value()
126 for item in s.values():
127 item.set_service_exists(False)
128 self._services.pop(service)
129 if self._device_removed_callback != None:
130 self._device_removed_callback(service, instance)
131 if service in self._watches:
132 del self._watches[service]
133
134 def track_value(self, serviceName, objectPath, callback, *args, **kwargs):
135 self._watches[serviceName][objectPath] = partial(callback, *args, **kwargs)
136
137 @property
138 def dbusConn(self):
139 raise dbus.DBusException("No Connection")
140
141
142 class MockImportItem(object):
143 def __init__(self, value, valid=True, service_exists=True):
144 self._value = value
145 self._valid = valid
146 self._service_exists = service_exists
147
148 def set_service_exists(self, service_exists):
149 self._service_exists = service_exists
150
151 def get_value(self):
152 return self._value
153
154 @property
155 def exists(self):
156 return self._valid
157
158 def set_value(self, value):
159 if not self._service_exists:
160 raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.ServiceUnknown')
161 if not self._valid:
162 raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.UnknownObject')
163 self._value = value
164
165
166 def _class_name(service):
167 return '.'.join(service.split('.')[:3])