comparison vxi_11.py @ 19:cba1c44060f5

Copied from http://sourceforge.net/projects/pythonlabtools/ I modified them slightly to work as something is causing pack_uint to be called instead of pack_int...
author Daniel O'Connor <darius@dons.net.au>
date Thu, 11 Aug 2011 17:11:58 +0930
parents
children 91b476ebc0f2
comparison
equal deleted inserted replaced
18:9bb8a9f3df6b 19:cba1c44060f5
1 "The basic infrastructure for maintaining a vxi-11 protocol connection to a remote device"
2 _rcsid="$Id: vxi_11.py 323 2011-04-06 19:10:03Z marcus $"
3
4 import rpc
5 from rpc import TCPClient, RawTCPClient
6 import exceptions
7 import struct
8 import traceback
9 import time
10 import weakref
11 import sys
12 import select
13
14 #try:
15 # import threading
16 # threads=1
17 #except:
18 # threads=0
19 threads=False
20
21 connection_dict={}
22
23 def close_all_connections():
24 "disconnect and close out all vxi_11 connections created here, even if their object references have been lost"
25 for wobj in connection_dict.keys():
26 name, wconn=connection_dict[wobj]
27 conn=wconn() #dereference weak ref
28 if conn is not None:
29 try:
30 conn.disconnect()
31 except:
32 conn.log_exception("***vxi_11.close_all_connections exception: ")
33
34 else:
35 del connection_dict[wobj] #how did this happen?
36
37 class Junk_OneWayAbortClient(RawTCPClient):
38 """OneWayAbortClient allows one to handle the strange, one-way abort rpc from an Agilent E5810.
39 Really, it doesn't even do a one-way transmission... it loses aborts, so this is history """
40
41 def do_call(self):
42 call = self.packer.get_buf()
43 rpc.sendrecord(self.sock, call)
44 self.unpacker.reset('\0\0\0\0') #put a valid return value into the unpacker
45
46 class VXI_11_Error(IOError):
47 vxi_11_errors={
48 0:"No error", 1:"Syntax error", 3:"Device not accessible",
49 4:"Invalid link identifier", 5:"Parameter error", 6:"Channel not established",
50 8:"Operation not supported", 9:"Out of resources", 11:"Device locked by another link",
51 12:"No lock held by this link", 15:"IO Timeout", 17:"IO Error", 21:"Invalid Address",
52 23:"Abort", 29:"Channel already established" ,
53 "eof": "Cut off packet received in rpc.recvfrag()",
54 "sync":"stream sync lost",
55 "notconnected": "Device not connected"}
56
57 def identify_vxi_11_error(self, error):
58 if self.vxi_11_errors.has_key(error):
59 return `error`+": "+self.vxi_11_errors[error]
60 else:
61 return `error`+": Unknown error code"
62
63
64 def __init__(self, code, **other_info):
65 IOError.__init__(self, self.identify_vxi_11_error(code))
66 self.code=code
67 self.other_info=other_info
68
69 def __repr__(self):
70 if self.other_info:
71 return str(self)+": "+str(self.other_info)
72 else:
73 return str(self)
74
75 class VXI_11_Device_Not_Connected(VXI_11_Error):
76 def __init__(self):
77 VXI_11_Error.__init__(self,'notconnected')
78
79 class VXI_11_Device_Not_Locked(VXI_11_Error):
80 pass
81
82 class VXI_11_Transient_Error(VXI_11_Error): #exceptions having to do with multiple use which might get better
83 pass
84
85 class VXI_11_Timeout(VXI_11_Transient_Error):
86 pass
87
88 class VXI_11_Locked_Elsewhere(VXI_11_Transient_Error):
89 pass
90
91 class VXI_11_Stream_Sync_Lost(VXI_11_Transient_Error):
92 def __init__(self, code, bytes):
93 VXI_11_Transient_Error.__init__(self, code)
94 self.other_info="bytes vacuumed = %d" % bytes
95
96
97 class VXI_11_RPC_EOF(VXI_11_Transient_Error):
98 pass
99
100 _VXI_11_enumerated_exceptions={ #common, correctable exceptions
101 15:VXI_11_Timeout,
102 11:VXI_11_Locked_Elsewhere,
103 12:VXI_11_Device_Not_Locked
104 }
105
106 class vxi_11_connection:
107 """vxi_11_connection implements handling of devices compliant with vxi11.1-vxi11.3 protocols, with which
108 the user should have some familiarity"""
109
110 debug_info=0
111 debug_error=1
112 debug_warning=2
113 debug_all=3
114
115 debug_level=debug_info
116
117 OneWayAbort=0 #by default, this class uses two-way aborts, per official vxi-11 standard
118
119 def _list_packer(self, args):
120 l=map(None, self.pack_type_list, args) # combine lists
121 for packer, data in l:
122 #print "packing " + str(data) + " with " + str(packer)
123 packer(data)
124
125 def _list_unpacker(self):
126 res = []
127 for f in self.unpack_type_list:
128 a = f()
129 #print "Unpacked " + str(a) + " with " + str(f)
130 res.append(a)
131 return res
132 #return [func() for func in self.unpack_type_list]
133
134 def _link_xdr_defs(self, channel):
135 "self.link_xdr_defs() creates dictionaries of functions for packing and unpacking the various data types"
136 p=channel.packer
137 u=channel.unpacker
138
139 xdr_packer_defs={
140 "write": (p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_opaque),
141 "read": (p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_int),
142 "create_link": (p.pack_int, p.pack_bool, p.pack_uint, p.pack_string),
143 "generic": (p.pack_int, p.pack_int, p.pack_int, p.pack_int),
144 "lock": (p.pack_int, p.pack_int, p.pack_int),
145 "id": (p.pack_int,)
146 }
147
148 xdr_unpacker_defs={
149 "write": (u.unpack_int, u.unpack_int),
150 "read": (u.unpack_int, u.unpack_int, u.unpack_opaque),
151 # *something* is transforming pack_int to pack_uint which (obviously) doesn't like getting negative numbers
152 # Since LID is basically opaque unpack it as a uint to paper over the vileness
153 "create_link": (u.unpack_uint, u.unpack_int, u.unpack_uint, u.unpack_uint),
154 "read_stb":(u.unpack_int, u.unpack_int),
155 "error": (u.unpack_int,)
156 }
157
158 return xdr_packer_defs, xdr_unpacker_defs
159
160 def _setup_core_packing(self, pack, unpack):
161 #print "setting up packing with " + pack
162 self.pack_type_list, self.unpack_type_list=self._core_packers[pack],self._core_unpackers[unpack]
163 #print "pack_type_list now " + str(self.pack_type_list)
164 #print "unpack_type_list now " + str(self.unpack_type_list)
165
166 def post_init(self):
167 pass
168
169 def simple_log_error(self, message, level=debug_error, file=None):
170 if level <= self.debug_level:
171 if file is None:
172 file=sys.stderr
173 print >> file, self.device_name, message
174
175 def fancy_log_error(self, message, level=debug_error, file=None):
176 if level <= self.debug_level:
177 message=str(message).strip()
178 level_str=("**INFO*****", "**ERROR****", "**WARNING**", "**DEBUG****")[level]
179 if file is None:
180 file=sys.stderr
181 print >> file, time.asctime().strip(), '\t', level_str, '\t', self.shortname, '\t', \
182 message.replace('\n','\n\t** ').replace('\r','\n\t** ')
183
184 def log_error(self, message, level=debug_error, file=None):
185 "override log_error() for sending messages to special places or formatting differently"
186 self.fancy_log_error(message, level, file)
187
188 def log_traceback(self, main_message='', file=None):
189 exlist=traceback.format_exception(*sys.exc_info())
190 s=main_message+'\n'
191 for i in exlist:
192 s=s+i
193
194 self.log_error(s, self.debug_error, file)
195
196 def log_info(self, message, file=None):
197 self.log_error(message, self.debug_info, file)
198
199 def log_warning(self, message, file=None):
200 self.log_error(message, self.debug_warning, file)
201
202 def log_debug(self, message, file=None):
203 self.log_error(message, self.debug_all, file)
204
205 def log_exception(self, main_message='', file=None):
206 self.log_error(main_message+traceback.format_exception_only(*(sys.exc_info()[:2]))[0], self.debug_error, file)
207
208 def __init__(self, host='127.0.0.1', device="inst0", timeout=1000, raise_on_err=None, device_name="Network Device", shortname=None,
209 portmap_proxy_host=None, portmap_proxy_port=rpc.PMAP_PORT, use_vxi_locking=True):
210
211 self.raise_on_err=raise_on_err
212 self.lid=None
213 self.timeout=timeout
214 self.device_name=device_name
215 self.device_sicl_name=device
216 self.host=host
217 self.portmap_proxy_host=portmap_proxy_host
218 self.portmap_proxy_port=portmap_proxy_port
219 self.core=None
220 self.abortChannel=None
221 self.mux=None #default is no multiplexer active
222 self.use_vxi_locking=use_vxi_locking
223
224 if shortname is None:
225 self.shortname=device_name.strip().replace(' ','').replace('\t','')
226 else:
227 self.shortname=shortname.strip().replace(' ','').replace('\t','')
228
229 if threads:
230 self.threadlock=threading.RLock()
231
232 try:
233 self.reconnect()
234
235 except VXI_11_Transient_Error:
236 self.log_exception("Initial connect failed... retry later")
237
238 def setup_mux(self, mux=None, global_name=None):
239 self.mux=mux
240 self.global_mux_name=global_name
241
242 def command(self, id, pack, unpack, arglist, ignore_connect=0):
243 #print "command() id = " + str(id) + ", pack = " + pack
244 if not (ignore_connect or self.connected):
245 raise VXI_11_Device_Not_Connected
246
247 #command has been made atomic, so that things like get_status_byte can be done
248 #in a multi-threaded environment without needed a full vxi-11 lock to make it safe
249 if threads:
250 self.threadlock.acquire() #make this atomic
251
252 self._setup_core_packing(pack, unpack)
253
254 try:
255 try:
256 result= self.core.make_call(id, arglist, self._list_packer, self._list_unpacker)
257 except (RuntimeError, EOFError):
258 #RuntimeError is thrown by recvfrag if the xid is off... it means we lost data in the pipe
259 #EOFError is thrown if the packet isn't full length, as usually happens when ther is garbage in the pipe read as a length
260 #so vacuum out the socket, and raise a transient error
261 rlist=1
262 ntotal=0
263 while(rlist):
264 rlist, wlist, xlist=select.select([self.core.sock],[],[], 1.0)
265 if rlist:
266 ntotal+=len(self.core.sock.recv(10000) )#get some data from it
267 raise VXI_11_Stream_Sync_Lost("sync", ntotal)
268 finally:
269 if threads:
270 self.threadlock.release() #let go
271
272 err=result[0]
273
274 if err and self.raise_on_err:
275 e=_VXI_11_enumerated_exceptions #common, correctable exceptions
276 if e.has_key(err):
277 raise e[err](err) #raise these exceptions explicitly
278 else:
279 raise VXI_11_Error(err) #raise generic VXI_11 exception
280
281 return result
282
283 def do_timeouts(self, timeout, lock_timeout, channel=None):
284
285 if channel is None:
286 channel=self.core
287
288 flags=0
289 if timeout is None:
290 timeout=self.timeout
291
292 if not lock_timeout and hasattr(self,"default_lock_timeout"):
293 lock_timeout=self.default_lock_timeout
294
295 if lock_timeout:
296 flags |= 1 # append waitlock bit
297
298 if channel:
299 channel.select_timeout_seconds=0.5+1.5*max(timeout, lock_timeout)/1000.0 #convert ms to sec, and be generous on hard timeout
300
301 return flags, timeout, lock_timeout
302
303 def reconnect(self): #recreate a broken connection
304 """reconnect() creates or recreates our main connection. Useful in __init__ and in complete communications breakdowns.
305 If it throws a VXI_11_Transient_Error, the connection exists, but the check_idn() handshake or post_init() failed."""
306
307 self.connected=0
308
309 if self.core:
310 self.core.close() #if this is a reconnect, break old connection the hard way
311 if self.abortChannel:
312 self.abortChannel.close()
313
314 self.core=rpc.TCPClient(self.host, 395183, 1,
315 portmap_proxy_host=self.portmap_proxy_host,
316 portmap_proxy_port=self.portmap_proxy_port)
317
318 self._core_packers, self._core_unpackers=self._link_xdr_defs(self.core) #construct xdr data type definitions for the core
319 #print "_core_packers now " + str(self._core_packers)
320
321 err, self.lid, self.abortPort, self.maxRecvSize=self.command(
322 10, "create_link","create_link", (0, 0, self.timeout, self.device_sicl_name), ignore_connect=1) #execute create_link
323 if err: #at this stage, we always raise exceptions since there isn't any way to bail out or retry reasonably
324 raise VXI_11_Error(err)
325
326 self.maxRecvSize=min(self.maxRecvSize, 1048576) #never transfer more than 1MB at a shot
327
328 if self.OneWayAbort:
329 #self.abort_channel=OneWayAbortClient(self.host, 395184, 1, self.abortPort)
330 self.abort_channel=rpc.RawUDPClient(self.host, 395184, 1, self.abortPort)
331 else:
332 self.abort_channel=RawTCPClient(self.host, 395184, 1, self.abortPort)
333
334 connection_dict[self.lid]=(self.device_name, weakref.ref(self))
335
336 self.locklevel=0
337
338 self.connected=1
339
340 self.check_idn()
341 self.post_init()
342
343
344 def abort(self):
345
346 self.abort_channel.select_timeout_seconds=self.timeout/1000.0 #convert to seconds
347 try:
348 err=self.abort_channel.make_call(1, self.lid, self.abort_channel.packer.pack_int, self.abort_channel.unpacker.unpack_int) #abort
349 except EOFError:
350 raise VXI_11_RPC_EOF("eof")
351
352 if err and self.raise_on_err:
353 raise VXI_11_Error( err)
354 return err
355
356 def disconnect(self):
357 if self.connected:
358 try:
359 err, =self.command(23, "id", "error", (self.lid,)) #execute destroy_link
360 except:
361 self.log_traceback() #if we can't close nicely, we'll close anyway
362
363 self.connected=0
364 del connection_dict[self.lid]
365 self.lid=None
366 self.core.close()
367 self.abort_channel.close()
368 del self.core, self.abort_channel
369 self.core=None
370 self.abortChannel=None
371
372 def __del__(self):
373 if self.lid is not None:
374 self.raise_on_err=0 #no exceptions here from simple errors
375 try:
376 self.abort()
377 except VXI_11_Error:
378 pass
379 try:
380 self.disconnect()
381 except VXI_11_Error:
382 pass
383
384
385
386 def write(self, data, timeout=None, lock_timeout=0):
387 """err, bytes_sent=write(data [, timeout] [,lock_timeout]) sends data to device. See do_timeouts() for
388 semantics of timeout and lock_timeout"""
389
390 flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
391 base=0
392 end=len(data)
393 while base<end:
394 n=end-base
395 if n>self.maxRecvSize:
396 xfer=self.maxRecvSize
397 else:
398 xfer=n
399 flags |= 8 #write end on last byte
400
401 err, count=self.command(11, "write", "write", (self.lid, timeout, lock_timeout, flags, data[base:base+xfer]))
402 if err: break
403 base+=count
404 return err, base
405
406 def read(self, timeout=None, lock_timeout=0, count=None, termChar=None):
407 """err, reason, result=read([timeout] [,lock_timeout] [,count] [,termChar]) reads up to count bytes from the device,
408 ending on count, EOI or termChar (if specified). See do_timeouts() for semantics of the timeouts. \n
409 the returned reason is an inclusive OR of 3 bits (per the VXI-11 specs section B.6.4.device_read):
410 Bit 2 = END/EOI received,
411 bit 1 = Terminating Character received,
412 bit 0 = full requested byte count received.
413 """
414 flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
415
416 if termChar is not None:
417 flags |= 128 # append termchrset bit
418 act_term=termChar
419 else:
420 act_term=0
421
422 accumdata=""
423 reason=0
424 err=0
425 accumlen=0
426
427 while ( (not err) and (not (reason & 6) ) and
428 ( (count is None) or (accumlen < count)) ): #wait for END or CHR reason flag or count
429
430 readcount=self.maxRecvSize
431 if count is not None:
432 readcount=min(readcount, count-accumlen)
433 err, reason, data = self.command(12, "read","read", (self.lid, readcount, timeout, lock_timeout, flags, act_term))
434 accumdata+=data
435 accumlen+=len(data)
436 #print err, reason, len(data), len(accumdata)
437
438 return err, reason, accumdata
439
440 def generic(self, code, timeout, lock_timeout):
441 flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
442
443 err, = self.command(code, "generic", "error", (self.lid, flags, timeout, lock_timeout))
444
445 return err
446
447 def trigger(self, timeout=None, lock_timeout=0):
448 return self.generic(14, timeout, lock_timeout)
449
450 def clear(self, timeout=None, lock_timeout=0):
451 return self.generic(15, timeout, lock_timeout)
452
453 def remote(self, timeout=None, lock_timeout=0):
454 return self.generic(16, timeout, lock_timeout)
455
456 def local(self, timeout=None, lock_timeout=0):
457 return self.generic(17, timeout, lock_timeout)
458
459 def read_status_byte(self, timeout=None, lock_timeout=0):
460 flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout)
461
462 err, status = self.command(13, "generic","read_stb", (self.lid, flags, timeout, lock_timeout))
463
464 return err, status
465
466 def lock(self, lock_timeout=0):
467 """lock() acquires a lock on a device and the threadlock. If it fails it leaves the connection cleanly unlocked.
468 If self.use_vxi_locking is false, it acquires only a thread lock locally, and does not really lock the vxi-11 device.
469 This is useful if only one process is talking to a given device, and saves time."""
470 err=0
471 if threads:
472 self.threadlock.acquire()
473
474 if self.use_vxi_locking and self.locklevel==0:
475 flags, timeout, lock_timeout=self.do_timeouts(0, lock_timeout)
476 try:
477 if self.mux: self.mux.lock_connection(self.global_mux_name)
478 try:
479 err, = self.command(18, "lock","error", (self.lid, flags, lock_timeout))
480 except:
481 if self.mux: self.mux.unlock_connection(self.global_mux_name)
482 raise
483 except:
484 if threads:
485 self.threadlock.release()
486 raise
487
488 if err:
489 if threads:
490 self.threadlock.release()
491 else:
492 self.locklevel+=1
493 return err
494
495 def is_locked(self):
496 return self.locklevel > 0
497
498 def unlock(self, priority=0):
499 """unlock(priority=0) unwinds one level of locking, and if the level is zero, really unlocks the device.
500 Calls to lock() and unlock() should be matched. If there is a danger that they are not, due to bad
501 exception handling, unlock_completely() should be used as a final cleanup for a series of operations.
502 Setting priority to non-zero will bias the apparent last-used time in a multiplexer (if one is used),
503 so setting priority to -10 will effectively mark this channel least-recently-used, while setting it to
504 +2 will post-date the last-used time 2 seconds, so for the next 2 seconds, the device will be hard to kick
505 out of the channel cache (but not impossible).
506 """
507
508 self.locklevel-=1
509 assert self.locklevel>=0, "Too many unlocks on device: "+self.device_name
510
511 err=0
512 try:
513 if self.use_vxi_locking and self.locklevel==0:
514 try:
515 err, = self.command(19, "id", "error", (self.lid, ))
516 finally:
517 if self.mux:
518 self.mux.unlock_connection(self.global_mux_name, priority) #this cannot fail, no try needed (??)
519 elif priority and self.mux:
520 #even on a non-final unlock, a request for changed priority is always remembered
521 self.mux.adjust_priority(self.global_mux_name, priority)
522 finally:
523 if threads:
524 self.threadlock.release()
525
526 return err
527
528 def unlock_completely(self, priority=0):
529 "unlock_completely() forces an unwind of any locks all the way back to zero for error cleanup. Only exceptions thrown are fatal."
530 if threads:
531 self.threadlock.acquire() #make sure we have the threadlock before we try a (possibly failing) full lock
532 try:
533 self.lock() #just to be safe, we should already hold one level of lock!
534 except VXI_11_Locked_Elsewhere:
535 pass #this is often called on error cleanup when we don't already have a lock, and we don't really care if we can't get it
536 except VXI_11_Error:
537 self.log_exception("Unexpected trouble locking in unlock_completely(): ")
538
539 if threads:
540 self.threadlock._RLock__count += (1-self.threadlock._RLock__count)
541 #unwind to single lock the fast way, and make sure this variable really existed, to shield against internal changes
542 self.locklevel=1 #unwind our own counter, too
543 try:
544 self.unlock(priority)
545 except VXI_11_Device_Not_Locked:
546 pass #if we couldn't lock above, we will probably get another exception here, and don't care
547 except VXI_11_Transient_Error:
548 self.log_exception("Unexpected trouble unlocking in unlock_completely(): ")
549 except VXI_11_Error:
550 self.log_exception("Unexpected trouble unlocking in unlock_completely(): ")
551 raise
552
553 def transaction(self, data, count=None, lock_timeout=0):
554 """err, reason, result=transaction(data, [, count] [,lock_timeout]) sends data and waits for a response.
555 It is guaranteed to leave the lock level at its original value on exit,
556 unless KeyboardInterrupt breaks the normal flow. If count isn't provided, there is no limit to how much data will be accepted.
557 See do_timeouts() for semantics on lock_timeout."""
558
559 self.lock(lock_timeout)
560 reason=None
561 result=None
562 try:
563 err, write_count = self.write(data)
564
565 if not err:
566 err, reason, result = self.read(count=count)
567 finally:
568 self.unlock()
569
570 return err, reason, result
571
572 def check_idn(self):
573 'check_idn() executes "*idn?" and aborts if the result does not start with self.idn_head'
574 if hasattr(self,"idn"):
575 return #already done
576 if hasattr(self,"idn_head") and self.idn_head is not None:
577
578 self.lock()
579 try:
580 self.clear()
581 err, reason, idn = self.transaction("*idn?")
582 finally:
583 self.unlock()
584
585 check=idn.find(self.idn_head)
586 self.idn=idn.strip() #save for future reference info
587 if check:
588 self.disconnect()
589 assert check==0, "Wrong device type! expecting: "+self.idn_head+"... got: "+self.idn
590 else:
591 self.idn="Device *idn? not checked!"
592
593 import copy
594
595 class device_thread:
596
597 if threads:
598 Thread=threading.Thread #by default, use canonical threads
599
600 def __init__(self, connection, main_sleep=1.0, name="Device"):
601 self.running=0
602 self.main_sleep=main_sleep
603 self.__thread=None
604 self.__name=copy.copy(name) #make a new copy to avoid a possible circular reference
605 self.__wait_event=threading.Event()
606 self.set_connection(connection)
607
608 def set_connection(self, connection):
609 #keep only a weak reference, so the thread cannot prevent the device from being deleted
610 #such deletion creates an error when the thread tries to run, but that's OK
611 #this allows the device_thread to be used as a clean mix-in class to a vxi_11 connection
612 self.__weak_connection=weakref.ref(connection)
613
614 def connection(self):
615 return self.__weak_connection() #dereference weak reference
616
617 def handle_lock_error(self):
618 "handle_lock_error can be overridden to count failures and do something if there are too many"
619 self.connection().log_exception(self.name+": Error while locking device")
620
621 def onepass(self):
622 connection=self.connection()
623
624 try:
625 connection.lock()
626 except VXI_11_Transient_Error:
627 self.handle_lock_error()
628 return
629
630 try:
631 self.get_data()
632 except:
633 connection.log_traceback('Uncaught exception in get_data()')
634 try:
635 connection.clear()
636 except:
637 connection.log_exception('failed to clear connection after error')
638 self.run=0
639
640 connection.unlock()
641
642 def monitor(self):
643 self.connection().log_info("Monitor loop entered")
644 while(self.run):
645 try:
646 self.onepass()
647 self.__wait_event.wait(self.main_sleep) #wait until timeout or we are cancelled
648 except KeyboardInterrupt:
649 self.connection().log_error("Keyboard Interrupt... terminating")
650 self.run=0
651 except:
652 self.connection().log_traceback()
653 self.run=0
654
655 self.running=0
656 self.connection().unlock_completely()
657
658 def run_thread(self):
659 if not self.running: #if it's already running, just keep it up.
660 self.run=1
661 self.__thread=self.Thread(target=self.monitor, name=self.__name)
662 self.__wait_event.clear() #make sure we don't fall through immediately
663 self.__thread.start()
664 self.running=1
665
666 def get_monitor_thread(self):
667 return self.__thread
668
669 def stop_thread(self):
670 if self.running:
671 self.run=0
672 self.__wait_event.set() #cancel any waiting