Mercurial > ~darius > hgwebdir.cgi > ZigBee
comparison zb.py @ 0:bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
author | darius@inchoate.localdomain |
---|---|
date | Sun, 28 Oct 2007 20:24:28 +1030 |
parents | |
children | b5d8ec0ffd21 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:bdcdd8380d94 |
---|---|
1 import serial, struct | |
2 | |
3 s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ | |
4 stopbits=1, rtscts=0) | |
5 s.setTimeout(0.1) | |
6 #s.write('+++') | |
7 #s.readline(eol='\r') | |
8 | |
9 testdata = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x07', '@', '~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] | |
10 | |
11 def readpkt(s): | |
12 l = [] | |
13 while (1): | |
14 a = s.read() | |
15 if (a == ''): | |
16 break | |
17 l.append(a) | |
18 | |
19 return l | |
20 | |
21 class Packet: | |
22 def __init__(self): | |
23 pass | |
24 | |
25 class unPacker: | |
26 def __init__(self): | |
27 self.buffer = [] | |
28 self.state = 'init' | |
29 self.packets = [] | |
30 | |
31 self.bufmsb = 0 | |
32 self.dataleft = 0 | |
33 | |
34 self.fr_err = 0 # Framing error | |
35 self.ck_err = 0 # Checksum error | |
36 | |
37 def process(self, data): | |
38 for d in data: | |
39 dint = struct.unpack('B', d)[0] | |
40 if (self.state == 'init'): | |
41 if (d != '\x7e'): | |
42 print "Framing error, got 0x%02x, expected 0x7f" % (dint) | |
43 self.fr_err = self.fr_err + 1 | |
44 continue | |
45 | |
46 self.state = 'sizemsb' | |
47 elif (self.state == 'sizemsb'): | |
48 self.bufmsb = dint | |
49 self.state = 'sizelsb' | |
50 elif (self.state == 'sizelsb'): | |
51 self.dataleft = self.bufmsb << 8 | \ | |
52 dint | |
53 self.state = 'data' | |
54 elif (self.state == 'data'): | |
55 self.buffer.append(dint) | |
56 self.dataleft = self.dataleft - 1 | |
57 if (self.dataleft == 0): | |
58 self.state = 'cksum' | |
59 elif (self.state == 'cksum'): | |
60 pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff | |
61 rxcksum = dint | |
62 if (pktsum + rxcksum != 0xff): | |
63 self.ck_err = self.ck_err + 1 | |
64 print "Checksum error, got 0x%02x, expected 0x%02x" % \ | |
65 (rxcksum, 0xff - pktsum) | |
66 else: | |
67 print "Got a packet - " + str(self.buffer) | |
68 self.state = 'init' | |
69 self.buffer = [] | |
70 else: | |
71 print "Invalid state %s! Resetting" % (self.state) | |
72 self.state = 'init' |