Mercurial > ~darius > hgwebdir.cgi > ZigBee
view EventGhost.py @ 26:ca8993488ac5 default tip
Use Python 2.7.
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Tue, 16 Apr 2013 10:23:29 +0930 |
parents | 2a1cea865cc0 |
children |
line wrap: on
line source
import eg import zb class PluginInfo(eg.PluginInfo): name = "ZigBee" author = "Darius" version = "1.0.0" description = "Listen for data from a Max Stream ZigBee module" class Text: port = "Port:" baudrate = "Baudrate:" bytesize = "Number of bits:" parity = "Parity:" parities = ['No parity', 'Odd', 'Even'] #, 'Mark', 'Space'] stopbits = "Stopbits:" flowcontrol = "Flow control:" handshakes = ['None', 'Xon / Xoff', 'Hardware'] generateEvents = "Generate events on incoming data" eventPrefix = "Event prefix:" import wx import threading import win32event import win32file class Serial(eg.RawReceiverPlugin): canMultiLoad = True text = Text def __init__(self): eg.RawReceiverPlugin.__init__(self) self.serial = None self.up = zb.Packets() def __start__( self, port, baudrate, bytesize=8, parity=0, stopbits=0, handshake=0, generateEvents=False, prefix="Serial", ): xonxoff = 0 rtscts = 0 if handshake == 1: xonxoff = 1 elif handshake == 2: rtscts = 1 try: self.serial = eg.SerialPort( port, baudrate=baudrate, bytesize=(5, 6, 7, 8)[bytesize], stopbits=(1, 2)[stopbits], parity=('N', 'O', 'E')[parity], xonxoff=xonxoff, rtscts=rtscts, ) except: self.serial = None raise eg.Exception("Can't open COM port.") self.serial.timeout = 1.0 self.serial.setRTS() self.info.eventPrefix = prefix self.stopEvent = win32event.CreateEvent(None, 1, 0, None) self.receiveThread = threading.Thread(target=self.ReceiveThread) self.receiveThread.start() def __stop__(self): if self.serial is not None: if self.receiveThread: win32event.SetEvent(self.stopEvent) self.receiveThread.join(1.0) self.serial.close() self.serial = None def HandleChar(self, ch): self.up.process(ch) while (len(self.up.pktq) > 0): p = self.up.pktq.pop(0) if (p.PKT_TYPE == zb.RXIO_16_Bit.PKT_TYPE): dios = (p._data[self.ADDR_SIZE + 5] << 8 | self._data[self.ADDR_SIZE + 6]) & p.mask self.TriggerEvent("0x%02x -> 0x%03x" % (p.sender, dios)) self.PrintError(str(p)) else: self.PrintError("Got unknown packet: " + str(self.buffer)) def ReceiveThread(self): from win32event import ( ResetEvent, MsgWaitForMultipleObjects, QS_ALLINPUT, WAIT_OBJECT_0, WAIT_TIMEOUT, ) from win32file import ReadFile, AllocateReadBuffer, GetOverlappedResult from win32api import GetLastError continueLoop = True overlapped = self.serial._overlappedRead hComPort = self.serial.hComPort hEvent = overlapped.hEvent stopEvent = self.stopEvent n = 1 waitingOnRead = False buf = AllocateReadBuffer(n) while continueLoop: if not waitingOnRead: ResetEvent(hEvent) hr, _ = ReadFile(hComPort, buf, overlapped) if hr == 997: waitingOnRead = True elif hr == 0: pass #n = GetOverlappedResult(hComPort, overlapped, 1) #self.HandleChar(str(buf)) else: self.PrintError("error") raise rc = MsgWaitForMultipleObjects( (hEvent, stopEvent), 0, 1000, QS_ALLINPUT ) if rc == WAIT_OBJECT_0: n = GetOverlappedResult(hComPort, overlapped, 1) if n: self.HandleChar(str(buf)) #else: # print "WAIT_OBJECT_0", n, str(buf[:n]) waitingOnRead = False elif rc == WAIT_OBJECT_0+1: continueLoop = False elif rc == WAIT_TIMEOUT: pass else: self.PrintError("unknown message") def Configure( self, port=0, baudrate=9600, bytesize=8, parity=0, stopbits=0, handshake=0, generateEvents=False, prefix="ZB", ): text = self.text dialog = eg.ConfigurationDialog(self) portCtrl = eg.SerialPortChoice(dialog, value=port) baudrateCtrl = wx.ComboBox( dialog, value=str(baudrate), choices=[ '110', '300', '600', '1200', '2400', '4800', '9600', '14400', '19200', '38400', '57600', '115200', '128000', '256000' ], style=wx.CB_DROPDOWN, validator=eg.DigitOnlyValidator() ) bytesizeCtrl = wx.Choice(dialog, choices=['5', '6', '7', '8']) bytesizeCtrl.SetSelection(8 - 5) parityCtrl = wx.Choice(dialog, choices=text.parities) parityCtrl.SetSelection(parity) stopbitsCtrl = wx.Choice(dialog, choices=['1', '2']) stopbitsCtrl.SetSelection(stopbits) handshakeCtrl = wx.Choice(dialog, choices=text.handshakes) handshakeCtrl.SetSelection(handshake) generateEventsCtrl = wx.CheckBox(dialog, label=text.generateEvents) generateEventsCtrl.SetValue(generateEvents) prefixCtrl = wx.TextCtrl(dialog) prefixCtrl.SetValue(prefix) prefixCtrl.Enable(generateEvents) def OnCheckBox(event): flag = generateEventsCtrl.GetValue() prefixCtrl.Enable(flag) generateEventsCtrl.Bind(wx.EVT_CHECKBOX, OnCheckBox) flags = wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL mySizer = wx.GridBagSizer(5, 5) Add = mySizer.Add Add(wx.StaticText(dialog, -1, text.port), (0, 0), flag=flags) Add(portCtrl, (0, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.baudrate), (1, 0), flag=flags) Add(baudrateCtrl, (1, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.bytesize), (2, 0), flag=flags) Add(bytesizeCtrl, (2, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.parity), (3, 0), flag=flags) Add(parityCtrl, (3, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.stopbits), (4, 0), flag=flags) Add(stopbitsCtrl, (4, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.flowcontrol), (5, 0), flag=flags) Add(handshakeCtrl, (5, 1), flag=wx.EXPAND) Add((5, 5), (6, 0), (1, 2), flag=flags) Add(generateEventsCtrl, (7, 0), (1, 2), flag=flags) Add(wx.StaticText(dialog, -1, text.eventPrefix), (9, 0), flag=flags) Add(prefixCtrl, (9, 1), flag=wx.EXPAND) dialog.sizer.Add(mySizer) if dialog.AffirmedShowModal(): return ( portCtrl.GetValue(), int(baudrateCtrl.GetValue()), bytesizeCtrl.GetSelection(), parityCtrl.GetSelection(), stopbitsCtrl.GetSelection(), handshakeCtrl.GetSelection(), generateEventsCtrl.GetValue(), prefixCtrl.GetValue(), )