view epro.py @ 13:4450cf739263

Add link to protocol document.
author Daniel O'Connor <darius@dons.net.au>
date Mon, 06 Dec 2021 11:25:51 +1030
parents 0a571da65068
children 60ead9b5fc1b
line wrap: on
line source

#!/usr/bin/env python

import logging
import serial
import sys

logger = logging.getLogger('epro')

# View facing ePro from the back
#   +---+
# +-|   |-|
# |       |
# |       |
# +-------+
#  1 ... 6
#
# RJ12 plug (Jaycar cable colours)
# 1 white   temp out
# 2 black   temp in
# 3 red     GND
# 4 green   TX TTL (5V)
# 5 yellow  RX TTL (5V)
# 6 blue    VCC (12V)
#
# 2400bps 8E1
# s = serial.Serial('/dev/cu.usbserial-AM01Z7TZ', 2400, parity='E')
#
# Protocol document: https://www.enerdrive.com.au/wp-content/uploads/ePRO-communication-interface-spec-rev.02.pdf

class Packet(object):
    MSGTYPE = -1

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        self.dstadr = dstadr
        self.srcadr = srcadr
        self.devid = devid
        self.msgtype = msgtype
        self.data = data

    def __repr__(self):
        hdr = "Src: 0x%02x Dst: 0x%02x DevID: 0x%02x MsgType: 0x%02x" % (
            self.dstadr, self.srcadr, self.devid, self.msgtype)
        # MSGTYPE is overridden for subclasses which also have their own repr so don't dumb data here
        if self.MSGTYPE == -1:
            hdr += " Data:"
            for d in self.data:
                hdr += " 0x%02x" % (d)
        else:
            hdr += " Name: " + self.MSGNAME

        return hdr

class MainVoltage(Packet):
    """Main Battery Voltage"""
    MSGTYPE = 0x60
    MSGNAME = "Main Voltage"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(MainVoltage, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.volts = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x03) << 14) / 100.0

    def __repr__(self):
        s = super(MainVoltage, self).__repr__()
        s += ": %.2f V" % (self.volts)
        return s

class BatteryCurrent(Packet):
    """Main Battery Current"""
    MSGTYPE = 0x61
    MSGNAME = "Battery Current"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(BatteryCurrent, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.amps = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14) / 100.0
        if data[0] & 0x40:
            self.amps *= -1

    def __repr__(self):
        s = super(BatteryCurrent, self).__repr__()
        s += ": %.2f A" % (self.amps)
        return s

class AmpHours(Packet):
    """Number of amp hours removed from the battery"""
    MSGTYPE = 0x62
    MSGNAME = "Amp Hours"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(AmpHours, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.amphrs = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14) / 100.0
        if data[0] & 0x40:
            self.amphrs *= -1

    def __repr__(self):
        s = super(AmpHours, self).__repr__()
        s += ": %.2f Ah" % (self.amphrs)
        return s

class StateOfCharge(Packet):
    """State of battery charge"""
    MSGTYPE = 0x64
    MSGNAME = "State Of Charge"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(StateOfCharge, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.soc = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x03) << 14) / 10.0

    def __repr__(self):
        s = super(StateOfCharge, self).__repr__()
        s += ": %.2f%%" % (self.soc)
        return s

class TimeRemaining(Packet):
    """Time remaining until battery needs charging"""
    MSGTYPE = 0x65
    MSGNAME = "Time remaining"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(TimeRemaining, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.time = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14)
        if data[0] & 0x40:
            self.time *= -1

    def __repr__(self):
        s = super(TimeRemaining, self).__repr__()
        s += ": %.2f min" % (self.time)
        return s

class BatteryTemperature(Packet):
    """Battery temperature (degrees Celcius)"""
    MSGTYPE = 0x66
    MSGNAME = "Battery Temperature"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(BatteryTemperature, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.temp = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x3f) << 14) / 10.0
        if data[2] & 0x40:
            self.temp *= -1
        if self.temp == -20:
            self.temp = None

    def __repr__(self):
        s = super(BatteryTemperature, self).__repr__()
        if self.temp == None:
            s += ": n/a"
        else:
            s += ": %.2f degC" % (self.temp)
        return s

