changeset 19:cba1c44060f5

Copied from http://sourceforge.net/projects/pythonlabtools/ I modified them slightly to work as something is causing pack_uint to be called instead of pack_int...
author Daniel O'Connor <darius@dons.net.au>
date Thu, 11 Aug 2011 17:11:58 +0930 (2011-08-11)
parents 9bb8a9f3df6b
children a124aa7067e7
files rpc.py vxi_11.py
diffstat 2 files changed, 1637 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rpc.py	Thu Aug 11 17:11:58 2011 +0930
@@ -0,0 +1,965 @@
+"A an RPC module, with optimizations for VXI-11 rpc calls"
+_rcsid="$Id: rpc.py 323 2011-04-06 19:10:03Z marcus $"
+
+#this module is 99% from the python distribution, Demo/rpc/rpc.py, with a few modifications 
+#by Marcus Mendenhall to improve timeout handling, etc.  It probably should follow any licensing 
+#intent of whoever originated it.
+
+# Sun RPC version 2 -- RFC1057.
+
+# XXX There should be separate exceptions for the various reasons why
+# XXX an RPC can fail, rather than using RuntimeError for everything
+
+# XXX The UDP version of the protocol resends requests when it does
+# XXX not receive a timely reply -- use only for idempotent calls!
+
+# XXX There is no provision for call timeout on TCP connections
+# Now there is, on receives. 2003.02.20 Marcus Mendenhall, Vanderbilt University marcus.h.mendenhall@vanderbilt.edu
+
+import xdrlib as xdr
+import socket
+import os
+from exceptions import *
+
+RPCVERSION = 2
+
+CALL = 0
+REPLY = 1
+
+AUTH_NULL = 0
+AUTH_UNIX = 1
+AUTH_SHORT = 2
+AUTH_DES = 3
+
+MSG_ACCEPTED = 0
+MSG_DENIED = 1
+
+SUCCESS = 0				# RPC executed successfully
+PROG_UNAVAIL  = 1			# remote hasn't exported program
+PROG_MISMATCH = 2			# remote can't support version #
+PROC_UNAVAIL  = 3			# program can't support procedure
+GARBAGE_ARGS  = 4			# procedure can't decode params
+
+RPC_MISMATCH = 0			# RPC version number != 2
+AUTH_ERROR = 1				# remote can't authenticate caller
+
+AUTH_BADCRED      = 1			# bad credentials (seal broken)
+AUTH_REJECTEDCRED = 2			# client must begin new session
+AUTH_BADVERF      = 3			# bad verifier (seal broken)
+AUTH_REJECTEDVERF = 4			# verifier expired or replayed
+AUTH_TOOWEAK      = 5			# rejected for security reasons
+
+
+class Packer(xdr.Packer):
+
+	def pack_auth(self, auth):
+		flavor, stuff = auth
+		self.pack_enum(flavor)
+		self.pack_opaque(stuff)
+
+	def pack_auth_unix(self, stamp, machinename, uid, gid, gids):
+		self.pack_uint(stamp)
+		self.pack_string(machinename)
+		self.pack_uint(uid)
+		self.pack_uint(gid)
+		self.pack_uint(len(gids))
+		for i in gids:
+			self.pack_uint(i)
+
+	def pack_callheader(self, xid, prog, vers, proc, cred, verf):
+		self.pack_uint(xid)
+		self.pack_enum(CALL)
+		self.pack_uint(RPCVERSION)
+		self.pack_uint(prog)
+		self.pack_uint(vers)
+		self.pack_uint(proc)
+		self.pack_auth(cred)
+		self.pack_auth(verf)
+		# Caller must add procedure-specific part of call
+
+	def pack_replyheader(self, xid, verf):
+		self.pack_uint(xid)
+		self.pack_enum(REPLY)
+		self.pack_uint(MSG_ACCEPTED)
+		self.pack_auth(verf)
+		self.pack_enum(SUCCESS)
+		# Caller must add procedure-specific part of reply
+
+
+# Exceptions
+BadRPCFormat = 'rpc.BadRPCFormat'
+BadRPCVersion = 'rpc.BadRPCVersion'
+GarbageArgs = 'rpc.GarbageArgs'
+
+class Unpacker(xdr.Unpacker):
+
+	def unpack_auth(self):
+		flavor = self.unpack_enum()
+		stuff = self.unpack_opaque()
+		return (flavor, stuff)
+
+	def unpack_callheader(self):
+		xid = self.unpack_uint(xid)
+		temp = self.unpack_enum()
+		if temp <> CALL:
+			raise BadRPCFormat, 'no CALL but ' + `temp`
+		temp = self.unpack_uint()
+		if temp <> RPCVERSION:
+			raise BadRPCVerspion, 'bad RPC version ' + `temp`
+		prog = self.unpack_uint()
+		vers = self.unpack_uint()
+		proc = self.unpack_uint()
+		cred = self.unpack_auth()
+		verf = self.unpack_auth()
+		return xid, prog, vers, proc, cred, verf
+		# Caller must add procedure-specific part of call
+
+	def unpack_replyheader(self):
+		xid = self.unpack_uint()
+		mtype = self.unpack_enum()
+		if mtype <> REPLY:
+			raise RuntimeError, 'no REPLY but ' + `mtype`
+		stat = self.unpack_enum()
+		if stat == MSG_DENIED:
+			stat = self.unpack_enum()
+			if stat == RPC_MISMATCH:
+				low = self.unpack_uint()
+				high = self.unpack_uint()
+				raise RuntimeError, \
+				  'MSG_DENIED: RPC_MISMATCH: ' + `low, high`
+			if stat == AUTH_ERROR:
+				stat = self.unpack_uint()
+				raise RuntimeError, \
+					'MSG_DENIED: AUTH_ERROR: ' + `stat`
+			raise RuntimeError, 'MSG_DENIED: ' + `stat`
+		if stat <> MSG_ACCEPTED:
+			raise RuntimeError, \
+			  'Neither MSG_DENIED nor MSG_ACCEPTED: ' + `stat`
+		verf = self.unpack_auth()
+		stat = self.unpack_enum()
+		if stat == PROG_UNAVAIL:
+			raise RuntimeError, 'call failed: PROG_UNAVAIL'
+		if stat == PROG_MISMATCH:
+			low = self.unpack_uint()
+			high = self.unpack_uint()
+			raise RuntimeError, \
+				'call failed: PROG_MISMATCH: ' + `low, high`
+		if stat == PROC_UNAVAIL:
+			raise RuntimeError, 'call failed: PROC_UNAVAIL'
+		if stat == GARBAGE_ARGS:
+			raise RuntimeError, 'call failed: GARBAGE_ARGS'
+		if stat <> SUCCESS:
+			raise RuntimeError, 'call failed: ' + `stat`
+		return xid, verf
+		# Caller must get procedure-specific part of reply
+
+
+# Subroutines to create opaque authentication objects
+
+def make_auth_null():
+	return ''
+
+def make_auth_unix(seed, host, uid, gid, groups):
+	p = Packer()
+	p.pack_auth_unix(seed, host, uid, gid, groups)
+	return p.get_buf()
+
+def make_auth_unix_default():
+	try:
+		from os import getuid, getgid
+		uid = getuid()
+		gid = getgid()
+	except ImportError:
+		uid = gid = 0
+	import time
+	return make_auth_unix(int(time.time()-unix_epoch()), \
+		  socket.gethostname(), uid, gid, [])
+
+_unix_epoch = -1
+def unix_epoch():
+    """Very painful calculation of when the Unix Epoch is.
+
+    This is defined as the return value of time.time() on Jan 1st,
+    1970, 00:00:00 GMT.
+
+    On a Unix system, this should always return 0.0.  On a Mac, the
+    calculations are needed -- and hard because of integer overflow
+    and other limitations.
+
+    """
+    global _unix_epoch
+    if _unix_epoch >= 0: return _unix_epoch
+    import time
+    now = time.time()
+    localt = time.localtime(now)	# (y, m, d, hh, mm, ss, ..., ..., ...)
+    gmt = time.gmtime(now)
+    offset = time.mktime(localt) - time.mktime(gmt)
+    y, m, d, hh, mm, ss = 1970, 1, 1, 0, 0, 0
+    offset, ss = divmod(ss + offset, 60)
+    offset, mm = divmod(mm + offset, 60)
+    offset, hh = divmod(hh + offset, 24)
+    d = d + offset
+    _unix_epoch = time.mktime((y, m, d, hh, mm, ss, 0, 0, 0))
+    print "Unix epoch:", time.ctime(_unix_epoch)
+    return _unix_epoch
+
+
+# Common base class for clients
+
+class Client:
+
+	def __init__(self, host, prog, vers, port):
+		self.host = host
+		self.prog = prog
+		self.vers = vers
+		self.port = port
+		self.makesocket() # Assigns to self.sock
+		self.bindsocket()
+		self.connsocket()
+		self.lastxid = 0 # XXX should be more random?
+		self.addpackers()
+		self.cred = None
+		self.verf = None
+
+	def close(self):
+		self.sock.close()
+
+	def makesocket(self):
+		# This MUST be overridden
+		raise RuntimeError, 'makesocket not defined'
+
+	def connsocket(self):
+		# Override this if you don't want/need a connection
+		self.sock.connect((self.host, self.port))
+
+	def bindsocket(self):
+		# Override this to bind to a different port (e.g. reserved)
+		self.sock.bind(('', 0))
+
+	def addpackers(self):
+		# Override this to use derived classes from Packer/Unpacker
+		self.packer = Packer()
+		self.unpacker = Unpacker('')
+
+	def make_call(self, proc, args, pack_func, unpack_func):
+		#print "make_call() args = " + str(args)
+		# Don't normally override this (but see Broadcast)
+		if pack_func is None and args is not None:
+			raise TypeError, 'non-null args with null pack_func'
+		#print "packed args"
+		self.start_call(proc)
+		if pack_func:
+			pack_func(args)
+		self.do_call()
+		if unpack_func:
+			result = unpack_func()
+		else:
+			result = None
+		#print "result = " + str(result)
+		self.unpacker.done()
+		return result
+
+	def start_call(self, proc):
+		# Don't override this
+		self.lastxid = xid = self.lastxid + 1
+		cred = self.mkcred()
+		verf = self.mkverf()
+		p = self.packer
+		p.reset()
+		p.pack_callheader(xid, self.prog, self.vers, proc, cred, verf)
+
+	def do_call(self):
+		# This MUST be overridden
+		raise RuntimeError, 'do_call not defined'
+
+	def mkcred(self):
+		# Override this to use more powerful credentials
+		if self.cred == None:
+			self.cred = (AUTH_NULL, make_auth_null())
+		return self.cred
+
+	def mkverf(self):
+		# Override this to use a more powerful verifier
+		if self.verf == None:
+			self.verf = (AUTH_NULL, make_auth_null())
+		return self.verf
+
+	def call_0(self):		# Procedure 0 is always like this
+		return self.make_call(0, None, None, None)
+
+
+# Record-Marking standard support
+
+try:
+	from select import select as _select
+except:
+	_select=None
+
+def sendfrag_with_timeout(sock,  last, frag, timeout_seconds=None):
+	x = len(frag)
+	if last: x = x | 0x80000000L
+	header = (chr(int(x>>24 & 0xff)) + chr(int(x>>16 & 0xff)) + \
+		  chr(int(x>>8 & 0xff)) + chr(int(x & 0xff)))
+	block=header+frag
+	n=len(block)
+	nsent=0
+	while(nsent<n):
+		if _select and timeout_seconds:
+			rlist, wlist, xlist=_select([],[sock],[], timeout_seconds)
+			if not wlist:
+				raise EOFError, "Blocked write in sendfrag()"		
+		nsent+=sock.send(block[nsent:])
+	
+def recvfrag_with_timeout(sock, timeout_seconds=None):
+	#print "receiving from ", sock
+	if _select and timeout_seconds:
+		#print "Selecting with timeout...", timeout_seconds
+		rlist, wlist, xlist=_select([sock],[],[], timeout_seconds)
+		if not rlist:
+			raise EOFError, "No header at all in recvfrag()"
+
+	header = sock.recv(4)
+	if len(header) < 4:
+		raise EOFError
+	x = long(ord(header[0]))<<24 | ord(header[1])<<16 | \
+	    ord(header[2])<<8 | ord(header[3])
+	last = ((x & 0x80000000L) != 0)
+	n = int(x & 0x7fffffff)
+	
+	frag=''
+	
+	while(len(frag) < n):	
+		if _select and timeout_seconds:
+			#print "Selecting with timeout...", timeout_seconds
+			rlist, wlist, xlist=_select([sock],[],[], timeout_seconds)
+			if not rlist:
+				raise EOFError, "No data after header in recvfrag()"
+		frag += sock.recv(n-len(frag))
+				
+	return last, frag
+
+
+def recvrecord(sock, timeout_seconds=None):
+	record = ''
+	last = 0
+	while not last:
+		last, frag = recvfrag_with_timeout(sock, timeout_seconds)
+		record = record + frag
+	return record
+
+def sendrecord(sock, record, timeout_seconds=None):
+	sendfrag_with_timeout(sock, 1, record, timeout_seconds)
+
+
+
+# Try to bind to a reserved port (must be root)
+
+last_resv_port_tried = None
+def bindresvport(sock, host):
+	global last_resv_port_tried
+	FIRST, LAST = 600, 1024 # Range of ports to try
+	if last_resv_port_tried == None:
+		import os
+		last_resv_port_tried = FIRST + os.getpid() % (LAST-FIRST)
+	for i in range(last_resv_port_tried, LAST) + \
+		  range(FIRST, last_resv_port_tried):
+		last_resv_port_tried = i
+		try:
+			sock.bind((host, i))
+			return last_resv_port_tried
+		except socket.error, (errno, msg):
+			if errno <> 114:
+				raise socket.error, (errno, msg)
+	raise RuntimeError, 'can\'t assign reserved port'
+
+
+# Client using TCP to a specific port
+
+class RawTCPClient(Client):
+
+	select_timeout_seconds=None
+	
+	def makesocket(self):
+		self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+	def do_call(self):
+		call = self.packer.get_buf()
+		sendrecord(self.sock, call, self.select_timeout_seconds)
+		reply = recvrecord(self.sock, self.select_timeout_seconds)
+		u = self.unpacker
+		u.reset(reply)
+		xid, verf = u.unpack_replyheader()
+		if xid <> self.lastxid:
+			# Can't really happen since this is TCP...
+			raise RuntimeError, 'wrong xid in reply ' + `xid` + \
+				' instead of ' + `self.lastxid`
+
+
+# Client using UDP to a specific port
+
+class RawUDPClient(Client):
+
+	def makesocket(self):
+		self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+	def do_call(self):
+		call = self.packer.get_buf()
+		self.sock.send(call)
+		try:
+			from select import select
+		except ImportError:
+			print 'WARNING: select not found, RPC may hang'
+			select = None
+		BUFSIZE = 8192 # Max UDP buffer size
+		timeout = 1
+		count = 5
+		while 1:
+			r, w, x = [self.sock], [], []
+			if select:
+				r, w, x = select(r, w, x, timeout)
+			if self.sock not in r:
+				count = count - 1
+				if count < 0: raise RuntimeError, 'timeout'
+				if timeout < 25: timeout = timeout *2
+##				print 'RESEND', timeout, count
+				self.sock.send(call)
+				continue
+			reply = self.sock.recv(BUFSIZE)
+			u = self.unpacker
+			u.reset(reply)
+			xid, verf = u.unpack_replyheader()
+			if xid <> self.lastxid:
+##				print 'BAD xid'
+				continue
+			break
+
+
+# Client using UDP broadcast to a specific port
+
+class RawBroadcastUDPClient(RawUDPClient):
+
+	def __init__(self, bcastaddr, prog, vers, port):
+		RawUDPClient.__init__(self, bcastaddr, prog, vers, port)
+		self.reply_handler = None
+		self.timeout = 30
+
+	def connsocket(self):
+		# Don't connect -- use sendto
+		self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+
+	def set_reply_handler(self, reply_handler):
+		self.reply_handler = reply_handler
+
+	def set_timeout(self, timeout):
+		self.timeout = timeout # Use None for infinite timeout
+
+	def make_call(self, proc, args, pack_func, unpack_func):
+		if pack_func is None and args is not None:
+			raise TypeError, 'non-null args with null pack_func'
+		self.start_call(proc)
+		if pack_func:
+			pack_func(args)
+		call = self.packer.get_buf()
+		self.sock.sendto(call, (self.host, self.port))
+		try:
+			from select import select
+		except ImportError:
+			print 'WARNING: select not found, broadcast will hang'
+			select = None
+		BUFSIZE = 8192 # Max UDP buffer size (for reply)
+		replies = []
+		if unpack_func is None:
+			def dummy(): pass
+			unpack_func = dummy
+		while 1:
+			r, w, x = [self.sock], [], []
+			if select:
+				if self.timeout is None:
+					r, w, x = select(r, w, x)
+				else:
+					r, w, x = select(r, w, x, self.timeout)
+			if self.sock not in r:
+				break
+			reply, fromaddr = self.sock.recvfrom(BUFSIZE)
+			u = self.unpacker
+			u.reset(reply)
+			xid, verf = u.unpack_replyheader()
+			if xid <> self.lastxid:
+##				print 'BAD xid'
+				continue
+			reply = unpack_func()
+			self.unpacker.done()
+			replies.append((reply, fromaddr))
+			if self.reply_handler:
+				self.reply_handler(reply, fromaddr)
+		return replies
+
+
+# Port mapper interface
+
+# Program number, version and (fixed!) port number
+PMAP_PROG = 100000
+PMAP_VERS = 2
+PMAP_PORT = 111
+
+# Procedure numbers
+PMAPPROC_NULL = 0			# (void) -> void
+PMAPPROC_SET = 1			# (mapping) -> bool
+PMAPPROC_UNSET = 2			# (mapping) -> bool
+PMAPPROC_GETPORT = 3			# (mapping) -> unsigned int
+PMAPPROC_DUMP = 4			# (void) -> pmaplist
+PMAPPROC_CALLIT = 5			# (call_args) -> call_result
+
+# A mapping is (prog, vers, prot, port) and prot is one of:
+
+IPPROTO_TCP = 6
+IPPROTO_UDP = 17
+
+# A pmaplist is a variable-length list of mappings, as follows:
+# either (1, mapping, pmaplist) or (0).
+
+# A call_args is (prog, vers, proc, args) where args is opaque;
+# a call_result is (port, res) where res is opaque.
+
+
+class PortMapperPacker(Packer):
+
+	def pack_mapping(self, mapping):
+		prog, vers, prot, port = mapping
+		self.pack_uint(prog)
+		self.pack_uint(vers)
+		self.pack_uint(prot)
+		self.pack_uint(port)
+
+	def pack_pmaplist(self, list):
+		self.pack_list(list, self.pack_mapping)
+
+	def pack_call_args(self, ca):
+		prog, vers, proc, args = ca
+		self.pack_uint(prog)
+		self.pack_uint(vers)
+		self.pack_uint(proc)
+		self.pack_opaque(args)
+
+
+class PortMapperUnpacker(Unpacker):
+
+	def unpack_mapping(self):
+		prog = self.unpack_uint()
+		vers = self.unpack_uint()
+		prot = self.unpack_uint()
+		port = self.unpack_uint()
+		return prog, vers, prot, port
+
+	def unpack_pmaplist(self):
+		return self.unpack_list(self.unpack_mapping)
+
+	def unpack_call_result(self):
+		port = self.unpack_uint()
+		res = self.unpack_opaque()
+		return port, res
+
+
+class PartialPortMapperClient:
+
+	def addpackers(self):
+		self.packer = PortMapperPacker()
+		self.unpacker = PortMapperUnpacker('')
+
+	def Set(self, mapping):
+		return self.make_call(PMAPPROC_SET, mapping, \
+			self.packer.pack_mapping, \
+			self.unpacker.unpack_uint)
+
+	def Unset(self, mapping):
+		return self.make_call(PMAPPROC_UNSET, mapping, \
+			self.packer.pack_mapping, \
+			self.unpacker.unpack_uint)
+
+	def Getport(self, mapping):
+		return self.make_call(PMAPPROC_GETPORT, mapping, \
+			self.packer.pack_mapping, \
+			self.unpacker.unpack_uint)
+
+	def Dump(self):
+		return self.make_call(PMAPPROC_DUMP, None, \
+			None, \
+			self.unpacker.unpack_pmaplist)
+
+	def Callit(self, ca):
+		return self.make_call(PMAPPROC_CALLIT, ca, \
+			self.packer.pack_call_args, \
+			self.unpacker.unpack_call_result)
+
+
+class TCPPortMapperClient(PartialPortMapperClient, RawTCPClient):
+
+	def __init__(self, host, port=PMAP_PORT, timeout_seconds=None):
+		RawTCPClient.__init__(self, \
+			host, PMAP_PROG, PMAP_VERS, port)
+		self.select_timeout_seconds=timeout_seconds
+
+
+class UDPPortMapperClient(PartialPortMapperClient, RawUDPClient):
+
+	def __init__(self, host, port=PMAP_PORT):
+		RawUDPClient.__init__(self, \
+			host, PMAP_PROG, PMAP_VERS, port)
+
+class BroadcastUDPPortMapperClient(PartialPortMapperClient, \
+				   RawBroadcastUDPClient):
+
+	def __init__(self, bcastaddr):
+		RawBroadcastUDPClient.__init__(self, \
+			bcastaddr, PMAP_PROG, PMAP_VERS, PMAP_PORT)
+
+
+# Generic clients that find their server through the Port mapper
+
+class TCPClient(RawTCPClient):
+
+	def __init__(self, host, prog, vers, portmap_proxy_host=None, portmap_proxy_port=PMAP_PORT, timeout_seconds=None):
+		
+		self.select_timeout_seconds=timeout_seconds
+		if portmap_proxy_host is None:
+			portmap_proxy_host=host #use a proxy to get around firewalled portmappers
+		pmap = TCPPortMapperClient(portmap_proxy_host, portmap_proxy_port,timeout_seconds)
+		port = pmap.Getport((prog, vers, IPPROTO_TCP, 0))
+		pmap.close()
+		if port == 0:
+			raise RuntimeError, 'program not registered'
+		RawTCPClient.__init__(self, host, prog, vers, port)
+
+
+class UDPClient(RawUDPClient):
+
+	def __init__(self, host, prog, vers, portmap_proxy_host=None, portmap_proxy_port=PMAP_PORT):
+		if portmap_proxy_host is None:
+			portmap_proxy_host=host #use a proxy to get around firewalled portmappers
+		pmap = UDPPortMapperClient(portmap_proxy_host, portmap_proxy_port)
+		port = pmap.Getport((prog, vers, IPPROTO_UDP, 0))
+		pmap.close()
+		if port == 0:
+			raise RuntimeError, 'program not registered'
+		RawUDPClient.__init__(self, host, prog, vers, port)
+
+
+class BroadcastUDPClient(Client):
+
+	def __init__(self, bcastaddr, prog, vers):
+		self.pmap = BroadcastUDPPortMapperClient(bcastaddr)
+		self.pmap.set_reply_handler(self.my_reply_handler)
+		self.prog = prog
+		self.vers = vers
+		self.user_reply_handler = None
+		self.addpackers()
+
+	def close(self):
+		self.pmap.close()
+
+	def set_reply_handler(self, reply_handler):
+		self.user_reply_handler = reply_handler
+
+	def set_timeout(self, timeout):
+		self.pmap.set_timeout(timeout)
+
+	def my_reply_handler(self, reply, fromaddr):
+		port, res = reply
+		self.unpacker.reset(res)
+		result = self.unpack_func()
+		self.unpacker.done()
+		self.replies.append((result, fromaddr))
+		if self.user_reply_handler is not None:
+			self.user_reply_handler(result, fromaddr)
+
+	def make_call(self, proc, args, pack_func, unpack_func):
+		self.packer.reset()
+		if pack_func:
+			pack_func(args)
+		if unpack_func is None:
+			def dummy(): pass
+			self.unpack_func = dummy
+		else:
+			self.unpack_func = unpack_func
+		self.replies = []
+		packed_args = self.packer.get_buf()
+		dummy_replies = self.pmap.Callit( \
+			(self.prog, self.vers, proc, packed_args))
+		return self.replies
+
+
+# Server classes
+
+# These are not symmetric to the Client classes
+# XXX No attempt is made to provide authorization hooks yet
+
+class Server:
+
+	def __init__(self, host, prog, vers, port):
+		self.host = host # Should normally be '' for default interface
+		self.prog = prog
+		self.vers = vers
+		self.port = port # Should normally be 0 for random port
+		self.makesocket() # Assigns to self.sock and self.prot
+		self.bindsocket()
+		self.host, self.port = self.sock.getsockname()
+		self.addpackers()
+
+	def handle(self, call):
+		# Don't use unpack_header but parse the header piecewise
+		# XXX I have no idea if I am using the right error responses!
+		self.unpacker.reset(call)
+		self.packer.reset()
+		xid = self.unpacker.unpack_uint()
+		self.packer.pack_uint(xid)
+		temp = self.unpacker.unpack_enum()
+		if temp <> CALL:
+			return None # Not worthy of a reply
+		self.packer.pack_uint(REPLY)
+		temp = self.unpacker.unpack_uint()
+		if temp <> RPCVERSION:
+			self.packer.pack_uint(MSG_DENIED)
+			self.packer.pack_uint(RPC_MISMATCH)
+			self.packer.pack_uint(RPCVERSION)
+			self.packer.pack_uint(RPCVERSION)
+			return self.packer.get_buf()
+		self.packer.pack_uint(MSG_ACCEPTED)
+		self.packer.pack_auth((AUTH_NULL, make_auth_null()))
+		prog = self.unpacker.unpack_uint()
+		if prog <> self.prog:
+			self.packer.pack_uint(PROG_UNAVAIL)
+			return self.packer.get_buf()
+		vers = self.unpacker.unpack_uint()
+		if vers <> self.vers:
+			self.packer.pack_uint(PROG_MISMATCH)
+			self.packer.pack_uint(self.vers)
+			self.packer.pack_uint(self.vers)
+			return self.packer.get_buf()
+		proc = self.unpacker.unpack_uint()
+		methname = 'handle_' + `proc`
+		try:
+			meth = getattr(self, methname)
+		except AttributeError:
+			self.packer.pack_uint(PROC_UNAVAIL)
+			return self.packer.get_buf()
+		cred = self.unpacker.unpack_auth()
+		verf = self.unpacker.unpack_auth()
+		try:
+			meth() # Unpack args, call turn_around(), pack reply
+		except (EOFError, GarbageArgs):
+			# Too few or too many arguments
+			self.packer.reset()
+			self.packer.pack_uint(xid)
+			self.packer.pack_uint(REPLY)
+			self.packer.pack_uint(MSG_ACCEPTED)
+			self.packer.pack_auth((AUTH_NULL, make_auth_null()))
+			self.packer.pack_uint(GARBAGE_ARGS)
+		return self.packer.get_buf()
+
+	def turn_around(self):
+		try:
+			self.unpacker.done()
+		except RuntimeError:
+			raise GarbageArgs
+		self.packer.pack_uint(SUCCESS)
+
+	def handle_0(self): # Handle NULL message
+		self.turn_around()
+
+	def makesocket(self):
+		# This MUST be overridden
+		raise RuntimeError, 'makesocket not defined'
+
+	def bindsocket(self):
+		# Override this to bind to a different port (e.g. reserved)
+		self.sock.bind((self.host, self.port))
+
+	def addpackers(self):
+		# Override this to use derived classes from Packer/Unpacker
+		self.packer = Packer()
+		self.unpacker = Unpacker('')
+
+
+class TCPServer(Server):
+
+	def register(self):
+		mapping = self.prog, self.vers, self.prot, self.port
+		p = TCPPortMapperClient(self.host)
+		if not p.Set(mapping):
+			raise RuntimeError, 'register failed'
+
+	def unregister(self):
+		mapping = self.prog, self.vers, self.prot, self.port
+		p = TCPPortMapperClient(self.host)
+		if not p.Unset(mapping):
+			raise RuntimeError, 'unregister failed'
+
+	def makesocket(self):
+		self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		self.prot = IPPROTO_TCP
+
+	def loop(self):
+		self.sock.listen(0)
+		while 1:
+			self.session(self.sock.accept())
+
+	def session(self, connection):
+		sock, (host, port) = connection
+		while 1:
+			try:
+				call = recvrecord(sock)
+			except EOFError:
+				break
+			except socket.error, msg:
+				print 'socket error:', msg
+				break
+			reply = self.handle(call)
+			if reply is not None:
+				sendrecord(sock, reply)
+
+	def forkingloop(self):
+		# Like loop but uses forksession()
+		self.sock.listen(0)
+		while 1:
+			self.forksession(self.sock.accept())
+
+	def forksession(self, connection):
+		# Like session but forks off a subprocess
+		import os
+		# Wait for deceased children
+		try:
+			while 1:
+				pid, sts = os.waitpid(0, 1)
+		except os.error:
+			pass
+		pid = None
+		try:
+			pid = os.fork()
+			if pid: # Parent
+				connection[0].close()
+				return
+			# Child
+			self.session(connection)
+		finally:
+			# Make sure we don't fall through in the parent
+			if pid == 0:
+				os._exit(0)
+
+
+class UDPServer(Server):
+
+	def register(self):
+		mapping = self.prog, self.vers, self.prot, self.port
+		p = UDPPortMapperClient(self.host)
+		if not p.Set(mapping):
+			raise RuntimeError, 'register failed'
+
+	def unregister(self):
+		mapping = self.prog, self.vers, self.prot, self.port
+		p = UDPPortMapperClient(self.host)
+		if not p.Unset(mapping):
+			raise RuntimeError, 'unregister failed'
+
+	def makesocket(self):
+		self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+		self.prot = IPPROTO_UDP
+
+	def loop(self):
+		while 1:
+			self.session()
+
+	def session(self):
+		call, host_port = self.sock.recvfrom(8192)
+		reply = self.handle(call)
+		if reply <> None:
+			self.sock.sendto(reply, host_port)
+
+
+# Simple test program -- dump local portmapper status
+
+def test():
+	pmap = UDPPortMapperClient('')
+	list = pmap.Dump()
+	list.sort()
+	for prog, vers, prot, port in list:
+		print prog, vers,
+		if prot == IPPROTO_TCP: print 'tcp',
+		elif prot == IPPROTO_UDP: print 'udp',
+		else: print prot,
+		print port
+
+
+# Test program for broadcast operation -- dump everybody's portmapper status
+
+def testbcast():
+	import sys
+	if sys.argv[1:]:
+		bcastaddr = sys.argv[1]
+	else:
+		bcastaddr = '<broadcast>'
+	def rh(reply, fromaddr):
+		host, port = fromaddr
+		print host + '\t' + `reply`
+	pmap = BroadcastUDPPortMapperClient(bcastaddr)
+	pmap.set_reply_handler(rh)
+	pmap.set_timeout(5)
+	replies = pmap.Getport((100002, 1, IPPROTO_UDP, 0))
+
+
+# Test program for server, with corresponding client
+# On machine A: python -c 'import rpc; rpc.testsvr()'
+# On machine B: python -c 'import rpc; rpc.testclt()' A
+# (A may be == B)
+
+def testsvr():
+	# Simple test class -- proc 1 doubles its string argument as reply
+	class S(UDPServer):
+		def handle_1(self):
+			arg = self.unpacker.unpack_string()
+			self.turn_around()
+			print 'RPC function 1 called, arg', `arg`
+			self.packer.pack_string(arg + arg)
+	#
+	s = S('', 0x20000000, 1, 0)
+	try:
+		s.unregister()
+	except RuntimeError, msg:
+		print 'RuntimeError:', msg, '(ignored)'
+	s.register()
+	print 'Service started...'
+	try:
+		s.loop()
+	finally:
+		s.unregister()
+		print 'Service interrupted.'
+
+
+def testclt():
+	import sys
+	if sys.argv[1:]: host = sys.argv[1]
+	else: host = ''
+	# Client for above server
+	class C(UDPClient):
+		def call_1(self, arg):
+			return self.make_call(1, arg, \
+				self.packer.pack_string, \
+				self.unpacker.unpack_string)
+	c = C(host, 0x20000000, 1)
+	print 'making call...'
+	reply = c.call_1('hello, world, ')
+	print 'call returned', `reply`
+
+def testclt2():
+	import sys
+	host = '127.0.0.1'
+	# Client for above server
+	class C(UDPClient):
+		def call_1(self, arg):
+			return self.make_call(1, arg, \
+				self.packer.pack_string, \
+				self.unpacker.unpack_string)
+	c = C(host, 0x20000000, 1)
+	print 'making call...'
+	reply = c.call_1('hello, world, ')
+	print 'call returned', `reply`
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/vxi_11.py	Thu Aug 11 17:11:58 2011 +0930
@@ -0,0 +1,672 @@
+"The basic infrastructure for maintaining a vxi-11 protocol connection to a remote device"
+_rcsid="$Id: vxi_11.py 323 2011-04-06 19:10:03Z marcus $"
+
+import rpc
+from rpc import TCPClient, RawTCPClient
+import exceptions
+import struct
+import traceback
+import time
+import weakref
+import sys
+import select
+
+#try:
+#	import threading
+#	threads=1
+#except:
+#	threads=0
+threads=False
+
+connection_dict={}
+
+def close_all_connections():
+	"disconnect and close out all vxi_11 connections created here, even if their object references have been lost" 
+	for wobj in connection_dict.keys():
+		name, wconn=connection_dict[wobj]
+		conn=wconn() #dereference weak ref
+		if conn is not None:
+			try:
+				conn.disconnect()
+			except:
+				conn.log_exception("***vxi_11.close_all_connections exception: ")
+				
+		else:
+			del connection_dict[wobj] #how did this happen?
+
+class Junk_OneWayAbortClient(RawTCPClient):
+	"""OneWayAbortClient allows one to handle the strange, one-way abort rpc from an Agilent E5810.
+	Really, it doesn't even do a one-way transmission... it loses aborts, so this is history """
+	
+	def do_call(self):
+		call = self.packer.get_buf()
+		rpc.sendrecord(self.sock, call)
+		self.unpacker.reset('\0\0\0\0') #put a valid return value into the unpacker
+
+class VXI_11_Error(IOError):
+	vxi_11_errors={
+				0:"No error", 1:"Syntax error", 3:"Device not accessible",
+				4:"Invalid link identifier", 5:"Parameter error", 6:"Channel not established",
+				8:"Operation not supported", 9:"Out of resources", 11:"Device locked by another link",
+				12:"No lock held by this link", 15:"IO Timeout", 17:"IO Error",  21:"Invalid Address",
+				23:"Abort", 29:"Channel already established" ,
+				"eof": "Cut off packet received in rpc.recvfrag()",
+				"sync":"stream sync lost",
+				"notconnected": "Device not connected"}
+	
+	def identify_vxi_11_error(self, error):
+		if self.vxi_11_errors.has_key(error):
+			return `error`+": "+self.vxi_11_errors[error]
+		else:
+			return `error`+": Unknown error code"
+	
+
+	def __init__(self, code,  **other_info):
+		IOError.__init__(self, self.identify_vxi_11_error(code))
+		self.code=code
+		self.other_info=other_info
+	
+	def __repr__(self):
+		if self.other_info:
+			return str(self)+": "+str(self.other_info)
+		else:
+			return str(self)
+
+class VXI_11_Device_Not_Connected(VXI_11_Error):
+	def __init__(self):
+		VXI_11_Error.__init__(self,'notconnected')
+
+class VXI_11_Device_Not_Locked(VXI_11_Error):
+	pass
+		
+class VXI_11_Transient_Error(VXI_11_Error): #exceptions having to do with multiple use which might get better
+	pass
+	
+class VXI_11_Timeout(VXI_11_Transient_Error):
+	pass
+
+class VXI_11_Locked_Elsewhere(VXI_11_Transient_Error):
+	pass
+
+class VXI_11_Stream_Sync_Lost(VXI_11_Transient_Error):
+	def __init__(self, code, bytes):
+		VXI_11_Transient_Error.__init__(self, code)
+		self.other_info="bytes  vacuumed = %d" % bytes
+
+	
+class VXI_11_RPC_EOF(VXI_11_Transient_Error):
+	pass
+
+_VXI_11_enumerated_exceptions={ #common, correctable exceptions
+		15:VXI_11_Timeout,
+		11:VXI_11_Locked_Elsewhere,
+		12:VXI_11_Device_Not_Locked
+}
+
+class vxi_11_connection:
+	"""vxi_11_connection implements handling of devices compliant with vxi11.1-vxi11.3 protocols, with which
+	the user should have some familiarity"""
+	
+	debug_info=0
+	debug_error=1
+	debug_warning=2
+	debug_all=3
+	
+	debug_level=debug_info
+	
+	OneWayAbort=0 #by default, this class uses two-way aborts, per official vxi-11 standard
+	
+	def _list_packer(self, args):
+		l=map(None, self.pack_type_list, args) # combine lists
+		for packer, data in l:
+			#print "packing " + str(data) + " with " + str(packer)
+			packer(data)
+
+	def _list_unpacker(self):
+		res = []
+		for f in self.unpack_type_list:
+			a = f()
+			#print "Unpacked " + str(a) + " with " + str(f)
+			res.append(a)
+		return res
+		#return [func() for func in self.unpack_type_list]
+ 
+	def _link_xdr_defs(self, channel):
+		"self.link_xdr_defs() creates dictionaries of functions for packing and unpacking the various data types"
+		p=channel.packer
+		u=channel.unpacker
+
+		xdr_packer_defs={
+			"write":  (p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_opaque),
+			"read":  (p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_int),
+			"create_link": (p.pack_int, p.pack_bool, p.pack_uint, p.pack_string), 
+			"generic": (p.pack_int, p.pack_int, p.pack_int, p.pack_int),
+			"lock": (p.pack_int, p.pack_int, p.pack_int), 
+			"id": (p.pack_int,)
+		}
+		
+		xdr_unpacker_defs={
+			"write": (u.unpack_int, u.unpack_int),
+			"read": (u.unpack_int, u.unpack_int, u.unpack_opaque), 
+			# *something* is transforming pack_int to pack_uint which (obviously) doesn't like getting negative numbers
+			# Since LID is basically opaque unpack it as a uint to paper over the vileness
+			"create_link":  (u.unpack_uint, u.unpack_int, u.unpack_uint, u.unpack_uint), 
+			"read_stb":(u.unpack_int, u.unpack_int), 
+			"error": (u.unpack_int,)
+		}
+		
+		return xdr_packer_defs, xdr_unpacker_defs
+	
+	def _setup_core_packing(self, pack, unpack):
+		#print "setting up packing with " + pack
+		self.pack_type_list, self.unpack_type_list=self._core_packers[pack],self._core_unpackers[unpack]
+		#print "pack_type_list now " + str(self.pack_type_list)
+		#print "unpack_type_list now " + str(self.unpack_type_list)
+		
+	def post_init(self):
+		pass
+	 
+	def simple_log_error(self, message, level=debug_error, file=None):
+		if level <= self.debug_level:
+			if file is None:
+				file=sys.stderr
+			print >> file, self.device_name, message
+			
+	def fancy_log_error(self, message, level=debug_error, file=None):
+		if level <= self.debug_level:
+			message=str(message).strip()
+			level_str=("**INFO*****", "**ERROR****", "**WARNING**", "**DEBUG****")[level]
+			if file is None:
+				file=sys.stderr
+			print >> file, time.asctime().strip(), '\t', level_str, '\t', self.shortname, '\t', \
+				message.replace('\n','\n\t** ').replace('\r','\n\t** ')
+
+	def log_error(self, message, level=debug_error, file=None):
+		"override log_error() for sending messages to special places or formatting differently"
+		self.fancy_log_error(message, level, file)
+		
+	def log_traceback(self, main_message='', file=None):
+		exlist=traceback.format_exception(*sys.exc_info())
+		s=main_message+'\n'
+		for i in exlist:
+			s=s+i
+			
+		self.log_error(s, self.debug_error, file)
+	
+	def log_info(self, message, file=None):
+		self.log_error(message, self.debug_info, file)
+	
+	def log_warning(self, message, file=None):
+		self.log_error(message, self.debug_warning, file)
+
+	def log_debug(self, message, file=None):
+		self.log_error(message, self.debug_all, file)
+
+	def log_exception(self, main_message='', file=None):
+		self.log_error(main_message+traceback.format_exception_only(*(sys.exc_info()[:2]))[0], self.debug_error, file)
+
+	def __init__(self, host='127.0.0.1', device="inst0", timeout=1000, raise_on_err=None, device_name="Network Device", shortname=None,
+			portmap_proxy_host=None, portmap_proxy_port=rpc.PMAP_PORT, use_vxi_locking=True):
+		
+		self.raise_on_err=raise_on_err
+		self.lid=None
+		self.timeout=timeout
+		self.device_name=device_name
+		self.device_sicl_name=device
+		self.host=host
+		self.portmap_proxy_host=portmap_proxy_host
+		self.portmap_proxy_port=portmap_proxy_port
+		self.core=None
+		self.abortChannel=None
+		self.mux=None #default is no multiplexer active
+		self.use_vxi_locking=use_vxi_locking
+		
+		if shortname is None:
+			self.shortname=device_name.strip().replace(' ','').replace('\t','')     
+		else:
+			self.shortname=shortname.strip().replace(' ','').replace('\t','')   
+					
+		if threads:
+			self.threadlock=threading.RLock()
+	
+		try:                            
+			self.reconnect()    
+			
+		except VXI_11_Transient_Error:
+			self.log_exception("Initial connect failed... retry later")
+	
+	def setup_mux(self, mux=None, global_name=None):
+			self.mux=mux
+			self.global_mux_name=global_name
+		
+	def command(self, id, pack, unpack, arglist, ignore_connect=0):
+		#print "command() id = " + str(id) + ", pack = " + pack
+		if not (ignore_connect or self.connected):
+			raise VXI_11_Device_Not_Connected
+
+		#command has been made atomic, so that things like get_status_byte can be done 
+		#in a multi-threaded environment without needed a full vxi-11 lock to make it safe
+		if threads:
+			self.threadlock.acquire() #make this atomic
+		
+		self._setup_core_packing(pack, unpack)
+
+		try:
+			try:
+				result= self.core.make_call(id, arglist, self._list_packer, self._list_unpacker)
+			except (RuntimeError, EOFError):
+				#RuntimeError is thrown by recvfrag if the xid is off... it means we lost data in the pipe
+				#EOFError is thrown if the packet isn't full length, as usually happens when ther is garbage in the pipe read as a length
+				#so vacuum out the socket, and raise a transient error
+				rlist=1
+				ntotal=0
+				while(rlist):
+					rlist, wlist, xlist=select.select([self.core.sock],[],[], 1.0)
+					if rlist:
+						ntotal+=len(self.core.sock.recv(10000) )#get some data from it
+				raise VXI_11_Stream_Sync_Lost("sync", ntotal)
+		finally:
+			if threads:
+				self.threadlock.release() #let go
+
+		err=result[0]
+		
+		if err and self.raise_on_err:
+			e=_VXI_11_enumerated_exceptions #common, correctable exceptions
+			if e.has_key(err):
+				raise e[err](err) #raise these exceptions explicitly
+			else:
+				raise VXI_11_Error(err) #raise generic VXI_11 exception
+				
+		return result
+				
+	def do_timeouts(self, timeout, lock_timeout, channel=None):
+		
+		if channel is None:
+			channel=self.core
+			
+		flags=0
+		if  timeout is  None:
+			timeout=self.timeout
+		
+		if not lock_timeout and hasattr(self,"default_lock_timeout"):
+			lock_timeout=self.default_lock_timeout
+	
+		if  lock_timeout:
+			flags |=  1 # append waitlock bit
+		
+		if channel:
+			channel.select_timeout_seconds=0.5+1.5*max(timeout, lock_timeout)/1000.0 #convert ms to sec, and be generous on hard timeout
+		
+		return flags, timeout, lock_timeout
+
+	def reconnect(self): #recreate a broken connection
+		"""reconnect() creates or recreates our main connection.  Useful in __init__ and in complete communications breakdowns.
+		If it throws a VXI_11_Transient_Error, the connection exists, but the check_idn() handshake or post_init() failed."""
+		
+		self.connected=0
+				
+		if self.core:
+			self.core.close() #if this is a reconnect, break old connection the hard way
+		if self.abortChannel:
+			self.abortChannel.close()
+			
+		self.core=rpc.TCPClient(self.host, 395183, 1, 
+				portmap_proxy_host=self.portmap_proxy_host, 
+				portmap_proxy_port=self.portmap_proxy_port)
+				
+		self._core_packers, self._core_unpackers=self._link_xdr_defs(self.core) #construct xdr data type definitions for the core
+		#print "_core_packers now " + str(self._core_packers)
+		
+		err, self.lid, self.abortPort, self.maxRecvSize=self.command(
+			10, "create_link","create_link", (0, 0, self.timeout, self.device_sicl_name), ignore_connect=1) #execute create_link
+		if err: #at this stage, we always raise exceptions since there isn't any way to bail out or retry reasonably
+			raise VXI_11_Error(err)
+		
+		self.maxRecvSize=min(self.maxRecvSize, 1048576) #never transfer more than 1MB at a shot
+					
+		if self.OneWayAbort:
+			#self.abort_channel=OneWayAbortClient(self.host, 395184, 1, self.abortPort)
+			self.abort_channel=rpc.RawUDPClient(self.host, 395184, 1, self.abortPort)
+		else:
+			self.abort_channel=RawTCPClient(self.host, 395184, 1, self.abortPort)
+			
+		connection_dict[self.lid]=(self.device_name, weakref.ref(self))
+
+		self.locklevel=0
+
+		self.connected=1
+
+		self.check_idn()
+		self.post_init()            
+
+
+	def abort(self):
+		
+		self.abort_channel.select_timeout_seconds=self.timeout/1000.0 #convert to seconds
+		try:
+			err=self.abort_channel.make_call(1, self.lid, self.abort_channel.packer.pack_int, self.abort_channel.unpacker.unpack_int) #abort
+		except EOFError:
+			raise VXI_11_RPC_EOF("eof")
+			
+		if err and self.raise_on_err:
+			raise VXI_11_Error( err)
+		return err
+
+	def disconnect(self):
+		if self.connected:
+			try:
+				err, =self.command(23,  "id", "error", (self.lid,)) #execute destroy_link
+			except:
+				self.log_traceback() #if we can't close nicely, we'll close anyway
+			
+			self.connected=0
+			del connection_dict[self.lid]
+			self.lid=None
+			self.core.close()
+			self.abort_channel.close()
+			del self.core, self.abort_channel
+			self.core=None
+			self.abortChannel=None
+		
+	def __del__(self):
+		if self.lid is not None:
+			self.raise_on_err=0 #no exceptions here from simple errors
+			try:
+				self.abort()
+			except VXI_11_Error:
+				pass
+			try:
+				self.disconnect()
+			except VXI_11_Error:
+				pass            
+
+
+				
+	def write(self, data, timeout=None, lock_timeout=0):
+		"""err, bytes_sent=write(data [, timeout] [,lock_timeout]) sends data to device.  See do_timeouts() for 
+		semantics of timeout and lock_timeout"""
+		
+		flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
+		base=0
+		end=len(data)
+		while base<end:
+			n=end-base
+			if n>self.maxRecvSize:
+				xfer=self.maxRecvSize
+			else:
+				xfer=n
+				flags |= 8 #write end on last byte          
+				
+			err, count=self.command(11, "write", "write",  (self.lid, timeout, lock_timeout, flags, data[base:base+xfer]))
+			if  err: break  
+			base+=count
+		return err, base
+		
+	def read(self, timeout=None, lock_timeout=0, count=None, termChar=None):
+		"""err, reason, result=read([timeout] [,lock_timeout] [,count] [,termChar]) reads up to count bytes from the device,
+		ending on count, EOI or termChar (if specified).  See do_timeouts() for semantics of the timeouts. \n
+		the returned reason is an inclusive OR of 3 bits (per the VXI-11 specs section B.6.4.device_read):
+			Bit 2 = END/EOI received,
+			bit 1 = Terminating Character received,
+			bit 0 = full requested byte count received. 
+		"""
+		flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
+
+		if termChar is not None:
+			flags |= 128 # append termchrset bit
+			act_term=termChar
+		else:
+			act_term=0
+		
+		accumdata=""
+		reason=0
+		err=0
+		accumlen=0
+		
+		while  ( (not err) and (not (reason & 6) ) and 
+			( (count is None) or (accumlen < count))  ):  #wait for END or CHR reason flag or count
+                        
+			readcount=self.maxRecvSize
+			if count is not None:
+				readcount=min(readcount, count-accumlen)
+			err, reason, data = self.command(12, "read","read", (self.lid,  readcount, timeout, lock_timeout, flags, act_term))
+			accumdata+=data     
+			accumlen+=len(data)
+			#print err, reason, len(data), len(accumdata)
+		
+		return err, reason, accumdata
+	
+	def generic(self, code, timeout, lock_timeout):
+		flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
+
+		err, = self.command(code, "generic", "error", (self.lid, flags, timeout, lock_timeout))
+
+		return err
+
+	def trigger(self, timeout=None, lock_timeout=0):
+		return self.generic(14, timeout, lock_timeout)
+
+	def clear(self, timeout=None, lock_timeout=0):
+		return self.generic(15, timeout, lock_timeout)
+		
+	def remote(self, timeout=None, lock_timeout=0):
+		return self.generic(16, timeout, lock_timeout)
+	
+	def local(self, timeout=None, lock_timeout=0):
+		return self.generic(17, timeout, lock_timeout)
+	
+	def read_status_byte(self, timeout=None, lock_timeout=0):
+		flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
+
+		err, status = self.command(13, "generic","read_stb", (self.lid, flags, timeout, lock_timeout))
+
+		return err, status 
+	
+	def lock(self,  lock_timeout=0):
+		"""lock() acquires a lock on a device and the threadlock.  If it fails it leaves the connection cleanly unlocked.
+		If self.use_vxi_locking is false, it acquires only a thread lock locally, and does not really lock the vxi-11 device.
+		This is useful if only one process is talking to a given device, and saves time."""
+		err=0
+		if threads:
+			self.threadlock.acquire()
+		
+		if self.use_vxi_locking and self.locklevel==0:
+			flags, timeout, lock_timeout=self.do_timeouts(0, lock_timeout)
+			try:
+				if self.mux: self.mux.lock_connection(self.global_mux_name)
+				try:
+					err, = self.command(18, "lock","error", (self.lid, flags, lock_timeout))
+				except:
+					if self.mux: self.mux.unlock_connection(self.global_mux_name)
+					raise                   
+			except:
+				if threads:
+					self.threadlock.release()
+				raise
+		
+		if err:
+			if threads:
+				self.threadlock.release()
+		else:
+			self.locklevel+=1
+		return err
+	
+	def is_locked(self):
+		return self.locklevel > 0
+		
+	def unlock(self, priority=0):
+		"""unlock(priority=0) unwinds one level of locking, and if the level is zero, really unlocks the device.
+		Calls to lock() and unlock() should be matched.  If there is a danger that they are not, due to bad
+		exception handling, unlock_completely() should be used as a final cleanup for a series of operations.
+		Setting priority to non-zero will bias the apparent last-used time in a multiplexer (if one is used),
+		so setting priority to -10 will effectively mark this channel least-recently-used, while setting it to 
+		+2 will post-date the last-used time 2 seconds, so for the next 2 seconds, the device will be hard to kick
+		out of the channel cache (but not impossible).
+		"""
+		
+		self.locklevel-=1
+		assert self.locklevel>=0, "Too many unlocks on device: "+self.device_name
+			
+		err=0
+		try:
+			if self.use_vxi_locking and self.locklevel==0:
+				try:
+					err, = self.command(19, "id", "error", (self.lid,  ))   
+				finally:
+					if self.mux: 
+						self.mux.unlock_connection(self.global_mux_name, priority) #this cannot fail, no try needed (??)
+			elif priority and self.mux:
+				#even on a non-final unlock, a request for changed priority is always remembered
+				self.mux.adjust_priority(self.global_mux_name, priority)
+		finally:            
+			if threads:
+				self.threadlock.release()
+
+		return err
+
+	def unlock_completely(self, priority=0):
+		"unlock_completely() forces an unwind of any locks all the way back to zero for error cleanup.  Only exceptions thrown are fatal."
+		if threads:
+			self.threadlock.acquire() #make sure we have the threadlock before we try a (possibly failing) full lock
+		try:
+			self.lock() #just to be safe, we should already hold one level of lock!
+		except VXI_11_Locked_Elsewhere: 
+			pass #this is often called on error cleanup when we don't already have a lock, and we don't really care if we can't get it
+		except VXI_11_Error:
+			self.log_exception("Unexpected trouble locking in unlock_completely(): ")
+	
+		if threads:
+			self.threadlock._RLock__count += (1-self.threadlock._RLock__count)
+			#unwind to single lock the fast way, and make sure this variable    really existed, to shield against internal changes
+		self.locklevel=1 #unwind our own counter, too           
+		try:
+			self.unlock(priority)
+		except VXI_11_Device_Not_Locked:
+			pass #if we couldn't lock above, we will probably get another exception here, and don't care
+		except VXI_11_Transient_Error:
+			self.log_exception("Unexpected trouble unlocking in unlock_completely(): ")
+		except VXI_11_Error:
+			self.log_exception("Unexpected trouble unlocking in unlock_completely(): ")
+			raise
+	
+	def transaction(self, data, count=None, lock_timeout=0):
+		"""err, reason, result=transaction(data, [, count] [,lock_timeout]) sends data and waits for a response. 
+		It is guaranteed to leave the lock level at its original value on exit,
+		unless KeyboardInterrupt breaks the normal flow.  If count isn't provided, there is no limit to how much data will be accepted.
+		See do_timeouts() for semantics on lock_timeout."""
+		
+		self.lock(lock_timeout)
+		reason=None
+		result=None
+		try:
+			err,  write_count = self.write(data)
+			
+			if not err:
+				err, reason, result = self.read(count=count)
+		finally:        
+			self.unlock()
+
+		return err, reason, result
+
+	def check_idn(self):
+		'check_idn() executes "*idn?" and aborts if the result does not start with self.idn_head'
+		if hasattr(self,"idn"):
+			return #already done
+		if hasattr(self,"idn_head") and self.idn_head is not None:
+
+			self.lock()
+			try:
+				self.clear()
+				err, reason, idn = self.transaction("*idn?")
+			finally:
+				self.unlock()
+
+			check=idn.find(self.idn_head)
+			self.idn=idn.strip() #save for future reference info    
+			if  check:
+				self.disconnect()               
+				assert check==0, "Wrong device type! expecting: "+self.idn_head+"... got: "+self.idn
+		else:
+			self.idn="Device *idn? not checked!"
+
+import copy
+
+class device_thread:
+	
+	if threads:
+		Thread=threading.Thread #by default, use canonical threads
+	
+	def __init__(self,  connection, main_sleep=1.0, name="Device"):
+		self.running=0
+		self.main_sleep=main_sleep
+		self.__thread=None
+		self.__name=copy.copy(name) #make a new copy to avoid a possible circular reference
+		self.__wait_event=threading.Event()
+		self.set_connection(connection)
+
+	def set_connection(self, connection):
+		#keep only a weak reference, so the thread cannot prevent the device from being deleted
+		#such deletion creates an error when the thread tries to run, but that's OK
+		#this allows the device_thread to be used as a clean mix-in class to a vxi_11 connection
+		self.__weak_connection=weakref.ref(connection)
+	
+	def connection(self):
+		return self.__weak_connection() #dereference weak reference
+					
+	def handle_lock_error(self):
+		"handle_lock_error can be overridden to count failures and do something if there are too many"
+		self.connection().log_exception(self.name+": Error while locking device")
+
+	def onepass(self):
+		connection=self.connection()
+
+		try:
+			connection.lock()
+		except VXI_11_Transient_Error:
+			self.handle_lock_error()
+			return
+		
+		try:
+			self.get_data()
+		except:
+			connection.log_traceback('Uncaught exception in get_data()')
+			try:
+				connection.clear()
+			except:
+				connection.log_exception('failed to clear connection after error')              
+			self.run=0
+
+		connection.unlock()
+					
+	def monitor(self):
+		self.connection().log_info("Monitor loop entered")
+		while(self.run):
+			try:
+				self.onepass()
+				self.__wait_event.wait(self.main_sleep) #wait until timeout or we are cancelled
+			except KeyboardInterrupt:
+				self.connection().log_error("Keyboard Interrupt... terminating")
+				self.run=0
+			except:
+				self.connection().log_traceback()
+				self.run=0
+				
+		self.running=0
+		self.connection().unlock_completely() 
+		
+	def run_thread(self):
+		if not self.running: #if it's already running, just keep it up.
+			self.run=1
+			self.__thread=self.Thread(target=self.monitor, name=self.__name)
+			self.__wait_event.clear() #make sure we don't fall through immediately
+			self.__thread.start()
+			self.running=1
+
+	def get_monitor_thread(self):
+		return self.__thread
+
+	def stop_thread(self):
+		if self.running:
+			self.run=0
+			self.__wait_event.set() #cancel any waiting