Mercurial > ~darius > hgwebdir.cgi > giant
view giant.py @ 0:1f3c12ba927d default tip
Rework code for USB interface
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sun, 19 Nov 2017 18:10:23 +1030 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python # pip install --user pyusb pycrc import crc16 import exceptions import os import usb.core, usb.util, usb.control # https://github.com/pyusb/pyusb # USB details # Borrowed from http://allican.be/blog/2017/01/28/reverse-engineering-cypress-serial-usb.html vendorId = 0x0665 productId = 0x5161 interface = 0 # Have to add one to these CRCs values before sending badvals = [0x28, 0x0d, 0x0a] class Timeout(exceptions.BaseException): pass class FramingError(exceptions.BaseException): pass class CRCError(exceptions.BaseException): pass class GiantIPS(object): PIWS = ['Reserved', 'InverterFault', 'BusOver', 'BusUnder', 'BusSoftFail', 'LineFail', 'OPVShort', 'InverterVoltsLow', 'InverterVoltsHigh', 'OverTemp', 'FanLocked', 'BattVoltsHigh', 'BattLowAlarm', 'Reserved(Overcharge)', 'BatterySHutdown', 'Reserved(BattDerate)', 'Overload', 'EEPROM', 'InverterOverCurrent', 'SelfTest', 'OPDCVoltsOver', 'BattOpen', 'CurrentSenseFail', 'BatteryShort', 'PowerLimit', 'PVVoltsHigh1', 'MPPTOverload', 'MPPTOverloadWarn', 'BattTooLowChrg', 'PVVoltsHigh2', 'MPPTOverload2', 'MPPTOverloadWarn2', 'BattTooLowChrg2', 'PVVoltsHigh3', 'MPPTOverload3', 'MPPTOverloadWarn3', 'BattTooLowChrg3'] def __init__(self): dev = usb.core.find(idVendor = vendorId, idProduct = productId) if dev.is_kernel_driver_active(interface): dev.detach_kernel_driver(interface) dev.set_interface_altsetting(0, 0) self.dev = dev def compose_msg(self, data): crc = crc16.crc16xmodem(data) crclow = crc & 0xff crchigh = crc >> 8 if crclow in badvals: crclow += 1 if crchigh in badvals: crchigh += 1 data = data + chr(crchigh) + chr(crclow) + chr(0x0d) while len(data) < 8: data = data + b'\0' return data def tx_msg(self, data): self.dev.ctrl_transfer(0x21, 0x9, 0x200, 0, self.compose_msg(data)) def rx_msg(self): res = '' tries = 200 while tries > 0 and '\r' not in res: try: d = self.dev.read(0x81, 8, 10) res += ''.join([chr(i) for i in d if i != 0x00]) except usb.core.USBError as e: if e.errno == 110: # timeout tries -= 1 pass else: raise if tries == 0: raise Timeout() if res[0] != '(' or res[-1] != '\r': raise FramingError() crc = crc16.crc16xmodem(res[0:-3]) crclow = crc & 0xff crchigh = crc >> 8 if ord(res[-3]) != crchigh or ord(res[-2]) != crclow: #raise CRCError() print('CRC error') return res[1:-3] def cmd(self, cmd): self.tx_msg(cmd) return self.rx_msg() def getStatus(self): d = self.cmd('QPIGS').split() status = {} if d[16][0] == '1': status['SBUPrio'] = True else: status['SBUPrio'] = False if d[16][1] == '1': status['ConfigChg'] = True else: status['ConfigChg'] = False if d[16][2] == '1': status['BattVoltSteady'] = True else: status['BattVoltSteady'] = False if d[16][3] == '1': status['Charging'] = True else: status['Charging'] = False if d[16][4] == '1': status['SCC1Charging'] = True else: status['SCC1Charging'] = False if d[16][5:] == '110': status['ChargeType'] = 'SCC1' elif d[16][5:] == '101': status['ChargeType'] = 'AC' else: status['ChargeType'] = 'Both' if d[20][0] == '1': status['FloatCharge'] = True else: status['FloatCharge'] = False if d[20][1] == '1': status['Switch'] = True else: status['Switch'] = False return { 'GridVolts' : float(d[0]), 'GridFreq' : float(d[1]), 'ACVolts' : float(d[2]), 'ACFreq' : float(d[3]), 'ACAppPower' : float(d[4]), 'ACActPower' : float(d[5]), 'LoadPct' : float(d[6]), 'BusVolts' : float(d[7]), 'BattVolts' : float(d[8]), 'BattChrCurr' : float(d[9]), 'BattCap' : float(d[10]), 'HSTemp' : float(d[11]), 'PVCurr1' : float(d[12]), 'PVVolt1' : float(d[13]), 'SCC1Volt' : float(d[14]), 'BattDisCurr' : float(d[15]), 'Status' : status, 'BattVoltOfs' : float(d[17]) / 0.01, # 10mV 'PVChrgPow1' : float(d[19]), } def getAlarms(self): d = self.cmd('QPIWS') res = {} for i in range(min(len(d), len(self.PIWS))): if d[i] == '1': res[self.PIWS[i]] = True else: res[self.PIWS[i]] = False return res def main(): ips = GiantIPS() print(ips.cmd('QPI')) if __name__ == '__main__': main()