--- /dev/null
+"""
+This demonstrates a simple use of delayedresult: get/compute
+something that takes a long time, without hanging the GUI while this
+is taking place.
+
+The top button runs a small GUI that uses wx.lib.delayedresult.startWorker
+to wrap a long-running function into a separate thread. Just click
+Get, and move the slider, and click Get and Abort a few times, and
+observe that GUI responds. The key functions to look for in the code
+are startWorker() and __handleResult().
+
+The second button runs the same GUI, but without delayedresult. Click
+Get: now the get/compute is taking place in main thread, so the GUI
+does not respond to user actions until worker function returns, it's
+not even possible to Abort.
+"""
+
+import wx
+from wx.lib.delayedresult import startWorker
+
+class FrameSimpleDelayedGlade(wx.Frame):
+ def __init__(self, *args, **kwds):
+ # begin wxGlade: FrameSimpleDelayed.__init__
+ kwds["style"] = wx.DEFAULT_FRAME_STYLE
+ wx.Frame.__init__(self, *args, **kwds)
+ self.checkboxUseDelayed = wx.CheckBox(self, -1, "Use delayedresult")
+ self.buttonGet = wx.Button(self, -1, "Get")
+ self.buttonAbort = wx.Button(self, -1, "Abort")
+ self.slider = wx.Slider(self, -1, 0, 0, 10, size=(100,-1), style=wx.SL_HORIZONTAL|wx.SL_AUTOTICKS)
+ self.textCtrlResult = wx.TextCtrl(self, -1, "", style=wx.TE_READONLY)
+
+ self.__set_properties()
+ self.__do_layout()
+
+ self.Bind(wx.EVT_BUTTON, self.handleGet, self.buttonGet)
+ self.Bind(wx.EVT_BUTTON, self.handleAbort, self.buttonAbort)
+ # end wxGlade
+
+ def __set_properties(self):
+ # begin wxGlade: FrameSimpleDelayed.__set_properties
+ self.SetTitle("Simple Examle of Delayed Result")
+ self.checkboxUseDelayed.SetValue(1)
+ self.checkboxUseDelayed.Enable(False)
+ self.buttonAbort.Enable(False)
+ # end wxGlade
+
+ def __do_layout(self):
+ # begin wxGlade: FrameSimpleDelayed.__do_layout
+ sizerFrame = wx.BoxSizer(wx.VERTICAL)
+ sizerGetResult = wx.BoxSizer(wx.HORIZONTAL)
+ sizerUseDelayed = wx.BoxSizer(wx.HORIZONTAL)
+ sizerUseDelayed.Add(self.checkboxUseDelayed, 0, wx.LEFT|wx.ALIGN_CENTER_VERTICAL|wx.ADJUST_MINSIZE, 5)
+ sizerFrame.Add(sizerUseDelayed, 1, wx.EXPAND, 0)
+ sizerGetResult.Add(self.buttonGet, 0, wx.ADJUST_MINSIZE, 0)
+ sizerGetResult.Add(self.buttonAbort, 0, wx.ADJUST_MINSIZE, 0)
+ sizerGetResult.Add(self.slider, 0, wx.ADJUST_MINSIZE, 0)
+ sizerGetResult.Add(self.textCtrlResult, 0, wx.ADJUST_MINSIZE, 0)
+ sizerFrame.Add(sizerGetResult, 1, wx.ALL|wx.EXPAND, 5)
+ self.SetAutoLayout(True)
+ self.SetSizer(sizerFrame)
+ sizerFrame.Fit(self)
+ sizerFrame.SetSizeHints(self)
+ self.Layout()
+ # end wxGlade
+
+
+class FrameSimpleDelayed(FrameSimpleDelayedGlade):
+ """This demos simplistic use of delayedresult module."""
+
+ def __init__(self, *args, **kwargs):
+ self.jobID = 1
+ FrameSimpleDelayedGlade.__init__(self, *args, **kwargs)
+ self.Bind(wx.EVT_CLOSE, self.handleClose)
+
+ def setLog(self, log):
+ self.log = log
+
+ def handleClose(self, event):
+ """Only needed because in demo, closing the window does not kill the
+ app, so worker thread continues and sends result to dead frame; normally
+ your app would exit so this would not happen."""
+ if self.buttonAbort.IsEnabled():
+ self.Hide()
+ import time
+ time.sleep(5)
+ self.Destroy()
+
+ def handleGet(self, event):
+ """Compute result in separate thread, doesn't affect GUI response."""
+ self.buttonGet.Enable(False)
+ self.buttonAbort.Enable(True)
+
+ self.log( "Starting job %s in producer thread: GUI remains responsive" % self.jobID )
+ startWorker(self.__handleResult, self.__resultCreator,
+ wargs=(self.jobID,), jobID=self.jobID)
+
+ def __resultCreator(self, jobID):
+ """Pretend to be a complex worker function or something that takes
+ long time to run due to network access etc. GUI will freeze if this
+ method is not called in separate thread."""
+ import time
+ time.sleep(5)
+ return jobID
+
+ def handleAbort(self, event):
+ """Abort actually just means 'ignore the result when it gets to
+ handler, it is no longer relevant'. We just increase the job ID,
+ this will let handler know that the result has been cancelled."""
+ self.log( "Aborting result for job %s" % self.jobID )
+ self.buttonGet.Enable(True)
+ self.buttonAbort.Enable(False)
+ self.jobID += 1
+
+ def __handleResult(self, delayedResult):
+ # See if we still want the result for last job started
+ jobID = delayedResult.getJobID()
+ if jobID != self.jobID:
+ self.log( "Got obsolete result for job %s, ignored" % jobID )
+ return
+
+ # we do, get result:
+ try:
+ result = delayedResult.get()
+ except Exception, exc:
+ self.log( "Result for job %s raised exception: %s" % (jobID, exc) )
+ self.jobID += 1
+ return
+
+ # output result
+ self.log( "Got result for job %s: %s" % (jobID, result) )
+ self.textCtrlResult.SetValue(str(result))
+
+ # get ready for next job:
+ self.buttonGet.Enable(True)
+ self.buttonAbort.Enable(False)
+ self.jobID += 1
+
+
+class FrameSimpleDirect(FrameSimpleDelayedGlade):
+ """This does not use delayedresult so the GUI will freeze while
+ the GET is taking place."""
+
+ def __init__(self, *args, **kwargs):
+ self.jobID = 1
+ FrameSimpleDelayedGlade.__init__(self, *args, **kwargs)
+ self.checkboxUseDelayed.SetValue(False)
+
+ def setLog(self, log):
+ self.log = log
+
+ def handleGet(self, event):
+ """Use delayedresult, this will compute
+ result in separate thread, and won't affect GUI response. """
+ self.buttonGet.Enable(False)
+ self.buttonAbort.Enable(True)
+
+ self.log( "Doing job %s without delayedresult (same as GUI thread): GUI hangs (for a while)" % self.jobID )
+ result = self.__resultCreator(self.jobID)
+ self.__handleResult( result )
+
+ def __resultCreator(self, jobID):
+ """Pretend to be a complex worker function or something that takes
+ long time to run due to network access etc. GUI will freeze if this
+ method is not called in separate thread."""
+ import time
+ time.sleep(5)
+ return jobID
+
+ def handleAbort(self, event):
+ """can never be called"""
+ pass
+
+ def __handleResult(self, result):
+ # output result
+ self.log( "Got result for job %s: %s" % (self.jobID, result) )
+ self.textCtrlResult.SetValue(str(result))
+
+ # get ready for next job:
+ self.buttonGet.Enable(True)
+ self.buttonAbort.Enable(False)
+ self.jobID += 1
+
+
+#---------------------------------------------------------------------------
+#---------------------------------------------------------------------------
+
+class TestPanel(wx.Panel):
+ def __init__(self, parent, log):
+ self.log = log
+ wx.Panel.__init__(self, parent, -1)
+
+ vsizer = wx.BoxSizer(wx.VERTICAL)
+ b = wx.Button(self, -1, "Long-running function in separate thread")
+ vsizer.Add(b, 0, wx.ALL, 5)
+ self.Bind(wx.EVT_BUTTON, self.OnButton1, b)
+
+ b = wx.Button(self, -1, "Long-running function in GUI thread")
+ vsizer.Add(b, 0, wx.ALL, 5)
+ self.Bind(wx.EVT_BUTTON, self.OnButton2, b)
+
+ bdr = wx.BoxSizer()
+ bdr.Add(vsizer, 0, wx.ALL, 50)
+ self.SetSizer(bdr)
+ self.Layout()
+
+ def OnButton1(self, evt):
+ frame = FrameSimpleDelayed(self, title="Long-running function in separate thread")
+ frame.setLog(self.log.WriteText)
+ frame.Show()
+
+ def OnButton2(self, evt):
+ frame = FrameSimpleDirect(self, title="Long-running function in GUI thread")
+ frame.setLog(self.log.WriteText)
+ frame.Show()
+
+
+#---------------------------------------------------------------------------
+
+
+def runTest(frame, nb, log):
+ win = TestPanel(nb, log)
+ return win
+
+
+#---------------------------------------------------------------------------
+
+
+overview = __doc__
+
+
+if __name__ == '__main__':
+ import sys,os
+ import run
+ run.main(['', os.path.basename(sys.argv[0])] + sys.argv[1:])
+
+
--- /dev/null
+"""
+This module supports the thread-safe, asynchronous transmission of data
+('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't
+need to mutex lock any data, the worker thread doesn't wait (or even check)
+for the result to be received, and the main thread doesn't wait for the
+worker thread to send the result. Instead, the consumer will be called
+automatically by the wx app when the worker thread result is available.
+
+In most cases you just need to use startWorker() with the correct parameters
+(your worker function and your 'consumer' in the simplest of cases). The
+only requirement on consumer is that it must accept a DelayedResult instance
+as first arg.
+
+In the following example, this will call consumer(delayedResult) with the
+return value from workerFn::
+
+ from delayedresult import startWorker
+ startWorker(consumer, workerFn)
+
+More advanced uses:
+
+- The other parameters to startWorker()
+- Derive from Producer to override _extraInfo (e.g. to provide traceback info)
+- Create your own worker-function-thread wrapper instead of using Producer
+- Create your own Handler-like wrapper to pre- or post-process the result
+ (see PreProcessChain)
+- Derive from Sender to use your own way of making result hop over the
+ "thread boundary" (from non-main thread to main thread), e.g. using Queue
+
+Thanks to Josiah Carlson for critical feedback/ideas that helped me
+improve this module.
+
+:Copyright: (c) 2006 by Oliver Schoenborn
+:License: wxWidgets license
+:Version: 1.0
+
+"""
+
+__author__ = 'Oliver Schoenborn at utoronto dot ca'
+__version__ = '1.0'
+
+__all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter',
+ 'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')
+
+
+import wx
+import threading
+
+
+class Struct:
+ """
+ An object that has attributes built from the dictionary given in
+ constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
+ and assert ss.b == 'b'.
+ """
+
+ def __init__(self, **kwargs):
+ self.__dict__.update( kwargs )
+
+
+class Handler:
+ """
+ Bind some of the arguments and keyword arguments of a callable ('listener').
+ Then when the Handler instance is called (e.g. handler(result, **kwargs))
+ the result is passed as first argument to callable, the kwargs is
+ combined with those given at construction, and the args are those
+ given at construction. Its return value is returned.
+ """
+ def __init__(self, listener, *args, **kwargs ):
+ """Bind args and kwargs to listener. """
+ self.__listener = listener
+ self.__args = args
+ self.__kwargs = kwargs
+
+ def __call__(self, result, **moreKwargs):
+ """Listener is assumed to take result as first arg, then *args,
+ then the combination of moreKwargs and the kwargs given at construction."""
+ if moreKwargs:
+ moreKwargs.update(self.__kwargs)
+ else:
+ moreKwargs = self.__kwargs
+ return self.__listener(result, *self.__args, **moreKwargs)
+
+
+class Sender:
+ """
+ Base class for various kinds of senders. A sender sends a result
+ produced by a worker funtion to a result handler (listener). Note
+ that each sender can be given a "job id". This can be anything
+ (number, string, id, and object, etc) and is not used, it is
+ simply added as attribute whenever a DelayedResult is created.
+ This allows you to know, if desired, what result corresponds to
+ which sender. Note that uniqueness is not necessary.
+
+ Derive from this class if none of the existing derived classes
+ are adequate, and override _sendImpl().
+ """
+
+ def __init__(self, jobID=None):
+ """The optional jobID can be anything that you want to use to
+ track which sender particular results come from. """
+ self.__jobID = jobID
+
+ def getJobID(self):
+ """Return the jobID given at construction"""
+ return self.__jobID
+
+ def sendResult(self, result):
+ """This will send the result to handler, using whatever
+ technique the derived class uses. """
+ delayedResult = DelayedResult(result, jobID=self.__jobID)
+ self._sendImpl(delayedResult)
+
+ def sendException(self, exception, extraInfo = None):
+ """Use this when the worker function raised an exception.
+ The *exception* is the instance of Exception caught. The extraInfo
+ could be anything you want (e.g. locals or traceback etc),
+ it will be added to the exception as attribute 'extraInfo'. The
+ exception will be raised when DelayedResult.get() is called."""
+ assert exception is not None
+ delayedResult = DelayedResult(extraInfo,
+ exception=exception, jobID=self.__jobID)
+ self._sendImpl(delayedResult)
+
+ def _sendImpl(self, delayedResult):
+ msg = '_sendImpl() must be implemented in %s' % self.__class__
+ raise NotImplementedError(msg)
+
+
+class SenderNoWx( Sender ):
+ """
+ Sender that works without wx. The results are sent directly, ie
+ the consumer will get them "in the worker thread". So it should
+ only be used for testing.
+ """
+ def __init__(self, consumer, jobID=None, args=(), kwargs={}):
+ """The consumer can be any callable of the form
+ callable(result, *args, **kwargs)"""
+ Sender.__init__(self, jobID)
+ if args or kwargs:
+ self.__consumer = Handler(consumer, *args, **kwargs)
+ else:
+ self.__consumer = consumer
+
+ def _sendImpl(self, delayedResult):
+ self.__consumer(delayedResult)
+
+
+class SenderWxEvent( Sender ):
+ """
+ This sender sends the delayed result produced in the worker thread
+ to an event handler in the main thread, via a wx event of class
+ *eventClass*. The result is an attribute of the event (default:
+ "delayedResult".
+ """
+ def __init__(self, handler, eventClass, resultAttr="delayedResult",
+ jobID=None, **kwargs):
+ """The handler must derive from wx.EvtHandler. The event class
+ is typically the first item in the pair returned by
+ wx.lib.newevent.NewEvent(). You can use the *resultAttr*
+ to change the attribute name of the generated event's
+ delayed result. """
+ Sender.__init__(self, jobID)
+ if not isinstance(handler, wx.EvtHandler):
+ msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler)
+ msg = '%s handler must derive from wx.EvtHandler' % msg
+ raise ValueError(msg)
+ self.__consumer = Struct(handler=handler, eventClass=eventClass,
+ resultAttr=resultAttr, kwargs=kwargs)
+
+ def _sendImpl(self, delayedResult):
+ """Must not modify the consumer (that was created at construction)
+ since might be shared by several senders, each sending from
+ separate threads."""
+ consumer = self.__consumer
+ kwargs = consumer.kwargs.copy()
+ kwargs[ consumer.resultAttr ] = delayedResult
+ event = consumer.eventClass(** kwargs)
+ wx.PostEvent(consumer.handler, event)
+
+
+class SenderCallAfter( Sender ):
+ """
+ This sender sends the delayed result produced in the worker thread
+ to a callable in the main thread, via wx.CallAfter.
+ """
+ def __init__(self, listener, jobID=None, args=(), kwargs={}):
+ Sender.__init__(self, jobID)
+ if args or kwargs:
+ self.__consumer = Handler(listener, *args, **kwargs)
+ else:
+ self.__consumer = listener
+
+ def _sendImpl(self, delayedResult):
+ wx.CallAfter(self.__consumer, delayedResult)
+
+
+class DelayedResult:
+ """
+ Represent the actual delayed result coming from the non-main thread.
+ An instance of this is given to the result handler. This result is
+ either a (reference to a) the value sent, or an exception.
+ If the latter, the exception is raised when the get() method gets
+ called.
+ """
+
+ def __init__(self, result, jobID=None, exception = None):
+ """You should never have to call this yourself. A DelayedResult
+ is created by a concrete Sender for you."""
+ self.__result = result
+ self.__exception = exception
+ self.__jobID = jobID
+
+ def getJobID(self):
+ """Return the jobID given when Sender initialized,
+ or None if none given. """
+ return self.__jobID
+
+ def get(self):
+ """Get the result. If an exception was sent instead of a result,
+ (via Sender's sendExcept()), that **exception is raised**.
+ Otherwise the result is simply returned. """
+ if self.__exception: # exception was raised!
+ self.__exception.extraInfo = self.__result
+ raise self.__exception
+
+ return self.__result
+
+
+class Producer(threading.Thread):
+ """
+ Represent the worker thread that produces delayed results.
+ It causes the given function to run in a separate thread,
+ and a sender to be used to send the return value of the function.
+ As with any threading.Thread, instantiate and call start().
+ """
+
+ def __init__(self, sender, workerFn, args=(), kwargs={},
+ name=None, group=None, daemon=False,
+ sendReturn=True, senderArg=None):
+ """The sender will send the return value of
+ workerFn(*args, **kwargs) to the main thread. The name and group
+ are same as threading.Thread constructor parameters. Daemon causes
+ setDaemon() to be called. If sendReturn is False, then the return
+ value of workerFn() will not be sent. If senderArg is given, it
+ must be the name of the keyword arg to use to pass the sender into
+ the workerFn, so the function can send (typically many) results."""
+ if senderArg:
+ kwargs[senderArg] = sender
+ def wrapper():
+ try:
+ result = workerFn(*args, **kwargs)
+ except Exception, exc:
+ extraInfo = self._extraInfo(exc)
+ sender.sendException(exc, extraInfo)
+ else:
+ if sendReturn:
+ sender.sendResult(result)
+
+ threading.Thread.__init__(self, name=name, group=group, target=wrapper)
+ if daemon:
+ self.setDaemon(daemon)
+
+ def _extraInfo(self, exception):
+ """This method could be overridden in a derived class to provide
+ extra information when an exception is being sent instead of a
+ result. """
+ return None
+
+
+def startWorker(
+ consumer, workerFn,
+ cargs=(), ckwargs={},
+ wargs=(), wkwargs={},
+ jobID=None, group=None, daemon=False,
+ sendReturn=True, senderArg=None):
+ """
+ Convenience function to send data produced by workerFn(*wargs, **wkwargs)
+ running in separate thread, to a consumer(*cargs, **ckwargs) running in
+ the main thread. This function merely creates a SenderCallAfter (or a
+ SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
+ and returns immediately after starting the Producer thread. The jobID
+ is used for the Sender and as name for the Producer thread. Returns the
+ thread created, in case caller needs join/etc.
+ """
+
+ if isinstance(consumer, wx.EvtHandler):
+ eventClass = cargs[0]
+ sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs)
+ else:
+ sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs)
+
+ thread = Producer(
+ sender, workerFn, args=wargs, kwargs=wkwargs,
+ name=jobID, group=group, daemon=daemon,
+ senderArg=senderArg, sendReturn=sendReturn)
+
+ thread.start()
+ return thread
+
+
+class PreProcessChain:
+ """
+ Represent a 'delayed result pre-processing chain', a kind of Handler.
+ Useful when lower-level objects need to apply a sequence of transformations
+ to the delayed result before handing it over to a final handler.
+ This allows the starter of the worker function to not know
+ anything about the lower-level objects.
+ """
+ def __init__(self, handler, *args, **kwargs):
+ """Wrap handler(result, *args, **kwargs) so that the result
+ it receives has been transformed by us. """
+ if handler is None:# assume rhs is a chain
+ self.__chain = args[0]
+ else:
+ if args or kwargs:
+ handler = Handler(handler, *args, **kwargs)
+ self.__chain = [handler]
+
+ def addSub(self, callable, *args, **kwargs):
+ """Add a sub-callable, ie a callable(result, *args, **kwargs)
+ that returns a transformed result to the previously added
+ sub-callable (or the handler given at construction, if this is
+ the first call to addSub). """
+ self.__chain.append( Handler(callable, *args, **kwargs) )
+
+ def clone(self):
+ """Clone the chain. Shallow only. Useful when several threads
+ must be started but have different sub-callables. """
+ return PreProcessChain(None, self.__chain[:] )
+
+ def cloneAddSub(self, callable, *args, **kwargs):
+ """Convenience method that first clones self, then calls addSub()
+ on that clone with given arguments. """
+ cc = self.clone()
+ cc.addSub(callable, *args, **kwargs)
+
+ def count(self):
+ """How many pre-processors in the chain"""
+ return len(self.__chain)
+
+ class Traverser:
+ """
+ Traverses the chain of pre-processors it is given, transforming
+ the original delayedResult along the way. The return value of each
+ callable added via addSub() is given to the previous addSub() callable,
+ until the handler is reached.
+ """
+ def __init__(self, delayedResult, chain):
+ self.__dr = delayedResult
+ self.__chain = chain
+
+ def get(self):
+ """This makes handler think we are a delayedResult."""
+ if not self.__chain:
+ return self.__dr.get()
+
+ handler = self.__chain[0]
+ del self.__chain[0]
+ return handler(self)
+
+ def getJobID(self):
+ """Return the job id for the delayedResult we transform."""
+ return self.__dr.getJobID()
+
+
+ def __call__(self, delayedResult):
+ """This makes us a Handler. We just call handler(Traverser). The
+ handler will think it is getting a delayed result, but in fact
+ will be getting an instance of Traverser, which will take care
+ of properly applying the chain of transformations to delayedResult."""
+ chainTrav = self.Traverser(delayedResult, self.__chain[1:])
+ handler = self.__chain[0]
+ handler( chainTrav )
+
+