Mercurial > ~darius > hgwebdir.cgi > giant
diff giant.py @ 0:1f3c12ba927d default tip
Rework code for USB interface
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 19 Nov 2017 18:10:23 +1030 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/giant.py Sun Nov 19 18:10:23 2017 +1030 @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +# pip install --user pyusb pycrc +import crc16 +import exceptions +import os +import usb.core, usb.util, usb.control # https://github.com/pyusb/pyusb + +# USB details +# Borrowed from http://allican.be/blog/2017/01/28/reverse-engineering-cypress-serial-usb.html +vendorId = 0x0665 +productId = 0x5161 +interface = 0 + +# Have to add one to these CRCs values before sending +badvals = [0x28, 0x0d, 0x0a] + +class Timeout(exceptions.BaseException): + pass + +class FramingError(exceptions.BaseException): + pass + +class CRCError(exceptions.BaseException): + pass + +class GiantIPS(object): + PIWS = ['Reserved', 'InverterFault', 'BusOver', 'BusUnder', 'BusSoftFail', 'LineFail', 'OPVShort', 'InverterVoltsLow', + 'InverterVoltsHigh', 'OverTemp', 'FanLocked', 'BattVoltsHigh', 'BattLowAlarm', 'Reserved(Overcharge)', + 'BatterySHutdown', 'Reserved(BattDerate)', 'Overload', 'EEPROM', 'InverterOverCurrent', 'SelfTest', + 'OPDCVoltsOver', 'BattOpen', 'CurrentSenseFail', 'BatteryShort', 'PowerLimit', 'PVVoltsHigh1', + 'MPPTOverload', 'MPPTOverloadWarn', 'BattTooLowChrg', 'PVVoltsHigh2', 'MPPTOverload2', 'MPPTOverloadWarn2', + 'BattTooLowChrg2', 'PVVoltsHigh3', 'MPPTOverload3', 'MPPTOverloadWarn3', 'BattTooLowChrg3'] + def __init__(self): + dev = usb.core.find(idVendor = vendorId, idProduct = productId) + if dev.is_kernel_driver_active(interface): + dev.detach_kernel_driver(interface) + dev.set_interface_altsetting(0, 0) + self.dev = dev + + def compose_msg(self, data): + crc = crc16.crc16xmodem(data) + crclow = crc & 0xff + crchigh = crc >> 8 + if crclow in badvals: + crclow += 1 + if crchigh in badvals: + crchigh += 1 + data = data + chr(crchigh) + chr(crclow) + chr(0x0d) + while len(data) < 8: + data = data + b'\0' + return data + + def tx_msg(self, data): + self.dev.ctrl_transfer(0x21, 0x9, 0x200, 0, self.compose_msg(data)) + + def rx_msg(self): + res = '' + tries = 200 + while tries > 0 and '\r' not in res: + try: + d = self.dev.read(0x81, 8, 10) + res += ''.join([chr(i) for i in d if i != 0x00]) + except usb.core.USBError as e: + if e.errno == 110: # timeout + tries -= 1 + pass + else: + raise + + + if tries == 0: + raise Timeout() + if res[0] != '(' or res[-1] != '\r': + raise FramingError() + crc = crc16.crc16xmodem(res[0:-3]) + crclow = crc & 0xff + crchigh = crc >> 8 + if ord(res[-3]) != crchigh or ord(res[-2]) != crclow: + #raise CRCError() + print('CRC error') + + return res[1:-3] + + def cmd(self, cmd): + self.tx_msg(cmd) + return self.rx_msg() + + def getStatus(self): + d = self.cmd('QPIGS').split() + status = {} + if d[16][0] == '1': + status['SBUPrio'] = True + else: + status['SBUPrio'] = False + if d[16][1] == '1': + status['ConfigChg'] = True + else: + status['ConfigChg'] = False + if d[16][2] == '1': + status['BattVoltSteady'] = True + else: + status['BattVoltSteady'] = False + if d[16][3] == '1': + status['Charging'] = True + else: + status['Charging'] = False + if d[16][4] == '1': + status['SCC1Charging'] = True + else: + status['SCC1Charging'] = False + if d[16][5:] == '110': + status['ChargeType'] = 'SCC1' + elif d[16][5:] == '101': + status['ChargeType'] = 'AC' + else: + status['ChargeType'] = 'Both' + if d[20][0] == '1': + status['FloatCharge'] = True + else: + status['FloatCharge'] = False + if d[20][1] == '1': + status['Switch'] = True + else: + status['Switch'] = False + return { + 'GridVolts' : float(d[0]), + 'GridFreq' : float(d[1]), + 'ACVolts' : float(d[2]), + 'ACFreq' : float(d[3]), + 'ACAppPower' : float(d[4]), + 'ACActPower' : float(d[5]), + 'LoadPct' : float(d[6]), + 'BusVolts' : float(d[7]), + 'BattVolts' : float(d[8]), + 'BattChrCurr' : float(d[9]), + 'BattCap' : float(d[10]), + 'HSTemp' : float(d[11]), + 'PVCurr1' : float(d[12]), + 'PVVolt1' : float(d[13]), + 'SCC1Volt' : float(d[14]), + 'BattDisCurr' : float(d[15]), + 'Status' : status, + 'BattVoltOfs' : float(d[17]) / 0.01, # 10mV + 'PVChrgPow1' : float(d[19]), + } + + def getAlarms(self): + d = self.cmd('QPIWS') + res = {} + for i in range(min(len(d), len(self.PIWS))): + if d[i] == '1': + res[self.PIWS[i]] = True + else: + res[self.PIWS[i]] = False + return res + +def main(): + ips = GiantIPS() + print(ips.cmd('QPI')) + +if __name__ == '__main__': + main()