Mercurial > ~darius > hgwebdir.cgi > ZigBee
annotate zb.py @ 4:ed7abe6f59c2
Many fixes (again)
- Worked out how to auto-discover packet type classes.
- Refactor receive class to make it usable for 16 & 64 bit modes.
- Use property() to map to data values.
- Add another test case, document what the others look like.
author | darius@inchoate.localdomain |
---|---|
date | Wed, 31 Oct 2007 20:01:54 +1030 |
parents | 7bf4d4265339 |
children | 5d5963d542bc |
rev | line source |
---|---|
4 | 1 import serial, inspect |
1 | 2 |
3 class PktBase(object): | |
4 | 4 def __init__(self, data = []): |
1 | 5 self.data = data |
6 print "Constructing " + self.__class__.__name__ | |
7 | |
8 def __repr__(self): | |
9 return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" | |
10 | |
4 | 11 def Encapsulate(self): |
12 return Packets.Encapsulate([self.PKT_TYPE] + self.data) | |
1 | 13 |
4 | 14 def resize(self, dlen): |
15 """Ensure the data list can hold at least len elements (0 fill)""" | |
16 if (len(self.data) < dlen): | |
17 self.data = (self.data + [0] * dlen)[0:dlen] | |
18 | |
1 | 19 class AT_Cmd(PktBase): |
20 PKT_TYPE = 0x08 | |
21 PKT_DESC = "AT Command" | |
22 | |
4 | 23 def __init__(self, *args): |
24 if (len(args) == 1): | |
25 super(AT_Cmd, self).__init__(args[0]) | |
26 elif (len(args) == 2): | |
27 super(AT_Cmd, self).__init__([]) | |
28 self.cmd = args[0] | |
29 if (args[1] != None): | |
30 self.cmdarg = args[1] | |
31 else: | |
32 raise TypeError("__init__ takes 1 list of ordinals or 2 strings") | |
33 | |
34 def setcmd(self, value): | |
35 self.resize(2) | |
36 self.data[0] = ord(value[0]) | |
37 self.data[1] = ord(value[1]) | |
38 cmd = property(lambda s: chr(s.data[0]) + chr(s.data[1]), setcmd) | |
39 | |
40 def setcmdarg(self, value): | |
41 self.resize(2 + len(value)) | |
42 self.data = self.data[0:2] + map(ord, value) | |
43 def delcmdarg(self): | |
44 self.data = self.data[0:2] | |
45 cmdarg = property(lambda s: map(chr, s.data[2:]), setcmdarg, delcmdarg) | |
46 | |
47 class AT_Cmd_Queue(AT_Cmd): | |
1 | 48 PKT_TYPE = 0x09 |
49 PKT_DESC = "AT Command (queued)" | |
50 | |
51 class AT_Response(PktBase): | |
52 PKT_TYPE = 0x88 | |
53 PKT_DESC = "AT Command response" | |
54 | |
55 class Modem_Status(PktBase): | |
56 PKT_TYPE = 0x8a | |
57 PKT_DESC = "Modem Status" | |
58 | |
59 class RX_16_Bit(PktBase): | |
60 PKT_TYPE = 0x81 | |
61 PKT_DESC = "RX Packet: 16 bit address" | |
4 | 62 ADDR_SIZE = 2 |
63 | |
64 def setsender(self, value): | |
65 self.resize(self.ADDR_SIZE) | |
66 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): | |
67 self.data[i] = (value & (0xff << j)) >> j | |
1 | 68 |
4 | 69 def getsender(self): |
70 value = 0 | |
71 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): | |
72 value |= self.data[i] << j | |
73 return value | |
74 sender = property(getsender, setsender) | |
1 | 75 |
4 | 76 def setrssi(self, value): |
77 self.resize(self.ADDR_SIZE + 1) | |
78 self.data[self.ADDR_SIZE] = -1 * value | |
79 rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], setrssi) | |
80 | |
81 def setflags(self, value): | |
82 self.resize(self.ADDR_SIZE + 2) | |
83 self.data[self.ADDR_SIZE + 1] = value | |
84 flags = property(lambda s: s.data[s.ADDR_SIZE + 1], setflags) | |
85 | |
86 class RX_64_Bit(RX_16_Bit): | |
87 PKT_TYPE = 0x80 | |
88 PKT_DESC = "RX Packet: 64 bit address" | |
89 ADDR_SIZE = 8 | |
90 | |
91 class RXIO_16_Bit(RX_16_Bit): | |
1 | 92 PKT_TYPE = 0x83 |
93 PKT_DESC = "RXIO Packet: 16 bit address" | |
94 | |
4 | 95 def setnsamples(self, value): |
96 self.resize(self.ADDR_SIZE + 3) | |
97 self.data[self.ADDR_SIZE + 2] = value | |
98 nsamples = property(lambda s: s.data[s.ADDR_SIZE + 2], setnsamples) | |
99 | |
100 def setmask(self, value): | |
101 self.resize(self.ADDR_SIZE + 5) | |
102 self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8 | |
103 self.data[self.ADDR_SIZE + 4] = value & 0xff | |
104 mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask) | |
105 | |
1 | 106 def __str__(self): |
4 | 107 rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, |
108 self.rssi, self.nsamples, self.mask) | |
1 | 109 # Any DIO lines enabled? |
4 | 110 if (self.mask | 0x01ff): |
111 rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 | | |
112 self.data[self.ADDR_SIZE + 6]) | |
113 offs = self.ADDR_SIZE + 7 | |
1 | 114 else: |
4 | 115 offs = self.ADDR_SIZE + 5 |
1 | 116 |
117 # Any ADC lines enabled? | |
4 | 118 if (self.mask | 0x7e00): |
1 | 119 for i in range(6): |
4 | 120 if (self.mask & 1 << (i + 9)): |
1 | 121 rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | |
122 self.data[offs + 1]) | |
123 offs = offs + 2 | |
124 | |
125 return rtn | |
126 | |
4 | 127 class RXIO_64_Bit(RX_16_Bit): |
128 PKT_TYPE = 0x82 | |
129 PKT_DESC = "RXIO Packet: 64 bit address" | |
130 ADDR_SIZE = 8 | |
131 | |
132 class TX_16_Bit(PktBase): | |
133 PKT_TYPE = 0x00 | |
134 PKT_DESC = "TX Packet: 16 bit address" | |
135 | |
1 | 136 class TX_64_Bit(PktBase): |
137 PKT_TYPE = 0x00 | |
138 PKT_DESC = "TX Packet: 64 bit address" | |
139 | |
140 class TX_Status(PktBase): | |
141 PKT_TYPE = 0x89 | |
142 PKT_DESC = "TX Status" | |
143 | |
144 class Packets(object): | |
4 | 145 PKT_CLASSES = None |
1 | 146 |
4 | 147 def Build(self, data): |
148 if (self.PKT_CLASSES == None): | |
149 m = inspect.getmodule(self) | |
150 # Generate list of objects from their names | |
151 mobjs = map(lambda n: m.__dict__[n], m.__dict__) | |
152 # Find all the classes | |
153 pktclasses = filter(inspect.isclass, mobjs) | |
154 # Find all subclasses of PktBase (but not PktBase itself) | |
155 pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses) | |
156 self.PKT_CLASSES = pktclasses | |
157 | |
158 for p in self.PKT_CLASSES: | |
1 | 159 if (p.PKT_TYPE == data[0]): |
160 return(p(data[1:])) | |
4 | 161 |
162 raise ValueError("Unknown packet type 0x%02x" % (data[0])) | |
163 Build = classmethod(Build) | |
1 | 164 |
165 def Encapsulate(data): | |
166 pktsum = reduce(lambda x, y: x + y, data) & 0xff | |
167 pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum] | |
168 return(map(chr, pkt)) | |
169 | |
170 Encapsulate = staticmethod(Encapsulate) | |
171 | |
172 def __init__(self): | |
4 | 173 print str(inspect.getmodule(self)) |
1 | 174 self.buffer = [] |
175 self.state = 'init' | |
176 self.packets = [] | |
177 | |
178 self.bufmsb = 0 | |
179 self.dataleft = 0 | |
180 | |
181 self.fr_err = 0 # Framing error | |
182 self.ck_err = 0 # Checksum error | |
183 self.rx_cnt = 0 # Packet count | |
184 | |
185 self.pktq = [] | |
186 | |
187 def getdata(self, s): | |
188 l = [] | |
189 while (1): | |
190 a = s.read() | |
191 if (a == ''): | |
192 break | |
193 l.append(a) | |
194 | |
195 return self.process(l) | |
196 | |
197 def process(self, data): | |
198 pktcount = 0 | |
199 for d in data: | |
200 if (self.state == 'init'): | |
201 if (d != '\x7e'): | |
4 | 202 print "Framing error, got 0x%02x, expected 0x7f" % (ord(d)) |
203 self.fr_err += 1 | |
1 | 204 continue |
205 | |
206 self.state = 'sizemsb' | |
207 elif (self.state == 'sizemsb'): | |
4 | 208 self.bufmsb = ord(d) |
1 | 209 self.state = 'sizelsb' |
210 elif (self.state == 'sizelsb'): | |
211 self.dataleft = self.bufmsb << 8 | \ | |
4 | 212 ord(d) |
1 | 213 self.state = 'data' |
214 elif (self.state == 'data'): | |
4 | 215 self.buffer.append(ord(d)) |
1 | 216 self.dataleft = self.dataleft - 1 |
217 if (self.dataleft == 0): | |
218 self.state = 'cksum' | |
219 elif (self.state == 'cksum'): | |
220 pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff | |
4 | 221 rxcksum = ord(d) |
1 | 222 self.state = 'init' |
223 if (pktsum + rxcksum != 0xff): | |
224 self.buffer = [] | |
4 | 225 self.ck_err += 1 |
1 | 226 print "Checksum error, got 0x%02x, expected 0x%02x" % \ |
227 (rxcksum, 0xff - pktsum) | |
228 else: | |
229 print "Got a packet - " + str(self.buffer) | |
230 p = Packets.Build(self.buffer) | |
231 self.pktq.append(p) | |
232 self.buffer = [] | |
4 | 233 pktcount += 1 |
234 self.rx_cnt += 1 | |
1 | 235 else: |
236 print "Invalid state %s! Resetting" % (self.state) | |
237 self.state = 'init' | |
238 | |
239 return pktcount | |
240 | |
241 #for c in dir(): | |
242 # if (issubclass(c, PktBase)): | |
243 # print .. | |
0
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
244 |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
245 s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
246 stopbits=1, rtscts=0) |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
247 s.setTimeout(0.1) |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
248 #s.write('+++') |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
249 #s.readline(eol='\r') |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
250 |
4 | 251 |
252 # 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f | |
1 | 253 goodtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] |
4 | 254 |
255 # Checksum error | |
1 | 256 badtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0e', '8'] |
4 | 257 |
258 #0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff | |
1 | 259 adctest = ['~', '\x00', '\x0c', '\x83', '\x00', '\x05', '$', '\x00', '\x01', '\x02', '\x0e', '\x00', '\x0e', '\x03', '\xff', '2' ] |
4 | 260 |
261 # Exception | |
262 badpkttypetest = ['~', '\x00', '\x03', '\x0a', 'V', 'L', 'S'] | |
263 | |
1 | 264 up = Packets() |
265 up.process(goodtest) | |
3
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
266 up.process(badtest) |
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
267 up.process(adctest) |
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
268 print up.pktq.pop(0) |
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
269 print up.pktq.pop(0) |
0
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
270 |