Mercurial > ~darius > hgwebdir.cgi > pyinst
view usb488.py @ 0:a43a47dfc902
First stab at code that actually works!
Example:
import usb488
u = usb488.USB488Device()
u.write("*IDN?\n")
Sending 20 bytes of data: [1, 1, 254, 0, 6, 0, 0, 0, 1, 0, 0, 0, 42, 73, 68, 78, 63, 10, 0, 0]
a = u.read()
Read 60 bytes: (2, 2, 253, 0, 48, 0, 0, 0, 1, 0, 0, 0, 84, 69, 75, 84, 82, 79, 78, 73, 88, 44, 84, 68, 83, 32
, 50, 48, 50, 52, 66, 44, 67, 48, 52, 55, 50, 54, 52, 44, 67, 70, 58, 57, 49, 46, 49, 67, 84, 32, 70, 86, 58,
118, 50, 50, 46, 48, 49, 10)
s = reduce(lambda x, y: x+y, map(chr, a))
print s
xxxxTEKTRONIX,TDS 2024B,C047264,CF:91.1CT FV:v22.01
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Wed, 13 May 2009 14:44:40 +0930 |
parents | |
children | e2089824735a |
line wrap: on
line source
#!/usr/bin/env python # # Spec/info.. # # http://www.usb.org/developers/devclass_docs/USBTMC_1_006a.zip # http://svn.openmoko.org/developers/werner/ahrt/host/tmc/README # http://www.home.agilent.com/agilent/redirector.jspx?action=ref&cname=AGILENT_EDITORIAL&ckey=1189335&lc=eng&cc=US&nfr=-35560.0.00 # linux-2.6.29.3/drivers/usb/class/usbtmc.c # import usb # # The usual SCPI commands are wrapped before being sent. # # Write: # Offset Field Size Value Description # 0 MsgID 1 0x01 DEV_DEP_MSG_OUT # 1 bTag 1 0x01 Varies with each transfer # 2 bTagInverse 1 0xfe Inverse of previous field # 3 Reserved 1 0x00 # 4 TransferSize 4 0x06 # 5 .. 0x00 # 6 .. 0x00 # 7 .. 0x00 # 8 bmTransferAttr 1 0x01 1 == end of msg # 9 Reserved 1 0x00 # 10 Reserved 1 0x00 # 11 Reserved 1 0x00 # 12 Msg itself 1 0x2a '*' # 13 1 0x49 'I' # 14 1 0x44 'D' # 15 1 0x4e 'N' # 16 1 0x3f '?' # 17 1 0x0a '\n' # 18-19 Alignment 2 0x0000 Bring into 4 byte alignment # # # Send a read request: # Offset Field Size Value Description # 0 MsgID 1 0x02 REQUEST_DEV_DEP_MSG_IN # 1 bTag 1 0x02 Varies with each transfer # 2 bTagInverse 1 0xfd Inverse of previous field # 3 Reserved 1 0x00 # 4 TransferSize 4 0x64 # 5 .. 0x00 # 6 .. 0x00 # 7 .. 0x00 # 8 bmTransferAttr 1 0x00 # 9 Term char 1 0x00 # 10 Reserved 1 0x00 # 11 Reserved 1 0x00 # Tektronix TDS2024B USB_VEND_TEKTRONIX = 1689 USB_PROD_TEKTORNIX = 874 # No libusb versions of these available USB_CLASS_APP_SPECIFIC = 254 USB_SUBCLASS_TMC = 3 USB_PROTOCOL_488 = 1 # USB488 message IDs DEV_DEP_MSG_OUT = 1 REQUEST_DEV_DEP_MSG_IN = 2 DEV_DEP_MSG_IN = 2 class USB488Device(object): def __init__(self, vendor = None, product = None, serial = None, path = None): """Search for a USB488 class device, if specified vendor, product, serial and path will refine the search""" busses = usb.busses() # # Search for the device we want # found = False for bus in busses: for dev in bus.devices: # Skip ones that don't match if vendor != None and dev.idVendor != vendor: continue if product != None and dev.idProduct != product: continue if serial != None and dev.idSerialNumber != serial: continue if path != None and dev.filename != path: continue # The libusb examples say you can check for device # class and then open, however in that case you can't # find the endpoint number which seems pretty useless # unless you want to hard code everything. for confidx in xrange(len(dev.configurations)): for iface in dev.configurations[confidx].interfaces: for altif in iface: # Check if this is a USB488 capable interface if altif.interfaceClass == USB_CLASS_APP_SPECIFIC and \ altif.interfaceSubClass == USB_SUBCLASS_TMC and \ altif.interfaceProtocol == USB_PROTOCOL_488: found = True break if found: break if found: break if found: break if found: break if not found: raise "Could not find a suitable USB device" # Open the device and claim the USB interface that supports the spec self.handle = dev.open() self.handle.setConfiguration(dev.configurations[confidx].value) self.handle.claimInterface(altif.interfaceNumber) self.handle.setAltInterface(altif.alternateSetting) # Get some info for humans self.vendname = self.handle.getString(dev.iManufacturer, 1024) self.prodname = self.handle.getString(dev.iProduct, 1024) self.serial = self.handle.getString(dev.iSerialNumber, 1024) # Determine the endpoints for each operation type self.intrep = self.bulkinep = self.bulkoutep = None for ep in altif.endpoints: if ep.type == usb.ENDPOINT_TYPE_INTERRUPT and \ ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN: self.intrep = ep.address if ep.type == usb.ENDPOINT_TYPE_BULK: if ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN: self.bulkinep = ep.address else: self.bulkoutep = ep.address self.maxPacket = ep.maxPacketSize # Required for 488.2 devices, optional otherwise if self.intrep == None: print "Can't find interrupt endpoint" # Data from the scope (mandatory) if self.bulkinep == None: raise "Can't find bulk-in endpoint" # Data to the scope (mandatory) if self.bulkoutep == None: raise "Can't find bulk-out endpoint" self.tag = 1 def __str__(self): rtn = "Mfg: %s Prod: %s" % (self.vendname, self.prodname) if self.serial != "": rtn += " S/N: " + self.serial return rtn def incrtag(self): self.tag += 1 if self.tag == 0: self.tag += 1 def write(self, data): """Send data (string) to the scope""" orddata = map(ord, data) datalen = len(orddata) # Build the packet pkt = [ DEV_DEP_MSG_OUT, self.tag, ~self.tag & 0xff, 0x00, datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff, datalen >> 24 & 0xff, 1, 0, 0, 0 ] # Add the data pkt = pkt + orddata # Align to 4 bytes alignlen = ((len(pkt) / 4) + 1) * 4 pkt = pkt + [0] * (alignlen - len(pkt)) # Bump the tag self.incrtag() # Split it up into maxPacket sized chunks and send.. while len(pkt) > 0: chunk = pkt[0:self.maxPacket] pkt = pkt[self.maxPacket:] print "Sending %s bytes of data: %s" % (len(chunk), chunk) wrote = self.handle.bulkWrite(self.bulkoutep, chunk) if wrote != len(chunk): raise "Short write, got %d, expected %d" % (wrote, len(chunk)) def read(self): """Read data from the device""" datalen = 1024 # Ask the device to send us something pkt = [ REQUEST_DEV_DEP_MSG_IN, self.tag, ~self.tag & 0xff, 0x00, datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff, datalen >> 24 & 0xff, 0, 0, 0, 0] # Bump tag self.incrtag() # Send it wrote = self.handle.bulkWrite(self.bulkoutep, pkt) if wrote != len(pkt): print "Short write, got %d, expected %d" % (wrote, len(pkt)) read = self.handle.bulkRead(self.bulkinep, datalen) print "Read %s bytes: %s" % (len(read), str(read)) return read def find488(): """Search for a USB488 device, returns a handle, iface, dev tuple for it""" busses = usb.busses() found = False for bus in busses: for dev in bus.devices: for confidx in xrange(len(dev.configurations)): # XXX: what do multi-interface devices look like? iface = dev.configurations[confidx].interfaces[0][0] # Check if this is a USB488 capable interface if iface.interfaceClass == USB_CLASS_APP_SPECIFIC and \ iface.interfaceSubClass == USB_SUBCLASS_TMC and \ iface.interfaceProtocol == USB_PROTOCOL_488: handle = dev.open() handle.setConfiguration(1) handle.claimInterface(0) handle.setAltInterface(0) #handle.setConfiguration(confidx) #handle.claimInterface(0) found = True break if found: break if not found: raise "Could not find scope, check perms" return (handle, iface, dev) def geteps(iface): """Returns a tuple of intr,input,output addresses of endpoints for the interface""" intrep = bulkinep = bulkoutep = None for ep in iface.endpoints: if ep.type == usb.ENDPOINT_TYPE_INTERRUPT and \ ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN: intrep = ep.address if ep.type == usb.ENDPOINT_TYPE_BULK: if ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN: bulkinep = ep.address else: bulkoutep = ep.address # Required for 488.2 devices, optional otherwise if intrep == None: print "Can't find interrup endpoint" # Data from the scope if bulkinep == None: raise "Can't find bulk-in endpoint" # Data to the scope if bulkoutep == None: raise "Can't find bulk-out endpoint" return intrep, bulkinep, bulkoutep def main(): handle, iface, dev = find488() print "Found device" intrep, bulkinep, bulkoutep = geteps(iface) print "Found endpoints" if __name__ == "__main__": main()