Mercurial > ~darius > hgwebdir.cgi > ZigBee
view zb.py @ 0:bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
author | darius@inchoate.localdomain |
---|---|
date | Sun, 28 Oct 2007 20:24:28 +1030 |
parents | |
children | b5d8ec0ffd21 |
line wrap: on
line source
import serial, struct s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ stopbits=1, rtscts=0) s.setTimeout(0.1) #s.write('+++') #s.readline(eol='\r') testdata = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x07', '@', '~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] def readpkt(s): l = [] while (1): a = s.read() if (a == ''): break l.append(a) return l class Packet: def __init__(self): pass class unPacker: def __init__(self): self.buffer = [] self.state = 'init' self.packets = [] self.bufmsb = 0 self.dataleft = 0 self.fr_err = 0 # Framing error self.ck_err = 0 # Checksum error def process(self, data): for d in data: dint = struct.unpack('B', d)[0] if (self.state == 'init'): if (d != '\x7e'): print "Framing error, got 0x%02x, expected 0x7f" % (dint) self.fr_err = self.fr_err + 1 continue self.state = 'sizemsb' elif (self.state == 'sizemsb'): self.bufmsb = dint self.state = 'sizelsb' elif (self.state == 'sizelsb'): self.dataleft = self.bufmsb << 8 | \ dint self.state = 'data' elif (self.state == 'data'): self.buffer.append(dint) self.dataleft = self.dataleft - 1 if (self.dataleft == 0): self.state = 'cksum' elif (self.state == 'cksum'): pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff rxcksum = dint if (pktsum + rxcksum != 0xff): self.ck_err = self.ck_err + 1 print "Checksum error, got 0x%02x, expected 0x%02x" % \ (rxcksum, 0xff - pktsum) else: print "Got a packet - " + str(self.buffer) self.state = 'init' self.buffer = [] else: print "Invalid state %s! Resetting" % (self.state) self.state = 'init'