+"""
+This module supports the thread-safe, asynchronous transmission of data
+('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't
+need to mutex lock any data, the worker thread doesn't wait (or even check)
+for the result to be received, and the main thread doesn't wait for the
+worker thread to send the result. Instead, the consumer will be called
+automatically by the wx app when the worker thread result is available.
+
+In most cases you just need to use startWorker() with the correct parameters
+(your worker function and your 'consumer' in the simplest of cases). The
+only requirement on consumer is that it must accept a DelayedResult instance
+as first arg.
+
+In the following example, this will call consumer(delayedResult) with the
+return value from workerFn::
+
+ from delayedresult import startWorker
+ startWorker(consumer, workerFn)
+
+More advanced uses:
+
+- The other parameters to startWorker()
+- Derive from Producer to override _extraInfo (e.g. to provide traceback info)
+- Create your own worker-function-thread wrapper instead of using Producer
+- Create your own Handler-like wrapper to pre- or post-process the result
+ (see PreProcessChain)
+- Derive from Sender to use your own way of making result hop over the
+ "thread boundary" (from non-main thread to main thread), e.g. using Queue
+
+Thanks to Josiah Carlson for critical feedback/ideas that helped me
+improve this module.
+
+:Copyright: (c) 2006 by Oliver Schoenborn
+:License: wxWidgets license
+:Version: 1.0
+
+"""
+
+__author__ = 'Oliver Schoenborn at utoronto dot ca'
+__version__ = '1.0'
+
+__all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter',
+ 'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')
+
+
+import wx
+import threading
+
+
+class Struct:
+ """
+ An object that has attributes built from the dictionary given in
+ constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
+ and assert ss.b == 'b'.
+ """
+
+ def __init__(self, **kwargs):
+ self.__dict__.update( kwargs )
+
+
+class Handler:
+ """
+ Bind some of the arguments and keyword arguments of a callable ('listener').
+ Then when the Handler instance is called (e.g. handler(result, **kwargs))
+ the result is passed as first argument to callable, the kwargs is
+ combined with those given at construction, and the args are those
+ given at construction. Its return value is returned.
+ """
+ def __init__(self, listener, *args, **kwargs ):
+ """Bind args and kwargs to listener. """
+ self.__listener = listener
+ self.__args = args
+ self.__kwargs = kwargs
+
+ def __call__(self, result, **moreKwargs):
+ """Listener is assumed to take result as first arg, then *args,
+ then the combination of moreKwargs and the kwargs given at construction."""
+ if moreKwargs:
+ moreKwargs.update(self.__kwargs)
+ else:
+ moreKwargs = self.__kwargs
+ return self.__listener(result, *self.__args, **moreKwargs)
+
+
+class Sender:
+ """
+ Base class for various kinds of senders. A sender sends a result
+ produced by a worker funtion to a result handler (listener). Note
+ that each sender can be given a "job id". This can be anything
+ (number, string, id, and object, etc) and is not used, it is
+ simply added as attribute whenever a DelayedResult is created.
+ This allows you to know, if desired, what result corresponds to
+ which sender. Note that uniqueness is not necessary.
+
+ Derive from this class if none of the existing derived classes
+ are adequate, and override _sendImpl().
+ """
+
+ def __init__(self, jobID=None):
+ """The optional jobID can be anything that you want to use to
+ track which sender particular results come from. """
+ self.__jobID = jobID
+
+ def getJobID(self):
+ """Return the jobID given at construction"""
+ return self.__jobID
+
+ def sendResult(self, result):
+ """This will send the result to handler, using whatever
+ technique the derived class uses. """
+ delayedResult = DelayedResult(result, jobID=self.__jobID)
+ self._sendImpl(delayedResult)
+
+ def sendException(self, exception, extraInfo = None):
+ """Use this when the worker function raised an exception.
+ The *exception* is the instance of Exception caught. The extraInfo
+ could be anything you want (e.g. locals or traceback etc),
+ it will be added to the exception as attribute 'extraInfo'. The
+ exception will be raised when DelayedResult.get() is called."""
+ assert exception is not None
+ delayedResult = DelayedResult(extraInfo,
+ exception=exception, jobID=self.__jobID)
+ self._sendImpl(delayedResult)
+
+ def _sendImpl(self, delayedResult):
+ msg = '_sendImpl() must be implemented in %s' % self.__class__
+ raise NotImplementedError(msg)
+
+
+class SenderNoWx( Sender ):
+ """
+ Sender that works without wx. The results are sent directly, ie
+ the consumer will get them "in the worker thread". So it should
+ only be used for testing.
+ """
+ def __init__(self, consumer, jobID=None, args=(), kwargs={}):
+ """The consumer can be any callable of the form
+ callable(result, *args, **kwargs)"""
+ Sender.__init__(self, jobID)
+ if args or kwargs:
+ self.__consumer = Handler(consumer, *args, **kwargs)
+ else:
+ self.__consumer = consumer
+
+ def _sendImpl(self, delayedResult):
+ self.__consumer(delayedResult)
+
+
+class SenderWxEvent( Sender ):
+ """
+ This sender sends the delayed result produced in the worker thread
+ to an event handler in the main thread, via a wx event of class
+ *eventClass*. The result is an attribute of the event (default:
+ "delayedResult".
+ """
+ def __init__(self, handler, eventClass, resultAttr="delayedResult",
+ jobID=None, **kwargs):
+ """The handler must derive from wx.EvtHandler. The event class
+ is typically the first item in the pair returned by
+ wx.lib.newevent.NewEvent(). You can use the *resultAttr*
+ to change the attribute name of the generated event's
+ delayed result. """
+ Sender.__init__(self, jobID)
+ if not isinstance(handler, wx.EvtHandler):
+ msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler)
+ msg = '%s handler must derive from wx.EvtHandler' % msg
+ raise ValueError(msg)
+ self.__consumer = Struct(handler=handler, eventClass=eventClass,
+ resultAttr=resultAttr, kwargs=kwargs)
+
+ def _sendImpl(self, delayedResult):
+ """Must not modify the consumer (that was created at construction)
+ since might be shared by several senders, each sending from
+ separate threads."""
+ consumer = self.__consumer
+ kwargs = consumer.kwargs.copy()
+ kwargs[ consumer.resultAttr ] = delayedResult
+ event = consumer.eventClass(** kwargs)
+ wx.PostEvent(consumer.handler, event)
+
+
+class SenderCallAfter( Sender ):
+ """
+ This sender sends the delayed result produced in the worker thread
+ to a callable in the main thread, via wx.CallAfter.
+ """
+ def __init__(self, listener, jobID=None, args=(), kwargs={}):
+ Sender.__init__(self, jobID)
+ if args or kwargs:
+ self.__consumer = Handler(listener, *args, **kwargs)
+ else:
+ self.__consumer = listener
+
+ def _sendImpl(self, delayedResult):
+ wx.CallAfter(self.__consumer, delayedResult)
+
+
+class DelayedResult:
+ """
+ Represent the actual delayed result coming from the non-main thread.
+ An instance of this is given to the result handler. This result is
+ either a (reference to a) the value sent, or an exception.
+ If the latter, the exception is raised when the get() method gets
+ called.
+ """
+
+ def __init__(self, result, jobID=None, exception = None):
+ """You should never have to call this yourself. A DelayedResult
+ is created by a concrete Sender for you."""
+ self.__result = result
+ self.__exception = exception
+ self.__jobID = jobID
+
+ def getJobID(self):
+ """Return the jobID given when Sender initialized,
+ or None if none given. """
+ return self.__jobID
+
+ def get(self):
+ """Get the result. If an exception was sent instead of a result,
+ (via Sender's sendExcept()), that **exception is raised**.
+ Otherwise the result is simply returned. """
+ if self.__exception: # exception was raised!
+ self.__exception.extraInfo = self.__result
+ raise self.__exception
+
+ return self.__result
+
+
+class Producer(threading.Thread):
+ """
+ Represent the worker thread that produces delayed results.
+ It causes the given function to run in a separate thread,
+ and a sender to be used to send the return value of the function.
+ As with any threading.Thread, instantiate and call start().
+ """
+
+ def __init__(self, sender, workerFn, args=(), kwargs={},
+ name=None, group=None, daemon=False,
+ sendReturn=True, senderArg=None):
+ """The sender will send the return value of
+ workerFn(*args, **kwargs) to the main thread. The name and group
+ are same as threading.Thread constructor parameters. Daemon causes
+ setDaemon() to be called. If sendReturn is False, then the return
+ value of workerFn() will not be sent. If senderArg is given, it
+ must be the name of the keyword arg to use to pass the sender into
+ the workerFn, so the function can send (typically many) results."""
+ if senderArg:
+ kwargs[senderArg] = sender
+ def wrapper():
+ try:
+ result = workerFn(*args, **kwargs)
+ except Exception, exc:
+ extraInfo = self._extraInfo(exc)
+ sender.sendException(exc, extraInfo)
+ else:
+ if sendReturn:
+ sender.sendResult(result)
+
+ threading.Thread.__init__(self, name=name, group=group, target=wrapper)
+ if daemon:
+ self.setDaemon(daemon)
+
+ def _extraInfo(self, exception):
+ """This method could be overridden in a derived class to provide
+ extra information when an exception is being sent instead of a
+ result. """
+ return None
+
+
+def startWorker(
+ consumer, workerFn,
+ cargs=(), ckwargs={},
+ wargs=(), wkwargs={},
+ jobID=None, group=None, daemon=False,
+ sendReturn=True, senderArg=None):
+ """
+ Convenience function to send data produced by workerFn(*wargs, **wkwargs)
+ running in separate thread, to a consumer(*cargs, **ckwargs) running in
+ the main thread. This function merely creates a SenderCallAfter (or a
+ SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
+ and returns immediately after starting the Producer thread. The jobID
+ is used for the Sender and as name for the Producer thread. Returns the
+ thread created, in case caller needs join/etc.
+ """
+
+ if isinstance(consumer, wx.EvtHandler):
+ eventClass = cargs[0]
+ sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs)
+ else:
+ sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs)
+
+ thread = Producer(
+ sender, workerFn, args=wargs, kwargs=wkwargs,
+ name=jobID, group=group, daemon=daemon,
+ senderArg=senderArg, sendReturn=sendReturn)
+
+ thread.start()
+ return thread
+
+
+class PreProcessChain:
+ """
+ Represent a 'delayed result pre-processing chain', a kind of Handler.
+ Useful when lower-level objects need to apply a sequence of transformations
+ to the delayed result before handing it over to a final handler.
+ This allows the starter of the worker function to not know
+ anything about the lower-level objects.
+ """
+ def __init__(self, handler, *args, **kwargs):
+ """Wrap handler(result, *args, **kwargs) so that the result
+ it receives has been transformed by us. """
+ if handler is None:# assume rhs is a chain
+ self.__chain = args[0]
+ else:
+ if args or kwargs:
+ handler = Handler(handler, *args, **kwargs)
+ self.__chain = [handler]
+
+ def addSub(self, callable, *args, **kwargs):
+ """Add a sub-callable, ie a callable(result, *args, **kwargs)
+ that returns a transformed result to the previously added
+ sub-callable (or the handler given at construction, if this is
+ the first call to addSub). """
+ self.__chain.append( Handler(callable, *args, **kwargs) )
+
+ def clone(self):
+ """Clone the chain. Shallow only. Useful when several threads
+ must be started but have different sub-callables. """
+ return PreProcessChain(None, self.__chain[:] )
+
+ def cloneAddSub(self, callable, *args, **kwargs):
+ """Convenience method that first clones self, then calls addSub()
+ on that clone with given arguments. """
+ cc = self.clone()
+ cc.addSub(callable, *args, **kwargs)
+
+ def count(self):
+ """How many pre-processors in the chain"""
+ return len(self.__chain)
+
+ class Traverser:
+ """
+ Traverses the chain of pre-processors it is given, transforming
+ the original delayedResult along the way. The return value of each
+ callable added via addSub() is given to the previous addSub() callable,
+ until the handler is reached.
+ """
+ def __init__(self, delayedResult, chain):
+ self.__dr = delayedResult
+ self.__chain = chain
+
+ def get(self):
+ """This makes handler think we are a delayedResult."""
+ if not self.__chain:
+ return self.__dr.get()
+
+ handler = self.__chain[0]
+ del self.__chain[0]
+ return handler(self)
+
+ def getJobID(self):
+ """Return the job id for the delayedResult we transform."""
+ return self.__dr.getJobID()
+
+
+ def __call__(self, delayedResult):
+ """This makes us a Handler. We just call handler(Traverser). The
+ handler will think it is getting a delayed result, but in fact
+ will be getting an instance of Traverser, which will take care
+ of properly applying the chain of transformations to delayedResult."""
+ chainTrav = self.Traverser(delayedResult, self.__chain[1:])
+ handler = self.__chain[0]
+ handler( chainTrav )
+
+