Mercurial > ~darius > hgwebdir.cgi > ZigBee
view zb.py @ 17:b0fc5f8da118
Rename to zbmux.tac, and use application framework.
Add emacs magic so it gets edited in Python mode.
Make log file rotate at 1Mb.
author | darius@Inchoate |
---|---|
date | Sun, 18 Jan 2009 13:32:29 +1030 |
parents | 729f2393f296 |
children | c6ee9eae9e49 |
line wrap: on
line source
# # Code to talk to MaxStream ZigBee modules in API (no escaped # characters) # # Copyright (c) 2009 # Daniel O'Connor <darius@dons.net.au>. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # """MaxStream ZigBee module API interface This code expects the module to be in API mode 1 (no escape characters), ie you have done something like.. +++ ATAP=1 ATWR ATCN See here for details http://www.digi.com/products/wireless/point-multipoint/xbee-series1-moduledocs.jsp """ import inspect def easyord(i): """Helper function to return the ordinal of a string, or just the passed in value""" if (type(i) != type(str())): return i else: return ord(i) class PktBase(object): """Base class for all packet types""" PKT_MAXLEN = 2 ** 16 PKT_TYPE = None def __init__(self): #print "Constructing " + self.__class__.__name__ pass def Encapsulate(self): """Encapsulate the packet""" return Packets.Encapsulate([self.PKT_TYPE] + self.data) def Pack(self): """Return string version of encapsulated packet""" return reduce(lambda x, y: str(x) + str(y), self.Encapsulate()) def resize(self, dlen): """Ensure the data list can hold at least len elements (0 fill)""" if (len(self._data) < dlen): self._data = (self._data + [0] * dlen)[0:dlen] @staticmethod def _checklist(list, min = 0, max = 255, maxlen = None): if (maxlen != None and len(list) > maxlen): raise ValueError("must have %d elements" % (maxlen)) for i in xrange(len(list)): if (easyord(list[i]) < min or easyord(list[i]) > max): raise ValueError("element %d (= %d) out of range must be between %d and %d inclusive" % (i, ord(list[i]), min, max)) class TXPkts(PktBase): """Base class for all packets that go to the module""" frameidcounter = 0 def __init__(self): self._frameid = self.getNextFrameId() @classmethod def getNextFrameId(clss): """Generate the next frame ID, skip 0 as it will cause the module to not send a response back""" clss.frameidcounter = (clss.frameidcounter + 1) % 255 if clss.frameidcounter == 0: clss.frameidcounter = 1 return clss.frameidcounter def setframeid(self, value): if (value < 0 or value > 255): raise ValueError("FrameID must be 0-255") self._frameid = value frameid = property(lambda s: s._frameid, setframeid) class AT_Cmd(TXPkts): """AT command packet""" PKT_TYPE = 0x08 PKT_DESC = "AT Command" def __init__(self, cmd = None, cmdarg = None): self.frameid = 1 # XXX: why do I need to dupe this? self.cmdarg = [] super(AT_Cmd, self).__init__() if (cmd != None): self.cmd = cmd if (cmdarg != None): self.cmdarg = cmdarg def setcmd(self, value): if (len(value) != 2): raise ValueError("must have 2 elements") self._checklist(value, ord('0'), ord('z')) self._cmd = value cmd = property(lambda s: s._cmd, setcmd) def setcmdarg(self, value): self._checklist(value, maxlen = self.PKT_MAXLEN - 3) self._cmdarg = value cmdarg = property(lambda s: s._cmdarg, setcmdarg) def getdata(self): return([self.frameid] + map(ord, self.cmd) + map(easyord, self.cmdarg)) data = property(getdata) class AT_Cmd_Queue(AT_Cmd): """Queued AT command packet""" PKT_TYPE = 0x09 PKT_DESC = "AT Command (queued)" class AT_Response(PktBase): """Response from an AT command packet""" PKT_TYPE = 0x88 PKT_DESC = "AT Command response" frameid = property(lambda s: s._data[0], None) cmd = property(lambda s: chr(s._data[1]) + chr(s._data[2]), None) statusOK = property(lambda s: s._data[3] == 0, None) payload = property(lambda s: s._data[4:], None) def __init__(self, data = []): super(AT_Response, self).__init__() self._data = data class Modem_Status(PktBase): PKT_TYPE = 0x8a PKT_DESC = "Modem Status" class RX_16_Bit(PktBase): """RX packet from a remote module (16 bit)""" PKT_TYPE = 0x81 PKT_DESC = "RX Packet: 16 bit address" ADDR_SIZE = 2 ADRBCASTMSK = 0x01 PANBCASTMSK = 0x02 def __init__(self, data = []): super(RX_16_Bit, self).__init__() self._data = data def __str__(self): return "RX_%d_Bit 0x%0*x (%ddBm) -> %s" % (self.ADDR_SIZE * 8, self.ADDR_SIZE * 2, self.sender, self.rssi, self.payloadstr) def getsender(self): value = 0 # Done this way so we can reuse the code for the 64 bit version for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): value |= self._data[i] << j return value #: Source module address sender = property(getsender, None) def isAdrBcast(self): """Is this an address broadcast packet?""" return self.flags & self.ADRBCASTMSK def isPanBcast(self): """Is this an PAN broadcast packet?""" return self.flags & self.PANBCASTMSK #: RX signal strength (dBm) rssi = property(lambda s: -1 * s._data[s.ADDR_SIZE], None) #: Return flag byte flags = property(lambda s: s._data[s.ADDR_SIZE + 1], None) #: Payload (list of ords) payload = property(lambda s: s._data[s.ADDR_SIZE + 2:], None) #: String version of payload def payloadstr(self): return reduce(lambda a, b: a + chr(b), self.payload, "") payloadstr = property(payloadstr, None) class RX_64_Bit(RX_16_Bit): PKT_TYPE = 0x80 PKT_DESC = "RX Packet: 64 bit address" ADDR_SIZE = 8 class RXIO_16_Bit(RX_16_Bit): """RX I/O packet from remote module (16 bit). This is sent when a remote module is configured to send data based on its IO or DAC pins """ PKT_TYPE = 0x83 PKT_DESC = "RXIO Packet: 16 bit address" nsamples = property(lambda s: s._data[s.ADDR_SIZE + 2]) mask = property(lambda s: s._data[s.ADDR_SIZE + 3] << 8 | s._data[s.ADDR_SIZE + 4]) def __str__(self): rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, self.rssi, self.nsamples, self.mask) # Any DIO lines enabled? if (self.mask | 0x01ff): rtn = rtn + ", DIO - 0x%03x" % (self._data[self.ADDR_SIZE + 5] << 8 | self._data[self.ADDR_SIZE + 6]) offs = self.ADDR_SIZE + 7 else: offs = self.ADDR_SIZE + 5 # Any ADC lines enabled? if (self.mask | 0x7e00): for i in range(6): if (self.mask & 1 << (i + 9)): rtn = rtn + ", ADC%d - 0x%02x" % (i, self._data[offs] << 8 | self._data[offs + 1]) offs = offs + 2 return rtn class RXIO_64_Bit(RXIO_16_Bit): PKT_TYPE = 0x82 PKT_DESC = "RXIO Packet: 64 bit address" ADDR_SIZE = 8 class TX_16_Bit(TXPkts): """Transmit to a 16 bit destination""" PKT_TYPE = 0x01 PKT_DESC = "TX Packet: 16 bit address" ADDR_SIZE = 2 #: Flag to disable ACK FLG_DISABLE_ACK = 0x01 #: Send to broadcast PAN ID FLG_BCAST_PANID = 0x04 #: Maximum size payload we can send PKT_MAX_PAYLOAD = 100 def __init__(self, *args): """Takes 0 to 2 arguments. First is the recipient, the second is the payload (string)""" self._flags = 0 self.payload = [] if len(args) == 0: pass if len(args) == 1: super(TX_16_Bit, self).__init__() self.recipient = args[0] elif len(args) == 2: super(TX_16_Bit, self).__init__() self.recipient = args[0] self.payload = args[1] else: raise TypeError("incorrect number of arguments"); def __str__(self): return "TX_%d_Bit ID: %d 0x%0*x <- %s" % (self.ADDR_SIZE * 8, self._frameid, self.ADDR_SIZE * 2, self.recipient, self.payload) def setrecipient(self, value): if (value < 0 or value > 2 ** (self.ADDR_SIZE * 8)): raise ValueError("value out of range must be between 0 and %d" % (2 ** self.ADDR_SIZE)) self._recipient = value """Destination address of the packet""" recipient = property(lambda s: s._recipient, setrecipient) def setflags(self, value): if (value < 0 or value > 255): raise ValueError("Value must be between 0 and 255 inclusive") self._flags = value flags = property(lambda s: s._flags, setflags) def setpayload(self, value): self._checklist(value, maxlen = self.PKT_MAX_PAYLOAD) self._payload = value payload = property(lambda s: s._payload, setpayload) def payloadstr(self): return reduce(lambda a, b: a + chr(b), self.payload, "") payloadstr = property(payloadstr, None) def getdata(self): data = [self.frameid] for i, j in zip(reversed(range(self.ADDR_SIZE)), reversed(range(0, self.ADDR_SIZE * 8, 8))): data.append((self.recipient & (0xff << j)) >> j) data.append(self.flags) data.extend(map(easyord, self.payload)) return(data) data = property(getdata) class TX_64_Bit(TX_16_Bit): PKT_TYPE = 0x00 PKT_DESC = "TX Packet: 64 bit address" ADDR_SIZE = 8 class TX_Status(PktBase): PKT_TYPE = 0x89 PKT_DESC = "TX Status" statusTxt = ['OK', 'No Ack', 'CCA failure', 'Purged'] frameid = property(lambda s: s._data[0], None) status = property(lambda s: s._data[1], None) statusMsg = property(lambda s: s.statusTxt[s._data[1]], None) def __init__(self, data = []): super(TX_Status, self).__init__() self._data = data class Packets(object): """Packet parsing class (misnamed)""" PKT_CLASSES = None @classmethod def Build(self, data): """Build a packet from data""" if (self.PKT_CLASSES == None): m = inspect.getmodule(self) # Generate list of objects from their names mobjs = map(lambda n: m.__dict__[n], m.__dict__) # Find all the classes pktclasses = filter(inspect.isclass, mobjs) # Find all subclasses of PktBase (but not PktBase itself) pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses) self.PKT_CLASSES = pktclasses for p in self.PKT_CLASSES: if (p.PKT_TYPE == data[0]): #print "Matched " + str(p.PKT_TYPE) return(p(data[1:])) raise ValueError("Unknown packet type 0x%02x" % (data[0])) @staticmethod def Encapsulate(data): """Encapsulate a packet so it can be sent to the module. Calculates checksum etc..""" pktsum = reduce(lambda x, y: x + y, data) & 0xff pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum] return(map(chr, pkt)) def __init__(self, s = None): """Init class, if s is passed in it is used for reading & writing data""" #print str(inspect.getmodule(self)) self.buffer = [] self.state = 'init' self.packets = [] self.bufmsb = 0 self._dataleft = 0 self.fr_err = 0 # Framing error self.ck_err = 0 # Checksum error self.rx_cnt = 0 # Packet count self.pktq = [] self.s = s # Output handle for convenience methods def writedata(self, data): """Convenience method to write data""" self.s.write("".join(map(str, data))) def getdata(self): """Read data until nothing is available (assumes non-blocking) and process it""" l = [] while 1: a = self.s.read() if a == '': break l.append(ord(a)) return self.process(l) def processstr(self, data): """Process a string of data""" return self.process(map(ord, data)) def process(self, data): """Process (ordinal) data through the state machine. Returns the number of packets in the queue when finished. Updates various internal counters too. """ pktcount = 0 for d in data: if (self.state == 'init'): if (d != 0x7e): print "Framing error, got 0x%02x, expected 0x7e" % d self.fr_err += 1 continue self.state = 'sizemsb' elif (self.state == 'sizemsb'): self.bufmsb = d self.state = 'sizelsb' elif (self.state == 'sizelsb'): self.dataleft = self.bufmsb << 8 | d self.state = 'data' elif (self.state == 'data'): self.buffer.append(d) self.dataleft = self.dataleft - 1 if (self.dataleft == 0): self.state = 'cksum' elif (self.state == 'cksum'): pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff rxcksum = d self.state = 'init' if (pktsum + rxcksum != 0xff): self.buffer = [] self.ck_err += 1 print "Checksum error, got 0x%02x, expected 0x%02x" % \ (rxcksum, 0xff - pktsum) else: #print "Got a packet - " + str(self.buffer) p = Packets.Build(self.buffer) self.pktq.append(p) self.buffer = [] pktcount += 1 self.rx_cnt += 1 else: print "Invalid state %s! Resetting" % (self.state) self.state = 'init' return pktcount