Mercurial > ~darius > hgwebdir.cgi > epro
comparison velib_python/ve_utils.py @ 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:982eeffe9d95 | 8:9c0435a617db |
---|---|
1 #!/usr/bin/env python3 | |
2 # -*- coding: utf-8 -*- | |
3 import sys | |
4 from traceback import print_exc | |
5 from os import _exit as os_exit | |
6 from os import statvfs | |
7 from subprocess import check_output, CalledProcessError | |
8 import logging | |
9 import dbus | |
10 logger = logging.getLogger(__name__) | |
11 | |
12 VEDBUS_INVALID = dbus.Array([], signature=dbus.Signature('i'), variant_level=1) | |
13 | |
14 class NoVrmPortalIdError(Exception): | |
15 pass | |
16 | |
17 # Use this function to make sure the code quits on an unexpected exception. Make sure to use it | |
18 # when using GLib.idle_add and also GLib.timeout_add. | |
19 # Without this, the code will just keep running, since GLib does not stop the mainloop on an | |
20 # exception. | |
21 # Example: GLib.idle_add(exit_on_error, myfunc, arg1, arg2) | |
22 def exit_on_error(func, *args, **kwargs): | |
23 try: | |
24 return func(*args, **kwargs) | |
25 except: | |
26 try: | |
27 print ('exit_on_error: there was an exception. Printing stacktrace will be tried and then exit') | |
28 print_exc() | |
29 except: | |
30 pass | |
31 | |
32 # sys.exit() is not used, since that throws an exception, which does not lead to a program | |
33 # halt when used in a dbus callback, see connection.py in the Python/Dbus libraries, line 230. | |
34 os_exit(1) | |
35 | |
36 | |
37 __vrm_portal_id = None | |
38 def get_vrm_portal_id(): | |
39 # The original definition of the VRM Portal ID is that it is the mac | |
40 # address of the onboard- ethernet port (eth0), stripped from its colons | |
41 # (:) and lower case. This may however differ between platforms. On Venus | |
42 # the task is therefore deferred to /sbin/get-unique-id so that a | |
43 # platform specific method can be easily defined. | |
44 # | |
45 # If /sbin/get-unique-id does not exist, then use the ethernet address | |
46 # of eth0. This also handles the case where velib_python is used as a | |
47 # package install on a Raspberry Pi. | |
48 # | |
49 # On a Linux host where the network interface may not be eth0, you can set | |
50 # the VRM_IFACE environment variable to the correct name. | |
51 | |
52 global __vrm_portal_id | |
53 | |
54 if __vrm_portal_id: | |
55 return __vrm_portal_id | |
56 | |
57 portal_id = None | |
58 | |
59 # First try the method that works if we don't have a data partition. This | |
60 # will fail when the current user is not root. | |
61 try: | |
62 portal_id = check_output("/sbin/get-unique-id").decode("utf-8", "ignore").strip() | |
63 if not portal_id: | |
64 raise NoVrmPortalIdError("get-unique-id returned blank") | |
65 __vrm_portal_id = portal_id | |
66 return portal_id | |
67 except CalledProcessError: | |
68 # get-unique-id returned non-zero | |
69 raise NoVrmPortalIdError("get-unique-id returned non-zero") | |
70 except OSError: | |
71 # File doesn't exist, use fallback | |
72 pass | |
73 | |
74 # Fall back to getting our id using a syscall. Assume we are on linux. | |
75 # Allow the user to override what interface is used using an environment | |
76 # variable. | |
77 import fcntl, socket, struct, os | |
78 | |
79 iface = os.environ.get('VRM_IFACE', 'eth0').encode('ascii') | |
80 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
81 try: | |
82 info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', iface[:15])) | |
83 except IOError: | |
84 raise NoVrmPortalIdError("ioctl failed for eth0") | |
85 | |
86 __vrm_portal_id = info[18:24].hex() | |
87 return __vrm_portal_id | |
88 | |
89 | |
90 # See VE.Can registers - public.docx for definition of this conversion | |
91 def convert_vreg_version_to_readable(version): | |
92 def str_to_arr(x, length): | |
93 a = [] | |
94 for i in range(0, len(x), length): | |
95 a.append(x[i:i+length]) | |
96 return a | |
97 | |
98 x = "%x" % version | |
99 x = x.upper() | |
100 | |
101 if len(x) == 5 or len(x) == 3 or len(x) == 1: | |
102 x = '0' + x | |
103 | |
104 a = str_to_arr(x, 2); | |
105 | |
106 # remove the first 00 if there are three bytes and it is 00 | |
107 if len(a) == 3 and a[0] == '00': | |
108 a.remove(0); | |
109 | |
110 # if we have two or three bytes now, and the first character is a 0, remove it | |
111 if len(a) >= 2 and a[0][0:1] == '0': | |
112 a[0] = a[0][1]; | |
113 | |
114 result = '' | |
115 for item in a: | |
116 result += ('.' if result != '' else '') + item | |
117 | |
118 | |
119 result = 'v' + result | |
120 | |
121 return result | |
122 | |
123 | |
124 def get_free_space(path): | |
125 result = -1 | |
126 | |
127 try: | |
128 s = statvfs(path) | |
129 result = s.f_frsize * s.f_bavail # Number of free bytes that ordinary users | |
130 except Exception as ex: | |
131 logger.info("Error while retrieving free space for path %s: %s" % (path, ex)) | |
132 | |
133 return result | |
134 | |
135 | |
136 def get_load_averages(): | |
137 c = read_file('/proc/loadavg') | |
138 return c.split(' ')[:3] | |
139 | |
140 | |
141 def _get_sysfs_machine_name(): | |
142 try: | |
143 with open('/sys/firmware/devicetree/base/model', 'r') as f: | |
144 return f.read().rstrip('\x00') | |
145 except IOError: | |
146 pass | |
147 | |
148 return None | |
149 | |
150 # Returns None if it cannot find a machine name. Otherwise returns the string | |
151 # containing the name | |
152 def get_machine_name(): | |
153 # First try calling the venus utility script | |
154 try: | |
155 return check_output("/usr/bin/product-name").strip().decode('UTF-8') | |
156 except (CalledProcessError, OSError): | |
157 pass | |
158 | |
159 # Fall back to sysfs | |
160 name = _get_sysfs_machine_name() | |
161 if name is not None: | |
162 return name | |
163 | |
164 # Fall back to venus build machine name | |
165 try: | |
166 with open('/etc/venus/machine', 'r', encoding='UTF-8') as f: | |
167 return f.read().strip() | |
168 except IOError: | |
169 pass | |
170 | |
171 return None | |
172 | |
173 | |
174 def get_product_id(): | |
175 """ Find the machine ID and return it. """ | |
176 | |
177 # First try calling the venus utility script | |
178 try: | |
179 return check_output("/usr/bin/product-id").strip() | |
180 except (CalledProcessError, OSError): | |
181 pass | |
182 | |
183 # Fall back machine name mechanism | |
184 name = _get_sysfs_machine_name() | |
185 return { | |
186 'Color Control GX': 'C001', | |
187 'Venus GX': 'C002', | |
188 'Octo GX': 'C006', | |
189 'EasySolar-II': 'C007', | |
190 'MultiPlus-II': 'C008' | |
191 }.get(name, 'C003') # C003 is Generic | |
192 | |
193 | |
194 # Returns False if it cannot open the file. Otherwise returns its rstripped contents | |
195 def read_file(path): | |
196 content = False | |
197 | |
198 try: | |
199 with open(path, 'r') as f: | |
200 content = f.read().rstrip() | |
201 except Exception as ex: | |
202 logger.debug("Error while reading %s: %s" % (path, ex)) | |
203 | |
204 return content | |
205 | |
206 | |
207 def wrap_dbus_value(value): | |
208 if value is None: | |
209 return VEDBUS_INVALID | |
210 if isinstance(value, float): | |
211 return dbus.Double(value, variant_level=1) | |
212 if isinstance(value, bool): | |
213 return dbus.Boolean(value, variant_level=1) | |
214 if isinstance(value, int): | |
215 try: | |
216 return dbus.Int32(value, variant_level=1) | |
217 except OverflowError: | |
218 return dbus.Int64(value, variant_level=1) | |
219 if isinstance(value, str): | |
220 return dbus.String(value, variant_level=1) | |
221 if isinstance(value, list): | |
222 if len(value) == 0: | |
223 # If the list is empty we cannot infer the type of the contents. So assume unsigned integer. | |
224 # A (signed) integer is dangerous, because an empty list of signed integers is used to encode | |
225 # an invalid value. | |
226 return dbus.Array([], signature=dbus.Signature('u'), variant_level=1) | |
227 return dbus.Array([wrap_dbus_value(x) for x in value], variant_level=1) | |
228 if isinstance(value, dict): | |
229 # Wrapping the keys of the dictionary causes D-Bus errors like: | |
230 # 'arguments to dbus_message_iter_open_container() were incorrect, | |
231 # assertion "(type == DBUS_TYPE_ARRAY && contained_signature && | |
232 # *contained_signature == DBUS_DICT_ENTRY_BEGIN_CHAR) || (contained_signature == NULL || | |
233 # _dbus_check_is_valid_signature (contained_signature))" failed in file ...' | |
234 return dbus.Dictionary({(k, wrap_dbus_value(v)) for k, v in value.items()}, variant_level=1) | |
235 return value | |
236 | |
237 | |
238 dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16, dbus.UInt32, dbus.Int64, dbus.UInt64) | |
239 | |
240 | |
241 def unwrap_dbus_value(val): | |
242 """Converts D-Bus values back to the original type. For example if val is of type DBus.Double, | |
243 a float will be returned.""" | |
244 if isinstance(val, dbus_int_types): | |
245 return int(val) | |
246 if isinstance(val, dbus.Double): | |
247 return float(val) | |
248 if isinstance(val, dbus.Array): | |
249 v = [unwrap_dbus_value(x) for x in val] | |
250 return None if len(v) == 0 else v | |
251 if isinstance(val, (dbus.Signature, dbus.String)): | |
252 return str(val) | |
253 # Python has no byte type, so we convert to an integer. | |
254 if isinstance(val, dbus.Byte): | |
255 return int(val) | |
256 if isinstance(val, dbus.ByteArray): | |
257 return "".join([bytes(x) for x in val]) | |
258 if isinstance(val, (list, tuple)): | |
259 return [unwrap_dbus_value(x) for x in val] | |
260 if isinstance(val, (dbus.Dictionary, dict)): | |
261 # Do not unwrap the keys, see comment in wrap_dbus_value | |
262 return dict([(x, unwrap_dbus_value(y)) for x, y in val.items()]) | |
263 if isinstance(val, dbus.Boolean): | |
264 return bool(val) | |
265 return val |