8
|
1 import fcntl
|
|
2 import threading
|
|
3 import logging
|
|
4 import os
|
|
5 import requests
|
|
6 import subprocess
|
|
7 import traceback
|
|
8 from ve_utils import exit_on_error
|
|
9 VrmNumberOfBrokers = 128
|
|
10 VrmApiServer = 'https://ccgxlogging.victronenergy.com'
|
|
11 CaBundlePath = "/etc/ssl/certs/ccgx-ca.pem"
|
|
12 RpcBroker = 'mqtt-rpc.victronenergy.com'
|
|
13 SettingsPath = os.environ.get('DBUS_MQTT_PATH') or '/data/conf/mosquitto.d'
|
|
14 BridgeConfigPath = os.path.join(SettingsPath, 'vrm_bridge.conf')
|
|
15 MqttPasswordFile = "/data/conf/mqtt_password.txt"
|
|
16 BridgeSettings = '''# Generated by MosquittoBridgeRegistrator. Any changes will be overwritten on service start.
|
|
17 connection rpc
|
|
18 address {4}:443
|
|
19 cleansession true
|
|
20 topic P/{0}/in/# in
|
|
21 topic P/{0}/out/# out
|
|
22 remote_clientid rpc-{2}
|
|
23 remote_username {6}
|
|
24 remote_password {1}
|
|
25 bridge_cafile {5}
|
|
26
|
|
27 connection vrm
|
|
28 address {3}:443
|
|
29 cleansession true
|
|
30 topic N/{0}/# out
|
|
31 topic R/{0}/# in
|
|
32 topic W/{0}/# in
|
|
33 remote_clientid {2}
|
|
34 remote_username {6}
|
|
35 remote_password {1}
|
|
36 bridge_cafile {5}
|
|
37 '''
|
|
38 LockFilePath = "/run/mosquittobridgeregistrator.lock"
|
|
39
|
|
40
|
|
41 class RepeatingTimer(threading.Thread):
|
|
42 def __init__(self, callback, interval):
|
|
43 threading.Thread.__init__(self)
|
|
44 self.event = threading.Event()
|
|
45 self.callback = callback
|
|
46 self.interval = interval
|
|
47
|
|
48 def run(self):
|
|
49 while not self.event.is_set():
|
|
50 if not self.callback():
|
|
51 self.event.set()
|
|
52
|
|
53 # either call your function here,
|
|
54 # or put the body of the function here
|
|
55 self.event.wait(self.interval)
|
|
56
|
|
57 def stop(self):
|
|
58 self.event.set()
|
|
59
|
|
60
|
|
61 class MosquittoBridgeRegistrator(object):
|
|
62 """
|
|
63 The MosquittoBridgeRegistrator manages a bridge connection between the local Mosquitto
|
|
64 MQTT server, and the global VRM broker. It can be called
|
|
65 concurrently by different processes; efforts will be synchronized using an
|
|
66 advisory lock file.
|
|
67
|
|
68 It now also supports registering the API key and getting it and the password without
|
|
69 restarting Mosquitto. This allows using the API key, but not use the local broker and
|
|
70 instead connect directly to the VRM broker url.
|
|
71 """
|
|
72
|
|
73 def __init__(self, system_id):
|
|
74 self._init_broker_timer = None
|
|
75 self._aborted = threading.Event()
|
|
76 self._client_id = None
|
|
77 self._system_id = system_id
|
|
78 self._global_broker_username = "ccgxapikey_" + self._system_id
|
|
79 self._global_broker_password = None
|
|
80 self._requests_log_level = logging.getLogger("requests").getEffectiveLevel()
|
|
81
|
|
82 def _get_vrm_broker_url(self):
|
|
83 """To allow scaling, the VRM broker URL is generated based on the system identifier
|
|
84 The function returns a numbered broker URL between 0 and VrmNumberOfBrokers, which makes sure
|
|
85 that broker connections are distributed equally between all VRM brokers
|
|
86 """
|
|
87 sum = 0
|
|
88 for character in self._system_id.lower().strip():
|
|
89 sum += ord(character)
|
|
90 broker_index = sum % VrmNumberOfBrokers
|
|
91 return "mqtt{}.victronenergy.com".format(broker_index)
|
|
92
|
|
93
|
|
94 def load_or_generate_mqtt_password(self):
|
|
95 """In case posting the password to storemqttpassword.php was processed
|
|
96 by the server, but we never saw the response, we need to keep it around
|
|
97 for the next time (don't post a random new one).
|
|
98
|
|
99 This way of storing the password was incepted later, and makes it
|
|
100 backwards compatible.
|
|
101 """
|
|
102
|
|
103 if os.path.exists(MqttPasswordFile):
|
|
104 with open(MqttPasswordFile, "r") as f:
|
|
105 logging.info("Using {}".format(MqttPasswordFile))
|
|
106 password = f.read().strip()
|
|
107 return password
|
|
108 else:
|
|
109 with open(MqttPasswordFile + ".tmp", "w") as f:
|
|
110 logging.info("Writing new {}".format(MqttPasswordFile))
|
|
111 password = get_random_string(32)
|
|
112
|
|
113 # make sure the password is on the disk
|
|
114 f.write(password)
|
|
115 f.flush()
|
|
116 os.fsync(f.fileno())
|
|
117
|
|
118 os.rename(MqttPasswordFile + ".tmp", MqttPasswordFile)
|
|
119
|
|
120 # update the directory meta-info
|
|
121 fd = os.open(os.path.dirname(MqttPasswordFile), 0)
|
|
122 os.fsync(fd)
|
|
123 os.close(fd)
|
|
124
|
|
125 return password
|
|
126
|
|
127 def register(self):
|
|
128 if self._init_broker_timer is not None:
|
|
129 return
|
|
130 if self._init_broker(quiet=False, timeout=5):
|
|
131 if not self._aborted.is_set():
|
|
132 logging.info("[InitBroker] Registration failed. Retrying in thread, silently.")
|
|
133 logging.getLogger("requests").setLevel(logging.WARNING)
|
|
134 # Not using gobject to keep these blocking operations out of the event loop
|
|
135 self._init_broker_timer = RepeatingTimer(self._init_broker, 60)
|
|
136 self._init_broker_timer.start()
|
|
137
|
|
138 def abort_gracefully(self):
|
|
139 self._aborted.set()
|
|
140 if self._init_broker_timer:
|
|
141 self._init_broker_timer.stop()
|
|
142 self._init_broker_timer.join()
|
|
143
|
|
144 @property
|
|
145 def client_id(self):
|
|
146 return self._client_id
|
|
147
|
|
148 def _write_config_atomically(self, path, contents):
|
|
149
|
|
150 config_dir = os.path.dirname(path)
|
|
151 if not os.path.exists(config_dir):
|
|
152 os.makedirs(config_dir)
|
|
153
|
|
154 with open(path + ".tmp", 'wt') as out_file:
|
|
155 # make sure the new config is on the disk
|
|
156 out_file.write(contents)
|
|
157 out_file.flush()
|
|
158 os.fsync(out_file.fileno())
|
|
159
|
|
160 # make sure there is either the old file or the new one
|
|
161 os.rename(path + ".tmp", path)
|
|
162
|
|
163 # update the directory meta-info
|
|
164 fd = os.open(os.path.dirname(path), 0)
|
|
165 os.fsync(fd)
|
|
166 os.close(fd)
|
|
167
|
|
168 def _init_broker(self, quiet=True, timeout=5):
|
|
169 try:
|
|
170 with open(LockFilePath, "a") as lockFile:
|
|
171 fcntl.flock(lockFile, fcntl.LOCK_EX)
|
|
172
|
|
173 orig_config = None
|
|
174 # Read the current config file (if present)
|
|
175 try:
|
|
176 if not quiet:
|
|
177 logging.info('[InitBroker] Reading config file')
|
|
178 with open(BridgeConfigPath, 'rt') as in_file:
|
|
179 orig_config = in_file.read()
|
|
180 settings = dict(tuple(l.strip().split(' ', 1)) for l in orig_config.split('\n')
|
|
181 if not l.startswith('#') and l.strip() != '')
|
|
182 self._client_id = settings.get('remote_clientid')
|
|
183 self._global_broker_password = settings.get('remote_password')
|
|
184 except IOError:
|
|
185 if not quiet:
|
|
186 logging.info('[InitBroker] Reading config file failed.')
|
|
187 # We need a guarantee an empty file, otherwise Mosquitto crashes on load.
|
|
188 if not os.path.exists(BridgeConfigPath):
|
|
189 self._write_config_atomically(BridgeConfigPath, "");
|
|
190 # Fix items missing from config
|
|
191 if self._client_id is None:
|
|
192 self._client_id = 'ccgx_' + get_random_string(12)
|
|
193 if self._global_broker_password is None:
|
|
194 self._global_broker_password = self.load_or_generate_mqtt_password()
|
|
195 # Get to the actual registration
|
|
196 if not quiet:
|
|
197 logging.info('[InitBroker] Registering CCGX at VRM portal')
|
|
198 with requests.Session() as session:
|
|
199 headers = {'content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'dbus-mqtt'}
|
|
200 r = session.post(
|
|
201 VrmApiServer + '/log/storemqttpassword.php',
|
|
202 data=dict(identifier=self._global_broker_username, mqttPassword=self._global_broker_password),
|
|
203 headers=headers,
|
|
204 verify=CaBundlePath,
|
|
205 timeout=(timeout,timeout))
|
|
206 if r.status_code == requests.codes.ok:
|
|
207 config = BridgeSettings.format(self._system_id,
|
|
208 self._global_broker_password, self._client_id,
|
|
209 self._get_vrm_broker_url(), RpcBroker, CaBundlePath,
|
|
210 self._global_broker_username)
|
|
211 # Do we need to adjust the settings file?
|
|
212 if config != orig_config:
|
|
213 logging.info('[InitBroker] Writing new config file')
|
|
214 self._write_config_atomically(BridgeConfigPath, config)
|
|
215 self._restart_broker()
|
|
216 else:
|
|
217 logging.info('[InitBroker] Not updating config file and not restarting Mosquitto, because config is correct.')
|
|
218 self._init_broker_timer = None
|
|
219 logging.getLogger("requests").setLevel(self._requests_log_level)
|
|
220 logging.info('[InitBroker] Registration successful')
|
|
221 return False
|
|
222 if not quiet:
|
|
223 logging.error('VRM registration failed. Http status was: {}'.format(r.status_code))
|
|
224 logging.error('Message was: {}'.format(r.text))
|
|
225 except:
|
|
226 if not quiet:
|
|
227 traceback.print_exc()
|
|
228 # Notify the timer we want to be called again
|
|
229 return True
|
|
230
|
|
231 def _restart_broker(self):
|
|
232 logging.info('Restarting broker')
|
|
233 subprocess.call(['svc', '-t', '/service/mosquitto'])
|
|
234
|
|
235 def get_password(self):
|
|
236 assert self._global_broker_password is not None
|
|
237 return self._global_broker_password
|
|
238
|
|
239 def get_apikey(self):
|
|
240 return self._global_broker_username
|
|
241
|
|
242
|
|
243 def get_random_string(size=32):
|
|
244 """Creates a random (hex) string which contains 'size' characters."""
|
|
245 return ''.join("{0:02x}".format(b) for b in open('/dev/urandom', 'rb').read(int(size/2)))
|
|
246
|
|
247 # vim: noexpandtab:shiftwidth=4:tabstop=4:softtabstop=0
|