view usb488.py @ 70:6ffa6fcf278e

Move pylab import to where graphing is so we don't need an X server
author Daniel O'Connor <doconnor@gsoft.com.au>
date Mon, 30 Aug 2021 12:40:43 +0930
parents 7386f2888508
children
line wrap: on
line source

#!/usr/bin/env python

# Copyright (c) 2009
#      Daniel O'Connor <darius@dons.net.au>.  All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#

#
# Spec/info..
#
# http://www.usb.org/developers/devclass_docs/USBTMC_1_006a.zip
# http://svn.openmoko.org/developers/werner/ahrt/host/tmc/README
# http://www.home.agilent.com/agilent/redirector.jspx?action=ref&cname=AGILENT_EDITORIAL&ckey=1189335&lc=eng&cc=US&nfr=-35560.0.00
# linux-2.6.29.3/drivers/usb/class/usbtmc.c
# http://sdpha2.ucsd.edu/Lab_Equip_Manuals/usbtmc_usb488_subclass_1_00.pdf
#

import time
import usb
from functools import reduce

#
# The usual SCPI commands are wrapped before being sent.
#
# Write:
# Offset	Field		Size	Value	Description
# 0		MsgID		1	0x01	DEV_DEP_MSG_OUT
# 1		bTag		1	0x01	Varies with each transfer
# 2		bTagInverse	1	0xfe	Inverse of previous field
# 3		Reserved	1	0x00
# 4		TransferSize	4	0x06
# 5		..			0x00
# 6		..			0x00
# 7		..			0x00
# 8		bmTransferAttr	1	0x01	1 == end of msg
# 9		Reserved	1	0x00
# 10		Reserved	1	0x00
# 11		Reserved	1	0x00
# 12		Msg itself	1	0x2a	'*'
# 13				1	0x49	'I'
# 14				1	0x44	'D'
# 15				1	0x4e	'N'
# 16				1	0x3f	'?'
# 17				1	0x0a	'\n'
# 18-19		Alignment	2	0x0000	Bring into 4 byte alignment
#
#
# Send a read request:
# Offset	Field		Size	Value	Description
# 0		MsgID		1	0x02	REQUEST_DEV_DEP_MSG_IN
# 1		bTag		1	0x02	Varies with each transfer
# 2		bTagInverse	1	0xfd	Inverse of previous field
# 3		Reserved	1	0x00
# 4		TransferSize	4	0x64
# 5		..			0x00
# 6		..			0x00
# 7		..			0x00
# 8		bmTransferAttr	1	0x00
# 9		Term char	1	0x00
# 10		Reserved	1	0x00
# 11		Reserved	1	0x00

# No libusb versions of these available
USB_CLASS_APP_SPECIFIC = 254
USB_SUBCLASS_TMC = 3
USB_PROTOCOL_488 = 1

# USB488 message IDs
DEV_DEP_MSG_OUT = 1
REQUEST_DEV_DEP_MSG_IN = 2
DEV_DEP_MSG_IN = 2

# USB TMC control requests
INITIATE_ABORT_BULK_OUT = 1
CHECK_ABORT_BULK_OUT_STATUS = 2
INITIATE_ABORT_BULK_IN = 3
CHECK_ABORT_BULK_IN_STATUS = 4
INITIATE_CLEAR = 5
CHECK_CLEAR_STATUS = 6
GET_CAPABILITIES = 7
INDICATOR_PULSE = 64
# USB488
READ_STATUS_BYTE = 128
REN_CONTROL = 160
GO_TO_LOCAL = 161
LOCAL_LOCKOUT = 162

# Interface capability bits
IF_CAP_LISTEN_ONLY = 0x01
IF_CAP_TALK_ONLY = 0x02
IF_CAP_HAS_INDICATOR = 0x04

# Device capability bits
DEV_CAP_TERM_CHAR = 0x01

# USB488 interface capbility bits
USB488_IFCAP_TRIG = 0x01
USB488_IFCAP_GO_LOC = 0x02
USB488_IFCAP_488_2 = 0x04

# USB488 device capbility bits
USB488_DEVCAP_DT1 = 0x01
USB488_DEVCAP_RL1 = 0x02
USB488_DEVCAP_SR1 = 0x04
USB488_DEVCAP_SCPI = 0x08

# USBTMC status definitions
STATUS_SUCCESS = 0x01
STATUS_PENDING = 0x02
STATUS_FAILED = 0x80
STATUS_TRANSFER_NOT_IN_PROGRESS = 0x81
STATUS_SPLIT_NOT_IN_PROGRESS = 0x82
STATUS_SPLIT_IN_PROGRESS = 0x83

# SCPI error/event queue status register bit
STATUS_EVE_QUEUE = 0x04

