Mercurial > ~darius > hgwebdir.cgi > EG-ZigBee
view __init__.py @ 0:c978dbb7e4b8
Commit first version of plugin to parse events from MaxStream ZigBee modules.
At the moment it is not very configurable and only listens for evens from
remote modules set up to send IO events.
author | darius@inchoate.localdomain |
---|---|
date | Sun, 28 Oct 2007 11:08:38 +1030 |
parents | |
children | 1e65aca0f39c |
line wrap: on
line source
import eg class PluginInfo(eg.PluginInfo): name = "ZigBee" author = "Darius" version = "1.0.0" description = "Listen for data from a Max Stream ZigBee module" class Text: port = "Port:" baudrate = "Baudrate:" bytesize = "Number of bits:" parity = "Parity:" parities = ['No parity', 'Odd', 'Even'] #, 'Mark', 'Space'] stopbits = "Stopbits:" flowcontrol = "Flow control:" handshakes = ['None', 'Xon / Xoff', 'Hardware'] generateEvents = "Generate events on incoming data" eventPrefix = "Event prefix:" import wx import threading import win32event import win32file import struct class Serial(eg.RawReceiverPlugin): canMultiLoad = True text = Text def __init__(self): eg.RawReceiverPlugin.__init__(self) self.serial = None self.buffer = [] self.state = 'init' def __start__( self, port, baudrate, bytesize=8, parity=0, stopbits=0, handshake=0, generateEvents=False, prefix="Serial", ): xonxoff = 0 rtscts = 0 if handshake == 1: xonxoff = 1 elif handshake == 2: rtscts = 1 try: self.serial = eg.SerialPort( port, baudrate=baudrate, bytesize=(5, 6, 7, 8)[bytesize], stopbits=(1, 2)[stopbits], parity=('N', 'O', 'E')[parity], xonxoff=xonxoff, rtscts=rtscts, ) except: self.serial = None raise eg.Exception("Can't open COM port.") self.serial.timeout = 1.0 self.serial.setRTS() self.info.eventPrefix = prefix self.stopEvent = win32event.CreateEvent(None, 1, 0, None) self.receiveThread = threading.Thread(target=self.ReceiveThread) self.receiveThread.start() def __stop__(self): if self.serial is not None: if self.receiveThread: win32event.SetEvent(self.stopEvent) self.receiveThread.join(1.0) self.serial.close() self.serial = None def HandleChar(self, ch): if (self.state == 'init'): if (ch != '\x7e'): return self.state = 'sizemsb' return elif (self.state == 'sizemsb'): self.bufszmsb = struct.unpack('B', ch)[0] self.state = 'sizelsb' return elif (self.state == 'sizelsb'): self.bufszlsb = struct.unpack('B', ch)[0] self.dataleft = self.bufszmsb << 8 | self.bufszlsb self.state = 'data' return elif (self.state == 'data'): self.buffer.append(struct.unpack('B', ch)[0]) self.dataleft = self.dataleft - 1 if (self.dataleft == 0): self.state = 'cksum' return elif (self.state == 'cksum'): cksum = reduce(lambda x, y: x + y, self.buffer) & 0xff rxcksum = struct.unpack('B', ch)[0] if (cksum + rxcksum != 0xff): self.PrintError("Bad checksum, got 0x%02x, expected 0x%02x" % cksum, 0xff - cksum) else: self.PrintError("Triggered event: " + str(self.buffer)) self.TriggerEvent(str(self.buffer)) self.buffer = [] self.state = 'init' return else: self.PrintError("Internal error: bad state: " + str(self.state)) self.state = 'init' def ReceiveThread(self): from win32event import ( ResetEvent, MsgWaitForMultipleObjects, QS_ALLINPUT, WAIT_OBJECT_0, WAIT_TIMEOUT, ) from win32file import ReadFile, AllocateReadBuffer, GetOverlappedResult from win32api import GetLastError continueLoop = True overlapped = self.serial._overlappedRead hComPort = self.serial.hComPort hEvent = overlapped.hEvent stopEvent = self.stopEvent n = 1 waitingOnRead = False buf = AllocateReadBuffer(n) while continueLoop: if not waitingOnRead: ResetEvent(hEvent) hr, _ = ReadFile(hComPort, buf, overlapped) if hr == 997: waitingOnRead = True elif hr == 0: pass #n = GetOverlappedResult(hComPort, overlapped, 1) #self.HandleChar(str(buf)) else: self.PrintError("error") raise rc = MsgWaitForMultipleObjects( (hEvent, stopEvent), 0, 1000, QS_ALLINPUT ) if rc == WAIT_OBJECT_0: n = GetOverlappedResult(hComPort, overlapped, 1) if n: self.HandleChar(str(buf)) #else: # print "WAIT_OBJECT_0", n, str(buf[:n]) waitingOnRead = False elif rc == WAIT_OBJECT_0+1: continueLoop = False elif rc == WAIT_TIMEOUT: pass else: self.PrintError("unknown message") def Configure( self, port=0, baudrate=9600, bytesize=8, parity=0, stopbits=0, handshake=0, generateEvents=False, prefix="Serial", ): text = self.text dialog = eg.ConfigurationDialog(self) portCtrl = eg.SerialPortChoice(dialog, value=port) baudrateCtrl = wx.ComboBox( dialog, value=str(baudrate), choices=[ '110', '300', '600', '1200', '2400', '4800', '9600', '14400', '19200', '38400', '57600', '115200', '128000', '256000' ], style=wx.CB_DROPDOWN, validator=eg.DigitOnlyValidator() ) bytesizeCtrl = wx.Choice(dialog, choices=['5', '6', '7', '8']) bytesizeCtrl.SetSelection(8 - 5) parityCtrl = wx.Choice(dialog, choices=text.parities) parityCtrl.SetSelection(parity) stopbitsCtrl = wx.Choice(dialog, choices=['1', '2']) stopbitsCtrl.SetSelection(stopbits) handshakeCtrl = wx.Choice(dialog, choices=text.handshakes) handshakeCtrl.SetSelection(handshake) generateEventsCtrl = wx.CheckBox(dialog, label=text.generateEvents) generateEventsCtrl.SetValue(generateEvents) prefixCtrl = wx.TextCtrl(dialog) prefixCtrl.SetValue(prefix) prefixCtrl.Enable(generateEvents) def OnCheckBox(event): flag = generateEventsCtrl.GetValue() prefixCtrl.Enable(flag) generateEventsCtrl.Bind(wx.EVT_CHECKBOX, OnCheckBox) flags = wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL mySizer = wx.GridBagSizer(5, 5) Add = mySizer.Add Add(wx.StaticText(dialog, -1, text.port), (0, 0), flag=flags) Add(portCtrl, (0, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.baudrate), (1, 0), flag=flags) Add(baudrateCtrl, (1, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.bytesize), (2, 0), flag=flags) Add(bytesizeCtrl, (2, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.parity), (3, 0), flag=flags) Add(parityCtrl, (3, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.stopbits), (4, 0), flag=flags) Add(stopbitsCtrl, (4, 1), flag=wx.EXPAND) Add(wx.StaticText(dialog, -1, text.flowcontrol), (5, 0), flag=flags) Add(handshakeCtrl, (5, 1), flag=wx.EXPAND) Add((5, 5), (6, 0), (1, 2), flag=flags) Add(generateEventsCtrl, (7, 0), (1, 2), flag=flags) Add(wx.StaticText(dialog, -1, text.eventPrefix), (9, 0), flag=flags) Add(prefixCtrl, (9, 1), flag=wx.EXPAND) dialog.sizer.Add(mySizer) if dialog.AffirmedShowModal(): return ( portCtrl.GetValue(), int(baudrateCtrl.GetValue()), bytesizeCtrl.GetSelection(), parityCtrl.GetSelection(), stopbitsCtrl.GetSelection(), handshakeCtrl.GetSelection(), generateEventsCtrl.GetValue(), prefixCtrl.GetValue(), )