Mercurial > ~darius > hgwebdir.cgi > epro
diff velib_python/mosquitto_bridge_registrator.py @ 8:9c0435a617db
Import velib_python
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 14:35:36 +1030 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/velib_python/mosquitto_bridge_registrator.py Sun Dec 05 14:35:36 2021 +1030 @@ -0,0 +1,247 @@ +import fcntl +import threading +import logging +import os +import requests +import subprocess +import traceback +from ve_utils import exit_on_error +VrmNumberOfBrokers = 128 +VrmApiServer = 'https://ccgxlogging.victronenergy.com' +CaBundlePath = "/etc/ssl/certs/ccgx-ca.pem" +RpcBroker = 'mqtt-rpc.victronenergy.com' +SettingsPath = os.environ.get('DBUS_MQTT_PATH') or '/data/conf/mosquitto.d' +BridgeConfigPath = os.path.join(SettingsPath, 'vrm_bridge.conf') +MqttPasswordFile = "/data/conf/mqtt_password.txt" +BridgeSettings = '''# Generated by MosquittoBridgeRegistrator. Any changes will be overwritten on service start. +connection rpc +address {4}:443 +cleansession true +topic P/{0}/in/# in +topic P/{0}/out/# out +remote_clientid rpc-{2} +remote_username {6} +remote_password {1} +bridge_cafile {5} + +connection vrm +address {3}:443 +cleansession true +topic N/{0}/# out +topic R/{0}/# in +topic W/{0}/# in +remote_clientid {2} +remote_username {6} +remote_password {1} +bridge_cafile {5} +''' +LockFilePath = "/run/mosquittobridgeregistrator.lock" + + +class RepeatingTimer(threading.Thread): + def __init__(self, callback, interval): + threading.Thread.__init__(self) + self.event = threading.Event() + self.callback = callback + self.interval = interval + + def run(self): + while not self.event.is_set(): + if not self.callback(): + self.event.set() + + # either call your function here, + # or put the body of the function here + self.event.wait(self.interval) + + def stop(self): + self.event.set() + + +class MosquittoBridgeRegistrator(object): + """ + The MosquittoBridgeRegistrator manages a bridge connection between the local Mosquitto + MQTT server, and the global VRM broker. It can be called + concurrently by different processes; efforts will be synchronized using an + advisory lock file. + + It now also supports registering the API key and getting it and the password without + restarting Mosquitto. This allows using the API key, but not use the local broker and + instead connect directly to the VRM broker url. + """ + + def __init__(self, system_id): + self._init_broker_timer = None + self._aborted = threading.Event() + self._client_id = None + self._system_id = system_id + self._global_broker_username = "ccgxapikey_" + self._system_id + self._global_broker_password = None + self._requests_log_level = logging.getLogger("requests").getEffectiveLevel() + + def _get_vrm_broker_url(self): + """To allow scaling, the VRM broker URL is generated based on the system identifier + The function returns a numbered broker URL between 0 and VrmNumberOfBrokers, which makes sure + that broker connections are distributed equally between all VRM brokers + """ + sum = 0 + for character in self._system_id.lower().strip(): + sum += ord(character) + broker_index = sum % VrmNumberOfBrokers + return "mqtt{}.victronenergy.com".format(broker_index) + + + def load_or_generate_mqtt_password(self): + """In case posting the password to storemqttpassword.php was processed + by the server, but we never saw the response, we need to keep it around + for the next time (don't post a random new one). + + This way of storing the password was incepted later, and makes it + backwards compatible. + """ + + if os.path.exists(MqttPasswordFile): + with open(MqttPasswordFile, "r") as f: + logging.info("Using {}".format(MqttPasswordFile)) + password = f.read().strip() + return password + else: + with open(MqttPasswordFile + ".tmp", "w") as f: + logging.info("Writing new {}".format(MqttPasswordFile)) + password = get_random_string(32) + + # make sure the password is on the disk + f.write(password) + f.flush() + os.fsync(f.fileno()) + + os.rename(MqttPasswordFile + ".tmp", MqttPasswordFile) + + # update the directory meta-info + fd = os.open(os.path.dirname(MqttPasswordFile), 0) + os.fsync(fd) + os.close(fd) + + return password + + def register(self): + if self._init_broker_timer is not None: + return + if self._init_broker(quiet=False, timeout=5): + if not self._aborted.is_set(): + logging.info("[InitBroker] Registration failed. Retrying in thread, silently.") + logging.getLogger("requests").setLevel(logging.WARNING) + # Not using gobject to keep these blocking operations out of the event loop + self._init_broker_timer = RepeatingTimer(self._init_broker, 60) + self._init_broker_timer.start() + + def abort_gracefully(self): + self._aborted.set() + if self._init_broker_timer: + self._init_broker_timer.stop() + self._init_broker_timer.join() + + @property + def client_id(self): + return self._client_id + + def _write_config_atomically(self, path, contents): + + config_dir = os.path.dirname(path) + if not os.path.exists(config_dir): + os.makedirs(config_dir) + + with open(path + ".tmp", 'wt') as out_file: + # make sure the new config is on the disk + out_file.write(contents) + out_file.flush() + os.fsync(out_file.fileno()) + + # make sure there is either the old file or the new one + os.rename(path + ".tmp", path) + + # update the directory meta-info + fd = os.open(os.path.dirname(path), 0) + os.fsync(fd) + os.close(fd) + + def _init_broker(self, quiet=True, timeout=5): + try: + with open(LockFilePath, "a") as lockFile: + fcntl.flock(lockFile, fcntl.LOCK_EX) + + orig_config = None + # Read the current config file (if present) + try: + if not quiet: + logging.info('[InitBroker] Reading config file') + with open(BridgeConfigPath, 'rt') as in_file: + orig_config = in_file.read() + settings = dict(tuple(l.strip().split(' ', 1)) for l in orig_config.split('\n') + if not l.startswith('#') and l.strip() != '') + self._client_id = settings.get('remote_clientid') + self._global_broker_password = settings.get('remote_password') + except IOError: + if not quiet: + logging.info('[InitBroker] Reading config file failed.') + # We need a guarantee an empty file, otherwise Mosquitto crashes on load. + if not os.path.exists(BridgeConfigPath): + self._write_config_atomically(BridgeConfigPath, ""); + # Fix items missing from config + if self._client_id is None: + self._client_id = 'ccgx_' + get_random_string(12) + if self._global_broker_password is None: + self._global_broker_password = self.load_or_generate_mqtt_password() + # Get to the actual registration + if not quiet: + logging.info('[InitBroker] Registering CCGX at VRM portal') + with requests.Session() as session: + headers = {'content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'dbus-mqtt'} + r = session.post( + VrmApiServer + '/log/storemqttpassword.php', + data=dict(identifier=self._global_broker_username, mqttPassword=self._global_broker_password), + headers=headers, + verify=CaBundlePath, + timeout=(timeout,timeout)) + if r.status_code == requests.codes.ok: + config = BridgeSettings.format(self._system_id, + self._global_broker_password, self._client_id, + self._get_vrm_broker_url(), RpcBroker, CaBundlePath, + self._global_broker_username) + # Do we need to adjust the settings file? + if config != orig_config: + logging.info('[InitBroker] Writing new config file') + self._write_config_atomically(BridgeConfigPath, config) + self._restart_broker() + else: + logging.info('[InitBroker] Not updating config file and not restarting Mosquitto, because config is correct.') + self._init_broker_timer = None + logging.getLogger("requests").setLevel(self._requests_log_level) + logging.info('[InitBroker] Registration successful') + return False + if not quiet: + logging.error('VRM registration failed. Http status was: {}'.format(r.status_code)) + logging.error('Message was: {}'.format(r.text)) + except: + if not quiet: + traceback.print_exc() + # Notify the timer we want to be called again + return True + + def _restart_broker(self): + logging.info('Restarting broker') + subprocess.call(['svc', '-t', '/service/mosquitto']) + + def get_password(self): + assert self._global_broker_password is not None + return self._global_broker_password + + def get_apikey(self): + return self._global_broker_username + + +def get_random_string(size=32): + """Creates a random (hex) string which contains 'size' characters.""" + return ''.join("{0:02x}".format(b) for b in open('/dev/urandom', 'rb').read(int(size/2))) + +# vim: noexpandtab:shiftwidth=4:tabstop=4:softtabstop=0