class USB488Device(object):
    def __init__(self, vendor = None, product = None, serial = None, path = None):
        """Search for a USB488 class device, if specified vendor,
        product, serial and path will refine the search"""

        busses = usb.busses()

        #
        # Search for the device we want
        #
        found = False
        for bus in busses:
            for dev in bus.devices:
                # Skip ones that don't match
                if vendor != None and dev.idVendor != vendor:
                    continue
                if product != None and dev.idProduct != product:
                    continue
                if serial != None and dev.idSerialNumber != serial:
                    continue
                if path != None and dev.filename != path:
                    continue

                # The libusb examples say you can check for device
                # class and then open, however in that case you can't
                # find the endpoint number which seems pretty useless
                # unless you want to hard code everything.
                for confidx in range(len(dev.configurations)):
                    for iface in dev.configurations[confidx].interfaces:
                        for altif in iface:
                            # Check if this is a USB488 capable interface
                            if altif.interfaceClass == USB_CLASS_APP_SPECIFIC and \
                                   altif.interfaceSubClass == USB_SUBCLASS_TMC and \
                                   altif.interfaceProtocol == USB_PROTOCOL_488:
                                found = True
                                break
                        if found:
                            break
                    if found:
                        break
                if found:
                    break
            if found:
                break
        if not found:
            raise BaseException("Could not find a suitable USB device")

        # Open the device and claim the USB interface that supports the spec
        handle = dev.open()
        handle.setConfiguration(dev.configurations[confidx].value)
        handle.claimInterface(altif.interfaceNumber)
        handle.setAltInterface(altif.alternateSetting)
        self.dev = dev
        self.handle = handle

        # Get some info for humans
        try:
            self.vendname = handle.getString(dev.iManufacturer, 1024)
        except ValueError:
            self.vendname = None
        try:
            self.prodname = handle.getString(dev.iProduct, 1024)
        except ValueError:
            self.prodname = None
        try:
            self.serial = handle.getString(dev.iSerialNumber, 1024)
        except ValueError:
            self.serial = None

        # Determine the endpoints for each operation type
        self.intrep = self.bulkinep = self.bulkoutep = None

        for ep in altif.endpoints:
            if ep.type == usb.ENDPOINT_TYPE_INTERRUPT and \
                   ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN:
                self.intrep = ep.address

            if ep.type == usb.ENDPOINT_TYPE_BULK:
                if ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN:
                    self.bulkinep = ep.address
                else:
                    self.bulkoutep = ep.address
                    self.maxPacket = ep.maxPacketSize

        # Required for 488.2 devices, optional otherwise
        if self.intrep == None:
            print("Can't find interrupt endpoint")

        # Data from the device (mandatory)
        if self.bulkinep == None:
            raise BaseException("Can't find bulk-in endpoint")

        # Data to the device (mandatory)
        if self.bulkoutep == None:
            raise BaseException("Can't find bulk-out endpoint")

        self.tag = 1
        #self.init()

    def init(self):
        # Flush out any pending data
        self.initiateClear()
        # Perform dummy write/read otherwise the next read times out
        try:
            self.ask('*STB?', timeout = 0.001)
        except usb.USBError:
            pass
        # Clear status register
        for i in range(10):
            self.write('*CLS')
            if int(self.ask('*STB?')) & STATUS_EVE_QUEUE == 0:
                break
        else:
            raise BaseException('Unable to clear status register')

    def __str__(self):
        rtn = "Mfg: %s Prod: %s" % (self.vendname, self.prodname)
        if self.serial != "":
            rtn += " S/N: " + self.serial

        return rtn

    def gettag(self):
        tag = self.tag
        self.tag = (self.tag + 1) % 255
        if self.tag == 0:
            self.tag += 1
        return tag

    def write(self, data):
        """Send data (string) to the instrument"""

        orddata = list(map(ord, data))
        # The device needs a \n at the end, enfore this
        if orddata[-1] != '\n':
            orddata += [ord('\n')]
        datalen = len(orddata)

        # Build the packet
        tag = self.gettag()
        pkt = [ DEV_DEP_MSG_OUT, tag, ~tag & 0xff, 0x00,
                datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff,
                datalen >> 24 & 0xff, 1, 0, 0, 0 ]

        # Add the data
        pkt = pkt + orddata

        # Align to 4 bytes
        alignlen = ((len(pkt) // 4) + 1) * 4
        pkt = pkt + [0] * (alignlen - len(pkt))

        # Split it up into maxPacket sized chunks and send..
        # XXX; this is not correct, need a header for each one
        while len(pkt) > 0:
            chunk = pkt[0:self.maxPacket]
            pkt = pkt[self.maxPacket:]

            #print("Sending %d bytes of data: %s" % (len(chunk), chunk))
            wrote = self.handle.bulkWrite(self.bulkoutep, chunk)
            if wrote != len(chunk):
                raise BaseException("Short write, got %d, expected %d" % (wrote, len(chunk)))

    def read(self, timeout = None):
        """Read data from the device, waits for up to timeout seconds for each USB transaction"""

        if timeout == None:
            timeout = 1

        # Mangle into milliseconds
        _timeout = int(timeout * 1000.0)

        # Maximum we accept at once
        # Was 2^31 - 1 but that seems to make things take too long to
        # read (perhaps libusb tries to malloc it..)
        datalen = 10240
        data = []
        
        while True:
            # Ask the device to send us something
            tag = self.gettag()
            pkt = [ REQUEST_DEV_DEP_MSG_IN, tag, ~tag & 0xff, 0x00,
                    datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff,
                    datalen >> 24 & 0xff, 0, 0, 0, 0]

            # Send it
            #print("Sending " + str(pkt))
            wrote = self.handle.bulkWrite(self.bulkoutep, pkt, _timeout)
            if wrote != len(pkt):
                print("Short write, got %d, expected %d" % (wrote, len(pkt)))

            #print("Reading..")
            read = self.handle.bulkRead(self.bulkinep, datalen, _timeout)
            #print("Read %s bytes: %s" % (len(read), str(read)))

            if read[0] != DEV_DEP_MSG_IN:
                raise BaseException("Unexpected Msg ID, got %s expected %d" % (read[0], DEV_DEP_MSG_IN))
            if read[1] != tag:
                raise BaseException("Unexpected tag, got %d expected %d" % (read[1], tag))
            if read[2] != ~tag & 0xff:
                raise BaseException("Unexpected tag inverse, got %d expected %d" % (read[1], ~tag & 0xff))

            actualdata = read[4] | read[5] << 8 | read[6] << 16 | read[7] << 24
            #print("Computed datalen is %d" % (actualdata))
            data += read[12:12 + actualdata]
            if read[8] & 0x01:
                #print("Breaking out due to EOM")
                break

        # Stringify result for easier consumption
        result = reduce(lambda x, y: x+y, list(map(chr, data)))
        # Trim off \n if present
        if result[-1] == '\n':
            result = result[0:-1]

        return result

    def ask(self, s, timeout = None):
        '''Wrapper to send a command and wait for a reply'''
        self.write(s)
        return self.read(timeout = timeout)

    def chkcmd(self, cmd):
        '''Wrapper to send a command and check for an error'''
        self.write(cmd)
        if int(self.ask('*STB?')) & STATUS_EVE_QUEUE != 0:
            self.write('*CLS')
            raise BaseException('Command failed')

    def isConnected(self):
        """Check if the device is present"""

        # libusb makes it very hard (at least on FreeBSD) to determine if we're still connected.
        # This is a reasonable proxy..
        try:
            self.handle.getString(self.dev.iManufacturer, 100)
        except USBError as e:
            return False

        return True

    def getCapabilities(self):
        '''Returns interface, device and USB488 capability bytes (see IF_CAP_*, DEV_CAP_*, USB488_IFCAP_* and USB488_DEVCAP_*)'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, GET_CAPABILITIES, 0x18)
        return res[4], res[5], res[14], res[15]

    def indicatorPulse(self):
        '''Send an indicator pulse request'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, INDICATOR_PULSE, 0x01)

    def initiateClear(self):
        '''Send request to clear all transfers and wait until the device reports it is done and clear stalls'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, INITIATE_CLEAR, 0x01)
        if res[0] == STATUS_SUCCESS:
            while True:
                res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, CHECK_CLEAR_STATUS, 0x02)
                if res[0] != STATUS_PENDING:
                    break
                time.sleep(0.1)
        else:
            raise BaseException('INITIATE_CLEAR returned 0x%02x' % (res[0]))
        self.handle.clearHalt(self.bulkinep)
        self.handle.clearHalt(self.bulkoutep)

    def renControl(self):
        '''Send enable remote control message'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, REN_CONTROL, 1, 0xff)
        return res[0]

    def getStatus(self):
        '''Returns IEEE 488 status byte'''
        tag = self.gettag()
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, READ_STATUS_BYTE, 3, tag)
        if res[1] != tag:
            raise BaseException('Tag mismatch, got 0x%02x, expected 0x%02x' % (res[1], tag))
        if res[0] != STATUS_SUCCESS:
            raise BaseException('Unit returned invalid USBTMC status: %d' % (res[0],))
        return res[2]

    def abortIO(self, tag, isout):
        if isout:
            req = INITIATE_ABORT_BULK_OUT
            chkreq = CHECK_ABORT_BULK_OUT_STATUS
            ep = self.bulkoutep
            name = 'out'
        else:
            req = INITIATE_ABORT_BULK_IN
            chkreq = CHECK_ABORT_BULK_IN_STATUS
            ep = self.bulkinep
            name = 'in'
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_ENDPOINT,
                                req, 2, value = tag, index = ep)
        print('Initiate abort returned ' + str(res))
        while True:
            res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_ENDPOINT,
                                         chkreq, 8, value = 0x00, index = ep)
            print('Check abort returned ' + str(res))
            if res[0] == STATUS_PENDING:
                print('Status pending for %s abort' % (name,))
                time.sleep(1)
            elif res[0] == STATUS_SUCCESS or res[0] == STATUS_TRANSFER_NOT_IN_PROGRESS:
                break
            else:
                raise BaseException('Invalid status reply to check abort %s 0x%02x' % (name, res[0]))