view zb.py @ 7:579dedf5a1f1

Rejoin line break, left over from removal of struct.
author darius@inchoate.localdomain
date Thu, 01 Nov 2007 16:32:17 +1030
parents 9c499d923544
children 9f0808b13454
line wrap: on
line source

import serial, inspect

class PktBase(object):
    def __init__(self, data = []):
        self.data = data
        print "Constructing " + self.__class__.__name__

    def __repr__(self):
        return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")"

    def Encapsulate(self):
        return Packets.Encapsulate([self.PKT_TYPE] + self.data)

    def resize(self, dlen):
        """Ensure the data list can hold at least len elements (0 fill)"""
        if (len(self.data) < dlen):
            self.data = (self.data + [0] * dlen)[0:dlen]
    
class AT_Cmd(PktBase):
    PKT_TYPE = 0x08
    PKT_DESC = "AT Command"

    def __init__(self, *args):
        if (len(args) == 1):
            if (type(args[0]) == type(str())):
                super(AT_Cmd, self).__init__([])
                self.cmd = args[0]
            else:
                super(AT_Cmd, self).__init__(args[0])
        elif (len(args) == 2):
            super(AT_Cmd, self).__init__([])
            self.cmd = args[0]
            self.cmdarg = args[1]
        else:
            raise TypeError("__init__ takes 1 list of ordinals, 1 string, or 2 strings")

        self.data[0] = 1 # XXX: could be smarter about Frame ID

    def setcmd(self, value):
        self.resize(3)
        self.data[1] = ord(value[0])
        self.data[2] = ord(value[1])
    cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), setcmd)

    def setcmdarg(self, value):
        self.resize(3 + len(value))
        self.data = self.data[0:3] + map(ord, value)
    def delcmdarg(self):
        self.data = self.data[0:3]
    cmdarg = property(lambda s: map(chr, s.data[3:]), setcmdarg, delcmdarg)
        
class AT_Cmd_Queue(AT_Cmd):
    PKT_TYPE = 0x09
    PKT_DESC = "AT Command (queued)"

class AT_Response(PktBase):
    PKT_TYPE = 0x88
    PKT_DESC = "AT Command response"
    frameid = property(lambda s: s.data[0], None)
    cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), None)
    statusOK = property(lambda s: s.data[3] == 0, None)
    payload = property(lambda s: s.data[4:], None)

class Modem_Status(PktBase):
    PKT_TYPE = 0x8a
    PKT_DESC = "Modem Status"

class RX_16_Bit(PktBase):
    PKT_TYPE = 0x81
    PKT_DESC = "RX Packet: 16 bit address"
    ADDR_SIZE = 2
    
    def getsender(self):
        value = 0
        for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)):
            value |= self.data[i] << j
        return value
    sender = property(getsender, None)

    rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], None)

    flags = property(lambda s: s.data[s.ADDR_SIZE + 1], None)

    payload = property(lambda s: s.data[s.ADDR_SIZE + 2:], None)
                       
class RX_64_Bit(RX_16_Bit):
    PKT_TYPE = 0x80
    PKT_DESC = "RX Packet: 64 bit address"
    ADDR_SIZE = 8

class RXIO_16_Bit(RX_16_Bit):
    PKT_TYPE = 0x83
    PKT_DESC = "RXIO Packet: 16 bit address"

    def setnsamples(self, value):
        self.resize(self.ADDR_SIZE + 3)
        self.data[self.ADDR_SIZE + 2] = value

    def getnsamples(self):
        return self.data[self.ADDR_SIZE + 2]
    nsamples = property(getnsamples, setnsamples)

    def setmask(self, value):
        self.resize(self.ADDR_SIZE + 5)
        self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8
        self.data[self.ADDR_SIZE + 4] = value & 0xff
    mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask)

    def __str__(self):
        rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender,
                                                             self.rssi, self.nsamples, self.mask)
        # Any DIO lines enabled?
        if (self.mask | 0x01ff):
            rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 |
                                            self.data[self.ADDR_SIZE + 6])
            offs = self.ADDR_SIZE + 7
        else:
            offs = self.ADDR_SIZE + 5
            
        # Any ADC lines enabled?
        if (self.mask | 0x7e00):
            for i in range(6):
                if (self.mask & 1 << (i + 9)):
                    rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 |
                                                      self.data[offs + 1])
                    offs = offs + 2

        return rtn

class RXIO_64_Bit(RXIO_16_Bit):
    PKT_TYPE = 0x82
    PKT_DESC = "RXIO Packet: 64 bit address"
    ADDR_SIZE = 8
    
