]>
git.saurik.com Git - wxWidgets.git/blob - wxPython/wx/lib/delayedresult.py
2 This module supports the thread-safe, asynchronous transmission of data
3 ('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't
4 need to mutex lock any data, the worker thread doesn't wait (or even check)
5 for the result to be received, and the main thread doesn't wait for the
6 worker thread to send the result. Instead, the consumer will be called
7 automatically by the wx app when the worker thread result is available.
9 In most cases you just need to use startWorker() with the correct parameters
10 (your worker function and your 'consumer' in the simplest of cases). The
11 only requirement on consumer is that it must accept a DelayedResult instance
14 In the following example, this will call consumer(delayedResult) with the
15 return value from workerFn::
17 from delayedresult import startWorker
18 startWorker(consumer, workerFn)
22 - The other parameters to startWorker()
23 - Derive from Producer to override _extraInfo (e.g. to provide traceback info)
24 - Create your own worker-function-thread wrapper instead of using Producer
25 - Create your own Handler-like wrapper to pre- or post-process the result
27 - Derive from Sender to use your own way of making result hop over the
28 "thread boundary" (from non-main thread to main thread), e.g. using Queue
30 Thanks to Josiah Carlson for critical feedback/ideas that helped me
33 :Copyright: (c) 2006 by Oliver Schoenborn
34 :License: wxWidgets license
39 __author__
= 'Oliver Schoenborn at utoronto dot ca'
42 __all__
= ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter',
43 'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')
52 An object that has attributes built from the dictionary given in
53 constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
54 and assert ss.b == 'b'.
57 def __init__(self
, **kwargs
):
58 self
.__dict
__.update( kwargs
)
63 Bind some of the arguments and keyword arguments of a callable ('listener').
64 Then when the Handler instance is called (e.g. handler(result, **kwargs))
65 the result is passed as first argument to callable, the kwargs is
66 combined with those given at construction, and the args are those
67 given at construction. Its return value is returned.
69 def __init__(self
, listener
, *args
, **kwargs
):
70 """Bind args and kwargs to listener. """
71 self
.__listener
= listener
73 self
.__kwargs
= kwargs
75 def __call__(self
, result
, **moreKwargs
):
76 """Listener is assumed to take result as first arg, then *args,
77 then the combination of moreKwargs and the kwargs given at construction."""
79 moreKwargs
.update(self
.__kwargs
)
81 moreKwargs
= self
.__kwargs
82 return self
.__listener
(result
, *self
.__args
, **moreKwargs
)
87 Base class for various kinds of senders. A sender sends a result
88 produced by a worker funtion to a result handler (listener). Note
89 that each sender can be given a "job id". This can be anything
90 (number, string, id, and object, etc) and is not used, it is
91 simply added as attribute whenever a DelayedResult is created.
92 This allows you to know, if desired, what result corresponds to
93 which sender. Note that uniqueness is not necessary.
95 Derive from this class if none of the existing derived classes
96 are adequate, and override _sendImpl().
99 def __init__(self
, jobID
=None):
100 """The optional jobID can be anything that you want to use to
101 track which sender particular results come from. """
105 """Return the jobID given at construction"""
108 def sendResult(self
, result
):
109 """This will send the result to handler, using whatever
110 technique the derived class uses. """
111 delayedResult
= DelayedResult(result
, jobID
=self
.__jobID
)
112 self
._sendImpl
(delayedResult
)
114 def sendException(self
, exception
, extraInfo
= None):
115 """Use this when the worker function raised an exception.
116 The *exception* is the instance of Exception caught. The extraInfo
117 could be anything you want (e.g. locals or traceback etc),
118 it will be added to the exception as attribute 'extraInfo'. The
119 exception will be raised when DelayedResult.get() is called."""
120 assert exception
is not None
121 delayedResult
= DelayedResult(extraInfo
,
122 exception
=exception
, jobID
=self
.__jobID
)
123 self
._sendImpl
(delayedResult
)
125 def _sendImpl(self
, delayedResult
):
126 msg
= '_sendImpl() must be implemented in %s' % self
.__class
__
127 raise NotImplementedError(msg
)
130 class SenderNoWx( Sender
):
132 Sender that works without wx. The results are sent directly, ie
133 the consumer will get them "in the worker thread". So it should
134 only be used for testing.
136 def __init__(self
, consumer
, jobID
=None, args
=(), kwargs
={}):
137 """The consumer can be any callable of the form
138 callable(result, *args, **kwargs)"""
139 Sender
.__init
__(self
, jobID
)
141 self
.__consumer
= Handler(consumer
, *args
, **kwargs
)
143 self
.__consumer
= consumer
145 def _sendImpl(self
, delayedResult
):
146 self
.__consumer
(delayedResult
)
149 class SenderWxEvent( Sender
):
151 This sender sends the delayed result produced in the worker thread
152 to an event handler in the main thread, via a wx event of class
153 *eventClass*. The result is an attribute of the event (default:
156 def __init__(self
, handler
, eventClass
, resultAttr
="delayedResult",
157 jobID
=None, **kwargs
):
158 """The handler must derive from wx.EvtHandler. The event class
159 is typically the first item in the pair returned by
160 wx.lib.newevent.NewEvent(). You can use the *resultAttr*
161 to change the attribute name of the generated event's
163 Sender
.__init
__(self
, jobID
)
164 if not isinstance(handler
, wx
.EvtHandler
):
165 msg
= 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler
)
166 msg
= '%s handler must derive from wx.EvtHandler' % msg
167 raise ValueError(msg
)
168 self
.__consumer
= Struct(handler
=handler
, eventClass
=eventClass
,
169 resultAttr
=resultAttr
, kwargs
=kwargs
)
171 def _sendImpl(self
, delayedResult
):
172 """Must not modify the consumer (that was created at construction)
173 since might be shared by several senders, each sending from
175 consumer
= self
.__consumer
176 kwargs
= consumer
.kwargs
.copy()
177 kwargs
[ consumer
.resultAttr
] = delayedResult
178 event
= consumer
.eventClass(** kwargs
)
179 wx
.PostEvent(consumer
.handler
, event
)
182 class SenderCallAfter( Sender
):
184 This sender sends the delayed result produced in the worker thread
185 to a callable in the main thread, via wx.CallAfter.
187 def __init__(self
, listener
, jobID
=None, args
=(), kwargs
={}):
188 Sender
.__init
__(self
, jobID
)
190 self
.__consumer
= Handler(listener
, *args
, **kwargs
)
192 self
.__consumer
= listener
194 def _sendImpl(self
, delayedResult
):
195 wx
.CallAfter(self
.__consumer
, delayedResult
)
200 Represent the actual delayed result coming from the non-main thread.
201 An instance of this is given to the result handler. This result is
202 either a (reference to a) the value sent, or an exception.
203 If the latter, the exception is raised when the get() method gets
207 def __init__(self
, result
, jobID
=None, exception
= None):
208 """You should never have to call this yourself. A DelayedResult
209 is created by a concrete Sender for you."""
210 self
.__result
= result
211 self
.__exception
= exception
215 """Return the jobID given when Sender initialized,
216 or None if none given. """
220 """Get the result. If an exception was sent instead of a result,
221 (via Sender's sendExcept()), that **exception is raised**.
222 Otherwise the result is simply returned. """
223 if self
.__exception
: # exception was raised!
224 self
.__exception
.extraInfo
= self
.__result
225 raise self
.__exception
230 class Producer(threading
.Thread
):
232 Represent the worker thread that produces delayed results.
233 It causes the given function to run in a separate thread,
234 and a sender to be used to send the return value of the function.
235 As with any threading.Thread, instantiate and call start().
238 def __init__(self
, sender
, workerFn
, args
=(), kwargs
={},
239 name
=None, group
=None, daemon
=False,
240 sendReturn
=True, senderArg
=None):
241 """The sender will send the return value of
242 workerFn(*args, **kwargs) to the main thread. The name and group
243 are same as threading.Thread constructor parameters. Daemon causes
244 setDaemon() to be called. If sendReturn is False, then the return
245 value of workerFn() will not be sent. If senderArg is given, it
246 must be the name of the keyword arg to use to pass the sender into
247 the workerFn, so the function can send (typically many) results."""
249 kwargs
[senderArg
] = sender
252 result
= workerFn(*args
, **kwargs
)
253 except Exception, exc
:
254 extraInfo
= self
._extraInfo
(exc
)
255 sender
.sendException(exc
, extraInfo
)
258 sender
.sendResult(result
)
260 threading
.Thread
.__init
__(self
, name
=name
, group
=group
, target
=wrapper
)
262 self
.setDaemon(daemon
)
264 def _extraInfo(self
, exception
):
265 """This method could be overridden in a derived class to provide
266 extra information when an exception is being sent instead of a
273 cargs
=(), ckwargs
={},
274 wargs
=(), wkwargs
={},
275 jobID
=None, group
=None, daemon
=False,
276 sendReturn
=True, senderArg
=None):
278 Convenience function to send data produced by workerFn(*wargs, **wkwargs)
279 running in separate thread, to a consumer(*cargs, **ckwargs) running in
280 the main thread. This function merely creates a SenderCallAfter (or a
281 SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
282 and returns immediately after starting the Producer thread. The jobID
283 is used for the Sender and as name for the Producer thread. Returns the
284 thread created, in case caller needs join/etc.
287 if isinstance(consumer
, wx
.EvtHandler
):
288 eventClass
= cargs
[0]
289 sender
= SenderWxEvent(consumer
, eventClass
, jobID
=jobID
, **ckwargs
)
291 sender
= SenderCallAfter(consumer
, jobID
, args
=cargs
, kwargs
=ckwargs
)
294 sender
, workerFn
, args
=wargs
, kwargs
=wkwargs
,
295 name
=jobID
, group
=group
, daemon
=daemon
,
296 senderArg
=senderArg
, sendReturn
=sendReturn
)
302 class PreProcessChain
:
304 Represent a 'delayed result pre-processing chain', a kind of Handler.
305 Useful when lower-level objects need to apply a sequence of transformations
306 to the delayed result before handing it over to a final handler.
307 This allows the starter of the worker function to not know
308 anything about the lower-level objects.
310 def __init__(self
, handler
, *args
, **kwargs
):
311 """Wrap handler(result, *args, **kwargs) so that the result
312 it receives has been transformed by us. """
313 if handler
is None:# assume rhs is a chain
314 self
.__chain
= args
[0]
317 handler
= Handler(handler
, *args
, **kwargs
)
318 self
.__chain
= [handler
]
320 def addSub(self
, callable, *args
, **kwargs
):
321 """Add a sub-callable, ie a callable(result, *args, **kwargs)
322 that returns a transformed result to the previously added
323 sub-callable (or the handler given at construction, if this is
324 the first call to addSub). """
325 self
.__chain
.append( Handler(callable, *args
, **kwargs
) )
328 """Clone the chain. Shallow only. Useful when several threads
329 must be started but have different sub-callables. """
330 return PreProcessChain(None, self
.__chain
[:] )
332 def cloneAddSub(self
, callable, *args
, **kwargs
):
333 """Convenience method that first clones self, then calls addSub()
334 on that clone with given arguments. """
336 cc
.addSub(callable, *args
, **kwargs
)
339 """How many pre-processors in the chain"""
340 return len(self
.__chain
)
344 Traverses the chain of pre-processors it is given, transforming
345 the original delayedResult along the way. The return value of each
346 callable added via addSub() is given to the previous addSub() callable,
347 until the handler is reached.
349 def __init__(self
, delayedResult
, chain
):
350 self
.__dr
= delayedResult
354 """This makes handler think we are a delayedResult."""
356 return self
.__dr
.get()
358 handler
= self
.__chain
[0]
363 """Return the job id for the delayedResult we transform."""
364 return self
.__dr
.getJobID()
367 def __call__(self
, delayedResult
):
368 """This makes us a Handler. We just call handler(Traverser). The
369 handler will think it is getting a delayed result, but in fact
370 will be getting an instance of Traverser, which will take care
371 of properly applying the chain of transformations to delayedResult."""
372 chainTrav
= self
.Traverser(delayedResult
, self
.__chain
[1:])
373 handler
= self
.__chain
[0]