Mercurial > ~darius > hgwebdir.cgi > ZigBee
view zb.py @ 1:b5d8ec0ffd21
Lots of changes..
- Create a class for each packet type, needs more fleshing out.
- Make sure we reset state when we get a checksum error.
- Add some more test cases.
- OOify things a bit more.
author | darius@inchoate.localdomain |
---|---|
date | Tue, 30 Oct 2007 17:26:40 +1030 |
parents | bdcdd8380d94 |
children | 7bf4d4265339 |
line wrap: on
line source
import serial class PktBase(object): def __init__(self, data): self.data = data print "Constructing " + self.__class__.__name__ def __repr__(self): return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" #def __str__(self): # return "%s: %s" % (self.PKT_DESC, str(self.data)) class AT_Cmd(PktBase): PKT_TYPE = 0x08 PKT_DESC = "AT Command" class AT_Cmd_Queue(PktBase): PKT_TYPE = 0x09 PKT_DESC = "AT Command (queued)" class AT_Response(PktBase): PKT_TYPE = 0x88 PKT_DESC = "AT Command response" class Modem_Status(PktBase): PKT_TYPE = 0x8a PKT_DESC = "Modem Status" class RX_64_Bit(PktBase): PKT_TYPE = 0x80 PKT_DESC = "RX Packet: 64 bit address" class RX_16_Bit(PktBase): PKT_TYPE = 0x81 PKT_DESC = "RX Packet: 16 bit address" class RXIO_64_Bit(PktBase): PKT_TYPE = 0x82 PKT_DESC = "RXIO Packet: 64 bit address" class RXIO_16_Bit(PktBase): PKT_TYPE = 0x83 PKT_DESC = "RXIO Packet: 16 bit address" def __str__(self): sender = self.data[0] << 8 | self.data[1] rssi = -1 * self.data[2] flags = self.data[3] nsamples = self.data[4] mask = self.data[5] << 8 | self.data[6] rtn = "0x%04x (%ddBm) -> %d samples, mask 0x%04x" % (sender, rssi, nsamples, mask) # Any DIO lines enabled? if (mask | 0x01ff): rtn = rtn + ", DIO - 0x%03x" % (self.data[7] << 8 | self.data[8]) offs = 9 else: offs = 7 # Any ADC lines enabled? if (mask | 0x7e00): for i in range(6): if (mask & 1 << (i + 9)): rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | self.data[offs + 1]) offs = offs + 2 return rtn class TX_64_Bit(PktBase): PKT_TYPE = 0x00 PKT_DESC = "TX Packet: 64 bit address" class TX_16_Bit(PktBase): PKT_TYPE = 0x00 PKT_DESC = "TX Packet: 16 bit address" class TX_Status(PktBase): PKT_TYPE = 0x89 PKT_DESC = "TX Status" class Packets(object): PKT_CLASSES = [AT_Cmd, AT_Cmd_Queue, AT_Response, Modem_Status, RX_64_Bit, RX_16_Bit, RXIO_64_Bit, RXIO_16_Bit, TX_64_Bit, TX_16_Bit, TX_Status] def Build(data): for p in Packets.PKT_CLASSES: if (p.PKT_TYPE == data[0]): return(p(data[1:])) Build = staticmethod(Build) def Encapsulate(data): pktsum = reduce(lambda x, y: x + y, data) & 0xff pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum] return(map(chr, pkt)) Encapsulate = staticmethod(Encapsulate) def __init__(self): self.buffer = [] self.state = 'init' self.packets = [] self.bufmsb = 0 self.dataleft = 0 self.fr_err = 0 # Framing error self.ck_err = 0 # Checksum error self.rx_cnt = 0 # Packet count self.pktq = [] def getdata(self, s): l = [] while (1): a = s.read() if (a == ''): break l.append(a) return self.process(l) def process(self, data): pktcount = 0 for d in data: dord = ord(d) if (self.state == 'init'): if (d != '\x7e'): print "Framing error, got 0x%02x, expected 0x7f" % (dord) self.fr_err = self.fr_err + 1 continue self.state = 'sizemsb' elif (self.state == 'sizemsb'): self.bufmsb = dord self.state = 'sizelsb' elif (self.state == 'sizelsb'): self.dataleft = self.bufmsb << 8 | \ dord self.state = 'data' elif (self.state == 'data'): self.buffer.append(dord) self.dataleft = self.dataleft - 1 if (self.dataleft == 0): self.state = 'cksum' elif (self.state == 'cksum'): pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff rxcksum = dord self.state = 'init' if (pktsum + rxcksum != 0xff): self.buffer = [] self.ck_err = self.ck_err + 1 print "Checksum error, got 0x%02x, expected 0x%02x" % \ (rxcksum, 0xff - pktsum) else: print "Got a packet - " + str(self.buffer) p = Packets.Build(self.buffer) self.pktq.append(p) self.buffer = [] pktcount = pktcount + 1 self.rx_cnt = self.rx_cnt + 1 else: print "Invalid state %s! Resetting" % (self.state) self.state = 'init' return pktcount #for c in dir(): # if (issubclass(c, PktBase)): # print .. s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ stopbits=1, rtscts=0) s.setTimeout(0.1) #s.write('+++') #s.readline(eol='\r') goodtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] badtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0e', '8'] adctest = ['~', '\x00', '\x0c', '\x83', '\x00', '\x05', '$', '\x00', '\x01', '\x02', '\x0e', '\x00', '\x0e', '\x03', '\xff', '2' ] up = Packets() up.process(goodtest) #up.process(badtest) p = up.pktq.pop(0)