class TX_16_Bit(RX_64_Bit):
    PKT_TYPE = 0x01
    PKT_DESC = "TX Packet: 16 bit address"
    ADDR_SIZE = 2

    def __init__(self, *args):
        if (len(args) == 1):
            if (type(args[0]) == type(int())):
                super(TX_16_Bit, self).__init__([])
                self.recipient = args[0]
            else:
                super(TX_16_Bit, self).__init__(args[0])
        elif (len(args) == 2):
            super(TX_16_Bit, self).__init__([])
            self.recipient = args[0]
            self.payload = args[1]
        else:
            raise TypeError("__init__ takes 1 list of ordinals or 2 strings")

        self.resize(self.ADDR_SIZE + 2)
        self.data[0] = 1 # XXX: could be smarter about Frame ID
        self.data[3] = 0 # XXX: should be able to set flags
        
    def getrecipient(self):
        value = 0
        for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)):
            value |= self.data[i + 1] << j
        return value
    
    def setrecipient(self, value):
        self.resize(self.ADDR_SIZE + 1)
        self.data[0] = 1 # XXX: could be smarter about Frame ID
        for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)):
            self.data[i + 1] = (value & (0xff << j)) >> j
    recipient = property(getrecipient, setrecipient)

    def setpayload(self, value):
        self.resize(self.ADDR_SIZE + 2)
        self.data[self.ADDR_SIZE + 2:] = value
    payload = property(lambda s: s.data[self.ADDR_SIZE + 2:], setpayload)
                       
class TX_64_Bit(TX_16_Bit):
    PKT_TYPE = 0x00
    PKT_DESC = "TX Packet: 64 bit address"
    ADDR_SIZE = 8

class TX_Status(PktBase):
    PKT_TYPE = 0x89
    PKT_DESC = "TX Status"
    statusTxt = ['OK', 'No Ack', 'CCA failure', 'Purged']
    frameid = property(lambda s: s.data[0], None)
    status = property(lambda s: s.data[1], None)
    statusMsg = property(lambda s: s.statusTxt[s.data[1]], None)
    
class Packets(object):
    PKT_CLASSES = None
    
    def Build(self, data):
        if (self.PKT_CLASSES == None):
            m = inspect.getmodule(self)
            # Generate list of objects from their names
            mobjs = map(lambda n: m.__dict__[n], m.__dict__)
            # Find all the classes
            pktclasses = filter(inspect.isclass, mobjs)
            # Find all subclasses of PktBase (but not PktBase itself)
            pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses)
            self.PKT_CLASSES = pktclasses
            
        for p in self.PKT_CLASSES:
            if (p.PKT_TYPE == data[0]):
                return(p(data[1:]))

        raise ValueError("Unknown packet type 0x%02x" % (data[0]))
    Build = classmethod(Build)

    def Encapsulate(data):
        pktsum = reduce(lambda x, y: x + y, data) & 0xff
        pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum]
        return(map(chr, pkt))
    
    Encapsulate = staticmethod(Encapsulate)

    def __init__(self, s):
        print str(inspect.getmodule(self))
        self.buffer = []
        self.state = 'init'
        self.packets = []
        
        self.bufmsb = 0
        self.dataleft = 0
        
        self.fr_err = 0 # Framing error
        self.ck_err = 0 # Checksum error
        self.rx_cnt = 0 # Packet count
        
        self.pktq = []
        self.s = s
        
    def writedata(self, data):
        self.s.write("".join(map(str, data)))
                
    def getdata(self):
        l = []
        while (1):
            a = self.s.read()
            if (a == ''):
                break
            l.append(ord(a))
    
        return self.process(l)
    
    def process(self, data):
        pktcount = 0
        for d in data:
            if (self.state == 'init'):
                if (d != 0x7e):
                    print "Framing error, got 0x%02x, expected 0x7e" % (d)
                    self.fr_err += 1
                    continue
                
                self.state = 'sizemsb'
            elif (self.state == 'sizemsb'):
                self.bufmsb = d
                self.state = 'sizelsb'
            elif (self.state == 'sizelsb'):
                self.dataleft = self.bufmsb << 8 | d
                self.state = 'data'
            elif (self.state == 'data'):
                self.buffer.append(d)
                self.dataleft = self.dataleft - 1
                if (self.dataleft == 0):
                    self.state = 'cksum'
            elif (self.state == 'cksum'):
                pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff
                rxcksum = d
                self.state = 'init'
                if (pktsum + rxcksum != 0xff):
                    self.buffer = []
                    self.ck_err += 1
                    print "Checksum error, got 0x%02x, expected 0x%02x" % \
                          (rxcksum, 0xff - pktsum)
                else:
                    print "Got a packet - " + str(self.buffer)
                    p = Packets.Build(self.buffer)
                    self.pktq.append(p)
                    self.buffer = []
                    pktcount += 1
                    self.rx_cnt += 1
            else:
                print "Invalid state %s! Resetting" % (self.state)
                self.state = 'init'
        
        return pktcount

#for c in dir():
#    if (issubclass(c, PktBase)):
#        print ..

s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \
		  stopbits=1, rtscts=0)
s.setTimeout(0.1)
#s.write('+++')
#s.readline(eol='\r')


# 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f
goodtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 15, 56]

# Checksum error
badtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 14, 56]

#0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff
adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50]

# Exception
badpkttypetest = [126, 0, 3, 10, 86, 76, 83]

up = Packets(s)
up.process(goodtest)
up.process(badtest)
up.process(adctest)
print up.pktq.pop(0)
print up.pktq.pop(0)