Mercurial > ~darius > hgwebdir.cgi > EG-ZigBee
view __init__.py @ 3:afd270964f5b default tip
Lots of changes.. Reformatting, culled unused stuff, fixed checksums, changed output
format (so RSSI isn't part of the trigger(!)), etc..
author | darius@inchoate.localdomain |
---|---|
date | Sat, 03 Nov 2007 10:56:05 +1030 |
parents | 1e65aca0f39c |
children |
line wrap: on
line source
import eg class PluginInfo(eg.PluginInfo): name = "ZigBee" author = "Darius" version = "1.0.0" description = "Listen for data from a Max Stream ZigBee module" class Text: port = "Port:" baudrate = "Baudrate:" bytesize = "Number of bits:" parity = "Parity:" parities = ['No parity', 'Odd', 'Even'] #, 'Mark', 'Space'] stopbits = "Stopbits:" flowcontrol = "Flow control:" handshakes = ['None', 'Xon / Xoff', 'Hardware'] generateEvents = "Generate events on incoming data" eventPrefix = "Event prefix:" import wx import threading import win32event import win32file class Serial(eg.RawReceiverPlugin): canMultiLoad = True text = Text def __init__(self): eg.RawReceiverPlugin.__init__(self) self.serial = None self.buffer = [] self.state = 'init' def __start__( self, port, baudrate, bytesize=8, parity=0, stopbits=0, handshake=0, generateEvents=False, prefix="Serial", ): xonxoff = 0 rtscts = 0 if handshake == 1: xonxoff = 1 elif handshake == 2: rtscts = 1 try: self.serial = eg.SerialPort( port, baudrate=baudrate, bytesize=(5, 6, 7, 8)[bytesize], stopbits=(1, 2)[stopbits], parity=('N', 'O', 'E')[parity], xonxoff=xonxoff, rtscts=rtscts, ) except: self.serial = None raise eg.Exception("Can't open COM port.") self.serial.timeout = 1.0 self.serial.setRTS() self.info.eventPrefix = prefix self.stopEvent = win32event.CreateEvent(None, 1, 0, None) self.receiveThread = threading.Thread(target=self.ReceiveThread) self.receiveThread.start() def __stop__(self): if self.serial is not None: if self.receiveThread: win32event.SetEvent(self.stopEvent) self.receiveThread.join(1.0) self.serial.close() self.serial = None def HandleChar(self, ch): if (self.state == 'init'): if (ch != '\x7e'): return self.state = 'sizemsb' return elif (self.state == 'sizemsb'): self.bufszmsb = ord(ch) self.state = 'sizelsb' return elif (self.state == 'sizelsb'): self.bufszlsb = ord(ch) self.dataleft = self.bufszmsb << 8 | self.bufszlsb self.state = 'data' return elif (self.state == 'data'): self.buffer.append(ord(ch)) self.dataleft = self.dataleft - 1 if (self.dataleft == 0): self.state = 'cksum' return elif (self.state == 'cksum'): pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff rxcksum = ord(ch) self.state = 'init' if (pktsum + rxcksum != 0xff): self.buffer = [] self.PrintError("Bad checksum, got 0x%02x, expected 0x%02x" % rxcksum, 0xff - pktsum) else: if (self.buffer[0] == 0x83): src = self.buffer[1] << 8 | self.buffer[2] rssi = -1 * self.buffer[3] flags = self.buffer[4] nsamples = self.buffer[5] mask = self.buffer[6] << 8 | self.buffer[7] if (mask | 0x01ff): dios = self.buffer[8] << 8 | self.buffer[9] else: dios = 0 # ADC samples would follow, we ignore them, also # ignore multiple samples self.TriggerEvent("0x%02x -> 0x%03x" % (src, dios)) self.PrintError("0x%02x %d dBm, flags 0x%02x, nsamples %d, mask 0x%03x -> 0x%03x" % (src, rssi, flags, nsamples, mask, dios)) else: self.PrintError("Got unknown packet: " + str(self.buffer)) self.buffer = [] return else: self.PrintError("Internal error: bad state: " + str(self.state)) self.state = 'init' def ReceiveThread(self): from win32event import ( ResetEvent, MsgWaitForMultipleObjects, QS_ALLINPUT, WAIT_OBJECT_0, WAIT_TIMEOUT, ) from win32file import ReadFile, AllocateReadBuffer, GetOverlappedResult from win32api import GetLastError continueLoop = True overlapped = self.serial._overlappedRead hComPort = self.serial.hComPort hEvent = overlapped.hEvent stopEvent = self.stopEvent n = 1 waitingOnRead = False buf = AllocateReadBuffer(n) while continueLoop: if not waitingOnRead: ResetEvent(hEvent) hr, _ = ReadFile(hComPort, buf, overlapped) if hr == 997: waitingOnRead = True elif hr == 0: pass #n = GetOverlappedResult(hComPort, overlapped, 1) #self.HandleChar(str(buf)) else: self.PrintError("error") raise rc = MsgWaitForMultipleObjects( (hEvent, stopEvent), 0, 1000, QS_ALLINPUT ) if rc == WAIT_OBJECT_0: n = GetOverlappedResult(hComPort, overlapped, 1) if n: self.HandleChar(str(buf)) #else: # print "WAIT_OBJECT_0", n, str(buf[:n]) waitingOnRead = False elif rc == WAIT_OBJECT_0+1: continueLoop = False elif rc == WAIT_TIMEOUT: pass else: self.PrintError("unknown message") def Configure( self, port=0, baudrate=9600, bytesize=8, parity=0, stopbits=0, handshake=0, generateEvents=False, prefix="ZB", ): text = self.text dialog = eg.ConfigurationDialog(self) portCtrl = eg.SerialPortChoice(dialog, value=port) baudrateCtrl = wx.ComboBox( dialog, value=str(baudrate), choices=[ '110', '300', '600', '1200', '2400', '4800', '9600', '14400', '19200', '38400', '57600', '115200', '128000', '256000' ], style=wx.CB_DROPDOWN, validator=eg.DigitOnlyValidator() ) bytesizeCtrl = wx.Choice(dialog, choices=['5', '6', '7', '8']) bytesizeCtrl.SetSelection(8 - 5) parityCtrl = wx.Choice(dialog, choices=text.parities) parityCtrl.SetSelection(parity) stopbitsCtrl = wx.Choice(dialog, choices=['1', '2']) stopbitsCtrl.SetSelection(stopbits) handshakeCtrl = wx.Choice(dialog, choices=text.handshakes) handshakeCtrl.SetSelection(handshake) generateEventsCtrl = wx.CheckBox(dialog, label=text.generateEvents) generateEventsCtrl.SetValue(generateEvents) prefixCtrl = wx.TextCtrl(dialog) prefixCtrl.SetValue(prefix) prefixCtrl.Enable(generateEvents) def OnCheckBox(event): flag = generateEventsCtrl.GetValue() prefixCtrl.Enable(flag) generateEventsCtrl.Bind(wx.EVT_CHECKBOX, OnCheckBox) flags = wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL mySizer = wx.GridBagSizer(5, 5) Add = mySizer.Add Add(wx.StaticText(dialog, -1, text.port), (0, 0), flag=flags) Add(portCtrl, (0, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.baudrate), (1, 0), flag=flags) Add(baudrateCtrl, (1, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.bytesize), (2, 0), flag=flags) Add(bytesizeCtrl, (2, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.parity), (3, 0), flag=flags) Add(parityCtrl, (3, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.stopbits), (4, 0), flag=flags) Add(stopbitsCtrl, (4, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.flowcontrol), (5, 0), flag=flags) Add(handshakeCtrl, (5, 1), flag=wx.EXPAND) Add((5, 5), (6, 0), (1, 2), flag=flags) Add(generateEventsCtrl, (7, 0), (1, 2), flag=flags) Add(wx.StaticText(dialog, -1, text.eventPrefix), (9, 0), flag=flags) Add(prefixCtrl, (9, 1), flag=wx.EXPAND) dialog.sizer.Add(mySizer) if dialog.AffirmedShowModal(): return ( portCtrl.GetValue(), int(baudrateCtrl.GetValue()), bytesizeCtrl.GetSelection(), parityCtrl.GetSelection(), stopbitsCtrl.GetSelection(), handshakeCtrl.GetSelection(), generateEventsCtrl.GetValue(), prefixCtrl.GetValue(), )