view usb488.py @ 79:84f96c5fe791

Use different message ID that does not result in "Query UNTERMINATE" messages in the error log. Fix testsrq. Rename queryrsb to querystb as that matches the operating manual.
author Daniel O'Connor <doconnor@gsoft.com.au>
date Fri, 27 Sep 2024 16:53:43 +0930
parents 7386f2888508
children
line wrap: on
line source

#!/usr/bin/env python

# Copyright (c) 2009
#      Daniel O'Connor <darius@dons.net.au>.  All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#

#
# Spec/info..
#
# http://www.usb.org/developers/devclass_docs/USBTMC_1_006a.zip
# http://svn.openmoko.org/developers/werner/ahrt/host/tmc/README
# http://www.home.agilent.com/agilent/redirector.jspx?action=ref&cname=AGILENT_EDITORIAL&ckey=1189335&lc=eng&cc=US&nfr=-35560.0.00
# linux-2.6.29.3/drivers/usb/class/usbtmc.c
# http://sdpha2.ucsd.edu/Lab_Equip_Manuals/usbtmc_usb488_subclass_1_00.pdf
#

import time
import usb
from functools import reduce

#
# The usual SCPI commands are wrapped before being sent.
#
# Write:
# Offset	Field		Size	Value	Description
# 0		MsgID		1	0x01	DEV_DEP_MSG_OUT
# 1		bTag		1	0x01	Varies with each transfer
# 2		bTagInverse	1	0xfe	Inverse of previous field
# 3		Reserved	1	0x00
# 4		TransferSize	4	0x06
# 5		..			0x00
# 6		..			0x00
# 7		..			0x00
# 8		bmTransferAttr	1	0x01	1 == end of msg
# 9		Reserved	1	0x00
# 10		Reserved	1	0x00
# 11		Reserved	1	0x00
# 12		Msg itself	1	0x2a	'*'
# 13				1	0x49	'I'
# 14				1	0x44	'D'
# 15				1	0x4e	'N'
# 16				1	0x3f	'?'
# 17				1	0x0a	'\n'
# 18-19		Alignment	2	0x0000	Bring into 4 byte alignment
#
#
# Send a read request:
# Offset	Field		Size	Value	Description
# 0		MsgID		1	0x02	REQUEST_DEV_DEP_MSG_IN
# 1		bTag		1	0x02	Varies with each transfer
# 2		bTagInverse	1	0xfd	Inverse of previous field
# 3		Reserved	1	0x00
# 4		TransferSize	4	0x64
# 5		..			0x00
# 6		..			0x00
# 7		..			0x00
# 8		bmTransferAttr	1	0x00
# 9		Term char	1	0x00
# 10		Reserved	1	0x00
# 11		Reserved	1	0x00

# No libusb versions of these available
USB_CLASS_APP_SPECIFIC = 254
USB_SUBCLASS_TMC = 3
USB_PROTOCOL_488 = 1

# USB488 message IDs
DEV_DEP_MSG_OUT = 1
REQUEST_DEV_DEP_MSG_IN = 2
DEV_DEP_MSG_IN = 2

# USB TMC control requests
INITIATE_ABORT_BULK_OUT = 1
CHECK_ABORT_BULK_OUT_STATUS = 2
INITIATE_ABORT_BULK_IN = 3
CHECK_ABORT_BULK_IN_STATUS = 4
INITIATE_CLEAR = 5
CHECK_CLEAR_STATUS = 6
GET_CAPABILITIES = 7
INDICATOR_PULSE = 64
# USB488
READ_STATUS_BYTE = 128
REN_CONTROL = 160
GO_TO_LOCAL = 161
LOCAL_LOCKOUT = 162

# Interface capability bits
IF_CAP_LISTEN_ONLY = 0x01
IF_CAP_TALK_ONLY = 0x02
IF_CAP_HAS_INDICATOR = 0x04

# Device capability bits
DEV_CAP_TERM_CHAR = 0x01

# USB488 interface capbility bits
USB488_IFCAP_TRIG = 0x01
USB488_IFCAP_GO_LOC = 0x02
USB488_IFCAP_488_2 = 0x04

# USB488 device capbility bits
USB488_DEVCAP_DT1 = 0x01
USB488_DEVCAP_RL1 = 0x02
USB488_DEVCAP_SR1 = 0x04
USB488_DEVCAP_SCPI = 0x08

# USBTMC status definitions
STATUS_SUCCESS = 0x01
STATUS_PENDING = 0x02
STATUS_FAILED = 0x80
STATUS_TRANSFER_NOT_IN_PROGRESS = 0x81
STATUS_SPLIT_NOT_IN_PROGRESS = 0x82
STATUS_SPLIT_IN_PROGRESS = 0x83

# SCPI error/event queue status register bit
STATUS_EVE_QUEUE = 0x04

