Mercurial > ~darius > hgwebdir.cgi > ZigBee
view zb.py @ 7:579dedf5a1f1
Rejoin line break, left over from removal of struct.
author | darius@inchoate.localdomain |
---|---|
date | Thu, 01 Nov 2007 16:32:17 +1030 |
parents | 9c499d923544 |
children | 9f0808b13454 |
line wrap: on
line source
import serial, inspect class PktBase(object): def __init__(self, data = []): self.data = data print "Constructing " + self.__class__.__name__ def __repr__(self): return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" def Encapsulate(self): return Packets.Encapsulate([self.PKT_TYPE] + self.data) def resize(self, dlen): """Ensure the data list can hold at least len elements (0 fill)""" if (len(self.data) < dlen): self.data = (self.data + [0] * dlen)[0:dlen] class AT_Cmd(PktBase): PKT_TYPE = 0x08 PKT_DESC = "AT Command" def __init__(self, *args): if (len(args) == 1): if (type(args[0]) == type(str())): super(AT_Cmd, self).__init__([]) self.cmd = args[0] else: super(AT_Cmd, self).__init__(args[0]) elif (len(args) == 2): super(AT_Cmd, self).__init__([]) self.cmd = args[0] self.cmdarg = args[1] else: raise TypeError("__init__ takes 1 list of ordinals, 1 string, or 2 strings") self.data[0] = 1 # XXX: could be smarter about Frame ID def setcmd(self, value): self.resize(3) self.data[1] = ord(value[0]) self.data[2] = ord(value[1]) cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), setcmd) def setcmdarg(self, value): self.resize(3 + len(value)) self.data = self.data[0:3] + map(ord, value) def delcmdarg(self): self.data = self.data[0:3] cmdarg = property(lambda s: map(chr, s.data[3:]), setcmdarg, delcmdarg) class AT_Cmd_Queue(AT_Cmd): PKT_TYPE = 0x09 PKT_DESC = "AT Command (queued)" class AT_Response(PktBase): PKT_TYPE = 0x88 PKT_DESC = "AT Command response" frameid = property(lambda s: s.data[0], None) cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), None) statusOK = property(lambda s: s.data[3] == 0, None) payload = property(lambda s: s.data[4:], None) class Modem_Status(PktBase): PKT_TYPE = 0x8a PKT_DESC = "Modem Status" class RX_16_Bit(PktBase): PKT_TYPE = 0x81 PKT_DESC = "RX Packet: 16 bit address" ADDR_SIZE = 2 def getsender(self): value = 0 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): value |= self.data[i] << j return value sender = property(getsender, None) rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], None) flags = property(lambda s: s.data[s.ADDR_SIZE + 1], None) payload = property(lambda s: s.data[s.ADDR_SIZE + 2:], None) class RX_64_Bit(RX_16_Bit): PKT_TYPE = 0x80 PKT_DESC = "RX Packet: 64 bit address" ADDR_SIZE = 8 class RXIO_16_Bit(RX_16_Bit): PKT_TYPE = 0x83 PKT_DESC = "RXIO Packet: 16 bit address" def setnsamples(self, value): self.resize(self.ADDR_SIZE + 3) self.data[self.ADDR_SIZE + 2] = value def getnsamples(self): return self.data[self.ADDR_SIZE + 2] nsamples = property(getnsamples, setnsamples) def setmask(self, value): self.resize(self.ADDR_SIZE + 5) self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8 self.data[self.ADDR_SIZE + 4] = value & 0xff mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask) def __str__(self): rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, self.rssi, self.nsamples, self.mask) # Any DIO lines enabled? if (self.mask | 0x01ff): rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 | self.data[self.ADDR_SIZE + 6]) offs = self.ADDR_SIZE + 7 else: offs = self.ADDR_SIZE + 5 # Any ADC lines enabled? if (self.mask | 0x7e00): for i in range(6): if (self.mask & 1 << (i + 9)): rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | self.data[offs + 1]) offs = offs + 2 return rtn class RXIO_64_Bit(RXIO_16_Bit): PKT_TYPE = 0x82 PKT_DESC = "RXIO Packet: 64 bit address" ADDR_SIZE = 8 class TX_16_Bit(RX_64_Bit): PKT_TYPE = 0x01 PKT_DESC = "TX Packet: 16 bit address" ADDR_SIZE = 2 def __init__(self, *args): if (len(args) == 1): if (type(args[0]) == type(int())): super(TX_16_Bit, self).__init__([]) self.recipient = args[0] else: super(TX_16_Bit, self).__init__(args[0]) elif (len(args) == 2): super(TX_16_Bit, self).__init__([]) self.recipient = args[0] self.payload = args[1] else: raise TypeError("__init__ takes 1 list of ordinals or 2 strings") self.resize(self.ADDR_SIZE + 2) self.data[0] = 1 # XXX: could be smarter about Frame ID self.data[3] = 0 # XXX: should be able to set flags def getrecipient(self): value = 0 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): value |= self.data[i + 1] << j return value def setrecipient(self, value): self.resize(self.ADDR_SIZE + 1) self.data[0] = 1 # XXX: could be smarter about Frame ID for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): self.data[i + 1] = (value & (0xff << j)) >> j recipient = property(getrecipient, setrecipient) def setpayload(self, value): self.resize(self.ADDR_SIZE + 2) self.data[self.ADDR_SIZE + 2:] = value payload = property(lambda s: s.data[self.ADDR_SIZE + 2:], setpayload) class TX_64_Bit(TX_16_Bit): PKT_TYPE = 0x00 PKT_DESC = "TX Packet: 64 bit address" ADDR_SIZE = 8 class TX_Status(PktBase): PKT_TYPE = 0x89 PKT_DESC = "TX Status" statusTxt = ['OK', 'No Ack', 'CCA failure', 'Purged'] frameid = property(lambda s: s.data[0], None) status = property(lambda s: s.data[1], None) statusMsg = property(lambda s: s.statusTxt[s.data[1]], None) class Packets(object): PKT_CLASSES = None def Build(self, data): if (self.PKT_CLASSES == None): m = inspect.getmodule(self) # Generate list of objects from their names mobjs = map(lambda n: m.__dict__[n], m.__dict__) # Find all the classes pktclasses = filter(inspect.isclass, mobjs) # Find all subclasses of PktBase (but not PktBase itself) pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses) self.PKT_CLASSES = pktclasses for p in self.PKT_CLASSES: if (p.PKT_TYPE == data[0]): return(p(data[1:])) raise ValueError("Unknown packet type 0x%02x" % (data[0])) Build = classmethod(Build) def Encapsulate(data): pktsum = reduce(lambda x, y: x + y, data) & 0xff pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum] return(map(chr, pkt)) Encapsulate = staticmethod(Encapsulate) def __init__(self, s): print str(inspect.getmodule(self)) self.buffer = [] self.state = 'init' self.packets = [] self.bufmsb = 0 self.dataleft = 0 self.fr_err = 0 # Framing error self.ck_err = 0 # Checksum error self.rx_cnt = 0 # Packet count self.pktq = [] self.s = s def writedata(self, data): self.s.write("".join(map(str, data))) def getdata(self): l = [] while (1): a = self.s.read() if (a == ''): break l.append(ord(a)) return self.process(l) def process(self, data): pktcount = 0 for d in data: if (self.state == 'init'): if (d != 0x7e): print "Framing error, got 0x%02x, expected 0x7e" % (d) self.fr_err += 1 continue self.state = 'sizemsb' elif (self.state == 'sizemsb'): self.bufmsb = d self.state = 'sizelsb' elif (self.state == 'sizelsb'): self.dataleft = self.bufmsb << 8 | d self.state = 'data' elif (self.state == 'data'): self.buffer.append(d) self.dataleft = self.dataleft - 1 if (self.dataleft == 0): self.state = 'cksum' elif (self.state == 'cksum'): pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff rxcksum = d self.state = 'init' if (pktsum + rxcksum != 0xff): self.buffer = [] self.ck_err += 1 print "Checksum error, got 0x%02x, expected 0x%02x" % \ (rxcksum, 0xff - pktsum) else: print "Got a packet - " + str(self.buffer) p = Packets.Build(self.buffer) self.pktq.append(p) self.buffer = [] pktcount += 1 self.rx_cnt += 1 else: print "Invalid state %s! Resetting" % (self.state) self.state = 'init' return pktcount #for c in dir(): # if (issubclass(c, PktBase)): # print .. s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ stopbits=1, rtscts=0) s.setTimeout(0.1) #s.write('+++') #s.readline(eol='\r') # 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f goodtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 15, 56] # Checksum error badtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 14, 56] #0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50] # Exception badpkttypetest = [126, 0, 3, 10, 86, 76, 83] up = Packets(s) up.process(goodtest) up.process(badtest) up.process(adctest) print up.pktq.pop(0) print up.pktq.pop(0)