Mercurial > ~darius > hgwebdir.cgi > pyinst
view usb488.py @ 74:b6ebe05f250f
Add some commentry about what it works with
author | Daniel O'Connor <doconnor@gsoft.com.au> |
---|---|
date | Wed, 25 Sep 2024 21:10:01 +0930 |
parents | 7386f2888508 |
children |
line wrap: on
line source
#!/usr/bin/env python # Copyright (c) 2009 # Daniel O'Connor <darius@dons.net.au>. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # # Spec/info.. # # http://www.usb.org/developers/devclass_docs/USBTMC_1_006a.zip # http://svn.openmoko.org/developers/werner/ahrt/host/tmc/README # http://www.home.agilent.com/agilent/redirector.jspx?action=ref&cname=AGILENT_EDITORIAL&ckey=1189335&lc=eng&cc=US&nfr=-35560.0.00 # linux-2.6.29.3/drivers/usb/class/usbtmc.c # http://sdpha2.ucsd.edu/Lab_Equip_Manuals/usbtmc_usb488_subclass_1_00.pdf # import time import usb from functools import reduce # # The usual SCPI commands are wrapped before being sent. # # Write: # Offset Field Size Value Description # 0 MsgID 1 0x01 DEV_DEP_MSG_OUT # 1 bTag 1 0x01 Varies with each transfer # 2 bTagInverse 1 0xfe Inverse of previous field # 3 Reserved 1 0x00 # 4 TransferSize 4 0x06 # 5 .. 0x00 # 6 .. 0x00 # 7 .. 0x00 # 8 bmTransferAttr 1 0x01 1 == end of msg # 9 Reserved 1 0x00 # 10 Reserved 1 0x00 # 11 Reserved 1 0x00 # 12 Msg itself 1 0x2a '*' # 13 1 0x49 'I' # 14 1 0x44 'D' # 15 1 0x4e 'N' # 16 1 0x3f '?' # 17 1 0x0a '\n' # 18-19 Alignment 2 0x0000 Bring into 4 byte alignment # # # Send a read request: # Offset Field Size Value Description # 0 MsgID 1 0x02 REQUEST_DEV_DEP_MSG_IN # 1 bTag 1 0x02 Varies with each transfer # 2 bTagInverse 1 0xfd Inverse of previous field # 3 Reserved 1 0x00 # 4 TransferSize 4 0x64 # 5 .. 0x00 # 6 .. 0x00 # 7 .. 0x00 # 8 bmTransferAttr 1 0x00 # 9 Term char 1 0x00 # 10 Reserved 1 0x00 # 11 Reserved 1 0x00 # No libusb versions of these available USB_CLASS_APP_SPECIFIC = 254 USB_SUBCLASS_TMC = 3 USB_PROTOCOL_488 = 1 # USB488 message IDs DEV_DEP_MSG_OUT = 1 REQUEST_DEV_DEP_MSG_IN = 2 DEV_DEP_MSG_IN = 2 # USB TMC control requests INITIATE_ABORT_BULK_OUT = 1 CHECK_ABORT_BULK_OUT_STATUS = 2 INITIATE_ABORT_BULK_IN = 3 CHECK_ABORT_BULK_IN_STATUS = 4 INITIATE_CLEAR = 5 CHECK_CLEAR_STATUS = 6 GET_CAPABILITIES = 7 INDICATOR_PULSE = 64 # USB488 READ_STATUS_BYTE = 128 REN_CONTROL = 160 GO_TO_LOCAL = 161 LOCAL_LOCKOUT = 162 # Interface capability bits IF_CAP_LISTEN_ONLY = 0x01 IF_CAP_TALK_ONLY = 0x02 IF_CAP_HAS_INDICATOR = 0x04 # Device capability bits DEV_CAP_TERM_CHAR = 0x01 # USB488 interface capbility bits USB488_IFCAP_TRIG = 0x01 USB488_IFCAP_GO_LOC = 0x02 USB488_IFCAP_488_2 = 0x04 # USB488 device capbility bits USB488_DEVCAP_DT1 = 0x01 USB488_DEVCAP_RL1 = 0x02 USB488_DEVCAP_SR1 = 0x04 USB488_DEVCAP_SCPI = 0x08 # USBTMC status definitions STATUS_SUCCESS = 0x01 STATUS_PENDING = 0x02 STATUS_FAILED = 0x80 STATUS_TRANSFER_NOT_IN_PROGRESS = 0x81 STATUS_SPLIT_NOT_IN_PROGRESS = 0x82 STATUS_SPLIT_IN_PROGRESS = 0x83 # SCPI error/event queue status register bit STATUS_EVE_QUEUE = 0x04 class USB488Device(object): def __init__(self, vendor = None, product = None, serial = None, path = None): """Search for a USB488 class device, if specified vendor, product, serial and path will refine the search""" busses = usb.busses() # # Search for the device we want # found = False for bus in busses: for dev in bus.devices: # Skip ones that don't match if vendor != None and dev.idVendor != vendor: continue if product != None and dev.idProduct != product: continue if serial != None and dev.idSerialNumber != serial: continue if path != None and dev.filename != path: continue # The libusb examples say you can check for device # class and then open, however in that case you can't # find the endpoint number which seems pretty useless # unless you want to hard code everything. for confidx in range(len(dev.configurations)): for iface in dev.configurations[confidx].interfaces: for altif in iface: # Check if this is a USB488 capable interface if altif.interfaceClass == USB_CLASS_APP_SPECIFIC and \ altif.interfaceSubClass == USB_SUBCLASS_TMC and \ altif.interfaceProtocol == USB_PROTOCOL_488: found = True break if found: break if found: break if found: break if found: break if not found: raise BaseException("Could not find a suitable USB device") # Open the device and claim the USB interface that supports the spec handle = dev.open() handle.setConfiguration(dev.configurations[confidx].value) handle.claimInterface(altif.interfaceNumber) handle.setAltInterface(altif.alternateSetting) self.dev = dev self.handle = handle # Get some info for humans try: self.vendname = handle.getString(dev.iManufacturer, 1024) except ValueError: self.vendname = None try: self.prodname = handle.getString(dev.iProduct, 1024) except ValueError: self.prodname = None try: self.serial = handle.getString(dev.iSerialNumber, 1024) except ValueError: self.serial = None # Determine the endpoints for each operation type self.intrep = self.bulkinep = self.bulkoutep = None for ep in altif.endpoints: if ep.type == usb.ENDPOINT_TYPE_INTERRUPT and \ ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN: self.intrep = ep.address if ep.type == usb.ENDPOINT_TYPE_BULK: if ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN: self.bulkinep = ep.address else: self.bulkoutep = ep.address self.maxPacket = ep.maxPacketSize # Required for 488.2 devices, optional otherwise if self.intrep == None: print("Can't find interrupt endpoint") # Data from the device (mandatory) if self.bulkinep == None: raise BaseException("Can't find bulk-in endpoint") # Data to the device (mandatory) if self.bulkoutep == None: raise BaseException("Can't find bulk-out endpoint") self.tag = 1 #self.init() def init(self): # Flush out any pending data self.initiateClear() # Perform dummy write/read otherwise the next read times out try: self.ask('*STB?', timeout = 0.001) except usb.USBError: pass # Clear status register for i in range(10): self.write('*CLS') if int(self.ask('*STB?')) & STATUS_EVE_QUEUE == 0: break else: raise BaseException('Unable to clear status register') def __str__(self): rtn = "Mfg: %s Prod: %s" % (self.vendname, self.prodname) if self.serial != "": rtn += " S/N: " + self.serial return rtn def gettag(self): tag = self.tag self.tag = (self.tag + 1) % 255 if self.tag == 0: self.tag += 1 return tag def write(self, data): """Send data (string) to the instrument""" orddata = list(map(ord, data)) # The device needs a \n at the end, enfore this if orddata[-1] != '\n': orddata += [ord('\n')] datalen = len(orddata) # Build the packet tag = self.gettag() pkt = [ DEV_DEP_MSG_OUT, tag, ~tag & 0xff, 0x00, datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff, datalen >> 24 & 0xff, 1, 0, 0, 0 ] # Add the data pkt = pkt + orddata # Align to 4 bytes alignlen = ((len(pkt) // 4) + 1) * 4 pkt = pkt + [0] * (alignlen - len(pkt)) # Split it up into maxPacket sized chunks and send.. # XXX; this is not correct, need a header for each one while len(pkt) > 0: chunk = pkt[0:self.maxPacket] pkt = pkt[self.maxPacket:] #print("Sending %d bytes of data: %s" % (len(chunk), chunk)) wrote = self.handle.bulkWrite(self.bulkoutep, chunk) if wrote != len(chunk): raise BaseException("Short write, got %d, expected %d" % (wrote, len(chunk))) def read(self, timeout = None): """Read data from the device, waits for up to timeout seconds for each USB transaction""" if timeout == None: timeout = 1 # Mangle into milliseconds _timeout = int(timeout * 1000.0) # Maximum we accept at once # Was 2^31 - 1 but that seems to make things take too long to # read (perhaps libusb tries to malloc it..) datalen = 10240 data = [] while True: # Ask the device to send us something tag = self.gettag() pkt = [ REQUEST_DEV_DEP_MSG_IN, tag, ~tag & 0xff, 0x00, datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff, datalen >> 24 & 0xff, 0, 0, 0, 0] # Send it #print("Sending " + str(pkt)) wrote = self.handle.bulkWrite(self.bulkoutep, pkt, _timeout) if wrote != len(pkt): print("Short write, got %d, expected %d" % (wrote, len(pkt))) #print("Reading..") read = self.handle.bulkRead(self.bulkinep, datalen, _timeout) #print("Read %s bytes: %s" % (len(read), str(read))) if read[0] != DEV_DEP_MSG_IN: raise BaseException("Unexpected Msg ID, got %s expected %d" % (read[0], DEV_DEP_MSG_IN)) if read[1] != tag: raise BaseException("Unexpected tag, got %d expected %d" % (read[1], tag)) if read[2] != ~tag & 0xff: raise BaseException("Unexpected tag inverse, got %d expected %d" % (read[1], ~tag & 0xff)) actualdata = read[4] | read[5] << 8 | read[6] << 16 | read[7] << 24 #print("Computed datalen is %d" % (actualdata)) data += read[12:12 + actualdata] if read[8] & 0x01: #print("Breaking out due to EOM") break # Stringify result for easier consumption result = reduce(lambda x, y: x+y, list(map(chr, data))) # Trim off \n if present if result[-1] == '\n': result = result[0:-1] return result def ask(self, s, timeout = None): '''Wrapper to send a command and wait for a reply''' self.write(s) return self.read(timeout = timeout) def chkcmd(self, cmd): '''Wrapper to send a command and check for an error''' self.write(cmd) if int(self.ask('*STB?')) & STATUS_EVE_QUEUE != 0: self.write('*CLS') raise BaseException('Command failed') def isConnected(self): """Check if the device is present""" # libusb makes it very hard (at least on FreeBSD) to determine if we're still connected. # This is a reasonable proxy.. try: self.handle.getString(self.dev.iManufacturer, 100) except USBError as e: return False return True def getCapabilities(self): '''Returns interface, device and USB488 capability bytes (see IF_CAP_*, DEV_CAP_*, USB488_IFCAP_* and USB488_DEVCAP_*)''' res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, GET_CAPABILITIES, 0x18) return res[4], res[5], res[14], res[15] def indicatorPulse(self): '''Send an indicator pulse request''' res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, INDICATOR_PULSE, 0x01) def initiateClear(self): '''Send request to clear all transfers and wait until the device reports it is done and clear stalls''' res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, INITIATE_CLEAR, 0x01) if res[0] == STATUS_SUCCESS: while True: res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, CHECK_CLEAR_STATUS, 0x02) if res[0] != STATUS_PENDING: break time.sleep(0.1) else: raise BaseException('INITIATE_CLEAR returned 0x%02x' % (res[0])) self.handle.clearHalt(self.bulkinep) self.handle.clearHalt(self.bulkoutep) def renControl(self): '''Send enable remote control message''' res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, REN_CONTROL, 1, 0xff) return res[0] def getStatus(self): '''Returns IEEE 488 status byte''' tag = self.gettag() res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, READ_STATUS_BYTE, 3, tag) if res[1] != tag: raise BaseException('Tag mismatch, got 0x%02x, expected 0x%02x' % (res[1], tag)) if res[0] != STATUS_SUCCESS: raise BaseException('Unit returned invalid USBTMC status: %d' % (res[0],)) return res[2] def abortIO(self, tag, isout): if isout: req = INITIATE_ABORT_BULK_OUT chkreq = CHECK_ABORT_BULK_OUT_STATUS ep = self.bulkoutep name = 'out' else: req = INITIATE_ABORT_BULK_IN chkreq = CHECK_ABORT_BULK_IN_STATUS ep = self.bulkinep name = 'in' res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_ENDPOINT, req, 2, value = tag, index = ep) print('Initiate abort returned ' + str(res)) while True: res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_ENDPOINT, chkreq, 8, value = 0x00, index = ep) print('Check abort returned ' + str(res)) if res[0] == STATUS_PENDING: print('Status pending for %s abort' % (name,)) time.sleep(1) elif res[0] == STATUS_SUCCESS or res[0] == STATUS_TRANSFER_NOT_IN_PROGRESS: break else: raise BaseException('Invalid status reply to check abort %s 0x%02x' % (name, res[0]))