Mercurial > ~darius > hgwebdir.cgi > pyinst
view sitesurvey.py @ 72:da3558dec4e3
Fetch large data blocks correctly.
Add ask function.
author | Daniel O'Connor <doconnor@gsoft.com.au> |
---|---|
date | Wed, 25 Sep 2024 20:56:39 +0930 |
parents | 91b476ebc0f2 |
children |
line wrap: on
line source
#!/usr/bin/env python # Copyright (c) 2011 # Daniel O'Connor <darius@dons.net.au>. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import calendar import configparser import datetime import exceptions import numpy import os import scipy.interpolate import scpi import specan import sys import time defaults = {} confname = "sitesurvey.ini" confpaths = [ ".", os.path.dirname(os.path.realpath(sys.argv[0]))] class Experiment(object): def __init__(self, conf, name): if not conf.has_section(name): raise exceptions.KeyError("No section for experiment " + name) self.name = name self.recurrence = None self.opts = {} for k, v in conf.items(name): if k == "recurrence": # In seconds self.recurrence = int(v) continue try: self.opts[k] = int(v) except exceptions.ValueError as e: try: self.opts[k] = float(v) except exceptions.ValueError as e: self.opts[k] = v if self.recurrence == None: raise exceptions.KeyError("Mandatory parameter 'recurrence' is missing") self.recurrence = datetime.timedelta(seconds = self.recurrence) self.last_run = None def __repr__(self): return "<" + self.name + ">" class CalFile(object): def __init__(self, fname): f = file(fname) if f.readline().strip() != "Frequencies": raise exceptions.SyntaxError("Format of cal file incorrect (frequencies header missing)") freqs = numpy.fromstring(f.readline().strip(), sep = ',') if f.readline().strip() != "Gain": raise exceptions.SyntaxError("Format of cal file incorrect (gains header missing)") gains = numpy.fromstring(f.readline().strip(), sep = ',') if len(gains) != len(freqs): raise exceptions.SyntaxError("Format of cal file incorrect (length of gain and freqs aren't equal)") self.calfreqs = freqs self.calgains = gains # Create interpolation function self._interp = scipy.interpolate.interp1d(self.calfreqs, self.calgains) def interp(self, freqs): if numpy.any(freqs < self.calfreqs[0]) or numpy.any(freqs > self.calfreqs[-1]): raise exceptions.SyntaxError("Frequency out of range of calibration %f - %f" % (calfreqs[0], calfreqs[-1])) return self._interp(freqs) def getexpt(sequence): '''Given a sequence return the experiment which should be run next and how long until it should start''' now = datetime.datetime.utcnow() #print "now is " + str(now) soonestdly = None soonestexp = None for e in sequence: #print "Looking at " + str(e) # If an experiment has ever run do it now if e.last_run == None: return e, datetime.timedelta(0) # Time until this experiment should be run nextrun = e.last_run + e.recurrence dly = nextrun - now #print "Last run was at %s, nextrun at %s, rec = %s, dly = %s / %f" % (str(e.last_run), str(nextrun), str(e.recurrence), str(dly), dly.total_seconds()) # Haven't looked at an experiment yet or this one is sooner if soonestdly == None or dly < soonestdly: #print "sooner" soonestdly = dly soonestexp = e if soonestdly < datetime.timedelta(0): #print "Capping " + e.name + " to run now" soonestdly = datetime.timedelta(0) #print "Returning " + str(soonestexp) return soonestexp, soonestdly def getsweep(inst, conf): print(" Sending configuration") for k in conf: #time.sleep(0.3) inst.setconf(k, conf[k]) # Otherwise the R&S doens't respond.. #time.sleep(0.3) rconf = inst.dumpconf() fstart = rconf['fstart'] fstop = rconf['fstop'] print(" Configuration is " + str(rconf)) print(" Fetching trace") yaxis = inst.gettrace() xaxis = numpy.arange(fstart, fstop, (fstop - fstart) / yaxis.shape[0]) return xaxis, yaxis, rconf def savesweep(fname, exp, x, y): print(" Saving to " + fname) f = open(fname, 'wb') for k in exp: f.write("%s %s\n" % (k.upper(), str(exp[k]))) f.write("XDATA ") numpy.savetxt(f, [x], delimiter = ', ', fmt = '%.3f') # Produces a trailing \n f.write("YDATA ") numpy.savetxt(f, [y], delimiter = ', ', fmt = '%.3f') del f def total_seconds(td): return (td.microseconds + (td.seconds + td.days * 24.0 * 3600.0) * 10.0**6) / 10.0**6 if __name__ == '__main__': # Read in config file(s) conf = configparser.SafeConfigParser(defaults) r = conf.read([os.path.join(a, confname) for a in confpaths]) if len(r) == 0: print("Unable to find any configuration file(s)") sys.exit(1) if not conf.has_section('general'): print("Configuration file doesn't have a 'general' section") sys.exit(1) if not conf.has_option('general', 'url'): print("Configuration file doesn't have a 'url' option in the 'general' section") sys.exit(1) if not conf.has_option('general', 'type'): print("Configuration file doesn't have a 'type' option in the 'general' section") sys.exit(1) if not conf.has_option('general', 'sequence'): print("Configuration file doesn't have a 'sequence' option in the 'general' section") sys.exit(1) if not conf.has_option('general', 'fname'): print("Configuration file doesn't have a 'fname' option in the 'general' section") sys.exit(1) if conf.has_option('general', 'ampcal'): ampcal = CalFile(conf.get('general', 'ampcal')) else: ampcal = None if conf.has_option('general', 'antcal'): antcal = CalFile(conf.get('general', 'antcal')) else: antcal = None sequence = [] seqnames = conf.get('general', 'sequence').split() for e in seqnames: sequence.append(Experiment(conf, e)) url = conf.get('general', 'url') insttype = conf.get('general', 'type') fnamefmt = conf.get('general', 'fname') # Connect to the instrument print("Connecting to " + url) con = scpi.instURL(url) con.write("*IDN?") idn = con.read() print("Instrument is a " + idn) # Get class for this instrument & instantiate it inst = specan.getInst(insttype)(con) while True: # Find the next experiment to run exp, dly = getexpt(sequence) # Sleep if necessary dly = total_seconds(dly) if dly > 1: print("Sleeping for %.1f seconds" % (dly)) time.sleep(dly) # Run it print("--> Running experiment " + str(exp)) freqs, power, opts = getsweep(inst, exp.opts) # Adjust power based on amplifier and antenna calibration if ampcal != None: adj = ampcal.interp(freqs) power = power - adj if antcal != None: adj = antcal.interp(freqs) power = power - adj # Update last run time exp.last_run = datetime.datetime.utcnow() # Add some informative params tsepoch = calendar.timegm(exp.last_run.utctimetuple()) extras = { 'timestamp' : exp.last_run, 'tag' : exp.name, } opts = dict(list(opts.items()) + list(extras.items())) fmtextras = { 'timestamp_hex' : '%08x' % (tsepoch), 'timestamp_dec' : '%d' % (tsepoch), 'fstart_mhz' : '%.1f' % (float(opts['fstart']) / 1e6), 'fstop_mhz' : '%.1f' % (float(opts['fstop']) / 1e6), } fmtopts = dict(list(opts.items()) + list(fmtextras.items())) fname = fnamefmt % fmtopts # Save data savesweep(fname, opts, freqs, power)