Mercurial > ~darius > hgwebdir.cgi > ZigBee
comparison zb.py @ 9:d147529ad2db
Switch to only reading RX pkts and writing TX pkts to make things simpler.
Add AT reply test case.
author | darius@inchoate.localdomain |
---|---|
date | Mon, 05 Nov 2007 23:35:52 +1030 |
parents | 9f0808b13454 |
children | 4c91fdfc862e |
comparison
equal
deleted
inserted
replaced
8:9f0808b13454 | 9:d147529ad2db |
---|---|
1 import serial, inspect | 1 import serial, inspect |
2 | 2 |
3 def easyord(i): | |
4 if (type(i) != type(str())): | |
5 return i | |
6 else: | |
7 return ord(i) | |
8 | |
3 class PktBase(object): | 9 class PktBase(object): |
10 PKT_MAXLEN = 2 ** 16 | |
11 | |
4 def __init__(self, data = []): | 12 def __init__(self, data = []): |
5 self.data = data | 13 self._data = data |
6 print "Constructing " + self.__class__.__name__ | 14 print "Constructing " + self.__class__.__name__ |
7 | |
8 def __repr__(self): | |
9 return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" | |
10 | 15 |
11 def Encapsulate(self): | 16 def Encapsulate(self): |
12 return Packets.Encapsulate([self.PKT_TYPE] + self.data) | 17 return Packets.Encapsulate([self.PKT_TYPE] + self.data) |
13 | 18 |
14 def resize(self, dlen): | 19 def resize(self, dlen): |
15 """Ensure the data list can hold at least len elements (0 fill)""" | 20 """Ensure the data list can hold at least len elements (0 fill)""" |
16 if (len(self.data) < dlen): | 21 if (len(self._data) < dlen): |
17 self.data = (self.data + [0] * dlen)[0:dlen] | 22 self._data = (self._data + [0] * dlen)[0:dlen] |
18 | 23 |
19 class AT_Cmd(PktBase): | 24 def _checklist(list, min = 0, max = 255, maxlen = None): |
25 if (maxlen != None and len(list) > maxlen): | |
26 raise ValueError("must have %d elements" % (maxlen)) | |
27 | |
28 for i in xrange(len(list)): | |
29 if (easyord(list[i]) < min or easyord(list[i]) > max): | |
30 raise ValueError("element %d (= %d) out of range must be between %d and %d inclusive" % | |
31 (i, ord(list[i]), min, max)) | |
32 _checklist = staticmethod(_checklist) | |
33 | |
34 class TXPkts(PktBase): | |
35 """Base class for all packets that go to the module""" | |
36 def setframeid(self, value): | |
37 if (value < 0 or value > 255): | |
38 raise ValueError("FrameID must be 0-255") | |
39 self._frameid = value | |
40 frameid = property(lambda s: s._frameid, setframeid) | |
41 | |
42 | |
43 class AT_Cmd(TXPkts): | |
20 PKT_TYPE = 0x08 | 44 PKT_TYPE = 0x08 |
21 PKT_DESC = "AT Command" | 45 PKT_DESC = "AT Command" |
22 | 46 |
23 def __init__(self, *args): | 47 def __init__(self, *args): |
24 if (len(args) == 1): | 48 if (len(args) == 1): |
31 super(AT_Cmd, self).__init__([]) | 55 super(AT_Cmd, self).__init__([]) |
32 self.cmd = args[0] | 56 self.cmd = args[0] |
33 self.cmdarg = args[1] | 57 self.cmdarg = args[1] |
34 else: | 58 else: |
35 raise TypeError("__init__ takes 1 list of ordinals, 1 string, or 2 strings") | 59 raise TypeError("__init__ takes 1 list of ordinals, 1 string, or 2 strings") |
36 | 60 self.frameid = 0 |
37 self.data[0] = 1 # XXX: could be smarter about Frame ID | 61 self.cmdarg = [] |
38 | 62 |
39 def setcmd(self, value): | 63 def setcmd(self, value): |
40 self.resize(3) | 64 if (len(value) != 2): |
41 self.data[1] = ord(value[0]) | 65 raise ValueError("must have 2 elements") |
42 self.data[2] = ord(value[1]) | 66 self._checklist(value, ord('0'), ord('z')) |
43 cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), setcmd) | 67 self._cmd = value |
68 cmd = property(lambda s: s._cmd, setcmd) | |
44 | 69 |
45 def setcmdarg(self, value): | 70 def setcmdarg(self, value): |
46 self.resize(3 + len(value)) | 71 self._checklist(value, maxlen = self.PKT_MAXLEN - 3) |
47 self.data = self.data[0:3] + map(ord, value) | 72 self._cmdarg = value |
48 def delcmdarg(self): | 73 cmdarg = property(lambda s: s._cmdarg, setcmdarg) |
49 self.data = self.data[0:3] | 74 |
50 cmdarg = property(lambda s: map(chr, s.data[3:]), setcmdarg, delcmdarg) | 75 def getdata(self): |
51 | 76 return([self.frameid] + map(ord, self.cmd) + map(easyord, self.cmdarg)) |
77 data = property(getdata) | |
78 | |
52 class AT_Cmd_Queue(AT_Cmd): | 79 class AT_Cmd_Queue(AT_Cmd): |
53 PKT_TYPE = 0x09 | 80 PKT_TYPE = 0x09 |
54 PKT_DESC = "AT Command (queued)" | 81 PKT_DESC = "AT Command (queued)" |
55 | 82 |
56 class AT_Response(PktBase): | 83 class AT_Response(PktBase): |
57 PKT_TYPE = 0x88 | 84 PKT_TYPE = 0x88 |
58 PKT_DESC = "AT Command response" | 85 PKT_DESC = "AT Command response" |
59 frameid = property(lambda s: s.data[0], None) | 86 frameid = property(lambda s: s._data[0], None) |
60 cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), None) | 87 cmd = property(lambda s: chr(s._data[1]) + chr(s._data[2]), None) |
61 statusOK = property(lambda s: s.data[3] == 0, None) | 88 statusOK = property(lambda s: s._data[3] == 0, None) |
62 payload = property(lambda s: s.data[4:], None) | 89 payload = property(lambda s: s._data[4:], None) |
63 | 90 |
64 class Modem_Status(PktBase): | 91 class Modem_Status(PktBase): |
65 PKT_TYPE = 0x8a | 92 PKT_TYPE = 0x8a |
66 PKT_DESC = "Modem Status" | 93 PKT_DESC = "Modem Status" |
67 | 94 |
75 self.rssi, str(self.payload)) | 102 self.rssi, str(self.payload)) |
76 | 103 |
77 def getsender(self): | 104 def getsender(self): |
78 value = 0 | 105 value = 0 |
79 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): | 106 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): |
80 value |= self.data[i] << j | 107 value |= self._data[i] << j |
81 return value | 108 return value |
82 sender = property(getsender, None) | 109 sender = property(getsender, None) |
83 | 110 |
84 rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], None) | 111 rssi = property(lambda s: -1 * s._data[s.ADDR_SIZE], None) |
85 | 112 |
86 flags = property(lambda s: s.data[s.ADDR_SIZE + 1], None) | 113 flags = property(lambda s: s._data[s.ADDR_SIZE + 1], None) |
87 | 114 |
88 payload = property(lambda s: s.data[s.ADDR_SIZE + 2:], None) | 115 payload = property(lambda s: s._data[s.ADDR_SIZE + 2:], None) |
89 | 116 |
90 class RX_64_Bit(RX_16_Bit): | 117 class RX_64_Bit(RX_16_Bit): |
91 PKT_TYPE = 0x80 | 118 PKT_TYPE = 0x80 |
92 PKT_DESC = "RX Packet: 64 bit address" | 119 PKT_DESC = "RX Packet: 64 bit address" |
93 ADDR_SIZE = 8 | 120 ADDR_SIZE = 8 |
94 | 121 |
95 class RXIO_16_Bit(RX_16_Bit): | 122 class RXIO_16_Bit(RX_16_Bit): |
96 PKT_TYPE = 0x83 | 123 PKT_TYPE = 0x83 |
97 PKT_DESC = "RXIO Packet: 16 bit address" | 124 PKT_DESC = "RXIO Packet: 16 bit address" |
98 | 125 |
99 def setnsamples(self, value): | 126 nsamples = property(lambda s: s._data[s.ADDR_SIZE + 2]) |
100 self.resize(self.ADDR_SIZE + 3) | 127 |
101 self.data[self.ADDR_SIZE + 2] = value | 128 mask = property(lambda s: s._data[s.ADDR_SIZE + 3] << 8 | s._data[s.ADDR_SIZE + 4]) |
102 | |
103 def getnsamples(self): | |
104 return self.data[self.ADDR_SIZE + 2] | |
105 nsamples = property(getnsamples, setnsamples) | |
106 | |
107 def setmask(self, value): | |
108 self.resize(self.ADDR_SIZE + 5) | |
109 self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8 | |
110 self.data[self.ADDR_SIZE + 4] = value & 0xff | |
111 mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask) | |
112 | 129 |
113 def __str__(self): | 130 def __str__(self): |
114 rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, | 131 rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, |
115 self.rssi, self.nsamples, self.mask) | 132 self.rssi, self.nsamples, self.mask) |
116 # Any DIO lines enabled? | 133 # Any DIO lines enabled? |
117 if (self.mask | 0x01ff): | 134 if (self.mask | 0x01ff): |
118 rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 | | 135 rtn = rtn + ", DIO - 0x%03x" % (self._data[self.ADDR_SIZE + 5] << 8 | |
119 self.data[self.ADDR_SIZE + 6]) | 136 self._data[self.ADDR_SIZE + 6]) |
120 offs = self.ADDR_SIZE + 7 | 137 offs = self.ADDR_SIZE + 7 |
121 else: | 138 else: |
122 offs = self.ADDR_SIZE + 5 | 139 offs = self.ADDR_SIZE + 5 |
123 | 140 |
124 # Any ADC lines enabled? | 141 # Any ADC lines enabled? |
125 if (self.mask | 0x7e00): | 142 if (self.mask | 0x7e00): |
126 for i in range(6): | 143 for i in range(6): |
127 if (self.mask & 1 << (i + 9)): | 144 if (self.mask & 1 << (i + 9)): |
128 rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | | 145 rtn = rtn + ", ADC%d - 0x%02x" % (i, self._data[offs] << 8 | |
129 self.data[offs + 1]) | 146 self._data[offs + 1]) |
130 offs = offs + 2 | 147 offs = offs + 2 |
131 | |
132 return rtn | 148 return rtn |
133 | 149 |
134 class RXIO_64_Bit(RXIO_16_Bit): | 150 class RXIO_64_Bit(RXIO_16_Bit): |
135 PKT_TYPE = 0x82 | 151 PKT_TYPE = 0x82 |
136 PKT_DESC = "RXIO Packet: 64 bit address" | 152 PKT_DESC = "RXIO Packet: 64 bit address" |
137 ADDR_SIZE = 8 | 153 ADDR_SIZE = 8 |
138 | 154 |
139 class TX_16_Bit(RX_64_Bit): | 155 class TX_16_Bit(TXPkts): |
140 PKT_TYPE = 0x01 | 156 PKT_TYPE = 0x01 |
141 PKT_DESC = "TX Packet: 16 bit address" | 157 PKT_DESC = "TX Packet: 16 bit address" |
142 ADDR_SIZE = 2 | 158 ADDR_SIZE = 2 |
143 | 159 FLG_DISABLE_ACK = 0x01 |
160 FLG_BCAST_PANID = 0x04 | |
161 PKT_MAX_PAYLOAD = 100 | |
162 | |
144 def __init__(self, *args): | 163 def __init__(self, *args): |
145 if (len(args) == 1): | 164 if (len(args) == 1): |
146 if (type(args[0]) == type(int())): | 165 super(TX_16_Bit, self).__init__() |
147 super(TX_16_Bit, self).__init__([]) | 166 self.recipient = args[0] |
148 self.recipient = args[0] | |
149 else: | |
150 super(TX_16_Bit, self).__init__(args[0]) | |
151 elif (len(args) == 2): | 167 elif (len(args) == 2): |
152 super(TX_16_Bit, self).__init__([]) | 168 super(TX_16_Bit, self).__init__([]) |
153 self.recipient = args[0] | 169 self.recipient = args[0] |
154 self.payload = args[1] | 170 self.payload = args[1] |
155 else: | 171 else: |
156 raise TypeError("__init__ takes 1 list of ordinals or 2 strings") | 172 raise TypeError("__init__ takes 1 list of ordinals or 2 strings") |
157 | 173 |
158 self.resize(self.ADDR_SIZE + 2) | 174 self.frameid = 0 |
159 self.data[0] = 1 # XXX: could be smarter about Frame ID | 175 self.flags = 0 |
160 self.data[3] = 0 # XXX: should be able to set flags | 176 self.payload = [] |
161 | 177 |
162 def getrecipient(self): | 178 def setrecipient(self, value): |
163 value = 0 | 179 if (value < 0 or value > 2 ** (self.ADDR_SIZE * 8)): |
180 raise ValueError("value out of range must be between 0 and %d" % (2 ** self.ADDR_SIZE)) | |
181 | |
182 self._recipient = value | |
183 recipient = property(lambda s: s._recipient, setrecipient) | |
184 | |
185 def setflags(self, value): | |
186 if (value < 0 or value > 255): | |
187 raise ValueError("Value must be between 0 and 255 inclusive") | |
188 | |
189 self._flags = value | |
190 flags = property(lambda s: s._flags, setflags) | |
191 | |
192 def setpayload(self, value): | |
193 self._checklist(value, maxlen = self.PKT_MAX_PAYLOAD) | |
194 self._payload = value | |
195 payload = property(lambda s: s._payload, setpayload) | |
196 | |
197 def getdata(self): | |
198 data = [self.frameid] | |
164 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): | 199 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): |
165 value |= self.data[i + 1] << j | 200 data.append((self.recipient & (0xff << j)) >> j) |
166 return value | 201 data.append(self.flags) |
167 | 202 data.extend(map(easyord, self.payload)) |
168 def setrecipient(self, value): | 203 return(data) |
169 self.resize(self.ADDR_SIZE + 1) | 204 data = property(getdata) |
170 self.data[0] = 1 # XXX: could be smarter about Frame ID | 205 |
171 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): | |
172 self.data[i + 1] = (value & (0xff << j)) >> j | |
173 recipient = property(getrecipient, setrecipient) | |
174 | |
175 def setpayload(self, value): | |
176 self.resize(self.ADDR_SIZE + 2) | |
177 self.data[self.ADDR_SIZE + 2:] = value | |
178 payload = property(lambda s: s.data[self.ADDR_SIZE + 2:], setpayload) | |
179 | |
180 class TX_64_Bit(TX_16_Bit): | 206 class TX_64_Bit(TX_16_Bit): |
181 PKT_TYPE = 0x00 | 207 PKT_TYPE = 0x00 |
182 PKT_DESC = "TX Packet: 64 bit address" | 208 PKT_DESC = "TX Packet: 64 bit address" |
183 ADDR_SIZE = 8 | 209 ADDR_SIZE = 8 |
184 | 210 |
185 class TX_Status(PktBase): | 211 class TX_Status(PktBase): |
186 PKT_TYPE = 0x89 | 212 PKT_TYPE = 0x89 |
187 PKT_DESC = "TX Status" | 213 PKT_DESC = "TX Status" |
188 statusTxt = ['OK', 'No Ack', 'CCA failure', 'Purged'] | 214 statusTxt = ['OK', 'No Ack', 'CCA failure', 'Purged'] |
189 frameid = property(lambda s: s.data[0], None) | 215 frameid = property(lambda s: s._data[0], None) |
190 status = property(lambda s: s.data[1], None) | 216 status = property(lambda s: s._data[1], None) |
191 statusMsg = property(lambda s: s.statusTxt[s.data[1]], None) | 217 statusMsg = property(lambda s: s.statusTxt[s._data[1]], None) |
192 | 218 |
193 class Packets(object): | 219 class Packets(object): |
194 PKT_CLASSES = None | 220 PKT_CLASSES = None |
195 | 221 |
196 def Build(self, data): | 222 def Build(self, data): |
223 self.buffer = [] | 249 self.buffer = [] |
224 self.state = 'init' | 250 self.state = 'init' |
225 self.packets = [] | 251 self.packets = [] |
226 | 252 |
227 self.bufmsb = 0 | 253 self.bufmsb = 0 |
228 self.dataleft = 0 | 254 self._dataleft = 0 |
229 | 255 |
230 self.fr_err = 0 # Framing error | 256 self.fr_err = 0 # Framing error |
231 self.ck_err = 0 # Checksum error | 257 self.ck_err = 0 # Checksum error |
232 self.rx_cnt = 0 # Packet count | 258 self.rx_cnt = 0 # Packet count |
233 | 259 |
312 adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50] | 338 adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50] |
313 | 339 |
314 # Exception | 340 # Exception |
315 badpkttypetest = [126, 0, 3, 10, 86, 76, 83] | 341 badpkttypetest = [126, 0, 3, 10, 86, 76, 83] |
316 | 342 |
343 # Frame ID = 0, Cmd = 'VL', Status = OK, Value = 'VL Result' | |
344 atreply = [126, 0, 14, 136, 0, 86, 76, 0, 86, 76, 32, 82, 101, 115, 117, 108, 116, 148] | |
345 | |
317 up = Packets(s) | 346 up = Packets(s) |
318 up.process(goodtest) | 347 up.process(goodtest) |
319 up.process(badtest) | 348 up.process(badtest) |
320 up.process(adctest) | 349 up.process(adctest) |
321 print up.pktq.pop(0) | 350 print up.pktq.pop(0) |