Mercurial > ~darius > hgwebdir.cgi > epro
view velib_python/mosquitto_bridge_registrator.py @ 11:b4d6c6049024
Add license
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 05 Dec 2021 18:02:32 +1030 |
parents | 9c0435a617db |
children |
line wrap: on
line source
import fcntl import threading import logging import os import requests import subprocess import traceback from ve_utils import exit_on_error VrmNumberOfBrokers = 128 VrmApiServer = 'https://ccgxlogging.victronenergy.com' CaBundlePath = "/etc/ssl/certs/ccgx-ca.pem" RpcBroker = 'mqtt-rpc.victronenergy.com' SettingsPath = os.environ.get('DBUS_MQTT_PATH') or '/data/conf/mosquitto.d' BridgeConfigPath = os.path.join(SettingsPath, 'vrm_bridge.conf') MqttPasswordFile = "/data/conf/mqtt_password.txt" BridgeSettings = '''# Generated by MosquittoBridgeRegistrator. Any changes will be overwritten on service start. connection rpc address {4}:443 cleansession true topic P/{0}/in/# in topic P/{0}/out/# out remote_clientid rpc-{2} remote_username {6} remote_password {1} bridge_cafile {5} connection vrm address {3}:443 cleansession true topic N/{0}/# out topic R/{0}/# in topic W/{0}/# in remote_clientid {2} remote_username {6} remote_password {1} bridge_cafile {5} ''' LockFilePath = "/run/mosquittobridgeregistrator.lock" class RepeatingTimer(threading.Thread): def __init__(self, callback, interval): threading.Thread.__init__(self) self.event = threading.Event() self.callback = callback self.interval = interval def run(self): while not self.event.is_set(): if not self.callback(): self.event.set() # either call your function here, # or put the body of the function here self.event.wait(self.interval) def stop(self): self.event.set() class MosquittoBridgeRegistrator(object): """ The MosquittoBridgeRegistrator manages a bridge connection between the local Mosquitto MQTT server, and the global VRM broker. It can be called concurrently by different processes; efforts will be synchronized using an advisory lock file. It now also supports registering the API key and getting it and the password without restarting Mosquitto. This allows using the API key, but not use the local broker and instead connect directly to the VRM broker url. """ def __init__(self, system_id): self._init_broker_timer = None self._aborted = threading.Event() self._client_id = None self._system_id = system_id self._global_broker_username = "ccgxapikey_" + self._system_id self._global_broker_password = None self._requests_log_level = logging.getLogger("requests").getEffectiveLevel() def _get_vrm_broker_url(self): """To allow scaling, the VRM broker URL is generated based on the system identifier The function returns a numbered broker URL between 0 and VrmNumberOfBrokers, which makes sure that broker connections are distributed equally between all VRM brokers """ sum = 0 for character in self._system_id.lower().strip(): sum += ord(character) broker_index = sum % VrmNumberOfBrokers return "mqtt{}.victronenergy.com".format(broker_index) def load_or_generate_mqtt_password(self): """In case posting the password to storemqttpassword.php was processed by the server, but we never saw the response, we need to keep it around for the next time (don't post a random new one). This way of storing the password was incepted later, and makes it backwards compatible. """ if os.path.exists(MqttPasswordFile): with open(MqttPasswordFile, "r") as f: logging.info("Using {}".format(MqttPasswordFile)) password = f.read().strip() return password else: with open(MqttPasswordFile + ".tmp", "w") as f: logging.info("Writing new {}".format(MqttPasswordFile)) password = get_random_string(32) # make sure the password is on the disk f.write(password) f.flush() os.fsync(f.fileno()) os.rename(MqttPasswordFile + ".tmp", MqttPasswordFile) # update the directory meta-info fd = os.open(os.path.dirname(MqttPasswordFile), 0) os.fsync(fd) os.close(fd) return password def register(self): if self._init_broker_timer is not None: return if self._init_broker(quiet=False, timeout=5): if not self._aborted.is_set(): logging.info("[InitBroker] Registration failed. Retrying in thread, silently.") logging.getLogger("requests").setLevel(logging.WARNING) # Not using gobject to keep these blocking operations out of the event loop self._init_broker_timer = RepeatingTimer(self._init_broker, 60) self._init_broker_timer.start() def abort_gracefully(self): self._aborted.set() if self._init_broker_timer: self._init_broker_timer.stop() self._init_broker_timer.join() @property def client_id(self): return self._client_id def _write_config_atomically(self, path, contents): config_dir = os.path.dirname(path) if not os.path.exists(config_dir): os.makedirs(config_dir) with open(path + ".tmp", 'wt') as out_file: # make sure the new config is on the disk out_file.write(contents) out_file.flush() os.fsync(out_file.fileno()) # make sure there is either the old file or the new one os.rename(path + ".tmp", path) # update the directory meta-info fd = os.open(os.path.dirname(path), 0) os.fsync(fd) os.close(fd) def _init_broker(self, quiet=True, timeout=5): try: with open(LockFilePath, "a") as lockFile: fcntl.flock(lockFile, fcntl.LOCK_EX) orig_config = None # Read the current config file (if present) try: if not quiet: logging.info('[InitBroker] Reading config file') with open(BridgeConfigPath, 'rt') as in_file: orig_config = in_file.read() settings = dict(tuple(l.strip().split(' ', 1)) for l in orig_config.split('\n') if not l.startswith('#') and l.strip() != '') self._client_id = settings.get('remote_clientid') self._global_broker_password = settings.get('remote_password') except IOError: if not quiet: logging.info('[InitBroker] Reading config file failed.') # We need a guarantee an empty file, otherwise Mosquitto crashes on load. if not os.path.exists(BridgeConfigPath): self._write_config_atomically(BridgeConfigPath, ""); # Fix items missing from config if self._client_id is None: self._client_id = 'ccgx_' + get_random_string(12) if self._global_broker_password is None: self._global_broker_password = self.load_or_generate_mqtt_password() # Get to the actual registration if not quiet: logging.info('[InitBroker] Registering CCGX at VRM portal') with requests.Session() as session: headers = {'content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'dbus-mqtt'} r = session.post( VrmApiServer + '/log/storemqttpassword.php', data=dict(identifier=self._global_broker_username, mqttPassword=self._global_broker_password), headers=headers, verify=CaBundlePath, timeout=(timeout,timeout)) if r.status_code == requests.codes.ok: config = BridgeSettings.format(self._system_id, self._global_broker_password, self._client_id, self._get_vrm_broker_url(), RpcBroker, CaBundlePath, self._global_broker_username) # Do we need to adjust the settings file? if config != orig_config: logging.info('[InitBroker] Writing new config file') self._write_config_atomically(BridgeConfigPath, config) self._restart_broker() else: logging.info('[InitBroker] Not updating config file and not restarting Mosquitto, because config is correct.') self._init_broker_timer = None logging.getLogger("requests").setLevel(self._requests_log_level) logging.info('[InitBroker] Registration successful') return False if not quiet: logging.error('VRM registration failed. Http status was: {}'.format(r.status_code)) logging.error('Message was: {}'.format(r.text)) except: if not quiet: traceback.print_exc() # Notify the timer we want to be called again return True def _restart_broker(self): logging.info('Restarting broker') subprocess.call(['svc', '-t', '/service/mosquitto']) def get_password(self): assert self._global_broker_password is not None return self._global_broker_password def get_apikey(self): return self._global_broker_username def get_random_string(size=32): """Creates a random (hex) string which contains 'size' characters.""" return ''.join("{0:02x}".format(b) for b in open('/dev/urandom', 'rb').read(int(size/2))) # vim: noexpandtab:shiftwidth=4:tabstop=4:softtabstop=0