# HG changeset patch # User darius@inchoate.localdomain # Date 1193565268 -37800 # Node ID bdcdd8380d94b6240c76a5e3bdda9012eab689dc Some test routines & framework for talking to MaxStream ZigBee modules. diff -r 000000000000 -r bdcdd8380d94 zb.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/zb.py Sun Oct 28 20:24:28 2007 +1030 @@ -0,0 +1,72 @@ +import serial, struct + +s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ + stopbits=1, rtscts=0) +s.setTimeout(0.1) +#s.write('+++') +#s.readline(eol='\r') + +testdata = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x07', '@', '~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] + +def readpkt(s): + l = [] + while (1): + a = s.read() + if (a == ''): + break + l.append(a) + + return l + +class Packet: + def __init__(self): + pass + +class unPacker: + def __init__(self): + self.buffer = [] + self.state = 'init' + self.packets = [] + + self.bufmsb = 0 + self.dataleft = 0 + + self.fr_err = 0 # Framing error + self.ck_err = 0 # Checksum error + + def process(self, data): + for d in data: + dint = struct.unpack('B', d)[0] + if (self.state == 'init'): + if (d != '\x7e'): + print "Framing error, got 0x%02x, expected 0x7f" % (dint) + self.fr_err = self.fr_err + 1 + continue + + self.state = 'sizemsb' + elif (self.state == 'sizemsb'): + self.bufmsb = dint + self.state = 'sizelsb' + elif (self.state == 'sizelsb'): + self.dataleft = self.bufmsb << 8 | \ + dint + self.state = 'data' + elif (self.state == 'data'): + self.buffer.append(dint) + self.dataleft = self.dataleft - 1 + if (self.dataleft == 0): + self.state = 'cksum' + elif (self.state == 'cksum'): + pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff + rxcksum = dint + if (pktsum + rxcksum != 0xff): + self.ck_err = self.ck_err + 1 + print "Checksum error, got 0x%02x, expected 0x%02x" % \ + (rxcksum, 0xff - pktsum) + else: + print "Got a packet - " + str(self.buffer) + self.state = 'init' + self.buffer = [] + else: + print "Invalid state %s! Resetting" % (self.state) + self.state = 'init'