comparison velib_python/ve_utils.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
parents
children
comparison
equal deleted inserted replaced
5:982eeffe9d95 8:9c0435a617db
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 import sys
4 from traceback import print_exc
5 from os import _exit as os_exit
6 from os import statvfs
7 from subprocess import check_output, CalledProcessError
8 import logging
9 import dbus
10 logger = logging.getLogger(__name__)
11
12 VEDBUS_INVALID = dbus.Array([], signature=dbus.Signature('i'), variant_level=1)
13
14 class NoVrmPortalIdError(Exception):
15 pass
16
17 # Use this function to make sure the code quits on an unexpected exception. Make sure to use it
18 # when using GLib.idle_add and also GLib.timeout_add.
19 # Without this, the code will just keep running, since GLib does not stop the mainloop on an
20 # exception.
21 # Example: GLib.idle_add(exit_on_error, myfunc, arg1, arg2)
22 def exit_on_error(func, *args, **kwargs):
23 try:
24 return func(*args, **kwargs)
25 except:
26 try:
27 print ('exit_on_error: there was an exception. Printing stacktrace will be tried and then exit')
28 print_exc()
29 except:
30 pass
31
32 # sys.exit() is not used, since that throws an exception, which does not lead to a program
33 # halt when used in a dbus callback, see connection.py in the Python/Dbus libraries, line 230.
34 os_exit(1)
35
36
37 __vrm_portal_id = None
38 def get_vrm_portal_id():
39 # The original definition of the VRM Portal ID is that it is the mac
40 # address of the onboard- ethernet port (eth0), stripped from its colons
41 # (:) and lower case. This may however differ between platforms. On Venus
42 # the task is therefore deferred to /sbin/get-unique-id so that a
43 # platform specific method can be easily defined.
44 #
45 # If /sbin/get-unique-id does not exist, then use the ethernet address
46 # of eth0. This also handles the case where velib_python is used as a
47 # package install on a Raspberry Pi.
48 #
49 # On a Linux host where the network interface may not be eth0, you can set
50 # the VRM_IFACE environment variable to the correct name.
51
52 global __vrm_portal_id
53
54 if __vrm_portal_id:
55 return __vrm_portal_id
56
57 portal_id = None
58
59 # First try the method that works if we don't have a data partition. This
60 # will fail when the current user is not root.
61 try:
62 portal_id = check_output("/sbin/get-unique-id").decode("utf-8", "ignore").strip()
63 if not portal_id:
64 raise NoVrmPortalIdError("get-unique-id returned blank")
65 __vrm_portal_id = portal_id
66 return portal_id
67 except CalledProcessError:
68 # get-unique-id returned non-zero
69 raise NoVrmPortalIdError("get-unique-id returned non-zero")
70 except OSError:
71 # File doesn't exist, use fallback
72 pass
73
74 # Fall back to getting our id using a syscall. Assume we are on linux.
75 # Allow the user to override what interface is used using an environment
76 # variable.
77 import fcntl, socket, struct, os
78
79 iface = os.environ.get('VRM_IFACE', 'eth0').encode('ascii')
80 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
81 try:
82 info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', iface[:15]))
83 except IOError:
84 raise NoVrmPortalIdError("ioctl failed for eth0")
85
86 __vrm_portal_id = info[18:24].hex()
87 return __vrm_portal_id
88
89
90 # See VE.Can registers - public.docx for definition of this conversion
91 def convert_vreg_version_to_readable(version):
92 def str_to_arr(x, length):
93 a = []
94 for i in range(0, len(x), length):
95 a.append(x[i:i+length])
96 return a
97
98 x = "%x" % version
99 x = x.upper()
100
101 if len(x) == 5 or len(x) == 3 or len(x) == 1:
102 x = '0' + x
103
104 a = str_to_arr(x, 2);
105
106 # remove the first 00 if there are three bytes and it is 00
107 if len(a) == 3 and a[0] == '00':
108 a.remove(0);
109
110 # if we have two or three bytes now, and the first character is a 0, remove it
111 if len(a) >= 2 and a[0][0:1] == '0':
112 a[0] = a[0][1];
113
114 result = ''
115 for item in a:
116 result += ('.' if result != '' else '') + item
117
118
119 result = 'v' + result
120
121 return result
122
123
124 def get_free_space(path):
125 result = -1
126
127 try:
128 s = statvfs(path)
129 result = s.f_frsize * s.f_bavail # Number of free bytes that ordinary users
130 except Exception as ex:
131 logger.info("Error while retrieving free space for path %s: %s" % (path, ex))
132
133 return result
134
135
136 def get_load_averages():
137 c = read_file('/proc/loadavg')
138 return c.split(' ')[:3]
139
140
141 def _get_sysfs_machine_name():
142 try:
143 with open('/sys/firmware/devicetree/base/model', 'r') as f:
144 return f.read().rstrip('\x00')
145 except IOError:
146 pass
147
148 return None
149
150 # Returns None if it cannot find a machine name. Otherwise returns the string
151 # containing the name
152 def get_machine_name():
153 # First try calling the venus utility script
154 try:
155 return check_output("/usr/bin/product-name").strip().decode('UTF-8')
156 except (CalledProcessError, OSError):
157 pass
158
159 # Fall back to sysfs
160 name = _get_sysfs_machine_name()
161 if name is not None:
162 return name
163
164 # Fall back to venus build machine name
165 try:
166 with open('/etc/venus/machine', 'r', encoding='UTF-8') as f:
167 return f.read().strip()
168 except IOError:
169 pass
170
171 return None
172
173
174 def get_product_id():
175 """ Find the machine ID and return it. """
176
177 # First try calling the venus utility script
178 try:
179 return check_output("/usr/bin/product-id").strip()
180 except (CalledProcessError, OSError):
181 pass
182
183 # Fall back machine name mechanism
184 name = _get_sysfs_machine_name()
185 return {
186 'Color Control GX': 'C001',
187 'Venus GX': 'C002',
188 'Octo GX': 'C006',
189 'EasySolar-II': 'C007',
190 'MultiPlus-II': 'C008'
191 }.get(name, 'C003') # C003 is Generic
192
193
194 # Returns False if it cannot open the file. Otherwise returns its rstripped contents
195 def read_file(path):
196 content = False
197
198 try:
199 with open(path, 'r') as f:
200 content = f.read().rstrip()
201 except Exception as ex:
202 logger.debug("Error while reading %s: %s" % (path, ex))
203
204 return content
205
206
207 def wrap_dbus_value(value):
208 if value is None:
209 return VEDBUS_INVALID
210 if isinstance(value, float):
211 return dbus.Double(value, variant_level=1)
212 if isinstance(value, bool):
213 return dbus.Boolean(value, variant_level=1)
214 if isinstance(value, int):
215 try:
216 return dbus.Int32(value, variant_level=1)
217 except OverflowError:
218 return dbus.Int64(value, variant_level=1)
219 if isinstance(value, str):
220 return dbus.String(value, variant_level=1)
221 if isinstance(value, list):
222 if len(value) == 0:
223 # If the list is empty we cannot infer the type of the contents. So assume unsigned integer.
224 # A (signed) integer is dangerous, because an empty list of signed integers is used to encode
225 # an invalid value.
226 return dbus.Array([], signature=dbus.Signature('u'), variant_level=1)
227 return dbus.Array([wrap_dbus_value(x) for x in value], variant_level=1)
228 if isinstance(value, dict):
229 # Wrapping the keys of the dictionary causes D-Bus errors like:
230 # 'arguments to dbus_message_iter_open_container() were incorrect,
231 # assertion "(type == DBUS_TYPE_ARRAY && contained_signature &&
232 # *contained_signature == DBUS_DICT_ENTRY_BEGIN_CHAR) || (contained_signature == NULL ||
233 # _dbus_check_is_valid_signature (contained_signature))" failed in file ...'
234 return dbus.Dictionary({(k, wrap_dbus_value(v)) for k, v in value.items()}, variant_level=1)
235 return value
236
237
238 dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16, dbus.UInt32, dbus.Int64, dbus.UInt64)
239
240
241 def unwrap_dbus_value(val):
242 """Converts D-Bus values back to the original type. For example if val is of type DBus.Double,
243 a float will be returned."""
244 if isinstance(val, dbus_int_types):
245 return int(val)
246 if isinstance(val, dbus.Double):
247 return float(val)
248 if isinstance(val, dbus.Array):
249 v = [unwrap_dbus_value(x) for x in val]
250 return None if len(v) == 0 else v
251 if isinstance(val, (dbus.Signature, dbus.String)):
252 return str(val)
253 # Python has no byte type, so we convert to an integer.
254 if isinstance(val, dbus.Byte):
255 return int(val)
256 if isinstance(val, dbus.ByteArray):
257 return "".join([bytes(x) for x in val])
258 if isinstance(val, (list, tuple)):
259 return [unwrap_dbus_value(x) for x in val]
260 if isinstance(val, (dbus.Dictionary, dict)):
261 # Do not unwrap the keys, see comment in wrap_dbus_value
262 return dict([(x, unwrap_dbus_value(y)) for x, y in val.items()])
263 if isinstance(val, dbus.Boolean):
264 return bool(val)
265 return val