Mercurial > ~darius > hgwebdir.cgi > epro
comparison velib_python/test/mock_dbus_monitor.py @ 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:982eeffe9d95 | 8:9c0435a617db |
---|---|
1 import dbus | |
2 from collections import defaultdict | |
3 from functools import partial | |
4 | |
5 # Simulation a DbusMonitor object, without using the D-Bus (intended for unit tests). Instead of changes values | |
6 # on the D-Bus you can use the set_value function. set_value will automatically expand the service list. Note | |
7 # that all simulated D-Bus paths passed to set_value must be part of the dbusTree passed to the constructor of | |
8 # the monitor. | |
9 class MockDbusMonitor(object): | |
10 def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None, | |
11 deviceRemovedCallback=None, mountEventCallback=None, vebusDeviceInstance0=False, checkPaths=True): | |
12 self._services = {} | |
13 self._tree = {} | |
14 self._seen = defaultdict(set) | |
15 self._watches = defaultdict(dict) | |
16 self._checkPaths = checkPaths | |
17 self._value_changed_callback = valueChangedCallback | |
18 self._device_removed_callback = deviceRemovedCallback | |
19 self._device_added_callback = deviceAddedCallback | |
20 for s, sv in dbusTree.items(): | |
21 service = self._tree.setdefault(s, set()) | |
22 service.update(['/Connected', '/ProductName', '/Mgmt/Connection', '/DeviceInstance']) | |
23 for p in sv: | |
24 service.add(p) | |
25 | |
26 # Gets the value for a certain servicename and path, returns the default_value when | |
27 # request service and objectPath combination does not not exists or when it is invalid | |
28 def get_value(self, serviceName, objectPath, default_value=None): | |
29 item = self._get_item(serviceName, objectPath) | |
30 if item is None: | |
31 return default_value | |
32 r = item.get_value() | |
33 return default_value if r is None else r | |
34 | |
35 def _get_item(self, serviceName, objectPath): | |
36 service = self._services.get(serviceName) | |
37 if service is None: | |
38 return None | |
39 if objectPath not in self._tree[_class_name(serviceName)]: | |
40 return None | |
41 item = service.get(objectPath) | |
42 if item is None: | |
43 item = MockImportItem(None, valid=False) | |
44 service[objectPath] = item | |
45 return item | |
46 | |
47 def exists(self, serviceName, objectPath): | |
48 if serviceName not in self._services: | |
49 return False | |
50 if objectPath not in self._tree[_class_name(serviceName)]: | |
51 return False | |
52 return True | |
53 | |
54 def set_seen(self, serviceName, path): | |
55 self._seen[serviceName].add(path) | |
56 | |
57 def seen(self, serviceName, objectPath): | |
58 return objectPath in self._seen[serviceName] | |
59 | |
60 # returns a dictionary, keys are the servicenames, value the instances | |
61 # optionally use the classfilter to get only a certain type of services, for | |
62 # example com.victronenergy.battery. | |
63 def get_service_list(self, classfilter=None): | |
64 r = {} | |
65 for servicename,items in self._services.items(): | |
66 if not classfilter or _class_name(servicename) == classfilter: | |
67 item = items.get('/DeviceInstance') | |
68 r[servicename] = None if item is None else item.get_value() | |
69 return r | |
70 | |
71 def add_value(self, service, path, value): | |
72 class_name = _class_name(service) | |
73 s = self._tree.get(class_name, None) | |
74 if s is None: | |
75 raise Exception('service not found') | |
76 if self._checkPaths and path not in s: | |
77 raise Exception('Path not found: {}{} (check dbusTree passed to __init__)'.format(service, path)) | |
78 s = self._services.setdefault(service, {}) | |
79 s[path] = MockImportItem(value) | |
80 self.set_seen(service, path) | |
81 | |
82 def set_value(self, serviceName, objectPath, value): | |
83 item = self._get_item(serviceName, objectPath) | |
84 if item is None: | |
85 return -1 | |
86 item.set_value(value) | |
87 self.set_seen(serviceName, objectPath) | |
88 if self._value_changed_callback != None: | |
89 self._value_changed_callback(serviceName, objectPath, None, None, None) | |
90 if serviceName in self._watches: | |
91 if objectPath in self._watches[serviceName]: | |
92 self._watches[serviceName][objectPath]({'Value': value, 'Text': str(value)}) | |
93 elif None in self._watches[serviceName]: | |
94 self._watches[serviceName][None]({'Value': value, 'Text': str(value)}) | |
95 return 0 | |
96 | |
97 def set_value_async(self, serviceName, objectPath, value, | |
98 reply_handler=None, error_handler=None): | |
99 item = self._get_item(serviceName, objectPath) | |
100 | |
101 if item is not None and item.exists: | |
102 item.set_value(value) | |
103 if reply_handler is not None: | |
104 reply_handler(0) | |
105 return | |
106 | |
107 if error_handler is not None: | |
108 error_handler(TypeError('Service or path not found, ' | |
109 'service=%s, path=%s' % (serviceName, objectPath))) | |
110 | |
111 def add_service(self, service, values): | |
112 if service in self._services: | |
113 raise Exception('Service already exists: {}'.format(service)) | |
114 self._services[service] = {} | |
115 for k,v in values.items(): | |
116 self.add_value(service, k, v) | |
117 if self._device_added_callback != None: | |
118 self._device_added_callback(service, values.get('/DeviceInstance', 0)) | |
119 | |
120 def remove_service(self, service): | |
121 s = self._services.get(service) | |
122 if s is None: | |
123 return | |
124 item = s.get('/DeviceInstance') | |
125 instance = 0 if item is None else item.get_value() | |
126 for item in s.values(): | |
127 item.set_service_exists(False) | |
128 self._services.pop(service) | |
129 if self._device_removed_callback != None: | |
130 self._device_removed_callback(service, instance) | |
131 if service in self._watches: | |
132 del self._watches[service] | |
133 | |
134 def track_value(self, serviceName, objectPath, callback, *args, **kwargs): | |
135 self._watches[serviceName][objectPath] = partial(callback, *args, **kwargs) | |
136 | |
137 @property | |
138 def dbusConn(self): | |
139 raise dbus.DBusException("No Connection") | |
140 | |
141 | |
142 class MockImportItem(object): | |
143 def __init__(self, value, valid=True, service_exists=True): | |
144 self._value = value | |
145 self._valid = valid | |
146 self._service_exists = service_exists | |
147 | |
148 def set_service_exists(self, service_exists): | |
149 self._service_exists = service_exists | |
150 | |
151 def get_value(self): | |
152 return self._value | |
153 | |
154 @property | |
155 def exists(self): | |
156 return self._valid | |
157 | |
158 def set_value(self, value): | |
159 if not self._service_exists: | |
160 raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.ServiceUnknown') | |
161 if not self._valid: | |
162 raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.UnknownObject') | |
163 self._value = value | |
164 | |
165 | |
166 def _class_name(service): | |
167 return '.'.join(service.split('.')[:3]) |