Mercurial > ~darius > hgwebdir.cgi > sirf
view sirf.py @ 0:6503256a3fc4
Prototype code for parsing SiRF messages.
Seems to work, but needs fleshing out for actual use.
author | darius@Inchoate |
---|---|
date | Sun, 22 Feb 2009 21:26:49 +1030 |
parents | |
children | d6d9fba5464d |
line wrap: on
line source
#!/usr/bin/env python import time, struct # SiRF at 9600 baud nmea2sirf = '$PSRF100,0,9600,8,1,0*0C\r\n' # NMEA at 4800 baud # Use like so # s.write(sirf.Parser.OrdLsttoStr(sirf.Parser.Encap(sirf.sirf2nmea))) sirf2nmea = [ 0x81, 0x02, 0x01, 0x01, 0x00, 0x00, 0x01, 0x01, 0x05, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x12, 0xc0 ] #s.write(sirf.Parser.OrdLsttoStr(sirf.Parser.Encap(sirf.getver))) getver = [ 0x84, 0x00 ] # Messages from engine sirfoutpktdsc = { 0x01 : "Reference Navigation Data", 0x02 : "Measured Navigation Data", 0x03 : "True Tracker Data", 0x04 : "Measured Tracking Data", 0x05 : "Raw Track Data", 0x06 : "SW Version", 0x07 : "Clock Status", 0x08 : "50 BPS Subframe Data", 0x09 : "Throughput", 0x0a : "Error ID", 0x0b : "Command Acknowledgment", 0x0c : "Command NAcknowledgment", 0x0d : "Visible List", 0x0e : "Almanac Data", 0x0f : "Ephemeris Data", 0x10 : "Test Mode 1", 0x11 : "Differential Corrections", 0x12 : "OkToSend", 0x13 : "Navigation Parameters", 0x14 : "Test Mode 2/3/4", 0x1b : "DGPS Status", 0x1c : "Nav. Lib. Measurement Data", 0x1d : "Nav. Lib. DGPS Data", 0x1e : "Nav. Lib. SV State Data", 0x1f : "Nav. Lib. Initialization Data", 0x29 : "Geodetic Navigation Data", 0x2d : "Raw DR Data", 0x2e : "Test Mode 3", 0x30 : "SiRFDRive-specific Class of Output Messages", 0x31 : "Test Mode 4 for SiRFLoc v2.x only", 0x32 : "SBAS Parameters", 0x34 : "1 PPS Time Message", 0x37 : "Test Mode 4", 0x38 : "Extended Ephemeris Data", 0xe1 : "SiRF internal message", 0xff : "Development Data" } # Messages to engine sirfinpktdsc = { 0x35 : "Advanced Power Management", 0x80 : "Initialize Data Source", 0x81 : "Switch to NMEA Protocol", 0x82 : "Set Almanac (upload)", 0x83 : "Handle Formatted Dump Data", 0x84 : "Poll Software Version", 0x85 : "DGPS Source Control", 0x86 : "Set Main Serial Port", 0x87 : "Switch Protocol", 0x88 : "Mode Control", 0x89 : "DOP Mask Control", 0x8a : "DGPS Mode", 0x8b : "Elevation Mask", 0x8c : "Power Mask", 0x8d : "Editing Residual", 0x8e : "Steady-State Detection", 0x8f : "Static Navigation", 0x90 : "Poll Clock Status", 0x91 : "Set DGPS Serial Port", 0x92 : "Poll Almanac", 0x93 : "Poll Ephemeris", 0x94 : "Flash Update", 0x95 : "Set Ephemeris (upload)", 0x96 : "Switch Operating Mode", 0x97 : "Set TricklePower Parameters", 0x98 : "Poll Navigation Parameters", 0xa5 : "Set UART Configuration", 0xa6 : "Set Message Rate", 0xa7 : "Set Low Power Acquisition Parameters", 0xa8 : "Poll Command Parameters", 0xaa : "Set SBAS Parameters", 0xac : "SiRFDRive-specific", 0xb4 : "Marketing Software Configuration", 0xb6 : "Set UART Configuration", 0xe4 : "SiRF internal message", 0xe8 : "Extended Ephemeris Proprietary" } def fmtlatlong(x): deg = int(x) min = abs((x - deg) * 60) sec = abs((min - int(min)) * 60) return "%d d %d m %.3f s" % (deg, int(min), sec) class Parser(object): def __init__(self, s = None): self.buffer = [] self.state = 'init1' self.fr_err = 0 # Framing error self.ck_err = 0 # Checksum error self.rx_cnt = 0 # Packet count self.pktq = [] self.s = s def processstr(self, data): return self.process(map(ord, data)) def process(self, data): pktcount = 0 for d in data: #print "Looking at 0x%02x, state = %s" % (d, self.state) if (self.state == 'init1'): self.buffer = [] if (d != 0xa0): print "Start1 framing error, got 0x%02x, expected 0xa0" % d self.fr_err += 1 continue self.state = 'init2' elif (self.state == 'init2'): if (d != 0xa2): print "Start2 framing error, got 0x%02x, expected 0xa2" % d self.fr_err += 1 self.state = 'init1' continue self.state = 'sizemsb' elif (self.state == 'sizemsb'): #print "Size1 - 0x%02x" % (d) if d > 0x7f: print "size msb too high (0x%02x)" % (d) self.fr_err += 1 self.state = 'init1' continue self.sizemsb = d self.state = 'sizelsb' elif (self.state == 'sizelsb'): #print "Size2 - 0x%02x" % (d) self.dataleft = self.sizemsb << 8 | d if self.dataleft < 1: print "size is too small (0x%04x)" % (self.dataleft) self.state = 'init1' continue if self.dataleft > 1024: print "size too large (0x%04x)" % (self.dataleft) self.fr_err += 1 self.state = 'init1' continue #print "Pkt size - 0x%04x" % (self.dataleft) self.state = 'data' elif (self.state == 'data'): self.buffer.append(d) self.dataleft = self.dataleft - 1 if self.dataleft == 0: self.state = 'cksum1' elif (self.state == 'cksum1'): self.cksummsb = d self.state = 'cksum2' elif (self.state == 'cksum2'): self.rxcksum = self.cksummsb << 8 | d self.state = 'end1' elif (self.state == 'end1'): if (d != 0xb0): print "End1 framing error, got 0x%02x, expected 0xb0" % d self.state = 'init1' self.fr_err += 1 continue self.state = 'end2' elif (self.state == 'end2'): if (d != 0xb3): print "End2 framing error, got 0x%02x, expected 0xb3" % d self.fr_err += 1 else: pktsum = reduce(lambda x, y: x + y, self.buffer) & 0x7fff if (pktsum != self.rxcksum): print "Checksum error: got 0x%04x, expected 0x%04x" % \ (self.rxcksum, pktsum) print "buffer is %s" % (str(self.buffer)) self.state = 'init1' self.ck_err += 1 else: p = Parser.Build(self.buffer) #self.pktq.append(p) pktcount += 1 self.rx_cnt += 1 self.state = 'init1' else: print "Invalid state %s! Resetting" % (self.state) self.state = 'init1' return pktcount def dumpmsgs(self, s): s.setTimeout(0.1) while True: self.processstr(s.read(100)) @classmethod def Encap(self, data): cksum = reduce(lambda x, y: x + y, data) dlen = len(data) out = [ 0xa0, 0xa2 ] out.append((dlen & 0xff00) >> 8) out.append(dlen & 0xff) out.extend(data) out.append((cksum & 0xff00) >> 8) out.append(cksum & 0xff) out.extend([0xb0, 0xb3]) return out @classmethod def OrdLsttoStr(self, data): return reduce(lambda x, y: x + y, map(chr, data)) @classmethod def Build(self, data): t = time.time() t1 = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(t)) t2 = (t - int(t)) * 1e3 tfmt = "%s.%03d" % (t1, t2) if data[0] in sirfoutpktdsc: print "%s: Got out packet 0x%02x : %s" % (tfmt, data[0], sirfoutpktdsc[data[0]]) elif data[0] in sirfinpktdsc: print "%s: Got in packet 0x%02x : %s" % (tfmt, data[0], sirfinpktdsc[data[0]]) else: print "%s: Unknown packet type 0x%02x" % (tfmt, data[0]) print "Payload - " + str(data[1:]) if data[0] == 0x02: fmt = '>iiihhhBBBhIB' datastr = reduce(lambda x, y: x + y, map(chr, data[1:struct.calcsize(fmt) + 1])) (xpos, ypos, zpos, xvel, yvel, zvel, mode1, hdop, mode2, gpsweek, gpstow, nsats) = \ struct.unpack(fmt, datastr) satprn = [] for i in xrange(nsats): satprn.append(data[struct.calcsize(fmt) + i]) xvel = float(xvel) / 8 yvel = float(yvel) / 8 zvel = float(zvel) / 8 print " Position: X: %d, Y: %d, Z: %d" % (xpos, ypos, zpos) print " Velocity: X: %.2f, Y: %.2f, Z: %.2f" % (xvel, yvel, zvel) elif data[0] == 0x06: nulidx = data.index(0) print " SW Ver : %s" % (reduce(lambda x, y: x + y, map(chr, data[1:nulidx]))) elif data[0] == 0x0a: errid = data[1] << 8 | data[2] dlen = (data[3] << 8 | data[4]) * 4 print " Error ID : 0x%04x" % (errid) if dlen > 0: print " Length : 0x%04x" % (dlen) print " Payload : %s" % (data[5:]) elif data[0] == 0x0b: print " Cmd Ack : 0x%02x" % (data[1]) elif data[0] == 0x0c: print " Cmd NAck : 0x%02x" % (data[1]) elif data[0] == 0x29: fixtype = { 0 : "none", 1 : "1-SV KF", 2 : "2-SV KF", 3 : "3-SV KF", 4 : "4+-SV KF", 5 : "2D", 6 : "3D", 7 : "DR" } fmt = '>HHHIHBBBBHIiiiiBHHHHHIIIHIIIIIHHBBB' datastr = reduce(lambda x, y: x + y, map(chr, data[1:struct.calcsize(fmt) + 1])) (navval, navtype, ewn, tow, year, month, day, hour, minute, second, satlst, latitude, longitude, alt_elip, alt_msl, datum, sog, cog, magvar, climbrate, headrate, estHPE, estVPE, estTE, estHVE, clockbias, CBerr, clockdrift, CDerr, distance, distanceErr, headErr, numsvs, hdop, addmodeinfo) = \ struct.unpack(fmt, datastr) tow = float(tow) / 1e3 latitude = float(latitude) / 1e7 longitude = float(longitude) / 1e7 alt_elip = float(alt_elip) / 1e2 alt_msl = float(alt_msl) / 1e2 sog = float(sog) / 1e2 cog = float(cog) / 1e2 climbrate = float(climbrate) / 1e2 headrate = float(headrate) / 1e2 estHPE = float(estHPE) / 1e2 estVPE = float(estVPE) / 1e2 estTE = float(estTE) / 1e2 estHVE = float(estHVE) / 1e2 clockbias = float(clockbias) / 1e2 CBerr = float(CBerr) / 1e2 clockdrift = float(clockdrift) / 1e2 CDerr = float(CDerr) / 1e2 headErr = float(headErr) / 1e2 hdop = float(hdop) / 5 print " Fix : %s, Sats : %d, Lat: %s, Long: %s, Alt: %.2fm" % \ (fixtype[navtype & 0x7], numsvs, fmtlatlong(latitude), fmtlatlong(longitude), \ alt_msl) elif data[0] == 0xa6: print " Message rate : MID 0x%02x, rate 0x%02x" % (data[2], data[3]) def enablemsgs(s): s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x02, 0x01, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x07, 0x01, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x09, 0x01, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x0d, 0x01, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x29, 0x01, 0x00, 0x00, 0x00, 0x00]))) #s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x34, 0x01, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x1b, 0x01, 0x00, 0x00, 0x00, 0x00]))) def disablemsgs(s): s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x0d, 0x00, 0x00, 0x00, 0x00, 0x00]))) #s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x29, 0x00, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x32, 0x00, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x34, 0x00, 0x00, 0x00, 0x00, 0x00]))) s.write(Parser.OrdLsttoStr(Parser.Encap([0xa6, 0x00, 0x1b, 0x00, 0x00, 0x00, 0x00, 0x00])))