Mercurial > ~darius > hgwebdir.cgi > epro
view eprodbus.py @ 28:ea07d14075e6 default tip
Remove superfluous statement.
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Tue, 21 Dec 2021 15:06:13 +1030 |
parents | 64d0d5aaf329 |
children |
line wrap: on
line source
#!/usr/bin/env python # Read enerdrive ePro packets from serial port and update DBus with them # Also logs to an sqlite3 DB every 60 seconds import datetime from dbus.mainloop.glib import DBusGMainLoop import epro import gobject import logging import os import serial import signal import sqlite3 import sys sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'velib_python')) from vedbus import VeDbusService from ve_utils import exit_on_error logger = logging.getLogger('eprodbus') logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(message)s') ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) logger.info(__file__ + " is starting up") dbpath = '/home/root/vanlogger/log.db' instance = 0 class eProUpdater: def __init__(self, dbusservice, s, dbh): self.log_queue = [] self.dbusservice = dbusservice self.p = epro.Processor() self.s = s self.dbh = dbh gobject.io_add_watch(s.fileno(), gobject.IO_IN, lambda fd, userdata: exit_on_error(self.read_serial, fd, userdata)) gobject.timeout_add(60000, exit_on_error, self.log_epro) def read_serial(self, fd, userdata): data = self.s.read(1024) logger.debug('Read %d bytes from serial port', len(data)) self.p.process(data) while len(self.p.packets) > 0: # Process oldest packets first p = self.p.packets.pop(0) self.log_queue.append(p) logger.debug('%s', str(p)) if type(p) == epro.StateOfCharge: self.dbusservice['/Soc'] = p.soc elif type(p) == epro.MainVoltage: self.dbusservice['/Dc/0/Voltage'] = p.volts elif type(p) == epro.BatteryCurrent: self.dbusservice['/Dc/0/Current'] = p.amps elif type(p) == epro.BatteryTemperature: self.dbusservice['/Dc/0/Temperature'] = p.temp elif type(p) == epro.AmpHours: self.dbusservice['/ConsumedAmphours'] = p.amphrs elif type(p) == epro.TimeRemaining: # ePro reports in minutes, Venus expects seconds if p.time >= 0: self.dbusservice['/TimeToGo'] = p.time * 60 else: # Battery is charging self.dbusservice['/TimeToGo'] = None elif type(p) == epro.AuxVoltage: self.dbusservice['/Dc/1/Voltage'] = p.volts elif type(p) == epro.MonitorStatus: self.dbusservice['/Alarms/LowVoltage'] = 2 if p.mainlv else 0 self.dbusservice['/Alarms/HighVoltage'] = 2 if p.mainhv else 0 self.dbusservice['/Alarms/LowStarterVoltage'] = 2 if p.auxlv else 0 self.dbusservice['/Alarms/HighStarterVoltage'] = 2 if p.auxhv else 0 self.dbusservice['/Alarms/LowSoc'] = 2 if p.batflat else 0 self.dbusservice['/Settings/HasTemperature'] = 1 if p.tempsense else 0 return True def log_epro(self): logger.debug('Logging epro data') # Check we have all the packets we need in the queue msgtypes = set([x.msgtype for x in self.log_queue]) wantedtypes = set([ epro.MainVoltage.MSGTYPE, epro.AmpHours.MSGTYPE, epro.BatteryCurrent.MSGTYPE, epro.StateOfCharge.MSGTYPE, epro.TimeRemaining.MSGTYPE, epro.BatteryTemperature.MSGTYPE, epro.MonitorStatus.MSGTYPE, epro.AuxVoltage.MSGTYPE, ]) if msgtypes < wantedtypes: logger.debug('Didn\'t get all packet types required to log') return row = {} usedtypes = set() while len(self.log_queue) > 0: pkt = self.log_queue.pop() # Read latest packets first if pkt.msgtype == epro.MainVoltage.MSGTYPE: row['main_voltage'] = pkt.volts elif pkt.msgtype == epro.AmpHours.MSGTYPE: row['amp_hours'] = pkt.amphrs elif pkt.msgtype == epro.BatteryCurrent.MSGTYPE: row['battery_curr'] = pkt.amps elif pkt.msgtype == epro.StateOfCharge.MSGTYPE: row['state_of_charge'] = pkt.soc elif pkt.msgtype == epro.TimeRemaining.MSGTYPE: row['time_remaining'] = pkt.time elif pkt.msgtype == epro.BatteryTemperature.MSGTYPE: row['battery_temp'] = pkt.temp elif pkt.msgtype == epro.MonitorStatus.MSGTYPE: row['auto_sync_volts'] = pkt.autosyncvolt row['auto_sync_curr'] = pkt.autosyncamp row['e501'] = pkt.e501compat row['alarm_test'] = pkt.alarmtst row['light'] = pkt.backlight row['display_test'] = pkt.disptst row['temp_sensor'] = pkt.tempsense row['aux_hv'] = pkt.auxhv row['aux_lv'] = pkt.auxlv row['installer_lock'] = pkt.lock row['main_hv'] = pkt.mainhv row['main_lv'] = pkt.mainlv row['low_battery'] = pkt.lowbatalarm row['battery_flat'] = pkt.batflat row['battery_full'] = pkt.batfull row['battery_charged'] = pkt.charged row['no_sync'] = pkt.nosync row['monitor_reset'] = pkt.monreset elif pkt.msgtype == epro.AuxVoltage.MSGTYPE: row['aux_voltage'] = pkt.volts usedtypes.add(pkt.msgtype) if usedtypes >= wantedtypes: self.log_queue = [] break logger.info('Got all packets, logging') cur = self.dbh.cursor() row['tstamp'] = int(datetime.datetime.now().strftime('%s')) cur.execute('INSERT INTO eprolog VALUES (:tstamp, :main_voltage, :aux_voltage, :battery_curr, :amp_hours, :state_of_charge, :time_remaining, :battery_temp, :auto_sync_volts, :auto_sync_curr, :e501, :alarm_test, :light, :display_test, :temp_sensor, :aux_hv, :aux_lv, :installer_lock, :main_hv, :main_lv, :low_battery, :battery_flat, :battery_full, :battery_charged, :no_sync, :monitor_reset)', row) self.dbh.commit() return True def doexit(signum, frame): sys.exit(1) def main(): port = sys.argv[1] servicename = 'com.victronenergy.battery.' + port # Add signal handler to exit, otherwise we have to press ctrl-c twice to quit signal.signal(signal.SIGINT, doexit) DBusGMainLoop(set_as_default = True) dbusservice = VeDbusService(servicename) dbusservice.add_mandatory_paths(__file__, '1.0', 'Serial ' + port, instance, 0, 'Enerdrive ePro', '1.0', '1.0', 1) dbusservice.add_path('/Dc/0/Voltage', value = None) dbusservice.add_path('/Dc/0/Temperature', value = None) dbusservice.add_path('/Dc/0/Current', value = None) dbusservice.add_path('/Dc/1/Voltage', value = None) dbusservice.add_path('/Soc', value = None) dbusservice.add_path('/TimeToGo', value = None) dbusservice.add_path('/ConsumedAmphours', value = None) dbusservice.add_path('/Alarms/LowVoltage', value = 0) dbusservice.add_path('/Alarms/HighVoltage', value = 0) dbusservice.add_path('/Alarms/LowStarterVoltage', value = 0) dbusservice.add_path('/Alarms/HighStarterVoltage', value = 0) dbusservice.add_path('/Alarms/LowSoc', value = 0) dbusservice.add_path('/Settings/HasTemperature', value = 0) s = serial.Serial('/dev/' + port, 2400, parity = 'E') s.timeout = 0.1 dbh = sqlite3.connect(dbpath) updater = eProUpdater(dbusservice, s, dbh) logger.info('Starting main loop') mainloop = gobject.MainLoop() mainloop.run() if __name__ == '__main__': main()