class USB488Device(object):
    def __init__(self, vendor = None, product = None, serial = None, path = None):
        """Search for a USB488 class device, if specified vendor,
        product, serial and path will refine the search"""

        busses = usb.busses()

        #
        # Search for the device we want
        #
        found = False
        for bus in busses:
            for dev in bus.devices:
                # Skip ones that don't match
                if vendor != None and dev.idVendor != vendor:
                    continue
                if product != None and dev.idProduct != product:
                    continue
                if serial != None and dev.idSerialNumber != serial:
                    continue
                if path != None and dev.filename != path:
                    continue

                # The libusb examples say you can check for device
                # class and then open, however in that case you can't
                # find the endpoint number which seems pretty useless
                # unless you want to hard code everything.
                for confidx in range(len(dev.configurations)):
                    for iface in dev.configurations[confidx].interfaces:
                        for altif in iface:
                            # Check if this is a USB488 capable interface
                            if altif.interfaceClass == USB_CLASS_APP_SPECIFIC and \
                                   altif.interfaceSubClass == USB_SUBCLASS_TMC and \
                                   altif.interfaceProtocol == USB_PROTOCOL_488:
                                found = True
                                break
                        if found:
                            break
                    if found:
                        break
                if found:
                    break
            if found:
                break
        if not found:
            raise BaseException("Could not find a suitable USB device")

        # Open the device and claim the USB interface that supports the spec
        handle = dev.open()
        handle.setConfiguration(dev.configurations[confidx].value)
        handle.claimInterface(altif.interfaceNumber)
        handle.setAltInterface(altif.alternateSetting)
        self.dev = dev
        self.handle = handle

        # Get some info for humans
        try:
            self.vendname = handle.getString(dev.iManufacturer, 1024)
        except ValueError:
            self.vendname = None
        try:
            self.prodname = handle.getString(dev.iProduct, 1024)
        except ValueError:
            self.prodname = None
        try:
            self.serial = handle.getString(dev.iSerialNumber, 1024)
        except ValueError:
            self.serial = None

        # Determine the endpoints for each operation type
        self.intrep = self.bulkinep = self.bulkoutep = None

        for ep in altif.endpoints:
            if ep.type == usb.ENDPOINT_TYPE_INTERRUPT and \
                   ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN:
                self.intrep = ep.address

            if ep.type == usb.ENDPOINT_TYPE_BULK:
                if ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN:
                    self.bulkinep = ep.address
                else:
                    self.bulkoutep = ep.address
                    self.maxPacket = ep.maxPacketSize

        # Required for 488.2 devices, optional otherwise
        if self.intrep == None:
            print("Can't find interrupt endpoint")

        # Data from the device (mandatory)
        if self.bulkinep == None:
            raise BaseException("Can't find bulk-in endpoint")

        # Data to the device (mandatory)
        if self.bulkoutep == None:
            raise BaseException("Can't find bulk-out endpoint")

        self.tag = 1
        #self.init()

    def init(self):
        # Flush out any pending data
        self.initiateClear()
        # Perform dummy write/read otherwise the next read times out
        try:
            self.ask('*STB?', timeout = 0.001)
        except usb.USBError:
            pass
        # Clear status register
        for i in range(10):
            self.write('*CLS')
            if int(self.ask('*STB?')) & STATUS_EVE_QUEUE == 0:
                break
        else:
            raise BaseException('Unable to clear status register')

    def __str__(self):
        rtn = "Mfg: %s Prod: %s" % (self.vendname, self.prodname)
        if self.serial != "":
            rtn += " S/N: " + self.serial

        return rtn

    def gettag(self):
        tag = self.tag
        self.tag = (self.tag + 1) % 255
        if self.tag == 0:
            self.tag += 1
        return tag

    def write(self, data):
        """Send data (string) to the instrument"""

        orddata = list(map(ord, data))
        # The device needs a \n at the end, enfore this
        if orddata[-1] != '\n':
            orddata += [ord('\n')]
        datalen = len(orddata)

        # Build the packet
        tag = self.gettag()
        pkt = [ DEV_DEP_MSG_OUT, tag, ~tag & 0xff, 0x00,
                datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff,
                datalen >> 24 & 0xff, 1, 0, 0, 0 ]

        # Add the data
        pkt = pkt + orddata

        # Align to 4 bytes
        alignlen = ((len(pkt) // 4) + 1) * 4
        pkt = pkt + [0] * (alignlen - len(pkt))

        # Split it up into maxPacket sized chunks and send..
        # XXX; this is not correct, need a header for each one
        while len(pkt) > 0:
            chunk = pkt[0:self.maxPacket]
            pkt = pkt[self.maxPacket:]

            #print("Sending %d bytes of data: %s" % (len(chunk), chunk))
            wrote = self.handle.bulkWrite(self.bulkoutep, chunk)
            if wrote != len(chunk):
                raise BaseException("Short write, got %d, expected %d" % (wrote, len(chunk)))

    def read(self, timeout = None):
        """Read data from the device, waits for up to timeout seconds for each USB transaction"""

        if timeout == None:
            timeout = 1

        # Mangle into milliseconds
        _timeout = int(timeout * 1000.0)

        # Maximum we accept at once
        # Was 2^31 - 1 but that seems to make things take too long to
        # read (perhaps libusb tries to malloc it..)
        datalen = 10240
        data = []
        
        while True:
            # Ask the device to send us something
            tag = self.gettag()
            pkt = [ REQUEST_DEV_DEP_MSG_IN, tag, ~tag & 0xff, 0x00,
                    datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff,
                    datalen >> 24 & 0xff, 0, 0, 0, 0]

            # Send it
            #print("Sending " + str(pkt))
            wrote = self.handle.bulkWrite(self.bulkoutep, pkt, _timeout)
            if wrote != len(pkt):
                print("Short write, got %d, expected %d" % (wrote, len(pkt)))

            #print("Reading..")
            read = self.handle.bulkRead(self.bulkinep, datalen, _timeout)
            #print("Read %s bytes: %s" % (len(read), str(read)))

            if read[0] != DEV_DEP_MSG_IN:
                raise BaseException("Unexpected Msg ID, got %s expected %d" % (read[0], DEV_DEP_MSG_IN))
            if read[1] != tag:
                raise BaseException("Unexpected tag, got %d expected %d" % (read[1], tag))
            if read[2] != ~tag & 0xff:
                raise BaseException("Unexpected tag inverse, got %d expected %d" % (read[1], ~tag & 0xff))

            actualdata = read[4] | read[5] << 8 | read[6] << 16 | read[7] << 24
            #print("Computed datalen is %d" % (actualdata))
            data += read[12:12 + actualdata]
            if read[8] & 0x01:
                #print("Breaking out due to EOM")
                break

        # Stringify result for easier consumption
        result = reduce(lambda x, y: x+y, list(map(chr, data)))
        # Trim off \n if present
        if result[-1] == '\n':
            result = result[0:-1]

        return result

    def ask(self, s, timeout = None):
        '''Wrapper to send a command and wait for a reply'''
        self.write(s)
        return self.read(timeout = timeout)

    def chkcmd(self, cmd):
        '''Wrapper to send a command and check for an error'''
        self.write(cmd)
        if int(self.ask('*STB?')) & STATUS_EVE_QUEUE != 0:
            self.write('*CLS')
            raise BaseException('Command failed')

    def isConnected(self):
        """Check if the device is present"""

        # libusb makes it very hard (at least on FreeBSD) to determine if we're still connected.
        # This is a reasonable proxy..
        try:
            self.handle.getString(self.dev.iManufacturer, 100)
        except USBError as e:
            return False

        return True

    def getCapabilities(self):
        '''Returns interface, device and USB488 capability bytes (see IF_CAP_*, DEV_CAP_*, USB488_IFCAP_* and USB488_DEVCAP_*)'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, GET_CAPABILITIES, 0x18)
        return res[4], res[5], res[14], res[15]

    def indicatorPulse(self):
        '''Send an indicator pulse request'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, INDICATOR_PULSE, 0x01)

    def initiateClear(self):
        '''Send request to clear all transfers and wait until the device reports it is done and clear stalls'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, INITIATE_CLEAR, 0x01)
        if res[0] == STATUS_SUCCESS:
            while True:
                res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, CHECK_CLEAR_STATUS, 0x02)
                if res[0] != STATUS_PENDING:
                    break
                time.sleep(0.1)
        else:
            raise BaseException('INITIATE_CLEAR returned 0x%02x' % (res[0]))
        self.handle.clearHalt(self.bulkinep)
        self.handle.clearHalt(self.bulkoutep)

    def renControl(self):
        '''Send enable remote control message'''
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, REN_CONTROL, 1, 0xff)
        return res[0]

    def getStatus(self):
        '''Returns IEEE 488 status byte'''
        tag = self.gettag()
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_INTERFACE, READ_STATUS_BYTE, 3, tag)
        if res[1] != tag:
            raise BaseException('Tag mismatch, got 0x%02x, expected 0x%02x' % (res[1], tag))
        if res[0] != STATUS_SUCCESS:
            raise BaseException('Unit returned invalid USBTMC status: %d' % (res[0],))
        return res[2]

    def abortIO(self, tag, isout):
        if isout:
            req = INITIATE_ABORT_BULK_OUT
            chkreq = CHECK_ABORT_BULK_OUT_STATUS
            ep = self.bulkoutep
            name = 'out'
        else:
            req = INITIATE_ABORT_BULK_IN
            chkreq = CHECK_ABORT_BULK_IN_STATUS
            ep = self.bulkinep
            name = 'in'
        res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_ENDPOINT,
                                req, 2, value = tag, index = ep)
        print('Initiate abort returned ' + str(res))
        while True:
            res = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS | usb.RECIP_ENDPOINT,
                                         chkreq, 8, value = 0x00, index = ep)
            print('Check abort returned ' + str(res))
            if res[0] == STATUS_PENDING:
                print('Status pending for %s abort' % (name,))
                time.sleep(1)
            elif res[0] == STATUS_SUCCESS or res[0] == STATUS_TRANSFER_NOT_IN_PROGRESS:
                break
            else:
                raise BaseException('Invalid status reply to check abort %s 0x%02x' % (name, res[0]))