]>
git.saurik.com Git - wxWidgets.git/blob - wxPython/wx/lib/delayedresult.py
   2 This module supports the thread-safe, asynchronous transmission of data  
   3 ('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't  
   4 need to mutex lock any data, the worker thread doesn't wait (or even check)  
   5 for the result to be received, and the main thread doesn't wait for the  
   6 worker thread to send the result. Instead, the consumer will be called  
   7 automatically by the wx app when the worker thread result is available.  
   9 In most cases you just need to use startWorker() with the correct parameters 
  10 (your worker function and your 'consumer' in the simplest of cases). The  
  11 only requirement on consumer is that it must accept a DelayedResult instance  
  14 In the following example, this will call consumer(delayedResult) with the  
  15 return value from workerFn:: 
  17     from delayedresult import startWorker 
  18     startWorker(consumer, workerFn) 
  22 - The other parameters to startWorker() 
  23 - Derive from Producer to override _extraInfo (e.g. to provide traceback info) 
  24 - Create your own worker-function-thread wrapper instead of using Producer 
  25 - Create your own Handler-like wrapper to pre- or post-process the result  
  27 - Derive from Sender to use your own way of making result hop over the  
  28   "thread boundary" (from non-main thread to main thread), e.g. using Queue 
  30 Thanks to Josiah Carlson for critical feedback/ideas that helped me  
  33 :Copyright: (c) 2006 by Oliver Schoenborn 
  34 :License: wxWidgets license 
  39 __author__  
= 'Oliver Schoenborn at utoronto dot ca' 
  42 __all__ 
= ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter',  
  43     'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain') 
  52     An object that has attributes built from the dictionary given in  
  53     constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1 
  54     and assert ss.b == 'b'. 
  57     def __init__(self
, **kwargs
): 
  58         self
.__dict
__.update( kwargs 
) 
  63     Bind some of the arguments and keyword arguments of a callable ('listener').  
  64     Then when the Handler instance is called (e.g. handler(result, **kwargs)) 
  65     the result is passed as first argument to callable, the kwargs is  
  66     combined with those given at construction, and the args are those 
  67     given at construction. Its return value is returned. 
  69     def __init__(self
, listener
, *args
, **kwargs 
): 
  70         """Bind args and kwargs to listener. """ 
  71         self
.__listener 
= listener
 
  73         self
.__kwargs 
= kwargs
 
  75     def __call__(self
, result
, **moreKwargs
): 
  76         """Listener is assumed to take result as first arg, then *args,  
  77         then the combination of moreKwargs and the kwargs given at construction.""" 
  79             moreKwargs
.update(self
.__kwargs
) 
  81             moreKwargs 
= self
.__kwargs
 
  82         return self
.__listener
(result
, *self
.__args
, **moreKwargs
) 
  87     Base class for various kinds of senders. A sender sends a result 
  88     produced by a worker funtion to a result handler (listener). Note 
  89     that each sender can be given a "job id". This can be anything 
  90     (number, string, id, and object, etc) and is not used, it is  
  91     simply added as attribute whenever a DelayedResult is created.  
  92     This allows you to know, if desired, what result corresponds to  
  93     which sender. Note that uniqueness is not necessary.  
  95     Derive from this class if none of the existing derived classes 
  96     are adequate, and override _sendImpl().  
  99     def __init__(self
, jobID
=None): 
 100         """The optional jobID can be anything that you want to use to  
 101         track which sender particular results come from. """ 
 105         """Return the jobID given at construction""" 
 108     def sendResult(self
, result
): 
 109         """This will send the result to handler, using whatever  
 110         technique the derived class uses. """ 
 111         delayedResult 
= DelayedResult(result
, jobID
=self
.__jobID
) 
 112         self
._sendImpl
(delayedResult
) 
 114     def sendException(self
, exception
, extraInfo 
= None): 
 115         """Use this when the worker function raised an exception. 
 116         The *exception* is the instance of Exception caught. The extraInfo 
 117         could be anything you want (e.g. locals or traceback etc),  
 118         it will be added to the exception as attribute 'extraInfo'. The 
 119         exception will be raised when DelayedResult.get() is called.""" 
 120         assert exception 
is not None 
 121         delayedResult 
= DelayedResult(extraInfo
,  
 122             exception
=exception
, jobID
=self
.__jobID
) 
 123         self
._sendImpl
(delayedResult
) 
 125     def _sendImpl(self
, delayedResult
): 
 126         msg 
= '_sendImpl() must be implemented in %s' % self
.__class
__ 
 127         raise NotImplementedError(msg
) 
 130 class SenderNoWx( Sender 
): 
 132     Sender that works without wx. The results are sent directly, ie 
 133     the consumer will get them "in the worker thread". So it should  
 134     only be used for testing.  
 136     def __init__(self
, consumer
, jobID
=None, args
=(), kwargs
={}): 
 137         """The consumer can be any callable of the form  
 138         callable(result, *args, **kwargs)""" 
 139         Sender
