Mercurial > ~darius > hgwebdir.cgi > epro
view epro.py @ 13:4450cf739263
Add link to protocol document.
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Mon, 06 Dec 2021 11:25:51 +1030 |
parents | 0a571da65068 |
children | 60ead9b5fc1b |
line wrap: on
line source
#!/usr/bin/env python import logging import serial import sys logger = logging.getLogger('epro') # View facing ePro from the back # +---+ # +-| |-| # | | # | | # +-------+ # 1 ... 6 # # RJ12 plug (Jaycar cable colours) # 1 white temp out # 2 black temp in # 3 red GND # 4 green TX TTL (5V) # 5 yellow RX TTL (5V) # 6 blue VCC (12V) # # 2400bps 8E1 # s = serial.Serial('/dev/cu.usbserial-AM01Z7TZ', 2400, parity='E') # # Protocol document: https://www.enerdrive.com.au/wp-content/uploads/ePRO-communication-interface-spec-rev.02.pdf class Packet(object): MSGTYPE = -1 def __init__(self, dstadr, srcadr, devid, msgtype, data): self.dstadr = dstadr self.srcadr = srcadr self.devid = devid self.msgtype = msgtype self.data = data def __repr__(self): hdr = "Src: 0x%02x Dst: 0x%02x DevID: 0x%02x MsgType: 0x%02x" % ( self.dstadr, self.srcadr, self.devid, self.msgtype) # MSGTYPE is overridden for subclasses which also have their own repr so don't dumb data here if self.MSGTYPE == -1: hdr += " Data:" for d in self.data: hdr += " 0x%02x" % (d) else: hdr += " Name: " + self.MSGNAME return hdr class MainVoltage(Packet): """Main Battery Voltage""" MSGTYPE = 0x60 MSGNAME = "Main Voltage" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(MainVoltage, self).__init__(dstadr, srcadr, devid, msgtype, data) self.volts = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x03) << 14) / 100.0 def __repr__(self): s = super(MainVoltage, self).__repr__() s += ": %.2f V" % (self.volts) return s class BatteryCurrent(Packet): """Main Battery Current""" MSGTYPE = 0x61 MSGNAME = "Battery Current" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(BatteryCurrent, self).__init__(dstadr, srcadr, devid, msgtype, data) self.amps = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14) / 100.0 if data[0] & 0x40: self.amps *= -1 def __repr__(self): s = super(BatteryCurrent, self).__repr__() s += ": %.2f A" % (self.amps) return s class AmpHours(Packet): """Number of amp hours removed from the battery""" MSGTYPE = 0x62 MSGNAME = "Amp Hours" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(AmpHours, self).__init__(dstadr, srcadr, devid, msgtype, data) self.amphrs = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14) / 100.0 if data[0] & 0x40: self.amphrs *= -1 def __repr__(self): s = super(AmpHours, self).__repr__() s += ": %.2f Ah" % (self.amphrs) return s class StateOfCharge(Packet): """State of battery charge""" MSGTYPE = 0x64 MSGNAME = "State Of Charge" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(StateOfCharge, self).__init__(dstadr, srcadr, devid, msgtype, data) self.soc = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x03) << 14) / 10.0 def __repr__(self): s = super(StateOfCharge, self).__repr__() s += ": %.2f%%" % (self.soc) return s class TimeRemaining(Packet): """Time remaining until battery needs charging""" MSGTYPE = 0x65 MSGNAME = "Time remaining" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(TimeRemaining, self).__init__(dstadr, srcadr, devid, msgtype, data) self.time = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14) if data[0] & 0x40: self.time *= -1 def __repr__(self): s = super(TimeRemaining, self).__repr__() s += ": %.2f min" % (self.time) return s class BatteryTemperature(Packet): """Battery temperature (degrees Celcius)""" MSGTYPE = 0x66 MSGNAME = "Battery Temperature" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(BatteryTemperature, self).__init__(dstadr, srcadr, devid, msgtype, data) self.temp = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14) / 10.0 if data[2] & 0x40: self.temp *= -1 if self.temp == -20: self.temp = None def __repr__(self): s = super(BatteryTemperature, self).__repr__() if self.temp == None: s += ": n/a" else: s += ": %.2f degC" % (self.temp) return s class MonitorStatus(Packet): """Monitor status""" MSGTYPE = 0x67 MSGNAME = "Monitor Status" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(MonitorStatus, self).__init__(dstadr, srcadr, devid, msgtype, data) self.autosyncvolt = bool(data[0] & 0x10) self.autosyncamp = bool(data[0] & 0x08) self.autosyncchrg = bool(data[0] & 0x04) self.e501compat = bool(data[0] & 0x02) self.alarmtst = bool(data[0] & 0x01) self.backlight = bool(data[1] & 0x40) self.disptst = bool(data[1] & 0x20) self.tempsense = bool(data[1] & 0x10) # Seems to be inverted from data sheet self.auxhv = bool(data[1] & 0x08) self.auxlv = bool(data[1] & 0x04) self.lock = bool(data[1] & 0x02) self.mainhv = bool(data[1] & 0x01) self.mainlv = bool(data[2] & 0x40) self.lowbatalarm = bool(data[2] & 0x20) self.batflat = bool(data[2] & 0x10) self.batfull = bool(data[2] & 0x08) self.charged = bool(data[2] & 0x04) self.nosync = bool(data[2] & 0x02) self.monreset = bool(data[2] & 0x01) def __repr__(self): s = super(MonitorStatus, self).__repr__() stats = ( ( "ASV", self.autosyncvolt ), ( "ASA", self.autosyncamp ), ( "ASC" , self.autosyncchrg ), ( "E501", self.e501compat ), ( "Alarm test", self.alarmtst ), ( "Light", self.backlight ), ( "Temperature Sensor", self.tempsense ), ( "Aux HV", self.auxhv ), ( "Aux LV", self.auxlv ), ( "Lock", self.lock ), ( "Main HV", self.mainhv ), ( "Main LV", self.mainlv ), ( "Low Battery", self.lowbatalarm ), ( "Battery Flat", self.batfull ), ( "Battery Full", self.batfull ), ( "Battery Charged", self.charged ), ( "No Sync", self.nosync ), ( "Monitor Reset", self.monreset ) ) for (short, var) in stats: if var: s += " %s : True" % (short) else: s += " %s : False" % (short) return s class AuxVoltage(Packet): """Aux Battery Voltage""" MSGTYPE = 0x68 MSGNAME = "Aux Voltage" LEN = 3 def __init__(self, dstadr, srcadr, devid, msgtype, data): super(AuxVoltage, self).__init__(dstadr, srcadr, devid, msgtype, data) self.volts = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x03) << 14) / 100.0 def __repr__(self): s = super(AuxVoltage, self).__repr__() s += ": %.2f V" % (self.volts) return s class Processor(object): PKT_TYPES = { MainVoltage.MSGTYPE : MainVoltage, BatteryCurrent.MSGTYPE : BatteryCurrent, AmpHours.MSGTYPE : AmpHours, StateOfCharge.MSGTYPE : StateOfCharge, TimeRemaining.MSGTYPE : TimeRemaining, BatteryTemperature.MSGTYPE : BatteryTemperature, MonitorStatus.MSGTYPE : MonitorStatus, AuxVoltage.MSGTYPE : AuxVoltage } def __init__(self): self.state = 0 self.packets = [] def process(self, dat): added = False for d in dat: d = ord(d) if d == 0xff and self.state != 4: logger.warn("Packet corruption") continue if self.state == 0: # Waiting for destination address (MSB set but not 0xff as that is EOM) if d == 0xff or d & 0x80 == 0: logger.info("Skipping byte") continue self.dstadr = d & 0x7f self.data = [] self.state += 1 elif self.state == 1: # Source address self.srcadr = d self.state += 1 elif self.state == 2: # Device ID self.devid = d self.state += 1 elif self.state == 3: # Message type self.msgtype = d self.state += 1 elif self.state == 4: # Data if d != 0xff: self.data.append(d) continue self.state = 0 if self.msgtype in Processor.PKT_TYPES: t = self.PKT_TYPES[self.msgtype] if len(self.data) != t.LEN: logger.warn("Packet length incorrect, expected %d got %d", t.LEN, len(self.data)) continue p = self.PKT_TYPES[self.msgtype](self.dstadr, self.srcadr, self.devid, self.msgtype, self.data) else: p = Packet(self.dstadr, self.srcadr, self.devid, self.msgtype, self.data) self.packets.append(p) added = True return added def main(): if len(sys.argv) != 2: print('Bad usage') exit(1) s = serial.Serial(sys.argv[1], 2400, parity='E') s.timeout = 0.2 p = Processor() while True: res = p.process(s.read(1024)) while len(p.packets) > 0: print(p.packets.pop(0)) if __name__ == '__main__': main()