Mercurial > ~darius > hgwebdir.cgi > epro
comparison velib_python/mosquitto_bridge_registrator.py @ 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:982eeffe9d95 | 8:9c0435a617db |
---|---|
1 import fcntl | |
2 import threading | |
3 import logging | |
4 import os | |
5 import requests | |
6 import subprocess | |
7 import traceback | |
8 from ve_utils import exit_on_error | |
9 VrmNumberOfBrokers = 128 | |
10 VrmApiServer = 'https://ccgxlogging.victronenergy.com' | |
11 CaBundlePath = "/etc/ssl/certs/ccgx-ca.pem" | |
12 RpcBroker = 'mqtt-rpc.victronenergy.com' | |
13 SettingsPath = os.environ.get('DBUS_MQTT_PATH') or '/data/conf/mosquitto.d' | |
14 BridgeConfigPath = os.path.join(SettingsPath, 'vrm_bridge.conf') | |
15 MqttPasswordFile = "/data/conf/mqtt_password.txt" | |
16 BridgeSettings = '''# Generated by MosquittoBridgeRegistrator. Any changes will be overwritten on service start. | |
17 connection rpc | |
18 address {4}:443 | |
19 cleansession true | |
20 topic P/{0}/in/# in | |
21 topic P/{0}/out/# out | |
22 remote_clientid rpc-{2} | |
23 remote_username {6} | |
24 remote_password {1} | |
25 bridge_cafile {5} | |
26 | |
27 connection vrm | |
28 address {3}:443 | |
29 cleansession true | |
30 topic N/{0}/# out | |
31 topic R/{0}/# in | |
32 topic W/{0}/# in | |
33 remote_clientid {2} | |
34 remote_username {6} | |
35 remote_password {1} | |
36 bridge_cafile {5} | |
37 ''' | |
38 LockFilePath = "/run/mosquittobridgeregistrator.lock" | |
39 | |
40 | |
41 class RepeatingTimer(threading.Thread): | |
42 def __init__(self, callback, interval): | |
43 threading.Thread.__init__(self) | |
44 self.event = threading.Event() | |
45 self.callback = callback | |
46 self.interval = interval | |
47 | |
48 def run(self): | |
49 while not self.event.is_set(): | |
50 if not self.callback(): | |
51 self.event.set() | |
52 | |
53 # either call your function here, | |
54 # or put the body of the function here | |
55 self.event.wait(self.interval) | |
56 | |
57 def stop(self): | |
58 self.event.set() | |
59 | |
60 | |
61 class MosquittoBridgeRegistrator(object): | |
62 """ | |
63 The MosquittoBridgeRegistrator manages a bridge connection between the local Mosquitto | |
64 MQTT server, and the global VRM broker. It can be called | |
65 concurrently by different processes; efforts will be synchronized using an | |
66 advisory lock file. | |
67 | |
68 It now also supports registering the API key and getting it and the password without | |
69 restarting Mosquitto. This allows using the API key, but not use the local broker and | |
70 instead connect directly to the VRM broker url. | |
71 """ | |
72 | |
73 def __init__(self, system_id): | |
74 self._init_broker_timer = None | |
75 self._aborted = threading.Event() | |
76 self._client_id = None | |
77 self._system_id = system_id | |
78 self._global_broker_username = "ccgxapikey_" + self._system_id | |
79 self._global_broker_password = None | |
80 self._requests_log_level = logging.getLogger("requests").getEffectiveLevel() | |
81 | |
82 def _get_vrm_broker_url(self): | |
83 """To allow scaling, the VRM broker URL is generated based on the system identifier | |
84 The function returns a numbered broker URL between 0 and VrmNumberOfBrokers, which makes sure | |
85 that broker connections are distributed equally between all VRM brokers | |
86 """ | |
87 sum = 0 | |
88 for character in self._system_id.lower().strip(): | |
89 sum += ord(character) | |
90 broker_index = sum % VrmNumberOfBrokers | |
91 return "mqtt{}.victronenergy.com".format(broker_index) | |
92 | |
93 | |
94 def load_or_generate_mqtt_password(self): | |
95 """In case posting the password to storemqttpassword.php was processed | |
96 by the server, but we never saw the response, we need to keep it around | |
97 for the next time (don't post a random new one). | |
98 | |
99 This way of storing the password was incepted later, and makes it | |
100 backwards compatible. | |
101 """ | |
102 | |
103 if os.path.exists(MqttPasswordFile): | |
104 with open(MqttPasswordFile, "r") as f: | |
105 logging.info("Using {}".format(MqttPasswordFile)) | |
106 password = f.read().strip() | |
107 return password | |
108 else: | |
109 with open(MqttPasswordFile + ".tmp", "w") as f: | |
110 logging.info("Writing new {}".format(MqttPasswordFile)) | |
111 password = get_random_string(32) | |
112 | |
113 # make sure the password is on the disk | |
114 f.write(password) | |
115 f.flush() | |
116 os.fsync(f.fileno()) | |
117 | |
118 os.rename(MqttPasswordFile + ".tmp", MqttPasswordFile) | |
119 | |
120 # update the directory meta-info | |
121 fd = os.open(os.path.dirname(MqttPasswordFile), 0) | |
122 os.fsync(fd) | |
123 os.close(fd) | |
124 | |
125 return password | |
126 | |
127 def register(self): | |
128 if self._init_broker_timer is not None: | |
129 return | |
130 if self._init_broker(quiet=False, timeout=5): | |
131 if not self._aborted.is_set(): | |
132 logging.info("[InitBroker] Registration failed. Retrying in thread, silently.") | |
133 logging.getLogger("requests").setLevel(logging.WARNING) | |
134 # Not using gobject to keep these blocking operations out of the event loop | |
135 self._init_broker_timer = RepeatingTimer(self._init_broker, 60) | |
136 self._init_broker_timer.start() | |
137 | |
138 def abort_gracefully(self): | |
139 self._aborted.set() | |
140 if self._init_broker_timer: | |
141 self._init_broker_timer.stop() | |
142 self._init_broker_timer.join() | |
143 | |
144 @property | |
145 def client_id(self): | |
146 return self._client_id | |
147 | |
148 def _write_config_atomically(self, path, contents): | |
149 | |
150 config_dir = os.path.dirname(path) | |
151 if not os.path.exists(config_dir): | |
152 os.makedirs(config_dir) | |
153 | |
154 with open(path + ".tmp", 'wt') as out_file: | |
155 # make sure the new config is on the disk | |
156 out_file.write(contents) | |
157 out_file.flush() | |
158 os.fsync(out_file.fileno()) | |
159 | |
160 # make sure there is either the old file or the new one | |
161 os.rename(path + ".tmp", path) | |
162 | |
163 # update the directory meta-info | |
164 fd = os.open(os.path.dirname(path), 0) | |
165 os.fsync(fd) | |
166 os.close(fd) | |
167 | |
168 def _init_broker(self, quiet=True, timeout=5): | |
169 try: | |
170 with open(LockFilePath, "a") as lockFile: | |
171 fcntl.flock(lockFile, fcntl.LOCK_EX) | |
172 | |
173 orig_config = None | |
174 # Read the current config file (if present) | |
175 try: | |
176 if not quiet: | |
177 logging.info('[InitBroker] Reading config file') | |
178 with open(BridgeConfigPath, 'rt') as in_file: | |
179 orig_config = in_file.read() | |
180 settings = dict(tuple(l.strip().split(' ', 1)) for l in orig_config.split('\n') | |
181 if not l.startswith('#') and l.strip() != '') | |
182 self._client_id = settings.get('remote_clientid') | |
183 self._global_broker_password = settings.get('remote_password') | |
184 except IOError: | |
185 if not quiet: | |
186 logging.info('[InitBroker] Reading config file failed.') | |
187 # We need a guarantee an empty file, otherwise Mosquitto crashes on load. | |
188 if not os.path.exists(BridgeConfigPath): | |
189 self._write_config_atomically(BridgeConfigPath, ""); | |
190 # Fix items missing from config | |
191 if self._client_id is None: | |
192 self._client_id = 'ccgx_' + get_random_string(12) | |
193 if self._global_broker_password is None: | |
194 self._global_broker_password = self.load_or_generate_mqtt_password() | |
195 # Get to the actual registration | |
196 if not quiet: | |
197 logging.info('[InitBroker] Registering CCGX at VRM portal') | |
198 with requests.Session() as session: | |
199 headers = {'content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'dbus-mqtt'} | |
200 r = session.post( | |
201 VrmApiServer + '/log/storemqttpassword.php', | |
202 data=dict(identifier=self._global_broker_username, mqttPassword=self._global_broker_password), | |
203 headers=headers, | |
204 verify=CaBundlePath, | |
205 timeout=(timeout,timeout)) | |
206 if r.status_code == requests.codes.ok: | |
207 config = BridgeSettings.format(self._system_id, | |
208 self._global_broker_password, self._client_id, | |
209 self._get_vrm_broker_url(), RpcBroker, CaBundlePath, | |
210 self._global_broker_username) | |
211 # Do we need to adjust the settings file? | |
212 if config != orig_config: | |
213 logging.info('[InitBroker] Writing new config file') | |
214 self._write_config_atomically(BridgeConfigPath, config) | |
215 self._restart_broker() | |
216 else: | |
217 logging.info('[InitBroker] Not updating config file and not restarting Mosquitto, because config is correct.') | |
218 self._init_broker_timer = None | |
219 logging.getLogger("requests").setLevel(self._requests_log_level) | |
220 logging.info('[InitBroker] Registration successful') | |
221 return False | |
222 if not quiet: | |
223 logging.error('VRM registration failed. Http status was: {}'.format(r.status_code)) | |
224 logging.error('Message was: {}'.format(r.text)) | |
225 except: | |
226 if not quiet: | |
227 traceback.print_exc() | |
228 # Notify the timer we want to be called again | |
229 return True | |
230 | |
231 def _restart_broker(self): | |
232 logging.info('Restarting broker') | |
233 subprocess.call(['svc', '-t', '/service/mosquitto']) | |
234 | |
235 def get_password(self): | |
236 assert self._global_broker_password is not None | |
237 return self._global_broker_password | |
238 | |
239 def get_apikey(self): | |
240 return self._global_broker_username | |
241 | |
242 | |
243 def get_random_string(size=32): | |
244 """Creates a random (hex) string which contains 'size' characters.""" | |
245 return ''.join("{0:02x}".format(b) for b in open('/dev/urandom', 'rb').read(int(size/2))) | |
246 | |
247 # vim: noexpandtab:shiftwidth=4:tabstop=4:softtabstop=0 |