.__init
__(self
, jobID
) 
 141             self
.__consumer 
= Handler(consumer
, *args
, **kwargs
) 
 143             self
.__consumer 
= consumer
 
 145     def _sendImpl(self
, delayedResult
): 
 146         self
.__consumer
(delayedResult
) 
 149 class SenderWxEvent( Sender 
): 
 151     This sender sends the delayed result produced in the worker thread 
 152     to an event handler in the main thread, via a wx event of class  
 153     *eventClass*. The result is an attribute of the event (default:  
 156     def __init__(self
, handler
, eventClass
, resultAttr
="delayedResult",  
 157         jobID
=None, **kwargs
): 
 158         """The handler must derive from wx.EvtHandler. The event class  
 159         is typically the first item in the pair returned by  
 160         wx.lib.newevent.NewEvent(). You can use the *resultAttr*  
 161         to change the attribute name of the generated event's  
 163         Sender
.__init
__(self
, jobID
) 
 164         if not isinstance(handler
, wx
.EvtHandler
): 
 165             msg 
= 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler
) 
 166             msg 
= '%s handler must derive from wx.EvtHandler' % msg
 
 167             raise ValueError(msg
) 
 168         self
.__consumer 
= Struct(handler
=handler
, eventClass
=eventClass
,  
 169                                  resultAttr
=resultAttr
, kwargs
=kwargs
) 
 171     def _sendImpl(self
, delayedResult
): 
 172         """Must not modify the consumer (that was created at construction)  
 173         since might be shared by several senders, each sending from  
 175         consumer 
= self
.__consumer
 
 176         kwargs 
= consumer
.kwargs
.copy() 
 177         kwargs
[ consumer
.resultAttr 
] = delayedResult
 
 178         event 
= consumer
.eventClass(** kwargs
) 
 179         wx
.PostEvent(consumer
.handler
, event
) 
 182 class SenderCallAfter( Sender 
): 
 184     This sender sends the delayed result produced in the worker thread 
 185     to a callable in the main thread, via wx.CallAfter.  
 187     def __init__(self
, listener
, jobID
=None, args
=(), kwargs
={}): 
 188         Sender
.__init
__(self
, jobID
) 
 190             self
.__consumer 
= Handler(listener
, *args
, **kwargs
) 
 192             self
.__consumer 
= listener
 
 194     def _sendImpl(self
, delayedResult
): 
 195         wx
.CallAfter(self
.__consumer
, delayedResult
) 
 200     Represent the actual delayed result coming from the non-main thread.  
 201     An instance of this is given to the result handler. This result is  
 202     either a (reference to a) the value sent, or an exception.  
 203     If the latter, the exception is raised when the get() method gets 
 207     def __init__(self
, result
, jobID
=None, exception 
= None): 
 208         """You should never have to call this yourself. A DelayedResult  
 209         is created by a concrete Sender for you.""" 
 210         self
.__result 
= result
 
 211         self
.__exception 
= exception
 
 215         """Return the jobID given when Sender initialized,  
 216         or None if none given. """ 
 220         """Get the result. If an exception was sent instead of a result,  
 221         (via Sender's sendExcept()), that **exception is raised**. 
 222         Otherwise the result is simply returned. """ 
 223         if self
.__exception
: # exception was raised! 
 224             self
.__exception
.extraInfo 
= self
.__result
 
 225             raise self
.__exception
 
 230 class AbortedException(Exception): 
 231     """Raise this in your worker function so that the sender knows  
 232     not to send a result to handler.""" 
 236 class Producer(threading
.Thread
): 
 238     Represent the worker thread that produces delayed results.  
 239     It causes the given function to run in a separate thread,  
 240     and a sender to be used to send the return value of the function. 
 241     As with any threading.Thread, instantiate and call start(). 
 242     Note that if the workerFn raises AbortedException, the result is not  
 243     sent and the thread terminates gracefully. 
 246     def __init__(self
, sender
, workerFn
, args
=(), kwargs
={},  
 247                  name
=None, group
=None, daemon
=False,  
 248                  sendReturn
=True, senderArg
=None): 
 249         """The sender will send the return value of  
 250         workerFn(*args, **kwargs) to the main thread. The name and group  
 251         are same as threading.Thread constructor parameters. Daemon causes  
 252         setDaemon() to be called. If sendReturn is False, then the return  
 253         value of workerFn() will not be sent. If senderArg is given, it  
 254         must be the name of the keyword arg to use to pass the sender into  
 255         the workerFn, so the function can send (typically many) results.""" 
 257             kwargs
[senderArg
] = sender
 
 260                 result 
= workerFn(*args
, **kwargs
) 
 261             except AbortedException
: 
 263             except Exception, exc
: 
 264                 extraInfo 
= self
._extraInfo
(exc
) 
 265                 sender
.sendException(exc
, extraInfo
) 
 268                     sender
.sendResult(result
) 
 270         threading
.Thread
.__init
__(self
, name
=name
, group
=group
, target
=wrapper
) 
 272             self
.setDaemon(daemon
) 
 274     def _extraInfo(self
, exception
): 
 275         """This method could be overridden in a derived class to provide  
 276         extra information when an exception is being sent instead of a  
 283     Convenience class that represents a kind of threading.Event that 
 284     raises AbortedException when called (see the __call__ method, everything 
 285     else is just to make it look like threading.Event). 
 289         self
.__ev 
= threading
.Event() 
 291     def __call__(self
, timeout
=None): 
 292         """See if event has been set (wait at most timeout if given).  If so,  
 293         raise AbortedException. Otherwise return None. Allows you to do 
 294         'while not event():' which will always succeed unless the event  
 295         has been set (then AbortedException will cause while to exit).""" 
 297             self
.__ev
.wait(timeout
) 
 298         if self
.__ev
.isSet(): 
 299             raise AbortedException() 
 302     def __getattr__(self
, name
): 
 303         """This allows us to be a kind of threading.Event.""" 
 304         if name 
in ('set','clear','wait','isSet'): 
 305             return getattr(self
.__ev
, name
) 
 310     cargs
=(), ckwargs
={},  
 311     wargs
=(), wkwargs
={}, 
 312     jobID
=None, group
=None, daemon
=False,  
 313     sendReturn
=True, senderArg
=None): 
 315     Convenience function to send data produced by workerFn(*wargs, **wkwargs)  
 316     running in separate thread, to a consumer(*cargs, **ckwargs) running in 
 317     the main thread. This function merely creates a SenderCallAfter (or a 
 318     SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer, 
 319     and returns immediately after starting the Producer thread. The jobID 
 320     is used for the Sender and as name for the Producer thread. Returns the  
 321     thread created, in case caller needs join/etc. 
 324     if isinstance(consumer
, wx
.EvtHandler
): 
 325         eventClass 
= cargs
[0] 
 326         sender 
= SenderWxEvent(consumer
, eventClass
, jobID
=jobID
, **ckwargs
) 
 328         sender 
= SenderCallAfter(consumer
, jobID
, args
=cargs
, kwargs
=ckwargs
) 
 331         sender
, workerFn
, args
=wargs
, kwargs
=wkwargs
,  
 332         name
=jobID
, group
=group
, daemon
=daemon
,  
 333         senderArg
=senderArg
, sendReturn
=sendReturn
) 
 339 class PreProcessChain
: 
 341     Represent a 'delayed result pre-processing chain', a kind of Handler.  
 342     Useful when lower-level objects need to apply a sequence of transformations  
 343     to the delayed result before handing it over to a final handler.  
 344     This allows the starter of the worker function to not know  
 345     anything about the lower-level objects.  
 347     def __init__(self
, handler
, *args
, **kwargs
): 
 348         """Wrap handler(result, *args, **kwargs) so that the result  
 349         it receives has been transformed by us. """ 
 350         if handler 
is None:# assume rhs is a chain 
 351             self
.__chain 
= args
[0] 
 354                 handler 
= Handler(handler
, *args
, **kwargs
) 
 355             self
.__chain 
= [handler
] 
 357     def addSub(self
, callable, *args
, **kwargs
): 
 358         """Add a sub-callable, ie a callable(result, *args, **kwargs) 
 359         that returns a transformed result to the previously added 
 360         sub-callable (or the handler given at construction, if this is  
 361         the first call to addSub). """ 
 362         self
.__chain
.append( Handler(callable, *args
, **kwargs
) ) 
 365         """Clone the chain. Shallow only. Useful when several threads  
 366         must be started but have different sub-callables. """ 
 367         return PreProcessChain(None, self
.__chain
[:] ) 
 369     def cloneAddSub(self
, callable, *args
, **kwargs
): 
 370         """Convenience method that first clones self, then calls addSub()  
 371         on that clone with given arguments. """ 
 373         cc
.addSub(callable, *args
, **kwargs
) 
 376         """How many pre-processors in the chain""" 
 377         return len(self
.__chain
) 
 381         Traverses the chain of pre-processors it is given, transforming 
 382         the original delayedResult along the way. The return value of each  
 383         callable added via addSub() is given to the previous addSub() callable, 
 384         until the handler is reached.  
 386         def __init__(self
, delayedResult
, chain
): 
 387             self
.__dr 
= delayedResult
 
 391             """This makes handler think we are a delayedResult.""" 
 393                 return self
.__dr
.get() 
 395             handler 
= self
.__chain
[0] 
 400             """Return the job id for the delayedResult we transform.""" 
 401             return self
.__dr
.getJobID() 
 404     def __call__(self
, delayedResult
): 
 405         """This makes us a Handler. We just call handler(Traverser). The 
 406         handler will think it is getting a delayed result, but in fact  
 407         will be getting an instance of Traverser, which will take care 
 408         of properly applying the chain of transformations to delayedResult.""" 
 409         chainTrav 
= self
.Traverser(delayedResult
, self
.__chain
[1:]) 
 410         handler 
= self
.__chain
[0]