view fsp7_phasenoise.py @ 79:84f96c5fe791

Use different message ID that does not result in "Query UNTERMINATE" messages in the error log. Fix testsrq. Rename queryrsb to querystb as that matches the operating manual.
author Daniel O'Connor <doconnor@gsoft.com.au>
date Fri, 27 Sep 2024 16:53:43 +0930
parents 23c96322cfb6
children 1947d10f9395
line wrap: on
line source

#!/usr/bin/env python

# Copyright (c) 2024
#      Daniel O'Connor <darius@dons.net.au>.  All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#

import argparse
import math
import numpy
import os
import PIL
import rsib
import scipy
import scpi
import sys
import time

def sifmt(_v, dp = 3, unit = 'Hz', sp = ' '):
    '''Format a number using SI prefixes'''
    si_prefixes = ('T', 'G', 'M', 'k', '', 'm', 'ยต', 'n', 'p')
    scale = 10 ** 12
    v = abs(_v)
    if v == 0:
        sip = ""
        scale = 0
    for i, sip in enumerate(si_prefixes):
        if v >= scale:
            break
        scale /= 1e3
    return ('%.' + str(dp) + 'f%s%s%s') % (_v / scale, sp, si_prefixes[i], unit)

def setupsweep(r, rbw, vbw, centre = None, span = None, start = None, stop = None):
    '''Helper function to set various sweep parameters'''
    if centre is not None:
        r.write('SENSE1:FREQ:CENT %f Hz' % (centre))
    if span is not None:
        r.write('SENSE1:FREQ:SPAN %f Hz' % (span))
    if start is not None:
        r.write('SENSE1:FREQ:START %f Hz' % (start))
    if stop is not None:
        r.write('SENSE1:FREQ:STOP %f Hz' % (stop))
    if rbw is not None:
        r.write('SENS1:BAND:RES %f Hz' % (rbw))
    if vbw is not None:
        r.write('SENS1:BAND:VID %f Hz' % (vbw))

def dosweep(r, sweeps):
    '''Helper function to trigger a sweep and wait for it to finish'''
    swt = float(r.ask('SWE:TIME?'))
    tout = swt * 5 * sweeps
    if tout < 1:
        tout = 1
    #print('Sweep time', swt)

    # Trigger the sweep
    r.write('INIT;*WAI')

    # Wait for it to be done
    opc = int(r.ask('*OPC?', timeout = tout))
    assert(opc == 1)

def findpeaks(r, maxpeak = 5, minpwr = None):
    '''Ask instrument to find maxpeaks peaks and return a list of frequencies and powers'''
    peaks_f = []
    peaks_pwr = []
    r.write('CALC:MARK1:MAX')
    for i in range(maxpeak):
        frq = float(r.ask('CALC:MARK1:X?'))
        pwr = float(r.ask('CALC:MARK1:Y?'))
        if minpwr is not None and pwr < minpwr:
            break
        if i > 0:
            if frq == peaks_f[i - 1]:
                break
        peaks_f.append(frq)
        peaks_pwr.append(pwr)
        r.write('CALC:MARK1:MAX:NEXT')

    return peaks_f, peaks_pwr

