view specan.py @ 79:84f96c5fe791

Use different message ID that does not result in "Query UNTERMINATE" messages in the error log. Fix testsrq. Rename queryrsb to querystb as that matches the operating manual.
author Daniel O'Connor <doconnor@gsoft.com.au>
date Fri, 27 Sep 2024 16:53:43 +0930
parents 23c96322cfb6
children
line wrap: on
line source

import exceptions
import numpy
import scpi

class Traceinst(object):
    '''Generic class for a trace based instrument'''
    attrs = {}
    tracetypename = None
    tracedtype = None
    tracequery = None
    
    def __init__(self, con):
        self.con = con
        # Set trace format
        self.con.write("FORM " + self.tracetypename)

    def setconf(self, name, value):
        if name not in self.attrs:
            raise exceptions.KeyError(name + " not supported")
        # Check value is correct type
        tmp = self.attrs[name][1](value)
        # Run validation function (if necessary)
        if self.attrs[name][2] != None:
            self.attrs[name][2](value)
        #print "Setting %s to %s" % (self.attrs[name][0], str(value))
        self.con.write("%s %s" % (self.attrs[name][0], str(value)))
                       
    def getconf(self, name):
        if name not in self.attrs:
            raise exceptions.KeyError(name + " not supported")
        self.con.write("%s?" % (self.attrs[name][0]))
        r = self.con.read()
        return self.attrs[name][1](r)

    def write(self, *args):
        return self.con.write(*args)

    def read(self, *args):
        return self.con.read(*args)

    def gettrace(self, timeout = 10):
        # Trigger the sweep
        self.con.write("INIT;*WAI")

        # Wait for it to be done
        self.wait(timeout)

        # Grab trace data
        self.con.write(self.tracequery)
        dat = self.con.read(10)

        # Parse into array
        ary = scpi.bindecode(dat, dtype = self.tracedtype)
        return ary

    def wait(self, timeout):
        if self.hasopc:
            self.con.write("*OPC?")
            opc = scpi.getdata(self.con.read(timeout), int)
            if opc != 1:
                return None
        else:
            while True:
                self.con.write(':STATus:OPERation?') 
                i = scpi.getdata(self.con.read(timeout), int)
                if i & 256:
                    break

        
    def dumpconf(self):
        rtn = {}
        for k in self.attrs:
            self.con.write(self.attrs[k][0] + '?')
            res = self.con.read()
            #print "Getting " + k + " / " + self.attrs[k][0] + " = " + res
            rtn[k] = self.attrs[k][1](res)
        return rtn
    
class RSSPA(Traceinst):
    '''Rohde & Schwarz Spectrum Analyzer'''

    attrs = { 'fstart' : ['FREQ:START', float, None], # Page 561
              'fstop' : ['FREQ:STOP', float, None],
              'atten' : ['INP:ATT', float, None], # Page 518
              'resbw' : ['SENSE:BANDWIDTH:RESOLUTION', float, None], # Page 539
              'vidbw' : ['SENSE:BANDWIDTH:VIDEO', float, None], # Page 541
              'sweept' : ['SENSE:SWEEP:TIME', float, None], # Page 599
              #'sweeppts' : ['SWEEP:POINTS', int, RSSPA.sweepptscheck],
              'sweeppts' : ['SWEEP:POINTS', int, None], # Page 599
              'sweepcnt' : ['SENSE1:AVERAGE:COUNT', int, None], # Page 595
              'reflev' : ['DISPLAY:WINDOW1:TRACE1:Y:SCALE:RLEVEL', float, None], # Page 506
              'detector' : ['SENSE1:DETECTOR1:FUNCTION', str, None] , # Page 552
              }

    hasopc = True
    tracetypename = 'REAL,32'
    tracedtype = numpy.float32
    tracequery = 'TRAC1? TRACE1'
    swptslist = [125, 251, 501, 1001, 2001, 4001, 8001]
    
#    def sweepptscheck(npts):
#        if x not in RSSPA.swptslist:
#            raise exceptions.ValueError("Sweep value not supported, must be one of " + str(RSSPA.swptslist))

        
class AnSPA(Traceinst):
    '''Anritsu Spectrum Analyzer'''
    attrs = { 'fstart' : ['FREQ:START', float, None], # Page 26
             'fstop' : ['FREQ:STOP', float, None], # Page 26
             'atten' : ['SENSE:POWER:ATTENUATION', float, None], # Page 27
             'sweept' : ['SENSE:SWEEP:TIME', float, None], # Page 28
             'sweepcnt' : [':SENSe:AVERage:COUNt', int, None], # Page 21
             'tracemode' : [':SENSe:AVERage:TYPE', str, None], # Page 21
             'resbw' : ['SENSE:BANDWIDTH:RESOLUTION', float, None], # Page 22
             'vidbw' : ['SENSE:BANDWIDTH:VIDEO', float, None], # Page 23
             'reflev' : [':DISPLAY:WIND:TRACE:Y:SCALE:RLEVEL', float, None], # Page 15
             'detector' : [':SENSe:DETector:FUNCtion', str, None], # Page 24
    }

    hasopc = False
    tracetypename = 'REAL,32'
    tracedtype = numpy.float32
    tracequery = 'TRACE:DATA?'

def getInst(inst):
    if inst == "RSSPA":
        return RSSPA
    elif inst == "AnSPA":
        return AnSPA
    else:
        raise exceptions.NotImplementedError("unknown instrument type " + inst)