# HG changeset patch # User darius@inchoate.localdomain # Date 1193823114 -37800 # Node ID ed7abe6f59c2a31f992049737ec6b8bac1467e48 # Parent 7bf4d4265339a786821ab6fe95519c9f99212c8b Many fixes (again) - Worked out how to auto-discover packet type classes. - Refactor receive class to make it usable for 16 & 64 bit modes. - Use property() to map to data values. - Add another test case, document what the others look like. diff -r 7bf4d4265339 -r ed7abe6f59c2 zb.py --- a/zb.py Tue Oct 30 21:29:00 2007 +1030 +++ b/zb.py Wed Oct 31 20:01:54 2007 +1030 @@ -1,21 +1,50 @@ -import serial +import serial, inspect class PktBase(object): - def __init__(self, data): + def __init__(self, data = []): self.data = data print "Constructing " + self.__class__.__name__ def __repr__(self): return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" - #def __str__(self): - # return "%s: %s" % (self.PKT_DESC, str(self.data)) + def Encapsulate(self): + return Packets.Encapsulate([self.PKT_TYPE] + self.data) + def resize(self, dlen): + """Ensure the data list can hold at least len elements (0 fill)""" + if (len(self.data) < dlen): + self.data = (self.data + [0] * dlen)[0:dlen] + class AT_Cmd(PktBase): PKT_TYPE = 0x08 PKT_DESC = "AT Command" -class AT_Cmd_Queue(PktBase): + def __init__(self, *args): + if (len(args) == 1): + super(AT_Cmd, self).__init__(args[0]) + elif (len(args) == 2): + super(AT_Cmd, self).__init__([]) + self.cmd = args[0] + if (args[1] != None): + self.cmdarg = args[1] + else: + raise TypeError("__init__ takes 1 list of ordinals or 2 strings") + + def setcmd(self, value): + self.resize(2) + self.data[0] = ord(value[0]) + self.data[1] = ord(value[1]) + cmd = property(lambda s: chr(s.data[0]) + chr(s.data[1]), setcmd) + + def setcmdarg(self, value): + self.resize(2 + len(value)) + self.data = self.data[0:2] + map(ord, value) + def delcmdarg(self): + self.data = self.data[0:2] + cmdarg = property(lambda s: map(chr, s.data[2:]), setcmdarg, delcmdarg) + +class AT_Cmd_Queue(AT_Cmd): PKT_TYPE = 0x09 PKT_DESC = "AT Command (queued)" @@ -27,69 +56,111 @@ PKT_TYPE = 0x8a PKT_DESC = "Modem Status" -class RX_64_Bit(PktBase): - PKT_TYPE = 0x80 - PKT_DESC = "RX Packet: 64 bit address" - class RX_16_Bit(PktBase): PKT_TYPE = 0x81 PKT_DESC = "RX Packet: 16 bit address" + ADDR_SIZE = 2 + + def setsender(self, value): + self.resize(self.ADDR_SIZE) + for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): + self.data[i] = (value & (0xff << j)) >> j -class RXIO_64_Bit(PktBase): - PKT_TYPE = 0x82 - PKT_DESC = "RXIO Packet: 64 bit address" + def getsender(self): + value = 0 + for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): + value |= self.data[i] << j + return value + sender = property(getsender, setsender) -class RXIO_16_Bit(PktBase): + def setrssi(self, value): + self.resize(self.ADDR_SIZE + 1) + self.data[self.ADDR_SIZE] = -1 * value + rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], setrssi) + + def setflags(self, value): + self.resize(self.ADDR_SIZE + 2) + self.data[self.ADDR_SIZE + 1] = value + flags = property(lambda s: s.data[s.ADDR_SIZE + 1], setflags) + +class RX_64_Bit(RX_16_Bit): + PKT_TYPE = 0x80 + PKT_DESC = "RX Packet: 64 bit address" + ADDR_SIZE = 8 + +class RXIO_16_Bit(RX_16_Bit): PKT_TYPE = 0x83 PKT_DESC = "RXIO Packet: 16 bit address" + def setnsamples(self, value): + self.resize(self.ADDR_SIZE + 3) + self.data[self.ADDR_SIZE + 2] = value + nsamples = property(lambda s: s.data[s.ADDR_SIZE + 2], setnsamples) + + def setmask(self, value): + self.resize(self.ADDR_SIZE + 5) + self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8 + self.data[self.ADDR_SIZE + 4] = value & 0xff + mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask) + def __str__(self): - sender = self.data[0] << 8 | self.data[1] - rssi = -1 * self.data[2] - flags = self.data[3] - nsamples = self.data[4] - mask = self.data[5] << 8 | self.data[6] - rtn = "0x%04x (%ddBm) -> %d samples, mask 0x%04x" % (sender, rssi, nsamples, mask) + rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, + self.rssi, self.nsamples, self.mask) # Any DIO lines enabled? - if (mask | 0x01ff): - rtn = rtn + ", DIO - 0x%03x" % (self.data[7] << 8 | self.data[8]) - offs = 9 + if (self.mask | 0x01ff): + rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 | + self.data[self.ADDR_SIZE + 6]) + offs = self.ADDR_SIZE + 7 else: - offs = 7 + offs = self.ADDR_SIZE + 5 # Any ADC lines enabled? - if (mask | 0x7e00): + if (self.mask | 0x7e00): for i in range(6): - if (mask & 1 << (i + 9)): + if (self.mask & 1 << (i + 9)): rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | self.data[offs + 1]) offs = offs + 2 return rtn +class RXIO_64_Bit(RX_16_Bit): + PKT_TYPE = 0x82 + PKT_DESC = "RXIO Packet: 64 bit address" + ADDR_SIZE = 8 + +class TX_16_Bit(PktBase): + PKT_TYPE = 0x00 + PKT_DESC = "TX Packet: 16 bit address" + class TX_64_Bit(PktBase): PKT_TYPE = 0x00 PKT_DESC = "TX Packet: 64 bit address" -class TX_16_Bit(PktBase): - PKT_TYPE = 0x00 - PKT_DESC = "TX Packet: 16 bit address" - class TX_Status(PktBase): PKT_TYPE = 0x89 PKT_DESC = "TX Status" class Packets(object): - PKT_CLASSES = [AT_Cmd, AT_Cmd_Queue, AT_Response, Modem_Status, RX_64_Bit, - RX_16_Bit, RXIO_64_Bit, RXIO_16_Bit, TX_64_Bit, TX_16_Bit, - TX_Status] + PKT_CLASSES = None - def Build(data): - for p in Packets.PKT_CLASSES: + def Build(self, data): + if (self.PKT_CLASSES == None): + m = inspect.getmodule(self) + # Generate list of objects from their names + mobjs = map(lambda n: m.__dict__[n], m.__dict__) + # Find all the classes + pktclasses = filter(inspect.isclass, mobjs) + # Find all subclasses of PktBase (but not PktBase itself) + pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses) + self.PKT_CLASSES = pktclasses + + for p in self.PKT_CLASSES: if (p.PKT_TYPE == data[0]): return(p(data[1:])) - - Build = staticmethod(Build) + + raise ValueError("Unknown packet type 0x%02x" % (data[0])) + Build = classmethod(Build) def Encapsulate(data): pktsum = reduce(lambda x, y: x + y, data) & 0xff @@ -99,6 +170,7 @@ Encapsulate = staticmethod(Encapsulate) def __init__(self): + print str(inspect.getmodule(self)) self.buffer = [] self.state = 'init' self.packets = [] @@ -125,33 +197,32 @@ def process(self, data): pktcount = 0 for d in data: - dord = ord(d) if (self.state == 'init'): if (d != '\x7e'): - print "Framing error, got 0x%02x, expected 0x7f" % (dord) - self.fr_err = self.fr_err + 1 + print "Framing error, got 0x%02x, expected 0x7f" % (ord(d)) + self.fr_err += 1 continue self.state = 'sizemsb' elif (self.state == 'sizemsb'): - self.bufmsb = dord + self.bufmsb = ord(d) self.state = 'sizelsb' elif (self.state == 'sizelsb'): self.dataleft = self.bufmsb << 8 | \ - dord + ord(d) self.state = 'data' elif (self.state == 'data'): - self.buffer.append(dord) + self.buffer.append(ord(d)) self.dataleft = self.dataleft - 1 if (self.dataleft == 0): self.state = 'cksum' elif (self.state == 'cksum'): pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff - rxcksum = dord + rxcksum = ord(d) self.state = 'init' if (pktsum + rxcksum != 0xff): self.buffer = [] - self.ck_err = self.ck_err + 1 + self.ck_err += 1 print "Checksum error, got 0x%02x, expected 0x%02x" % \ (rxcksum, 0xff - pktsum) else: @@ -159,8 +230,8 @@ p = Packets.Build(self.buffer) self.pktq.append(p) self.buffer = [] - pktcount = pktcount + 1 - self.rx_cnt = self.rx_cnt + 1 + pktcount += 1 + self.rx_cnt += 1 else: print "Invalid state %s! Resetting" % (self.state) self.state = 'init' @@ -177,9 +248,19 @@ #s.write('+++') #s.readline(eol='\r') + +# 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f goodtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] + +# Checksum error badtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0e', '8'] + +#0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff adctest = ['~', '\x00', '\x0c', '\x83', '\x00', '\x05', '$', '\x00', '\x01', '\x02', '\x0e', '\x00', '\x0e', '\x03', '\xff', '2' ] + +# Exception +badpkttypetest = ['~', '\x00', '\x03', '\x0a', 'V', 'L', 'S'] + up = Packets() up.process(goodtest) up.process(badtest)