class MonitorStatus(Packet):
    """Monitor status"""
    MSGTYPE = 0x67
    MSGNAME = "Monitor Status"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(MonitorStatus, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.autosyncvolt = bool(data[0] & 0x10)
        self.autosyncamp = bool(data[0] & 0x08)
        self.autosyncchrg = bool(data[0] & 0x04)
        self.e501compat = bool(data[0] & 0x02)
        self.alarmtst = bool(data[0] & 0x01)
        self.backlight = bool(data[1] & 0x40)
        self.disptst = bool(data[1] & 0x20)
        self.tempsense = bool(data[1] & 0x10) # Seems to be inverted from data sheet
        self.auxhv = bool(data[1] & 0x08)
        self.auxlv = bool(data[1] & 0x04)
        self.lock = bool(data[1] & 0x02)
        self.mainhv = bool(data[1] & 0x01)
        self.mainlv = bool(data[2] & 0x40)
        self.lowbatalarm = bool(data[2] & 0x20)
        self.batflat = bool(data[2] & 0x10)
        self.batfull = bool(data[2] & 0x08)
        self.charged = bool(data[2] & 0x04)
        self.nosync = bool(data[2] & 0x02)
        self.monreset = bool(data[2] & 0x01)

    def __repr__(self):
        s = super(MonitorStatus, self).__repr__()
        stats = ( ( "ASV", self.autosyncvolt ), ( "ASA", self.autosyncamp ), ( "ASC" , self.autosyncchrg ),
          ( "E501", self.e501compat ), ( "Alarm test", self.alarmtst ), ( "Light", self.backlight ),
          ( "Temperature Sensor", self.tempsense ), ( "Aux HV", self.auxhv ), ( "Aux LV", self.auxlv ),
          ( "Lock", self.lock ), ( "Main HV", self.mainhv ), ( "Main LV", self.mainlv ),
          ( "Low Battery", self.lowbatalarm ), ( "Battery Flat", self.batfull ),
          ( "Battery Full", self.batfull ), ( "Battery Charged", self.charged ),
          ( "No Sync", self.nosync ), ( "Monitor Reset", self.monreset ) )
        for (short, var) in stats:
            if var:
                s += " %s : True" % (short)
            else:
                s += " %s : False" % (short)

        return s

class AuxVoltage(Packet):
    """Aux Battery Voltage"""
    MSGTYPE = 0x68
    MSGNAME = "Aux Voltage"
    LEN = 3

    def __init__(self, dstadr, srcadr, devid, msgtype, data):
        super(AuxVoltage, self).__init__(dstadr, srcadr, devid, msgtype, data)
        self.volts = (data[2] & 0x7f | (data[1] & 0x7f) << 7 | (data[0] & 0x03) << 14) / 100.0

    def __repr__(self):
        s = super(AuxVoltage, self).__repr__()
        s += ": %.2f V" % (self.volts)
        return s

class Processor(object):
    PKT_TYPES = { MainVoltage.MSGTYPE : MainVoltage, BatteryCurrent.MSGTYPE : BatteryCurrent,
                  AmpHours.MSGTYPE : AmpHours, StateOfCharge.MSGTYPE : StateOfCharge,
                  TimeRemaining.MSGTYPE : TimeRemaining, BatteryTemperature.MSGTYPE : BatteryTemperature,
                  MonitorStatus.MSGTYPE : MonitorStatus, AuxVoltage.MSGTYPE : AuxVoltage }

    def __init__(self):
        self.state = 0
        self.packets = []

    def process(self, dat):
        added = False
        for d in dat:
            d = ord(d)
            if d == 0xff and self.state != 4:
                logger.warn("Packet corruption")
                continue

            if self.state == 0:
                # Waiting for destination address (MSB set but not 0xff as that is EOM)
                if d == 0xff or d & 0x80 == 0:
                    logger.info("Skipping byte")
                    continue
                self.dstadr = d & 0x7f
                self.data = []
                self.state += 1
            elif self.state == 1:
                # Source address
                self.srcadr = d
                self.state += 1
            elif self.state == 2:
                # Device ID
                self.devid = d
                self.state += 1
            elif self.state == 3:
                # Message type
                self.msgtype = d
                self.state += 1
            elif self.state == 4:
                # Data
                if d != 0xff:
                    self.data.append(d)
                    continue
                self.state = 0
                if self.msgtype in Processor.PKT_TYPES:
                    t = self.PKT_TYPES[self.msgtype]
                    if len(self.data) != t.LEN:
                        logger.warn("Packet length incorrect, expected %d got %d", t.LEN, len(self.data))
                        continue

                    p = self.PKT_TYPES[self.msgtype](self.dstadr, self.srcadr, self.devid, self.msgtype, self.data)
                else:
                    p = Packet(self.dstadr, self.srcadr, self.devid, self.msgtype, self.data)

                self.packets.append(p)
                added = True
        return added

def main():
    if len(sys.argv) != 2:
        print('Bad usage')
        exit(1)

    s = serial.Serial(sys.argv[1], 2400, parity='E')
    s.timeout = 0.2

    p = Processor()
    while True:
        res = p.process(s.read(1024))
        while len(p.packets) > 0:
            print(p.packets.pop(0))

if __name__ == '__main__':
    main()