8
|
1 import dbus
|
|
2 from collections import defaultdict
|
|
3 from functools import partial
|
|
4
|
|
5 # Simulation a DbusMonitor object, without using the D-Bus (intended for unit tests). Instead of changes values
|
|
6 # on the D-Bus you can use the set_value function. set_value will automatically expand the service list. Note
|
|
7 # that all simulated D-Bus paths passed to set_value must be part of the dbusTree passed to the constructor of
|
|
8 # the monitor.
|
|
9 class MockDbusMonitor(object):
|
|
10 def __init__(self, dbusTree, valueChangedCallback=None, deviceAddedCallback=None,
|
|
11 deviceRemovedCallback=None, mountEventCallback=None, vebusDeviceInstance0=False, checkPaths=True):
|
|
12 self._services = {}
|
|
13 self._tree = {}
|
|
14 self._seen = defaultdict(set)
|
|
15 self._watches = defaultdict(dict)
|
|
16 self._checkPaths = checkPaths
|
|
17 self._value_changed_callback = valueChangedCallback
|
|
18 self._device_removed_callback = deviceRemovedCallback
|
|
19 self._device_added_callback = deviceAddedCallback
|
|
20 for s, sv in dbusTree.items():
|
|
21 service = self._tree.setdefault(s, set())
|
|
22 service.update(['/Connected', '/ProductName', '/Mgmt/Connection', '/DeviceInstance'])
|
|
23 for p in sv:
|
|
24 service.add(p)
|
|
25
|
|
26 # Gets the value for a certain servicename and path, returns the default_value when
|
|
27 # request service and objectPath combination does not not exists or when it is invalid
|
|
28 def get_value(self, serviceName, objectPath, default_value=None):
|
|
29 item = self._get_item(serviceName, objectPath)
|
|
30 if item is None:
|
|
31 return default_value
|
|
32 r = item.get_value()
|
|
33 return default_value if r is None else r
|
|
34
|
|
35 def _get_item(self, serviceName, objectPath):
|
|
36 service = self._services.get(serviceName)
|
|
37 if service is None:
|
|
38 return None
|
|
39 if objectPath not in self._tree[_class_name(serviceName)]:
|
|
40 return None
|
|
41 item = service.get(objectPath)
|
|
42 if item is None:
|
|
43 item = MockImportItem(None, valid=False)
|
|
44 service[objectPath] = item
|
|
45 return item
|
|
46
|
|
47 def exists(self, serviceName, objectPath):
|
|
48 if serviceName not in self._services:
|
|
49 return False
|
|
50 if objectPath not in self._tree[_class_name(serviceName)]:
|
|
51 return False
|
|
52 return True
|
|
53
|
|
54 def set_seen(self, serviceName, path):
|
|
55 self._seen[serviceName].add(path)
|
|
56
|
|
57 def seen(self, serviceName, objectPath):
|
|
58 return objectPath in self._seen[serviceName]
|
|
59
|
|
60 # returns a dictionary, keys are the servicenames, value the instances
|
|
61 # optionally use the classfilter to get only a certain type of services, for
|
|
62 # example com.victronenergy.battery.
|
|
63 def get_service_list(self, classfilter=None):
|
|
64 r = {}
|
|
65 for servicename,items in self._services.items():
|
|
66 if not classfilter or _class_name(servicename) == classfilter:
|
|
67 item = items.get('/DeviceInstance')
|
|
68 r[servicename] = None if item is None else item.get_value()
|
|
69 return r
|
|
70
|
|
71 def add_value(self, service, path, value):
|
|
72 class_name = _class_name(service)
|
|
73 s = self._tree.get(class_name, None)
|
|
74 if s is None:
|
|
75 raise Exception('service not found')
|
|
76 if self._checkPaths and path not in s:
|
|
77 raise Exception('Path not found: {}{} (check dbusTree passed to __init__)'.format(service, path))
|
|
78 s = self._services.setdefault(service, {})
|
|
79 s[path] = MockImportItem(value)
|
|
80 self.set_seen(service, path)
|
|
81
|
|
82 def set_value(self, serviceName, objectPath, value):
|
|
83 item = self._get_item(serviceName, objectPath)
|
|
84 if item is None:
|
|
85 return -1
|
|
86 item.set_value(value)
|
|
87 self.set_seen(serviceName, objectPath)
|
|
88 if self._value_changed_callback != None:
|
|
89 self._value_changed_callback(serviceName, objectPath, None, None, None)
|
|
90 if serviceName in self._watches:
|
|
91 if objectPath in self._watches[serviceName]:
|
|
92 self._watches[serviceName][objectPath]({'Value': value, 'Text': str(value)})
|
|
93 elif None in self._watches[serviceName]:
|
|
94 self._watches[serviceName][None]({'Value': value, 'Text': str(value)})
|
|
95 return 0
|
|
96
|
|
97 def set_value_async(self, serviceName, objectPath, value,
|
|
98 reply_handler=None, error_handler=None):
|
|
99 item = self._get_item(serviceName, objectPath)
|
|
100
|
|
101 if item is not None and item.exists:
|
|
102 item.set_value(value)
|
|
103 if reply_handler is not None:
|
|
104 reply_handler(0)
|
|
105 return
|
|
106
|
|
107 if error_handler is not None:
|
|
108 error_handler(TypeError('Service or path not found, '
|
|
109 'service=%s, path=%s' % (serviceName, objectPath)))
|
|
110
|
|
111 def add_service(self, service, values):
|
|
112 if service in self._services:
|
|
113 raise Exception('Service already exists: {}'.format(service))
|
|
114 self._services[service] = {}
|
|
115 for k,v in values.items():
|
|
116 self.add_value(service, k, v)
|
|
117 if self._device_added_callback != None:
|
|
118 self._device_added_callback(service, values.get('/DeviceInstance', 0))
|
|
119
|
|
120 def remove_service(self, service):
|
|
121 s = self._services.get(service)
|
|
122 if s is None:
|
|
123 return
|
|
124 item = s.get('/DeviceInstance')
|
|
125 instance = 0 if item is None else item.get_value()
|
|
126 for item in s.values():
|
|
127 item.set_service_exists(False)
|
|
128 self._services.pop(service)
|
|
129 if self._device_removed_callback != None:
|
|
130 self._device_removed_callback(service, instance)
|
|
131 if service in self._watches:
|
|
132 del self._watches[service]
|
|
133
|
|
134 def track_value(self, serviceName, objectPath, callback, *args, **kwargs):
|
|
135 self._watches[serviceName][objectPath] = partial(callback, *args, **kwargs)
|
|
136
|
|
137 @property
|
|
138 def dbusConn(self):
|
|
139 raise dbus.DBusException("No Connection")
|
|
140
|
|
141
|
|
142 class MockImportItem(object):
|
|
143 def __init__(self, value, valid=True, service_exists=True):
|
|
144 self._value = value
|
|
145 self._valid = valid
|
|
146 self._service_exists = service_exists
|
|
147
|
|
148 def set_service_exists(self, service_exists):
|
|
149 self._service_exists = service_exists
|
|
150
|
|
151 def get_value(self):
|
|
152 return self._value
|
|
153
|
|
154 @property
|
|
155 def exists(self):
|
|
156 return self._valid
|
|
157
|
|
158 def set_value(self, value):
|
|
159 if not self._service_exists:
|
|
160 raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.ServiceUnknown')
|
|
161 if not self._valid:
|
|
162 raise dbus.exceptions.DBusException('org.freedesktop.DBus.Error.UnknownObject')
|
|
163 self._value = value
|
|
164
|
|
165
|
|
166 def _class_name(service):
|
|
167 return '.'.join(service.split('.')[:3])
|