]> git.saurik.com Git - wxWidgets.git/blob - wxPython/wx/lib/delayedresult.py
Bug fix from Pierre
[wxWidgets.git] / wxPython / wx / lib / delayedresult.py
1 """
2 This module supports the thread-safe, asynchronous transmission of data
3 ('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't
4 need to mutex lock any data, the worker thread doesn't wait (or even check)
5 for the result to be received, and the main thread doesn't wait for the
6 worker thread to send the result. Instead, the consumer will be called
7 automatically by the wx app when the worker thread result is available.
8
9 In most cases you just need to use startWorker() with the correct parameters
10 (your worker function and your 'consumer' in the simplest of cases). The
11 only requirement on consumer is that it must accept a DelayedResult instance
12 as first arg.
13
14 In the following example, this will call consumer(delayedResult) with the
15 return value from workerFn::
16
17 from delayedresult import startWorker
18 startWorker(consumer, workerFn)
19
20 More advanced uses:
21
22 - The other parameters to startWorker()
23 - Derive from Producer to override _extraInfo (e.g. to provide traceback info)
24 - Create your own worker-function-thread wrapper instead of using Producer
25 - Create your own Handler-like wrapper to pre- or post-process the result
26 (see PreProcessChain)
27 - Derive from Sender to use your own way of making result hop over the
28 "thread boundary" (from non-main thread to main thread), e.g. using Queue
29
30 Thanks to Josiah Carlson for critical feedback/ideas that helped me
31 improve this module.
32
33 :Copyright: (c) 2006 by Oliver Schoenborn
34 :License: wxWidgets license
35 :Version: 1.0
36
37 """
38
39 __author__ = 'Oliver Schoenborn at utoronto dot ca'
40 __version__ = '1.0'
41
42 __all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter',
43 'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')
44
45
46 import wx
47 import threading
48
49
50 class Struct:
51 """
52 An object that has attributes built from the dictionary given in
53 constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
54 and assert ss.b == 'b'.
55 """
56
57 def __init__(self, **kwargs):
58 self.__dict__.update( kwargs )
59
60
61 class Handler:
62 """
63 Bind some of the arguments and keyword arguments of a callable ('listener').
64 Then when the Handler instance is called (e.g. handler(result, **kwargs))
65 the result is passed as first argument to callable, the kwargs is
66 combined with those given at construction, and the args are those
67 given at construction. Its return value is returned.
68 """
69 def __init__(self, listener, *args, **kwargs ):
70 """Bind args and kwargs to listener. """
71 self.__listener = listener
72 self.__args = args
73 self.__kwargs = kwargs
74
75 def __call__(self, result, **moreKwargs):
76 """Listener is assumed to take result as first arg, then *args,
77 then the combination of moreKwargs and the kwargs given at construction."""
78 if moreKwargs:
79 moreKwargs.update(self.__kwargs)
80 else:
81 moreKwargs = self.__kwargs
82 return self.__listener(result, *self.__args, **moreKwargs)
83
84
85 class Sender:
86 """
87 Base class for various kinds of senders. A sender sends a result
88 produced by a worker funtion to a result handler (listener). Note
89 that each sender can be given a "job id". This can be anything
90 (number, string, id, and object, etc) and is not used, it is
91 simply added as attribute whenever a DelayedResult is created.
92 This allows you to know, if desired, what result corresponds to
93 which sender. Note that uniqueness is not necessary.
94
95 Derive from this class if none of the existing derived classes
96 are adequate, and override _sendImpl().
97 """
98
99 def __init__(self, jobID=None):
100 """The optional jobID can be anything that you want to use to
101 track which sender particular results come from. """
102 self.__jobID = jobID
103
104 def getJobID(self):
105 """Return the jobID given at construction"""
106 return self.__jobID
107
108 def sendResult(self, result):
109 """This will send the result to handler, using whatever
110 technique the derived class uses. """
111 delayedResult = DelayedResult(result, jobID=self.__jobID)
112 self._sendImpl(delayedResult)
113
114 def sendException(self, exception, extraInfo = None):
115 """Use this when the worker function raised an exception.
116 The *exception* is the instance of Exception caught. The extraInfo
117 could be anything you want (e.g. locals or traceback etc),
118 it will be added to the exception as attribute 'extraInfo'. The
119 exception will be raised when DelayedResult.get() is called."""
120 assert exception is not None
121 delayedResult = DelayedResult(extraInfo,
122 exception=exception, jobID=self.__jobID)
123 self._sendImpl(delayedResult)
124
125 def _sendImpl(self, delayedResult):
126 msg = '_sendImpl() must be implemented in %s' % self.__class__
127 raise NotImplementedError(msg)
128
129
130 class SenderNoWx( Sender ):
131 """
132 Sender that works without wx. The results are sent directly, ie
133 the consumer will get them "in the worker thread". So it should
134 only be used for testing.
135 """
136 def __init__(self, consumer, jobID=None, args=(), kwargs={}):
137 """The consumer can be any callable of the form
138 callable(result, *args, **kwargs)"""
139 Sender.__init__(self, jobID)
140 if args or kwargs:
141 self.__consumer = Handler(consumer, *args, **kwargs)
142 else:
143 self.__consumer = consumer
144
145 def _sendImpl(self, delayedResult):
146 self.__consumer(delayedResult)
147
148
149 class SenderWxEvent( Sender ):
150 """
151 This sender sends the delayed result produced in the worker thread
152 to an event handler in the main thread, via a wx event of class
153 *eventClass*. The result is an attribute of the event (default:
154 "delayedResult".
155 """
156 def __init__(self, handler, eventClass, resultAttr="delayedResult",
157 jobID=None, **kwargs):
158 """The handler must derive from wx.EvtHandler. The event class
159 is typically the first item in the pair returned by
160 wx.lib.newevent.NewEvent(). You can use the *resultAttr*
161 to change the attribute name of the generated event's
162 delayed result. """
163 Sender.__init__(self, jobID)
164 if not isinstance(handler, wx.EvtHandler):
165 msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler)
166 msg = '%s handler must derive from wx.EvtHandler' % msg
167 raise ValueError(msg)
168 self.__consumer = Struct(handler=handler, eventClass=eventClass,
169 resultAttr=resultAttr, kwargs=kwargs)
170
171 def _sendImpl(self, delayedResult):
172 """Must not modify the consumer (that was created at construction)
173 since might be shared by several senders, each sending from
174 separate threads."""
175 consumer = self.__consumer
176 kwargs = consumer.kwargs.copy()
177 kwargs[ consumer.resultAttr ] = delayedResult
178 event = consumer.eventClass(** kwargs)
179 wx.PostEvent(consumer.handler, event)
180
181
182 class SenderCallAfter( Sender ):
183 """
184 This sender sends the delayed result produced in the worker thread
185 to a callable in the main thread, via wx.CallAfter.
186 """
187 def __init__(self, listener, jobID=None, args=(), kwargs={}):
188 Sender.__init__(self, jobID)
189 if args or kwargs:
190 self.__consumer = Handler(listener, *args, **kwargs)
191 else:
192 self.__consumer = listener
193
194 def _sendImpl(self, delayedResult):
195 wx.CallAfter(self.__consumer, delayedResult)
196
197
198 class DelayedResult:
199 """
200 Represent the actual delayed result coming from the non-main thread.
201 An instance of this is given to the result handler. This result is
202 either a (reference to a) the value sent, or an exception.
203 If the latter, the exception is raised when the get() method gets
204 called.
205 """
206
207 def __init__(self, result, jobID=None, exception = None):
208 """You should never have to call this yourself. A DelayedResult
209 is created by a concrete Sender for you."""
210 self.__result = result
211 self.__exception = exception
212 self.__jobID = jobID
213
214 def getJobID(self):
215 """Return the jobID given when Sender initialized,
216 or None if none given. """
217 return self.__jobID
218
219 def get(self):
220 """Get the result. If an exception was sent instead of a result,
221 (via Sender's sendExcept()), that **exception is raised**.
222 Otherwise the result is simply returned. """
223 if self.__exception: # exception was raised!
224 self.__exception.extraInfo = self.__result
225 raise self.__exception
226
227 return self.__result
228
229
230 class AbortedException(Exception):
231 """Raise this in your worker function so that the sender knows
232 not to send a result to handler."""
233 pass
234
235
236 class Producer(threading.Thread):
237 """
238 Represent the worker thread that produces delayed results.
239 It causes the given function to run in a separate thread,
240 and a sender to be used to send the return value of the function.
241 As with any threading.Thread, instantiate and call start().
242 Note that if the workerFn raises AbortedException, the result is not
243 sent and the thread terminates gracefully.
244 """
245
246 def __init__(self, sender, workerFn, args=(), kwargs={},
247 name=None, group=None, daemon=False,
248 sendReturn=True, senderArg=None):
249 """The sender will send the return value of
250 workerFn(*args, **kwargs) to the main thread. The name and group
251 are same as threading.Thread constructor parameters. Daemon causes
252 setDaemon() to be called. If sendReturn is False, then the return
253 value of workerFn() will not be sent. If senderArg is given, it
254 must be the name of the keyword arg to use to pass the sender into
255 the workerFn, so the function can send (typically many) results."""
256 if senderArg:
257 kwargs[senderArg] = sender
258 def wrapper():
259 try:
260 result = workerFn(*args, **kwargs)
261 except AbortedException:
262 pass
263 except Exception, exc:
264 extraInfo = self._extraInfo(exc)
265 sender.sendException(exc, extraInfo)
266 else:
267 if sendReturn:
268 sender.sendResult(result)
269
270 threading.Thread.__init__(self, name=name, group=group, target=wrapper)
271 if daemon:
272 self.setDaemon(daemon)
273
274 def _extraInfo(self, exception):
275 """This method could be overridden in a derived class to provide
276 extra information when an exception is being sent instead of a
277 result. """
278 return None
279
280
281 class AbortEvent:
282 """
283 Convenience class that represents a kind of threading.Event that
284 raises AbortedException when called (see the __call__ method, everything
285 else is just to make it look like threading.Event).
286 """
287
288 def __init__(self):
289 self.__ev = threading.Event()
290
291 def __call__(self, timeout=None):
292 """See if event has been set (wait at most timeout if given). If so,
293 raise AbortedException. Otherwise return None. Allows you to do
294 'while not event():' which will always succeed unless the event
295 has been set (then AbortedException will cause while to exit)."""
296 if timeout:
297 self.__ev.wait(timeout)
298 if self.__ev.isSet():
299 raise AbortedException()
300 return None
301
302 def __getattr__(self, name):
303 """This allows us to be a kind of threading.Event."""
304 if name in ('set','clear','wait','isSet'):
305 return getattr(self.__ev, name)
306
307
308 def startWorker(
309 consumer, workerFn,
310 cargs=(), ckwargs={},
311 wargs=(), wkwargs={},
312 jobID=None, group=None, daemon=False,
313 sendReturn=True, senderArg=None):
314 """
315 Convenience function to send data produced by workerFn(*wargs, **wkwargs)
316 running in separate thread, to a consumer(*cargs, **ckwargs) running in
317 the main thread. This function merely creates a SenderCallAfter (or a
318 SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
319 and returns immediately after starting the Producer thread. The jobID
320 is used for the Sender and as name for the Producer thread. Returns the
321 thread created, in case caller needs join/etc.
322 """
323
324 if isinstance(consumer, wx.EvtHandler):
325 eventClass = cargs[0]
326 sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs)
327 else:
328 sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs)
329
330 thread = Producer(
331 sender, workerFn, args=wargs, kwargs=wkwargs,
332 name=jobID, group=group, daemon=daemon,
333 senderArg=senderArg, sendReturn=sendReturn)
334
335 thread.start()
336 return thread
337
338
339 class PreProcessChain:
340 """
341 Represent a 'delayed result pre-processing chain', a kind of Handler.
342 Useful when lower-level objects need to apply a sequence of transformations
343 to the delayed result before handing it over to a final handler.
344 This allows the starter of the worker function to not know
345 anything about the lower-level objects.
346 """
347 def __init__(self, handler, *args, **kwargs):
348 """Wrap handler(result, *args, **kwargs) so that the result
349 it receives has been transformed by us. """
350 if handler is None:# assume rhs is a chain
351 self.__chain = args[0]
352 else:
353 if args or kwargs:
354 handler = Handler(handler, *args, **kwargs)
355 self.__chain = [handler]
356
357 def addSub(self, callable, *args, **kwargs):
358 """Add a sub-callable, ie a callable(result, *args, **kwargs)
359 that returns a transformed result to the previously added
360 sub-callable (or the handler given at construction, if this is
361 the first call to addSub). """
362 self.__chain.append( Handler(callable, *args, **kwargs) )
363
364 def clone(self):
365 """Clone the chain. Shallow only. Useful when several threads
366 must be started but have different sub-callables. """
367 return PreProcessChain(None, self.__chain[:] )
368
369 def cloneAddSub(self, callable, *args, **kwargs):
370 """Convenience method that first clones self, then calls addSub()
371 on that clone with given arguments. """
372 cc = self.clone()
373 cc.addSub(callable, *args, **kwargs)
374
375 def count(self):
376 """How many pre-processors in the chain"""
377 return len(self.__chain)
378
379 class Traverser:
380 """
381 Traverses the chain of pre-processors it is given, transforming
382 the original delayedResult along the way. The return value of each
383 callable added via addSub() is given to the previous addSub() callable,
384 until the handler is reached.
385 """
386 def __init__(self, delayedResult, chain):
387 self.__dr = delayedResult
388 self.__chain = chain
389
390 def get(self):
391 """This makes handler think we are a delayedResult."""
392 if not self.__chain:
393 return self.__dr.get()
394
395 handler = self.__chain[0]
396 del self.__chain[0]
397 return handler(self)
398
399 def getJobID(self):
400 """Return the job id for the delayedResult we transform."""
401 return self.__dr.getJobID()
402
403
404 def __call__(self, delayedResult):
405 """This makes us a Handler. We just call handler(Traverser). The
406 handler will think it is getting a delayed result, but in fact
407 will be getting an instance of Traverser, which will take care
408 of properly applying the chain of transformations to delayedResult."""
409 chainTrav = self.Traverser(delayedResult, self.__chain[1:])
410 handler = self.__chain[0]
411 handler( chainTrav )
412