Mercurial > ~darius > hgwebdir.cgi > pyinst
view fsp7_phasenoise.py @ 79:84f96c5fe791
Use different message ID that does not result in
"Query UNTERMINATE" messages in the error log.
Fix testsrq.
Rename queryrsb to querystb as that matches the operating manual.
author | Daniel O'Connor <doconnor@gsoft.com.au> |
---|---|
date | Fri, 27 Sep 2024 16:53:43 +0930 |
parents | 23c96322cfb6 |
children | 1947d10f9395 |
line wrap: on
line source
#!/usr/bin/env python # Copyright (c) 2024 # Daniel O'Connor <darius@dons.net.au>. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import argparse import math import numpy import os import PIL import rsib import scipy import scpi import sys import time def sifmt(_v, dp = 3, unit = 'Hz', sp = ' '): '''Format a number using SI prefixes''' si_prefixes = ('T', 'G', 'M', 'k', '', 'm', 'ยต', 'n', 'p') scale = 10 ** 12 v = abs(_v) if v == 0: sip = "" scale = 0 for i, sip in enumerate(si_prefixes): if v >= scale: break scale /= 1e3 return ('%.' + str(dp) + 'f%s%s%s') % (_v / scale, sp, si_prefixes[i], unit) def setupsweep(r, rbw, vbw, centre = None, span = None, start = None, stop = None): '''Helper function to set various sweep parameters''' if centre is not None: r.write('SENSE1:FREQ:CENT %f Hz' % (centre)) if span is not None: r.write('SENSE1:FREQ:SPAN %f Hz' % (span)) if start is not None: r.write('SENSE1:FREQ:START %f Hz' % (start)) if stop is not None: r.write('SENSE1:FREQ:STOP %f Hz' % (stop)) if rbw is not None: r.write('SENS1:BAND:RES %f Hz' % (rbw)) if vbw is not None: r.write('SENS1:BAND:VID %f Hz' % (vbw)) def dosweep(r, sweeps): '''Helper function to trigger a sweep and wait for it to finish''' swt = float(r.ask('SWE:TIME?')) tout = swt * 5 * sweeps if tout < 1: tout = 1 #print('Sweep time', swt) # Trigger the sweep r.write('INIT;*WAI') # Wait for it to be done opc = int(r.ask('*OPC?', timeout = tout)) assert(opc == 1) def findpeaks(r, maxpeak = 5, minpwr = None): '''Ask instrument to find maxpeaks peaks and return a list of frequencies and powers''' peaks_f = [] peaks_pwr = [] r.write('CALC:MARK1:MAX') for i in range(maxpeak): frq = float(r.ask('CALC:MARK1:X?')) pwr = float(r.ask('CALC:MARK1:Y?')) if minpwr is not None and pwr < minpwr: break if i > 0: if frq == peaks_f[i - 1]: break peaks_f.append(frq) peaks_pwr.append(pwr) r.write('CALC:MARK1:MAX:NEXT') return peaks_f, peaks_pwr def phasenoise(r, nominal, sweeps, atten, rlev, yscale, ssprefix): '''Main function to find a signal, do some phase noise measurements and wideband sweeps''' cpwrlim = -30 measurements = ( { 'offset' : 10, 'span' : 100, 'rbw' : 10, 'vbw' : 10 }, { 'offset' : 100, 'span' : 250, 'rbw' : 10, 'vbw' : 10 }, { 'offset' : 1e3, 'span' : 2.5e3, 'rbw' : 10, 'vbw' : 30 }, { 'offset' : 10e3, 'span' : 25e3, 'rbw' : 100, 'vbw' : 300 }, { 'offset' : 100e3, 'span' : 250e3, 'rbw' : 100, 'vbw' : 300 }, { 'offset' : 1e6, 'span' : 2.1e6, 'rbw' : 1e3, 'vbw' : 3e3 }, ) # Reset to defaults r.write('*RST') # Set to single sweep mode r.write('INIT:CONT OFF') # Enable display updates r.write('SYST:DISP:UPD ON') # Set attenuation if atten is None: r.write('INP:ATT AUTO') else: r.write('INP:ATT %s' % atten) # Set Y scale if yscale is not None: r.write('DISP:TRAC:Y %s dB' % (yscale)) # Set reference level if rlev is None: r.write('DISP:WIND:TRAC:Y:RLEV AUTO') else: r.write('DISP:WIND:TRAC:Y:RLEV %f' % (rlev)) # # Look for signal # # Set frequency range etc setupsweep(r, 10, 30, centre = nominal, span = 1e3) # Switch marker 1 on in screen A r.write('CALC:MARK1 ON') print('Looking for signal (%.1f seconds)' % (float(r.ask('SWE:TIME?')))) # Do the sweep dosweep(r, 1) # Check the instrument is happy status = int(r.ask('STAT:QUES:COND?')) pwrstat = int(r.ask('STAT:QUES:POW:COND?')) if status != 0 or pwrstat != 0: print('Instrument warning, status %s power status %s' % (bin(status), bin(pwrstat))) # Find the peak r.write('CALC:MARK1:MAX') cpwr = float(r.ask('CALC:MARK1:Y?')) if cpwr < cpwrlim: Exception('Power too low / not found: %.1f dBm vs %.1f dBm' % (cpwr, cpwrlim)) cmeas = float(r.ask('CALC:MARK1:X?')) print('Found signal at %s with power %.1f dBm' % (sifmt(cmeas, dp = 6), cpwr)) # Centre peak r.write('CALC:MARK1:FUNC:CENT') # Set number of sweeps r.write('SWE:COUN %d' % (sweeps)) # Enable phase noise measurement r.write('CALC:MARK:AOFF') r.write('CALC:DELT1:FUNC:PNO') for idx, m in enumerate(measurements): # Setup measurement setupsweep(r, m['rbw'], m['vbw'], span = m['span']) # Do the sweep dosweep(r, sweeps) # Set offset of phase noise measurement r.write('CALC:MARK1:MAX') r.write('CALC:DELT1:MOD REL') r.write('CALC:DELT1:X %f Hz' % (m['offset'])) meas = float(r.ask('CALC:DELT:FUNC:PNO:RES?')) print('Offset %7s %7.2f dBc/Hz' % (sifmt(m['offset'], dp = 0), meas)) # Take screen shot and save it if ssprefix is not None: screenshot(r, '%s-%d_%s_offset.png' % ( ssprefix, idx, sifmt(m['offset'], dp = 0, sp = ''))) # Do a wide band scan if ssprefix is not None: start = 1e6 stop = 100e6 setupsweep(r, 1e3, 3e3, start = start, stop = stop) print('Scanning %s - %s (%.1f seconds)' % (sifmt(start, dp = 0), sifmt(stop, dp = 0), float(r.ask('SWE:TIME?')))) dosweep(r, sweeps) # Show peaks r.write('CALC:MARK:AOFF') for i, (f, p) in enumerate(zip(*findpeaks(r, 4))): r.write('CALC:MARK%d:X %f Hz' % (i + 1, f)) screenshot(r, '%s-sweep-%.1f-%.1fMHz.png' % ( ssprefix, start / 1e6, stop / 1e6)) # Look from just before signal to just after second harmonic start = nominal * 0.9 stop = 2 * nominal + nominal * 0.1 setupsweep(r, 1e3, 3e3, start = start, stop = stop) print('Scanning %s - %s (%.1f seconds)' % (sifmt(start, dp = 0), sifmt(stop, dp = 0), float(r.ask('SWE:TIME?')))) dosweep(r, sweeps) # Show peaks r.write('CALC:MARK:AOFF') for i, (f, p) in enumerate(zip(*findpeaks(r, 4))): r.write('CALC:MARK%d:X %f Hz' % (i + 1, f)) screenshot(r, '%s-sweep-%.1f-%.1fMHz.png' % ( ssprefix, start / 1e6, stop / 1e6)) def screenshot(r, ssname, sapath = '\'c:\\temp\\pntmp.bmp\''): # Setup & take screen shot r.write('HCOPY:DEV:LANG1 BMP') r.write('HCOPY:DEST1 MMEM') r.write('HCOPY:CMAP:DEF2') r.write('HCOPY:ITEM:ALL') r.write('MMEM:NAME %s' % (sapath,)) r.write('HCOPY:IMM1') time.sleep(5) # Grab it r.write('MMEM:DATA? %s' % (sapath,)) bmpdat = scpi.getbin(r.read()) bmpname = ssname[0:-4] + '.bmp' # Convert to whatever format open(bmpname, 'wb').write(bmpdat) img = PIL.Image.open(bmpname) img.save(ssname) os.unlink(bmpname) r.write('MMEM:DEL %s' % (sapath,)) if __name__ == '__main__': parser = argparse.ArgumentParser(description = 'Configures a Rohde Schwarz FSP7 spectrum analyser to do a phase noise measurement') parser.add_argument('-a', '--atten', default = None, help = 'Attenuation level (default: autoamatic)') parser.add_argument('-r', '--rlevel', default = 10, help = 'Reference level (default: %default dBm)', type = float) parser.add_argument('-s', '--ssprefix', default = None, help = 'Path name to save screenshots to (default: none)', type = str) parser.add_argument('-w', '--sweeps', default = 1, help = 'Number of sweeps (default: %default)', type = int) parser.add_argument('-y', '--yscale', default = 120, help = 'Y-scale extent (default: %default dB)', type = float) parser.add_argument('address', help = 'Address of analyser', type = str) parser.add_argument('nominal', help = 'Nominal frequency of measurement (Hz)', type = float) args = parser.parse_args() # Connect to the analyser r = rsib.RSIBDevice(args.address) # ID instrument print('Device ID is', r.ask('*IDN?').decode('ascii')) # Do measurements phasenoise(r, args.nominal, args.sweeps, args.atten, args.rlevel, args.yscale, args.ssprefix)