8
|
1 #!/usr/bin/env python3
|
|
2 # -*- coding: utf-8 -*-
|
|
3 import sys
|
|
4 from traceback import print_exc
|
|
5 from os import _exit as os_exit
|
|
6 from os import statvfs
|
|
7 from subprocess import check_output, CalledProcessError
|
|
8 import logging
|
|
9 import dbus
|
|
10 logger = logging.getLogger(__name__)
|
|
11
|
|
12 VEDBUS_INVALID = dbus.Array([], signature=dbus.Signature('i'), variant_level=1)
|
|
13
|
|
14 class NoVrmPortalIdError(Exception):
|
|
15 pass
|
|
16
|
|
17 # Use this function to make sure the code quits on an unexpected exception. Make sure to use it
|
|
18 # when using GLib.idle_add and also GLib.timeout_add.
|
|
19 # Without this, the code will just keep running, since GLib does not stop the mainloop on an
|
|
20 # exception.
|
|
21 # Example: GLib.idle_add(exit_on_error, myfunc, arg1, arg2)
|
|
22 def exit_on_error(func, *args, **kwargs):
|
|
23 try:
|
|
24 return func(*args, **kwargs)
|
|
25 except:
|
|
26 try:
|
|
27 print ('exit_on_error: there was an exception. Printing stacktrace will be tried and then exit')
|
|
28 print_exc()
|
|
29 except:
|
|
30 pass
|
|
31
|
|
32 # sys.exit() is not used, since that throws an exception, which does not lead to a program
|
|
33 # halt when used in a dbus callback, see connection.py in the Python/Dbus libraries, line 230.
|
|
34 os_exit(1)
|
|
35
|
|
36
|
|
37 __vrm_portal_id = None
|
|
38 def get_vrm_portal_id():
|
|
39 # The original definition of the VRM Portal ID is that it is the mac
|
|
40 # address of the onboard- ethernet port (eth0), stripped from its colons
|
|
41 # (:) and lower case. This may however differ between platforms. On Venus
|
|
42 # the task is therefore deferred to /sbin/get-unique-id so that a
|
|
43 # platform specific method can be easily defined.
|
|
44 #
|
|
45 # If /sbin/get-unique-id does not exist, then use the ethernet address
|
|
46 # of eth0. This also handles the case where velib_python is used as a
|
|
47 # package install on a Raspberry Pi.
|
|
48 #
|
|
49 # On a Linux host where the network interface may not be eth0, you can set
|
|
50 # the VRM_IFACE environment variable to the correct name.
|
|
51
|
|
52 global __vrm_portal_id
|
|
53
|
|
54 if __vrm_portal_id:
|
|
55 return __vrm_portal_id
|
|
56
|
|
57 portal_id = None
|
|
58
|
|
59 # First try the method that works if we don't have a data partition. This
|
|
60 # will fail when the current user is not root.
|
|
61 try:
|
|
62 portal_id = check_output("/sbin/get-unique-id").decode("utf-8", "ignore").strip()
|
|
63 if not portal_id:
|
|
64 raise NoVrmPortalIdError("get-unique-id returned blank")
|
|
65 __vrm_portal_id = portal_id
|
|
66 return portal_id
|
|
67 except CalledProcessError:
|
|
68 # get-unique-id returned non-zero
|
|
69 raise NoVrmPortalIdError("get-unique-id returned non-zero")
|
|
70 except OSError:
|
|
71 # File doesn't exist, use fallback
|
|
72 pass
|
|
73
|
|
74 # Fall back to getting our id using a syscall. Assume we are on linux.
|
|
75 # Allow the user to override what interface is used using an environment
|
|
76 # variable.
|
|
77 import fcntl, socket, struct, os
|
|
78
|
|
79 iface = os.environ.get('VRM_IFACE', 'eth0').encode('ascii')
|
|
80 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
81 try:
|
|
82 info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', iface[:15]))
|
|
83 except IOError:
|
|
84 raise NoVrmPortalIdError("ioctl failed for eth0")
|
|
85
|
|
86 __vrm_portal_id = info[18:24].hex()
|
|
87 return __vrm_portal_id
|
|
88
|
|
89
|
|
90 # See VE.Can registers - public.docx for definition of this conversion
|
|
91 def convert_vreg_version_to_readable(version):
|
|
92 def str_to_arr(x, length):
|
|
93 a = []
|
|
94 for i in range(0, len(x), length):
|
|
95 a.append(x[i:i+length])
|
|
96 return a
|
|
97
|
|
98 x = "%x" % version
|
|
99 x = x.upper()
|
|
100
|
|
101 if len(x) == 5 or len(x) == 3 or len(x) == 1:
|
|
102 x = '0' + x
|
|
103
|
|
104 a = str_to_arr(x, 2);
|
|
105
|
|
106 # remove the first 00 if there are three bytes and it is 00
|
|
107 if len(a) == 3 and a[0] == '00':
|
|
108 a.remove(0);
|
|
109
|
|
110 # if we have two or three bytes now, and the first character is a 0, remove it
|
|
111 if len(a) >= 2 and a[0][0:1] == '0':
|
|
112 a[0] = a[0][1];
|
|
113
|
|
114 result = ''
|
|
115 for item in a:
|
|
116 result += ('.' if result != '' else '') + item
|
|
117
|
|
118
|
|
119 result = 'v' + result
|
|
120
|
|
121 return result
|
|
122
|
|
123
|
|
124 def get_free_space(path):
|
|
125 result = -1
|
|
126
|
|
127 try:
|
|
128 s = statvfs(path)
|
|
129 result = s.f_frsize * s.f_bavail # Number of free bytes that ordinary users
|
|
130 except Exception as ex:
|
|
131 logger.info("Error while retrieving free space for path %s: %s" % (path, ex))
|
|
132
|
|
133 return result
|
|
134
|
|
135
|
|
136 def get_load_averages():
|
|
137 c = read_file('/proc/loadavg')
|
|
138 return c.split(' ')[:3]
|
|
139
|
|
140
|
|
141 def _get_sysfs_machine_name():
|
|
142 try:
|
|
143 with open('/sys/firmware/devicetree/base/model', 'r') as f:
|
|
144 return f.read().rstrip('\x00')
|
|
145 except IOError:
|
|
146 pass
|
|
147
|
|
148 return None
|
|
149
|
|
150 # Returns None if it cannot find a machine name. Otherwise returns the string
|
|
151 # containing the name
|
|
152 def get_machine_name():
|
|
153 # First try calling the venus utility script
|
|
154 try:
|
|
155 return check_output("/usr/bin/product-name").strip().decode('UTF-8')
|
|
156 except (CalledProcessError, OSError):
|
|
157 pass
|
|
158
|
|
159 # Fall back to sysfs
|
|
160 name = _get_sysfs_machine_name()
|
|
161 if name is not None:
|
|
162 return name
|
|
163
|
|
164 # Fall back to venus build machine name
|
|
165 try:
|
|
166 with open('/etc/venus/machine', 'r', encoding='UTF-8') as f:
|
|
167 return f.read().strip()
|
|
168 except IOError:
|
|
169 pass
|
|
170
|
|
171 return None
|
|
172
|
|
173
|
|
174 def get_product_id():
|
|
175 """ Find the machine ID and return it. """
|
|
176
|
|
177 # First try calling the venus utility script
|
|
178 try:
|
|
179 return check_output("/usr/bin/product-id").strip()
|
|
180 except (CalledProcessError, OSError):
|
|
181 pass
|
|
182
|
|
183 # Fall back machine name mechanism
|
|
184 name = _get_sysfs_machine_name()
|
|
185 return {
|
|
186 'Color Control GX': 'C001',
|
|
187 'Venus GX': 'C002',
|
|
188 'Octo GX': 'C006',
|
|
189 'EasySolar-II': 'C007',
|
|
190 'MultiPlus-II': 'C008'
|
|
191 }.get(name, 'C003') # C003 is Generic
|
|
192
|
|
193
|
|
194 # Returns False if it cannot open the file. Otherwise returns its rstripped contents
|
|
195 def read_file(path):
|
|
196 content = False
|
|
197
|
|
198 try:
|
|
199 with open(path, 'r') as f:
|
|
200 content = f.read().rstrip()
|
|
201 except Exception as ex:
|
|
202 logger.debug("Error while reading %s: %s" % (path, ex))
|
|
203
|
|
204 return content
|
|
205
|
|
206
|
|
207 def wrap_dbus_value(value):
|
|
208 if value is None:
|
|
209 return VEDBUS_INVALID
|
|
210 if isinstance(value, float):
|
|
211 return dbus.Double(value, variant_level=1)
|
|
212 if isinstance(value, bool):
|
|
213 return dbus.Boolean(value, variant_level=1)
|
|
214 if isinstance(value, int):
|
|
215 try:
|
|
216 return dbus.Int32(value, variant_level=1)
|
|
217 except OverflowError:
|
|
218 return dbus.Int64(value, variant_level=1)
|
|
219 if isinstance(value, str):
|
|
220 return dbus.String(value, variant_level=1)
|
|
221 if isinstance(value, list):
|
|
222 if len(value) == 0:
|
|
223 # If the list is empty we cannot infer the type of the contents. So assume unsigned integer.
|
|
224 # A (signed) integer is dangerous, because an empty list of signed integers is used to encode
|
|
225 # an invalid value.
|
|
226 return dbus.Array([], signature=dbus.Signature('u'), variant_level=1)
|
|
227 return dbus.Array([wrap_dbus_value(x) for x in value], variant_level=1)
|
|
228 if isinstance(value, dict):
|
|
229 # Wrapping the keys of the dictionary causes D-Bus errors like:
|
|
230 # 'arguments to dbus_message_iter_open_container() were incorrect,
|
|
231 # assertion "(type == DBUS_TYPE_ARRAY && contained_signature &&
|
|
232 # *contained_signature == DBUS_DICT_ENTRY_BEGIN_CHAR) || (contained_signature == NULL ||
|
|
233 # _dbus_check_is_valid_signature (contained_signature))" failed in file ...'
|
|
234 return dbus.Dictionary({(k, wrap_dbus_value(v)) for k, v in value.items()}, variant_level=1)
|
|
235 return value
|
|
236
|
|
237
|
|
238 dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16, dbus.UInt32, dbus.Int64, dbus.UInt64)
|
|
239
|
|
240
|
|
241 def unwrap_dbus_value(val):
|
|
242 """Converts D-Bus values back to the original type. For example if val is of type DBus.Double,
|
|
243 a float will be returned."""
|
|
244 if isinstance(val, dbus_int_types):
|
|
245 return int(val)
|
|
246 if isinstance(val, dbus.Double):
|
|
247 return float(val)
|
|
248 if isinstance(val, dbus.Array):
|
|
249 v = [unwrap_dbus_value(x) for x in val]
|
|
250 return None if len(v) == 0 else v
|
|
251 if isinstance(val, (dbus.Signature, dbus.String)):
|
|
252 return str(val)
|
|
253 # Python has no byte type, so we convert to an integer.
|
|
254 if isinstance(val, dbus.Byte):
|
|
255 return int(val)
|
|
256 if isinstance(val, dbus.ByteArray):
|
|
257 return "".join([bytes(x) for x in val])
|
|
258 if isinstance(val, (list, tuple)):
|
|
259 return [unwrap_dbus_value(x) for x in val]
|
|
260 if isinstance(val, (dbus.Dictionary, dict)):
|
|
261 # Do not unwrap the keys, see comment in wrap_dbus_value
|
|
262 return dict([(x, unwrap_dbus_value(y)) for x, y in val.items()])
|
|
263 if isinstance(val, dbus.Boolean):
|
|
264 return bool(val)
|
|
265 return val
|