Mercurial > ~darius > hgwebdir.cgi > epro
view eprodbus.py @ 14:60ead9b5fc1b
Mark temperature as invalid when <-20C as per spec.
Shows up as -25C on the unit I have when the sensor is not connected.
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Mon, 06 Dec 2021 11:26:20 +1030 |
parents | 446cfe74827b |
children | 08b61687b75f |
line wrap: on
line source
#!/usr/bin/env python # Read enerdrive ePro packets from serial port and update DBus with them # Also logs to an sqlite3 DB every 60 seconds import datetime from dbus.mainloop.glib import DBusGMainLoop import epro import gobject import logging import os import serial import signal import sqlite3 import sys sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'velib_python')) from vedbus import VeDbusService logging.basicConfig(format = '%(asctime)s %(message)s', level = logging.DEBUG) logging.info(__file__ + " is starting up") port = 'ttyepro' servicename = 'com.victronenergy.battery.' + port instance = 0 class eProUpdater: def __init__(self, dbusservice, s, dbh): self.log_queue = [] self.dbusservice = dbusservice self.p = epro.Processor() self.s = s self.dbh = dbh gobject.io_add_watch(s.fileno(), gobject.IO_IN, self.read_serial) gobject.timeout_add(60000, self.log_epro) def read_serial(self, fd, userdata): try: data = self.s.read(1024) except Exception as e: logging.error('Failed to read from serial port: %s', str(e)) return False logging.debug('Read %d bytes from serial port', len(data)) self.p.process(data) while len(self.p.packets) > 0: # Process oldest packets first p = self.p.packets.pop(0) self.log_queue.append(p) logging.debug('%s', str(p)) if type(p) == epro.StateOfCharge: self.dbusservice['/Soc'] = p.soc elif type(p) == epro.MainVoltage: self.dbusservice['/Dc/0/Voltage'] = p.volts elif type(p) == epro.BatteryCurrent: self.dbusservice['/Dc/0/Current'] = p.amps elif type(p) == epro.AmpHours: self.dbusservice['/ConsumedAmphours'] = p.amphrs elif type(p) == epro.TimeRemaining: # ePro reports in minutes, Venus expects seconds self.dbusservice['/TimeToGo'] = p.time * 60 return True def log_epro(self): logging.debug('Logging epro data') # Check we have all the packets we need in the queue msgtypes = set([x.msgtype for x in self.log_queue]) wantedtypes = set([ epro.MainVoltage.MSGTYPE, epro.AmpHours.MSGTYPE, epro.BatteryCurrent.MSGTYPE, epro.StateOfCharge.MSGTYPE, epro.TimeRemaining.MSGTYPE, epro.BatteryTemperature.MSGTYPE, epro.MonitorStatus.MSGTYPE, epro.AuxVoltage.MSGTYPE, ]) if msgtypes < wantedtypes: logging.debug('Didn\'t get all packet types required to log') return row = {} usedtypes = set() while len(self.log_queue) > 0: pkt = self.log_queue.pop() # Read latest packets first if pkt.msgtype == epro.MainVoltage.MSGTYPE: row['main_voltage'] = pkt.volts elif pkt.msgtype == epro.AmpHours.MSGTYPE: row['amp_hours'] = pkt.amphrs elif pkt.msgtype == epro.BatteryCurrent.MSGTYPE: row['battery_curr'] = pkt.amps elif pkt.msgtype == epro.StateOfCharge.MSGTYPE: row['state_of_charge'] = pkt.soc elif pkt.msgtype == epro.TimeRemaining.MSGTYPE: row['time_remaining'] = pkt.time elif pkt.msgtype == epro.BatteryTemperature.MSGTYPE: row['battery_temp'] = pkt.temp elif pkt.msgtype == epro.MonitorStatus.MSGTYPE: row['auto_sync_volts'] = pkt.autosyncvolt row['auto_sync_curr'] = pkt.autosyncamp row['e501'] = pkt.e501compat row['alarm_test'] = pkt.alarmtst row['light'] = pkt.backlight row['display_test'] = pkt.disptst row['temp_sensor'] = pkt.tempsense row['aux_hv'] = pkt.auxhv row['aux_lv'] = pkt.auxlv row['installer_lock'] = pkt.lock row['main_hv'] = pkt.mainhv row['main_lv'] = pkt.mainlv row['low_battery'] = pkt.lowbatalarm row['battery_flat'] = pkt.batflat row['battery_full'] = pkt.batfull row['battery_charged'] = pkt.charged row['no_sync'] = pkt.nosync row['monitor_reset'] = pkt.monreset elif pkt.msgtype == epro.AuxVoltage.MSGTYPE: row['aux_voltage'] = pkt.volts usedtypes.add(pkt.msgtype) if usedtypes >= wantedtypes: self.log_queue = [] break logging.info('Got all packets, logging') cur = self.dbh.cursor() row['tstamp'] = int(datetime.datetime.now().strftime('%s')) cur.execute('INSERT INTO eprolog VALUES (:tstamp, :main_voltage, :aux_voltage, :battery_curr, :amp_hours, :state_of_charge, :time_remaining, :battery_temp, :auto_sync_volts, :auto_sync_curr, :e501, :alarm_test, :light, :display_test, :temp_sensor, :aux_hv, :aux_lv, :installer_lock, :main_hv, :main_lv, :low_battery, :battery_flat, :battery_full, :battery_charged, :no_sync, :monitor_reset)', row) self.dbh.commit() def doexit(): sys.exit(1) def main(): # Add signal handler to exit, otherwise we have to press ctrl-c twice to quit signal.signal(signal.SIGINT, doexit) DBusGMainLoop(set_as_default = True) dbusservice = VeDbusService(servicename) dbusservice.add_path('/Connected', value = True) dbusservice.add_path('/ProductName', value = 'Enerdrive ePro') dbusservice.add_path('/Mgmt/Connection', value = '/dev/' + port) dbusservice.add_path('/DeviceInstance', value = instance) dbusservice.add_path('/ProductId', value = 'unknown') dbusservice.add_path('/Dc/0/Voltage', value = None) dbusservice.add_path('/Dc/0/Current', value = None) dbusservice.add_path('/Soc', value = None) dbusservice.add_path('/TimeToGo', value = None) dbusservice.add_path('/ConsumedAmphours', value = None) s = serial.Serial('/dev/' + port, 2400, parity = 'E') s.timeout = 0.1 dbh = sqlite3.connect('/home/root/vanlogger/log.db') updater = eProUpdater(dbusservice, s, dbh) logging.info('Starting main loop') mainloop = gobject.MainLoop() mainloop.run() if __name__ == '__main__': main()