comparison velib_python/mosquitto_bridge_registrator.py @ 8:9c0435a617db

Import velib_python
author Daniel O'Connor <darius@dons.net.au>
date Sun, 05 Dec 2021 14:35:36 +1030
parents
children
comparison
equal deleted inserted replaced
5:982eeffe9d95 8:9c0435a617db
1 import fcntl
2 import threading
3 import logging
4 import os
5 import requests
6 import subprocess
7 import traceback
8 from ve_utils import exit_on_error
9 VrmNumberOfBrokers = 128
10 VrmApiServer = 'https://ccgxlogging.victronenergy.com'
11 CaBundlePath = "/etc/ssl/certs/ccgx-ca.pem"
12 RpcBroker = 'mqtt-rpc.victronenergy.com'
13 SettingsPath = os.environ.get('DBUS_MQTT_PATH') or '/data/conf/mosquitto.d'
14 BridgeConfigPath = os.path.join(SettingsPath, 'vrm_bridge.conf')
15 MqttPasswordFile = "/data/conf/mqtt_password.txt"
16 BridgeSettings = '''# Generated by MosquittoBridgeRegistrator. Any changes will be overwritten on service start.
17 connection rpc
18 address {4}:443
19 cleansession true
20 topic P/{0}/in/# in
21 topic P/{0}/out/# out
22 remote_clientid rpc-{2}
23 remote_username {6}
24 remote_password {1}
25 bridge_cafile {5}
26
27 connection vrm
28 address {3}:443
29 cleansession true
30 topic N/{0}/# out
31 topic R/{0}/# in
32 topic W/{0}/# in
33 remote_clientid {2}
34 remote_username {6}
35 remote_password {1}
36 bridge_cafile {5}
37 '''
38 LockFilePath = "/run/mosquittobridgeregistrator.lock"
39
40
41 class RepeatingTimer(threading.Thread):
42 def __init__(self, callback, interval):
43 threading.Thread.__init__(self)
44 self.event = threading.Event()
45 self.callback = callback
46 self.interval = interval
47
48 def run(self):
49 while not self.event.is_set():
50 if not self.callback():
51 self.event.set()
52
53 # either call your function here,
54 # or put the body of the function here
55 self.event.wait(self.interval)
56
57 def stop(self):
58 self.event.set()
59
60
61 class MosquittoBridgeRegistrator(object):
62 """
63 The MosquittoBridgeRegistrator manages a bridge connection between the local Mosquitto
64 MQTT server, and the global VRM broker. It can be called
65 concurrently by different processes; efforts will be synchronized using an
66 advisory lock file.
67
68 It now also supports registering the API key and getting it and the password without
69 restarting Mosquitto. This allows using the API key, but not use the local broker and
70 instead connect directly to the VRM broker url.
71 """
72
73 def __init__(self, system_id):
74 self._init_broker_timer = None
75 self._aborted = threading.Event()
76 self._client_id = None
77 self._system_id = system_id
78 self._global_broker_username = "ccgxapikey_" + self._system_id
79 self._global_broker_password = None
80 self._requests_log_level = logging.getLogger("requests").getEffectiveLevel()
81
82 def _get_vrm_broker_url(self):
83 """To allow scaling, the VRM broker URL is generated based on the system identifier
84 The function returns a numbered broker URL between 0 and VrmNumberOfBrokers, which makes sure
85 that broker connections are distributed equally between all VRM brokers
86 """
87 sum = 0
88 for character in self._system_id.lower().strip():
89 sum += ord(character)
90 broker_index = sum % VrmNumberOfBrokers
91 return "mqtt{}.victronenergy.com".format(broker_index)
92
93
94 def load_or_generate_mqtt_password(self):
95 """In case posting the password to storemqttpassword.php was processed
96 by the server, but we never saw the response, we need to keep it around
97 for the next time (don't post a random new one).
98
99 This way of storing the password was incepted later, and makes it
100 backwards compatible.
101 """
102
103 if os.path.exists(MqttPasswordFile):
104 with open(MqttPasswordFile, "r") as f:
105 logging.info("Using {}".format(MqttPasswordFile))
106 password = f.read().strip()
107 return password
108 else:
109 with open(MqttPasswordFile + ".tmp", "w") as f:
110 logging.info("Writing new {}".format(MqttPasswordFile))
111 password = get_random_string(32)
112
113 # make sure the password is on the disk
114 f.write(password)
115 f.flush()
116 os.fsync(f.fileno())
117
118 os.rename(MqttPasswordFile + ".tmp", MqttPasswordFile)
119
120 # update the directory meta-info
121 fd = os.open(os.path.dirname(MqttPasswordFile), 0)
122 os.fsync(fd)
123 os.close(fd)
124
125 return password
126
127 def register(self):
128 if self._init_broker_timer is not None:
129 return
130 if self._init_broker(quiet=False, timeout=5):
131 if not self._aborted.is_set():
132 logging.info("[InitBroker] Registration failed. Retrying in thread, silently.")
133 logging.getLogger("requests").setLevel(logging.WARNING)
134 # Not using gobject to keep these blocking operations out of the event loop
135 self._init_broker_timer = RepeatingTimer(self._init_broker, 60)
136 self._init_broker_timer.start()
137
138 def abort_gracefully(self):
139 self._aborted.set()
140 if self._init_broker_timer:
141 self._init_broker_timer.stop()
142 self._init_broker_timer.join()
143
144 @property
145 def client_id(self):
146 return self._client_id
147
148 def _write_config_atomically(self, path, contents):
149
150 config_dir = os.path.dirname(path)
151 if not os.path.exists(config_dir):
152 os.makedirs(config_dir)
153
154 with open(path + ".tmp", 'wt') as out_file:
155 # make sure the new config is on the disk
156 out_file.write(contents)
157 out_file.flush()
158 os.fsync(out_file.fileno())
159
160 # make sure there is either the old file or the new one
161 os.rename(path + ".tmp", path)
162
163 # update the directory meta-info
164 fd = os.open(os.path.dirname(path), 0)
165 os.fsync(fd)
166 os.close(fd)
167
168 def _init_broker(self, quiet=True, timeout=5):
169 try:
170 with open(LockFilePath, "a") as lockFile:
171 fcntl.flock(lockFile, fcntl.LOCK_EX)
172
173 orig_config = None
174 # Read the current config file (if present)
175 try:
176 if not quiet:
177 logging.info('[InitBroker] Reading config file')
178 with open(BridgeConfigPath, 'rt') as in_file:
179 orig_config = in_file.read()
180 settings = dict(tuple(l.strip().split(' ', 1)) for l in orig_config.split('\n')
181 if not l.startswith('#') and l.strip() != '')
182 self._client_id = settings.get('remote_clientid')
183 self._global_broker_password = settings.get('remote_password')
184 except IOError:
185 if not quiet:
186 logging.info('[InitBroker] Reading config file failed.')
187 # We need a guarantee an empty file, otherwise Mosquitto crashes on load.
188 if not os.path.exists(BridgeConfigPath):
189 self._write_config_atomically(BridgeConfigPath, "");
190 # Fix items missing from config
191 if self._client_id is None:
192 self._client_id = 'ccgx_' + get_random_string(12)
193 if self._global_broker_password is None:
194 self._global_broker_password = self.load_or_generate_mqtt_password()
195 # Get to the actual registration
196 if not quiet:
197 logging.info('[InitBroker] Registering CCGX at VRM portal')
198 with requests.Session() as session:
199 headers = {'content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'dbus-mqtt'}
200 r = session.post(
201 VrmApiServer + '/log/storemqttpassword.php',
202 data=dict(identifier=self._global_broker_username, mqttPassword=self._global_broker_password),
203 headers=headers,
204 verify=CaBundlePath,
205 timeout=(timeout,timeout))
206 if r.status_code == requests.codes.ok:
207 config = BridgeSettings.format(self._system_id,
208 self._global_broker_password, self._client_id,
209 self._get_vrm_broker_url(), RpcBroker, CaBundlePath,
210 self._global_broker_username)
211 # Do we need to adjust the settings file?
212 if config != orig_config:
213 logging.info('[InitBroker] Writing new config file')
214 self._write_config_atomically(BridgeConfigPath, config)
215 self._restart_broker()
216 else:
217 logging.info('[InitBroker] Not updating config file and not restarting Mosquitto, because config is correct.')
218 self._init_broker_timer = None
219 logging.getLogger("requests").setLevel(self._requests_log_level)
220 logging.info('[InitBroker] Registration successful')
221 return False
222 if not quiet:
223 logging.error('VRM registration failed. Http status was: {}'.format(r.status_code))
224 logging.error('Message was: {}'.format(r.text))
225 except:
226 if not quiet:
227 traceback.print_exc()
228 # Notify the timer we want to be called again
229 return True
230
231 def _restart_broker(self):
232 logging.info('Restarting broker')
233 subprocess.call(['svc', '-t', '/service/mosquitto'])
234
235 def get_password(self):
236 assert self._global_broker_password is not None
237 return self._global_broker_password
238
239 def get_apikey(self):
240 return self._global_broker_username
241
242
243 def get_random_string(size=32):
244 """Creates a random (hex) string which contains 'size' characters."""
245 return ''.join("{0:02x}".format(b) for b in open('/dev/urandom', 'rb').read(int(size/2)))
246
247 # vim: noexpandtab:shiftwidth=4:tabstop=4:softtabstop=0