Mercurial > ~darius > hgwebdir.cgi > ZigBee
changeset 11:75f785a09e2e
Add license.
Split test stuff into a separate file.
Fix some bugs..
- Default frame ID to 1 so we get status replies by default.
- Correct address generation routine for TX packets.
Add doc strings and epydoc variable comments.
author | darius@Inchoate |
---|---|
date | Tue, 13 Jan 2009 12:14:13 +1030 |
parents | 4c91fdfc862e |
children | a3ec3f2988ac |
files | zb.py zbtest.py |
diffstat | 2 files changed, 244 insertions(+), 80 deletions(-) [+] |
line wrap: on
line diff
--- a/zb.py Wed Nov 07 17:10:08 2007 +1030 +++ b/zb.py Tue Jan 13 12:14:13 2009 +1030 @@ -1,25 +1,77 @@ -import serial, inspect, time +# +# Code to talk to MaxStream ZigBee modules in API (no escaped +# characters) +# +# Copyright (c) 2009 +# Daniel O'Connor <darius@dons.net.au>. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +"""MaxStream ZigBee module API interface + +This code expects the module to be in API mode 1 (no escape +characters), ie you have done something like.. ++++ +ATAP=1 +ATWR +ATCN + +See here for details +http://www.digi.com/products/wireless/point-multipoint/xbee-series1-moduledocs.jsp +""" + +import inspect def easyord(i): + """Helper function to return the ordinal of a string, or just the +passed in value""" if (type(i) != type(str())): return i else: return ord(i) class PktBase(object): + """Base class for all packet types""" PKT_MAXLEN = 2 ** 16 + PKT_TYPE = None def __init__(self): - print "Constructing " + self.__class__.__name__ + #print "Constructing " + self.__class__.__name__ + pass + + def Encapsulate(self): + """Encapsulate the packet""" + return Packets.Encapsulate([self.PKT_TYPE] + self.data) - def Encapsulate(self): - return Packets.Encapsulate([self.PKT_TYPE] + self.data) + def Pack(self): + """Return string version of encapsulated packet""" + return reduce(lambda x, y: str(x) + str(y), self.Encapsulate()) def resize(self, dlen): """Ensure the data list can hold at least len elements (0 fill)""" if (len(self._data) < dlen): self._data = (self._data + [0] * dlen)[0:dlen] + @staticmethod def _checklist(list, min = 0, max = 255, maxlen = None): if (maxlen != None and len(list) > maxlen): raise ValueError("must have %d elements" % (maxlen)) @@ -28,31 +80,35 @@ if (easyord(list[i]) < min or easyord(list[i]) > max): raise ValueError("element %d (= %d) out of range must be between %d and %d inclusive" % (i, ord(list[i]), min, max)) - _checklist = staticmethod(_checklist) class TXPkts(PktBase): """Base class for all packets that go to the module""" + def __init__(self): + # Frame ID of 0 will prevent TX status pakets being sent + self._frameid = 1 + def setframeid(self, value): if (value < 0 or value > 255): raise ValueError("FrameID must be 0-255") self._frameid = value frameid = property(lambda s: s._frameid, setframeid) +class AT_Cmd(TXPkts): + """AT command packet""" -class AT_Cmd(TXPkts): PKT_TYPE = 0x08 PKT_DESC = "AT Command" def __init__(self, cmd = None, cmdarg = None): + self.frameid = 1 # XXX: why do I need to dupe this? + self.cmdarg = [] + super(AT_Cmd, self).__init__() if (cmd != None): self.cmd = cmd if (cmdarg != None): self.cmdarg = cmdarg - - self.frameid = 0 - self.cmdarg = [] def setcmd(self, value): if (len(value) != 2): @@ -71,10 +127,14 @@ data = property(getdata) class AT_Cmd_Queue(AT_Cmd): + """Queued AT command packet""" + PKT_TYPE = 0x09 PKT_DESC = "AT Command (queued)" class AT_Response(PktBase): + """Response from an AT command packet""" + PKT_TYPE = 0x88 PKT_DESC = "AT Command response" frameid = property(lambda s: s._data[0], None) @@ -82,42 +142,71 @@ statusOK = property(lambda s: s._data[3] == 0, None) payload = property(lambda s: s._data[4:], None) + def __init__(self, data = []): + super(AT_Response, self).__init__() + self._data = data + class Modem_Status(PktBase): PKT_TYPE = 0x8a PKT_DESC = "Modem Status" class RX_16_Bit(PktBase): + """RX packet from a remote module (16 bit)""" PKT_TYPE = 0x81 PKT_DESC = "RX Packet: 16 bit address" ADDR_SIZE = 2 + ADRBCASTMSK = 0x01 + PANBCASTMSK = 0x02 def __init__(self, data = []): super(RX_16_Bit, self).__init__() self._data = data - + def __str__(self): - return "0x%0*x (%ddBm) -> %s" % (self.ADDR_SIZE * 2, self.sender, - self.rssi, str(self.payload)) - + return "RX_%d_Bit 0x%0*x (%ddBm) -> %s" % (self.ADDR_SIZE * 8, self.ADDR_SIZE * 2, + self.sender, self.rssi, self.payloadstr) def getsender(self): value = 0 + # Done this way so we can reuse the code for the 64 bit version for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): value |= self._data[i] << j return value + #: Source module address sender = property(getsender, None) + def isAdrBcast(self): + """Is this an address broadcast packet?""" + return self.flags & self.ADRBCASTMSK + + def isPanBcast(self): + """Is this an PAN broadcast packet?""" + return self.flags & self.PANBCASTMSK + + #: RX signal strength (dBm) rssi = property(lambda s: -1 * s._data[s.ADDR_SIZE], None) + #: Return flag byte flags = property(lambda s: s._data[s.ADDR_SIZE + 1], None) + #: Payload (list of ords) payload = property(lambda s: s._data[s.ADDR_SIZE + 2:], None) - + + #: String version of payload + def payloadstr(self): + return reduce(lambda a, b: a + chr(b), self.payload, "") + payloadstr = property(payloadstr, None) + + class RX_64_Bit(RX_16_Bit): PKT_TYPE = 0x80 PKT_DESC = "RX Packet: 64 bit address" ADDR_SIZE = 8 class RXIO_16_Bit(RX_16_Bit): + """RX I/O packet from remote module (16 bit). + +This is sent when a remote module is configured to send data based on its IO or DAC pins +""" PKT_TYPE = 0x83 PKT_DESC = "RXIO Packet: 16 bit address" @@ -151,33 +240,45 @@ ADDR_SIZE = 8 class TX_16_Bit(TXPkts): + """Transmit to a 16 bit destination""" PKT_TYPE = 0x01 PKT_DESC = "TX Packet: 16 bit address" ADDR_SIZE = 2 + #: Flag to disable ACK FLG_DISABLE_ACK = 0x01 + #: Send to broadcast PAN ID FLG_BCAST_PANID = 0x04 + #: Maximum size payload we can send PKT_MAX_PAYLOAD = 100 def __init__(self, *args): - if (len(args) == 1): + """Takes 0 to 2 arguments. First is the recipient, the second is the payload (string)""" + self._flags = 0 + self.payload = [] + + if len(args) == 0: + pass + if len(args) == 1: super(TX_16_Bit, self).__init__() self.recipient = args[0] - elif (len(args) == 2): - super(TX_16_Bit, self).__init__([]) + elif len(args) == 2: + super(TX_16_Bit, self).__init__() self.recipient = args[0] self.payload = args[1] else: - raise TypeError("__init__ takes 1 list of ordinals or 2 strings") + raise TypeError("incorrect number of arguments"); - self.frameid = 0 - self.flags = 0 - self.payload = [] - + def __str__(self): + return "TX_%d_Bit 0x%0*x <- %s" % (self.ADDR_SIZE * 8, self.ADDR_SIZE * 2, self.recipient, + self.payload) + def setrecipient(self, value): if (value < 0 or value > 2 ** (self.ADDR_SIZE * 8)): raise ValueError("value out of range must be between 0 and %d" % (2 ** self.ADDR_SIZE)) self._recipient = value + + """Destination address of the packet""" recipient = property(lambda s: s._recipient, setrecipient) def setflags(self, value): @@ -192,9 +293,13 @@ self._payload = value payload = property(lambda s: s._payload, setpayload) + def payloadstr(self): + return reduce(lambda a, b: a + chr(b), self.payload, "") + payloadstr = property(payloadstr, None) + def getdata(self): data = [self.frameid] - for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): + for i, j in zip(reversed(range(self.ADDR_SIZE)), reversed(range(0, self.ADDR_SIZE * 8, 8))): data.append((self.recipient & (0xff << j)) >> j) data.append(self.flags) data.extend(map(easyord, self.payload)) @@ -213,11 +318,18 @@ frameid = property(lambda s: s._data[0], None) status = property(lambda s: s._data[1], None) statusMsg = property(lambda s: s.statusTxt[s._data[1]], None) - + + def __init__(self, data = []): + super(TX_Status, self).__init__() + self._data = data + class Packets(object): + """Packet parsing class (misnamed)""" PKT_CLASSES = None - + + @classmethod def Build(self, data): + """Build a packet from data""" if (self.PKT_CLASSES == None): m = inspect.getmodule(self) # Generate list of objects from their names @@ -230,20 +342,21 @@ for p in self.PKT_CLASSES: if (p.PKT_TYPE == data[0]): + #print "Matched " + str(p.PKT_TYPE) return(p(data[1:])) raise ValueError("Unknown packet type 0x%02x" % (data[0])) - Build = classmethod(Build) + @staticmethod def Encapsulate(data): + """Encapsulate a packet so it can be sent to the module. Calculates checksum etc..""" pktsum = reduce(lambda x, y: x + y, data) & 0xff pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum] return(map(chr, pkt)) - Encapsulate = staticmethod(Encapsulate) - def __init__(self, s = None): - print str(inspect.getmodule(self)) + """Init class, if s is passed in it is used for reading & writing data""" + #print str(inspect.getmodule(self)) self.buffer = [] self.state = 'init' self.packets = [] @@ -256,27 +369,38 @@ self.rx_cnt = 0 # Packet count self.pktq = [] - self.s = s + self.s = s # Output handle for convenience methods def writedata(self, data): + """Convenience method to write data""" self.s.write("".join(map(str, data))) def getdata(self): + """Read data until nothing is available (assumes non-blocking) and process it""" l = [] - while (1): + while 1: a = self.s.read() - if (a == ''): + if a == '': break l.append(ord(a)) return self.process(l) + def processstr(self, data): + """Process a string of data""" + return self.process(map(ord, data)) + def process(self, data): + """Process (ordinal) data through the state machine. + +Returns the number of packets in the queue when finished. Updates +various internal counters too. +""" pktcount = 0 for d in data: if (self.state == 'init'): if (d != 0x7e): - print "Framing error, got 0x%02x, expected 0x7e" % (d) + print "Framing error, got 0x%02x, expected 0x7e" % d self.fr_err += 1 continue @@ -302,7 +426,7 @@ print "Checksum error, got 0x%02x, expected 0x%02x" % \ (rxcksum, 0xff - pktsum) else: - print "Got a packet - " + str(self.buffer) + #print "Got a packet - " + str(self.buffer) p = Packets.Build(self.buffer) self.pktq.append(p) self.buffer = [] @@ -314,49 +438,3 @@ return pktcount -def polldev(): - while (1): - foo = up.getdata() - for p in up.pktq: - print p - - if (foo == 0): - time.sleep(0.1) - -#for c in dir(): -# if (issubclass(c, PktBase)): -# print .. - -try: - s = serial.Serial(port='/dev/cuaU0', baudrate=9600, bytesize=8, parity='N', \ - stopbits=1, rtscts=0) - # Non-blocking - s.timeout = 0 - #s.write('+++') - #s.readline(eol='\r') -except serial.serialutil.SerialException, e: - print "Can't open serial port - " + str(e) - s = None - -# 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f -goodtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 15, 56] - -# Checksum error -badtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 14, 56] - -#0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff -adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50] - -# Exception -badpkttypetest = [126, 0, 3, 10, 86, 76, 83] - -# Frame ID = 0, Cmd = 'VL', Status = OK, Value = 'VL Result' -atreply = [126, 0, 14, 136, 0, 86, 76, 0, 86, 76, 32, 82, 101, 115, 117, 108, 116, 148] - -up = Packets(s) -up.process(goodtest) -up.process(badtest) -up.process(adctest) -print up.pktq.pop(0) -print up.pktq.pop(0) -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/zbtest.py Tue Jan 13 12:14:13 2009 +1030 @@ -0,0 +1,86 @@ +# +# Test code for ZigBee code +# +# Copyright (c) 2009 +# Daniel O'Connor <darius@dons.net.au>. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# + +import zb, serial, time + +def polldev(): + while (1): + foo = up.getdata() + for p in up.pktq: + print p + + if (foo == 0): + time.sleep(0.1) + +#for c in dir(): +# if (issubclass(c, PktBase)): +# print .. + +try: + s = serial.Serial(port='/dev/cuaU0', baudrate=38400, bytesize=8, parity='N', \ + stopbits=1, rtscts=0) + # Non-blocking + s.timeout = 0 + #s.write('+++') + #s.readline(eol='\r') +except serial.serialutil.SerialException, e: + print "Can't open serial port - " + str(e) + s = None + +# 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f +goodtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 15, 56] + +# Checksum error +badtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 14, 56] + +#0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff +adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50] + +# Exception +badpkttypetest = [126, 0, 3, 10, 86, 76, 83] + +# Frame ID = 0, Cmd = 'VL', Status = OK, Value = 'VL Result' +atreply = [126, 0, 14, 136, 0, 86, 76, 0, 86, 76, 32, 82, 101, 115, 117, 108, 116, 148] + +# Do some basic tests +up = zb.Packets(s) +up.process(goodtest) +up.process(badtest) +up.process(adctest) +p = up.pktq.pop(0) +assert(p.sender == 0x1 and p.rssi == -36 and p.nsamples == 1 and p.mask == 0xf) + +p = up.pktq.pop(0) +assert(p.sender == 0x5 and p.rssi == -36 and p.nsamples == 1 and p.mask == 0x20e) + +assert len(up.pktq) == 0 + +assert(up.rx_cnt == 2) +assert(up.ck_err == 1) +assert(up.fr_err == 0) +