Mercurial > ~darius > hgwebdir.cgi > ZigBee
annotate zb.py @ 5:5d5963d542bc
More tidyups & fixes.
- Remove ability to set stuff like RSSI - it is pointless.
- Rejig class hierarchy so it makes more sense.
- Convert to list of ordinals earlier.
author | darius@inchoate.localdomain |
---|---|
date | Wed, 31 Oct 2007 20:47:32 +1030 |
parents | ed7abe6f59c2 |
children | 9c499d923544 |
rev | line source |
---|---|
4 | 1 import serial, inspect |
1 | 2 |
3 class PktBase(object): | |
4 | 4 def __init__(self, data = []): |
1 | 5 self.data = data |
6 print "Constructing " + self.__class__.__name__ | |
7 | |
8 def __repr__(self): | |
9 return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" | |
10 | |
4 | 11 def Encapsulate(self): |
12 return Packets.Encapsulate([self.PKT_TYPE] + self.data) | |
1 | 13 |
4 | 14 def resize(self, dlen): |
15 """Ensure the data list can hold at least len elements (0 fill)""" | |
16 if (len(self.data) < dlen): | |
17 self.data = (self.data + [0] * dlen)[0:dlen] | |
18 | |
1 | 19 class AT_Cmd(PktBase): |
20 PKT_TYPE = 0x08 | |
21 PKT_DESC = "AT Command" | |
22 | |
4 | 23 def __init__(self, *args): |
24 if (len(args) == 1): | |
25 super(AT_Cmd, self).__init__(args[0]) | |
26 elif (len(args) == 2): | |
27 super(AT_Cmd, self).__init__([]) | |
28 self.cmd = args[0] | |
29 if (args[1] != None): | |
30 self.cmdarg = args[1] | |
31 else: | |
32 raise TypeError("__init__ takes 1 list of ordinals or 2 strings") | |
33 | |
34 def setcmd(self, value): | |
35 self.resize(2) | |
36 self.data[0] = ord(value[0]) | |
37 self.data[1] = ord(value[1]) | |
38 cmd = property(lambda s: chr(s.data[0]) + chr(s.data[1]), setcmd) | |
39 | |
40 def setcmdarg(self, value): | |
41 self.resize(2 + len(value)) | |
42 self.data = self.data[0:2] + map(ord, value) | |
43 def delcmdarg(self): | |
44 self.data = self.data[0:2] | |
45 cmdarg = property(lambda s: map(chr, s.data[2:]), setcmdarg, delcmdarg) | |
46 | |
47 class AT_Cmd_Queue(AT_Cmd): | |
1 | 48 PKT_TYPE = 0x09 |
49 PKT_DESC = "AT Command (queued)" | |
50 | |
51 class AT_Response(PktBase): | |
52 PKT_TYPE = 0x88 | |
53 PKT_DESC = "AT Command response" | |
54 | |
55 class Modem_Status(PktBase): | |
56 PKT_TYPE = 0x8a | |
57 PKT_DESC = "Modem Status" | |
58 | |
59 class RX_16_Bit(PktBase): | |
60 PKT_TYPE = 0x81 | |
61 PKT_DESC = "RX Packet: 16 bit address" | |
4 | 62 ADDR_SIZE = 2 |
63 | |
64 def getsender(self): | |
65 value = 0 | |
66 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): | |
67 value |= self.data[i] << j | |
68 return value | |
5 | 69 sender = property(getsender, None) |
70 | |
71 rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], None) | |
1 | 72 |
5 | 73 flags = property(lambda s: s.data[s.ADDR_SIZE + 1], None) |
4 | 74 |
5 | 75 payload = property(lambda s: s.data[s.ADDR_SIZE + 2:], None) |
76 | |
4 | 77 class RX_64_Bit(RX_16_Bit): |
78 PKT_TYPE = 0x80 | |
79 PKT_DESC = "RX Packet: 64 bit address" | |
80 ADDR_SIZE = 8 | |
81 | |
82 class RXIO_16_Bit(RX_16_Bit): | |
1 | 83 PKT_TYPE = 0x83 |
84 PKT_DESC = "RXIO Packet: 16 bit address" | |
85 | |
4 | 86 def setnsamples(self, value): |
87 self.resize(self.ADDR_SIZE + 3) | |
88 self.data[self.ADDR_SIZE + 2] = value | |
5 | 89 |
90 def getnsamples(self): | |
91 return self.data[self.ADDR_SIZE + 2] | |
92 nsamples = property(getnsamples, setnsamples) | |
4 | 93 |
94 def setmask(self, value): | |
95 self.resize(self.ADDR_SIZE + 5) | |
96 self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8 | |
97 self.data[self.ADDR_SIZE + 4] = value & 0xff | |
98 mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask) | |
99 | |
1 | 100 def __str__(self): |
4 | 101 rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, |
102 self.rssi, self.nsamples, self.mask) | |
1 | 103 # Any DIO lines enabled? |
4 | 104 if (self.mask | 0x01ff): |
105 rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 | | |
106 self.data[self.ADDR_SIZE + 6]) | |
107 offs = self.ADDR_SIZE + 7 | |
1 | 108 else: |
4 | 109 offs = self.ADDR_SIZE + 5 |
1 | 110 |
111 # Any ADC lines enabled? | |
4 | 112 if (self.mask | 0x7e00): |
1 | 113 for i in range(6): |
4 | 114 if (self.mask & 1 << (i + 9)): |
1 | 115 rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | |
116 self.data[offs + 1]) | |
117 offs = offs + 2 | |
118 | |
119 return rtn | |
120 | |
5 | 121 class RXIO_64_Bit(RXIO_16_Bit): |
4 | 122 PKT_TYPE = 0x82 |
123 PKT_DESC = "RXIO Packet: 64 bit address" | |
124 ADDR_SIZE = 8 | |
125 | |
5 | 126 class TX_16_Bit(RX_64_Bit): |
4 | 127 PKT_TYPE = 0x00 |
128 PKT_DESC = "TX Packet: 16 bit address" | |
129 | |
5 | 130 def setsender(self, value): |
131 self.resize(self.ADDR_SIZE) | |
132 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): | |
133 self.data[i] = (value & (0xff << j)) >> j | |
134 sender = property(RX_64_Bit.getsender, setsender) | |
135 | |
136 class TX_64_Bit(RX_64_Bit): | |
1 | 137 PKT_TYPE = 0x00 |
138 PKT_DESC = "TX Packet: 64 bit address" | |
139 | |
140 class TX_Status(PktBase): | |
141 PKT_TYPE = 0x89 | |
142 PKT_DESC = "TX Status" | |
143 | |
144 class Packets(object): | |
4 | 145 PKT_CLASSES = None |
1 | 146 |
4 | 147 def Build(self, data): |
148 if (self.PKT_CLASSES == None): | |
149 m = inspect.getmodule(self) | |
150 # Generate list of objects from their names | |
151 mobjs = map(lambda n: m.__dict__[n], m.__dict__) | |
152 # Find all the classes | |
153 pktclasses = filter(inspect.isclass, mobjs) | |
154 # Find all subclasses of PktBase (but not PktBase itself) | |
155 pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses) | |
156 self.PKT_CLASSES = pktclasses | |
157 | |
158 for p in self.PKT_CLASSES: | |
1 | 159 if (p.PKT_TYPE == data[0]): |
160 return(p(data[1:])) | |
4 | 161 |
162 raise ValueError("Unknown packet type 0x%02x" % (data[0])) | |
163 Build = classmethod(Build) | |
1 | 164 |
165 def Encapsulate(data): | |
166 pktsum = reduce(lambda x, y: x + y, data) & 0xff | |
167 pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum] | |
168 return(map(chr, pkt)) | |
169 | |
170 Encapsulate = staticmethod(Encapsulate) | |
171 | |
172 def __init__(self): | |
4 | 173 print str(inspect.getmodule(self)) |
1 | 174 self.buffer = [] |
175 self.state = 'init' | |
176 self.packets = [] | |
177 | |
178 self.bufmsb = 0 | |
179 self.dataleft = 0 | |
180 | |
181 self.fr_err = 0 # Framing error | |
182 self.ck_err = 0 # Checksum error | |
183 self.rx_cnt = 0 # Packet count | |
184 | |
185 self.pktq = [] | |
5 | 186 |
187 def writedata(self, s, data): | |
188 s.write("".join(map(str, data))) | |
189 | |
1 | 190 def getdata(self, s): |
191 l = [] | |
192 while (1): | |
193 a = s.read() | |
194 if (a == ''): | |
195 break | |
5 | 196 l.append(ord(a)) |
1 | 197 |
198 return self.process(l) | |
199 | |
200 def process(self, data): | |
201 pktcount = 0 | |
202 for d in data: | |
203 if (self.state == 'init'): | |
5 | 204 if (d != 0x7e): |
205 print "Framing error, got 0x%02x, expected 0x7e" % (d) | |
4 | 206 self.fr_err += 1 |
1 | 207 continue |
208 | |
209 self.state = 'sizemsb' | |
210 elif (self.state == 'sizemsb'): | |
5 | 211 self.bufmsb = d |
1 | 212 self.state = 'sizelsb' |
213 elif (self.state == 'sizelsb'): | |
214 self.dataleft = self.bufmsb << 8 | \ | |
5 | 215 d |
1 | 216 self.state = 'data' |
217 elif (self.state == 'data'): | |
5 | 218 self.buffer.append(d) |
1 | 219 self.dataleft = self.dataleft - 1 |
220 if (self.dataleft == 0): | |
221 self.state = 'cksum' | |
222 elif (self.state == 'cksum'): | |
223 pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff | |
5 | 224 rxcksum = d |
1 | 225 self.state = 'init' |
226 if (pktsum + rxcksum != 0xff): | |
227 self.buffer = [] | |
4 | 228 self.ck_err += 1 |
1 | 229 print "Checksum error, got 0x%02x, expected 0x%02x" % \ |
230 (rxcksum, 0xff - pktsum) | |
231 else: | |
5 | 232 #print "Got a packet - " + str(self.buffer) |
1 | 233 p = Packets.Build(self.buffer) |
234 self.pktq.append(p) | |
235 self.buffer = [] | |
4 | 236 pktcount += 1 |
237 self.rx_cnt += 1 | |
1 | 238 else: |
239 print "Invalid state %s! Resetting" % (self.state) | |
240 self.state = 'init' | |
241 | |
242 return pktcount | |
243 | |
244 #for c in dir(): | |
245 # if (issubclass(c, PktBase)): | |
246 # print .. | |
0
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
247 |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
248 s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
249 stopbits=1, rtscts=0) |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
250 s.setTimeout(0.1) |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
251 #s.write('+++') |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
252 #s.readline(eol='\r') |
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
253 |
4 | 254 |
255 # 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f | |
5 | 256 goodtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 15, 56] |
4 | 257 |
258 # Checksum error | |
5 | 259 badtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 14, 56] |
4 | 260 |
261 #0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff | |
5 | 262 adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50] |
4 | 263 |
264 # Exception | |
5 | 265 badpkttypetest = [126, 0, 3, 10, 86, 76, 83] |
4 | 266 |
1 | 267 up = Packets() |
268 up.process(goodtest) | |
3
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
269 up.process(badtest) |
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
270 up.process(adctest) |
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
271 print up.pktq.pop(0) |
7bf4d4265339
Pop off the packets we test with.
darius@inchoate.localdomain
parents:
1
diff
changeset
|
272 print up.pktq.pop(0) |
0
bdcdd8380d94
Some test routines & framework for talking to MaxStream ZigBee modules.
darius@inchoate.localdomain
parents:
diff
changeset
|
273 |