def phasenoise(r, nominal, sweeps, atten, rlev, yscale, ssprefix):
    '''Main function to find a signal, do some phase noise measurements and wideband sweeps'''
    cpwrlim = -30
    measurements = (
        { 'offset' : 10,    'span' : 100,   'rbw' : 10,  'vbw' : 10 },
        { 'offset' : 100,   'span' : 250,   'rbw' : 10,  'vbw' : 10 },
        { 'offset' : 1e3,   'span' : 2.5e3, 'rbw' : 10,  'vbw' : 30 },
        { 'offset' : 10e3,  'span' : 25e3,  'rbw' : 100, 'vbw' : 300 },
        { 'offset' : 100e3, 'span' : 250e3, 'rbw' : 100, 'vbw' : 300 },
        { 'offset' : 1e6,   'span' : 2.1e6, 'rbw' : 1e3, 'vbw' : 3e3 },
    )
    # Reset to defaults
    r.write('*RST')

    # Set to single sweep mode
    r.write('INIT:CONT OFF')

    # Enable display updates
    r.write('SYST:DISP:UPD ON')

    # Set attenuation
    if atten is None:
        r.write('INP:ATT AUTO')
    else:
        r.write('INP:ATT %s' % atten)

    # Set Y scale
    if yscale is not None:
        r.write('DISP:TRAC:Y %s dB' % (yscale))

    # Set reference level
    if rlev is None:
        r.write('DISP:WIND:TRAC:Y:RLEV AUTO')
    else:
        r.write('DISP:WIND:TRAC:Y:RLEV %f' % (rlev))

    #
    # Look for signal
    #
    # Set frequency range etc
    setupsweep(r, 10, 30, centre = nominal, span = 1e3)

    # Switch marker 1 on in screen A
    r.write('CALC:MARK1 ON')
    print('Looking for signal (%.1f seconds)' % (float(r.ask('SWE:TIME?'))))

    # Do the sweep
    dosweep(r, 1)

    # Check the instrument is happy
    status = int(r.ask('STAT:QUES:COND?'))
    pwrstat = int(r.ask('STAT:QUES:POW:COND?'))
    if status != 0 or pwrstat != 0:
        print('Instrument warning, status %s power status %s' %
              (bin(status), bin(pwrstat)))
    # Find the peak
    r.write('CALC:MARK1:MAX')
    cpwr = float(r.ask('CALC:MARK1:Y?'))
    if cpwr < cpwrlim:
         Exception('Power too low / not found: %.1f dBm vs %.1f dBm' % (cpwr, cpwrlim))
    cmeas = float(r.ask('CALC:MARK1:X?'))
    print('Found signal at %s with power %.1f dBm' % (sifmt(cmeas, dp = 6), cpwr))

    # Centre peak
    r.write('CALC:MARK1:FUNC:CENT')

    # Set number of sweeps
    r.write('SWE:COUN %d' % (sweeps))

    # Enable phase noise measurement
    r.write('CALC:MARK:AOFF')
    r.write('CALC:DELT1:FUNC:PNO')

    for idx, m in enumerate(measurements):
        # Setup measurement
        setupsweep(r, m['rbw'], m['vbw'], span = m['span'])

        # Do the sweep
        dosweep(r, sweeps)

        # Set offset of phase noise measurement
        r.write('CALC:MARK1:MAX')
        r.write('CALC:DELT1:MOD REL')
        r.write('CALC:DELT1:X %f Hz' % (m['offset']))
        meas = float(r.ask('CALC:DELT:FUNC:PNO:RES?'))
        print('Offset %7s %7.2f dBc/Hz' % (sifmt(m['offset'], dp = 0), meas))

        # Take screen shot and save it
        if ssprefix is not None:
            screenshot(r, '%s-%d_%s_offset.png' % (
                ssprefix, idx, sifmt(m['offset'], dp = 0, sp = '')))

    # Do a wide band scan
    if ssprefix is not None:
        start = 1e6
        stop = 100e6
        setupsweep(r, 1e3, 3e3, start = start, stop = stop)
        print('Scanning %s - %s (%.1f seconds)' % (sifmt(start, dp = 0), sifmt(stop, dp = 0),
                                                   float(r.ask('SWE:TIME?'))))
        dosweep(r, sweeps)
        # Show peaks
        r.write('CALC:MARK:AOFF')
        for i, (f, p) in enumerate(zip(*findpeaks(r, 4))):
            r.write('CALC:MARK%d:X %f Hz' % (i + 1, f))
            screenshot(r, '%s-sweep-%.1f-%.1fMHz.png' % (
                ssprefix, start / 1e6, stop / 1e6))

        # Look from just before signal to just after second harmonic
        start = nominal * 0.9
        stop = 2 * nominal + nominal * 0.1
        setupsweep(r, 1e3, 3e3, start = start, stop = stop)
        print('Scanning %s - %s (%.1f seconds)' % (sifmt(start, dp = 0), sifmt(stop, dp = 0),
                                               float(r.ask('SWE:TIME?'))))
        dosweep(r, sweeps)
        # Show peaks
        r.write('CALC:MARK:AOFF')
        for i, (f, p) in enumerate(zip(*findpeaks(r, 4))):
            r.write('CALC:MARK%d:X %f Hz' % (i + 1, f))
        screenshot(r, '%s-sweep-%.1f-%.1fMHz.png' % (
            ssprefix, start / 1e6, stop / 1e6))

def screenshot(r, ssname, sapath = '\'c:\\temp\\pntmp.bmp\''):
    # Setup & take screen shot
    r.write('HCOPY:DEV:LANG1 BMP')
    r.write('HCOPY:DEST1 MMEM')
    r.write('HCOPY:CMAP:DEF2')
    r.write('HCOPY:ITEM:ALL')
    r.write('MMEM:NAME %s' % (sapath,))
    r.write('HCOPY:IMM1')
    time.sleep(5)

    # Grab it
    r.write('MMEM:DATA? %s' % (sapath,))
    bmpdat = scpi.getbin(r.read())
    bmpname = ssname[0:-4] + '.bmp'

    # Convert to whatever format
    open(bmpname, 'wb').write(bmpdat)
    img = PIL.Image.open(bmpname)
    img.save(ssname)
    os.unlink(bmpname)
    r.write('MMEM:DEL %s' % (sapath,))

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description = 'Configures a Rohde Schwarz FSP7 spectrum analyser to do a phase noise measurement')
    parser.add_argument('-a', '--atten', default = None, help = 'Attenuation level (default: autoamatic)')
    parser.add_argument('-r', '--rlevel', default = 10, help = 'Reference level (default: %default dBm)', type = float)
    parser.add_argument('-s', '--ssprefix', default = None, help = 'Path name to save screenshots to (default: none)', type = str)
    parser.add_argument('-w', '--sweeps', default = 1, help = 'Number of sweeps (default: %default)', type = int)
    parser.add_argument('-y', '--yscale', default = 120, help = 'Y-scale extent (default: %default dB)', type = float)
    parser.add_argument('address', help = 'Address of analyser', type = str)
    parser.add_argument('nominal', help = 'Nominal frequency of measurement (Hz)', type = float)

    args = parser.parse_args()

    # Connect to the analyser
    r = rsib.RSIBDevice(args.address)

    # ID instrument
    print('Device ID is', r.ask('*IDN?').decode('ascii'))

    # Do measurements
    phasenoise(r, args.nominal, args.sweeps, args.atten, args.rlevel, args.yscale, args.ssprefix)