]>
Commit | Line | Data |
---|---|---|
1 | """ | |
2 | This module supports the thread-safe, asynchronous transmission of data | |
3 | ('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't | |
4 | need to mutex lock any data, the worker thread doesn't wait (or even check) | |
5 | for the result to be received, and the main thread doesn't wait for the | |
6 | worker thread to send the result. Instead, the consumer will be called | |
7 | automatically by the wx app when the worker thread result is available. | |
8 | ||
9 | In most cases you just need to use startWorker() with the correct parameters | |
10 | (your worker function and your 'consumer' in the simplest of cases). The | |
11 | only requirement on consumer is that it must accept a DelayedResult instance | |
12 | as first arg. | |
13 | ||
14 | In the following example, this will call consumer(delayedResult) with the | |
15 | return value from workerFn:: | |
16 | ||
17 | from delayedresult import startWorker | |
18 | startWorker(consumer, workerFn) | |
19 | ||
20 | More advanced uses: | |
21 | ||
22 | - The other parameters to startWorker() | |
23 | - Derive from Producer to override _extraInfo (e.g. to provide traceback info) | |
24 | - Create your own worker-function-thread wrapper instead of using Producer | |
25 | - Create your own Handler-like wrapper to pre- or post-process the result | |
26 | (see PreProcessChain) | |
27 | - Derive from Sender to use your own way of making result hop over the | |
28 | "thread boundary" (from non-main thread to main thread), e.g. using Queue | |
29 | ||
30 | Thanks to Josiah Carlson for critical feedback/ideas that helped me | |
31 | improve this module. | |
32 | ||
33 | :Copyright: (c) 2006 by Oliver Schoenborn | |
34 | :License: wxWidgets license | |
35 | :Version: 1.0 | |
36 | ||
37 | """ | |
38 | ||
39 | __author__ = 'Oliver Schoenborn at utoronto dot ca' | |
40 | __version__ = '1.0' | |
41 | ||
42 | __all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter', | |
43 | 'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain') | |
44 | ||
45 | ||
46 | import wx | |
47 | import threading | |
48 | ||
49 | ||
50 | class Struct: | |
51 | """ | |
52 | An object that has attributes built from the dictionary given in | |
53 | constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1 | |
54 | and assert ss.b == 'b'. | |
55 | """ | |
56 | ||
57 | def __init__(self, **kwargs): | |
58 | self.__dict__.update( kwargs ) | |
59 | ||
60 | ||
61 | class Handler: | |
62 | """ | |
63 | Bind some of the arguments and keyword arguments of a callable ('listener'). | |
64 | Then when the Handler instance is called (e.g. handler(result, **kwargs)) | |
65 | the result is passed as first argument to callable, the kwargs is | |
66 | combined with those given at construction, and the args are those | |
67 | given at construction. Its return value is returned. | |
68 | """ | |
69 | def __init__(self, listener, *args, **kwargs ): | |
70 | """Bind args and kwargs to listener. """ | |
71 | self.__listener = listener | |
72 | self.__args = args | |
73 | self.__kwargs = kwargs | |
74 | ||
75 | def __call__(self, result, **moreKwargs): | |
76 | """Listener is assumed to take result as first arg, then *args, | |
77 | then the combination of moreKwargs and the kwargs given at construction.""" | |
78 | if moreKwargs: | |
79 | moreKwargs.update(self.__kwargs) | |
80 | else: | |
81 | moreKwargs = self.__kwargs | |
82 | return self.__listener(result, *self.__args, **moreKwargs) | |
83 | ||
84 | ||
85 | class Sender: | |
86 | """ | |
87 | Base class for various kinds of senders. A sender sends a result | |
88 | produced by a worker funtion to a result handler (listener). Note | |
89 | that each sender can be given a "job id". This can be anything | |
90 | (number, string, id, and object, etc) and is not used, it is | |
91 | simply added as attribute whenever a DelayedResult is created. | |
92 | This allows you to know, if desired, what result corresponds to | |
93 | which sender. Note that uniqueness is not necessary. | |
94 | ||
95 | Derive from this class if none of the existing derived classes | |
96 | are adequate, and override _sendImpl(). | |
97 | """ | |
98 | ||
99 | def __init__(self, jobID=None): | |
100 | """The optional jobID can be anything that you want to use to | |
101 | track which sender particular results come from. """ | |
102 | self.__jobID = jobID | |
103 | ||
104 | def getJobID(self): | |
105 | """Return the jobID given at construction""" | |
106 | return self.__jobID | |
107 | ||
108 | def sendResult(self, result): | |
109 | """This will send the result to handler, using whatever | |
110 | technique the derived class uses. """ | |
111 | delayedResult = DelayedResult(result, jobID=self.__jobID) | |
112 | self._sendImpl(delayedResult) | |
113 | ||
114 | def sendException(self, exception, extraInfo = None): | |
115 | """Use this when the worker function raised an exception. | |
116 | The *exception* is the instance of Exception caught. The extraInfo | |
117 | could be anything you want (e.g. locals or traceback etc), | |
118 | it will be added to the exception as attribute 'extraInfo'. The | |
119 | exception will be raised when DelayedResult.get() is called.""" | |
120 | assert exception is not None | |
121 | delayedResult = DelayedResult(extraInfo, | |
122 | exception=exception, jobID=self.__jobID) | |
123 | self._sendImpl(delayedResult) | |
124 | ||
125 | def _sendImpl(self, delayedResult): | |
126 | msg = '_sendImpl() must be implemented in %s' % self.__class__ | |
127 | raise NotImplementedError(msg) | |
128 | ||
129 | ||
130 | class SenderNoWx( Sender ): | |
131 | """ | |
132 | Sender that works without wx. The results are sent directly, ie | |
133 | the consumer will get them "in the worker thread". So it should | |
134 | only be used for testing. | |
135 | """ | |
136 | def __init__(self, consumer, jobID=None, args=(), kwargs={}): | |
137 | """The consumer can be any callable of the form | |
138 | callable(result, *args, **kwargs)""" | |
139 | Sender.__init__(self, jobID) | |
140 | if args or kwargs: | |
141 | self.__consumer = Handler(consumer, *args, **kwargs) | |
142 | else: | |
143 | self.__consumer = consumer | |
144 | ||
145 | def _sendImpl(self, delayedResult): | |
146 | self.__consumer(delayedResult) | |
147 | ||
148 | ||
149 | class SenderWxEvent( Sender ): | |
150 | """ | |
151 | This sender sends the delayed result produced in the worker thread | |
152 | to an event handler in the main thread, via a wx event of class | |
153 | *eventClass*. The result is an attribute of the event (default: | |
154 | "delayedResult". | |
155 | """ | |
156 | def __init__(self, handler, eventClass, resultAttr="delayedResult", | |
157 | jobID=None, **kwargs): | |
158 | """The handler must derive from wx.EvtHandler. The event class | |
159 | is typically the first item in the pair returned by | |
160 | wx.lib.newevent.NewEvent(). You can use the *resultAttr* | |
161 | to change the attribute name of the generated event's | |
162 | delayed result. """ | |
163 | Sender.__init__(self, jobID) | |
164 | if not isinstance(handler, wx.EvtHandler): | |
165 | msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler) | |
166 | msg = '%s handler must derive from wx.EvtHandler' % msg | |
167 | raise ValueError(msg) | |
168 | self.__consumer = Struct(handler=handler, eventClass=eventClass, | |
169 | resultAttr=resultAttr, kwargs=kwargs) | |
170 | ||
171 | def _sendImpl(self, delayedResult): | |
172 | """Must not modify the consumer (that was created at construction) | |
173 | since might be shared by several senders, each sending from | |
174 | separate threads.""" | |
175 | consumer = self.__consumer | |
176 | kwargs = consumer.kwargs.copy() | |
177 | kwargs[ consumer.resultAttr ] = delayedResult | |
178 | event = consumer.eventClass(** kwargs) | |
179 | wx.PostEvent(consumer.handler, event) | |
180 | ||
181 | ||
182 | class SenderCallAfter( Sender ): | |
183 | """ | |
184 | This sender sends the delayed result produced in the worker thread | |
185 | to a callable in the main thread, via wx.CallAfter. | |
186 | """ | |
187 | def __init__(self, listener, jobID=None, args=(), kwargs={}): | |
188 | Sender.__init__(self, jobID) | |
189 | if args or kwargs: | |
190 | self.__consumer = Handler(listener, *args, **kwargs) | |
191 | else: | |
192 | self.__consumer = listener | |
193 | ||
194 | def _sendImpl(self, delayedResult): | |
195 | wx.CallAfter(self.__consumer, delayedResult) | |
196 | ||
197 | ||
198 | class DelayedResult: | |
199 | """ | |
200 | Represent the actual delayed result coming from the non-main thread. | |
201 | An instance of this is given to the result handler. This result is | |
202 | either a (reference to a) the value sent, or an exception. | |
203 | If the latter, the exception is raised when the get() method gets | |
204 | called. | |
205 | """ | |
206 | ||
207 | def __init__(self, result, jobID=None, exception = None): | |
208 | """You should never have to call this yourself. A DelayedResult | |
209 | is created by a concrete Sender for you.""" | |
210 | self.__result = result | |
211 | self.__exception = exception | |
212 | self.__jobID = jobID | |
213 | ||
214 | def getJobID(self): | |
215 | """Return the jobID given when Sender initialized, | |
216 | or None if none given. """ | |
217 | return self.__jobID | |
218 | ||
219 | def get(self): | |
220 | """Get the result. If an exception was sent instead of a result, | |
221 | (via Sender's sendExcept()), that **exception is raised**. | |
222 | Otherwise the result is simply returned. """ | |
223 | if self.__exception: # exception was raised! | |
224 | self.__exception.extraInfo = self.__result | |
225 | raise self.__exception | |
226 | ||
227 | return self.__result | |
228 | ||
229 | ||
230 | class AbortedException(Exception): | |
231 | """Raise this in your worker function so that the sender knows | |
232 | not to send a result to handler.""" | |
233 | pass | |
234 | ||
235 | ||
236 | class Producer(threading.Thread): | |
237 | """ | |
238 | Represent the worker thread that produces delayed results. | |
239 | It causes the given function to run in a separate thread, | |
240 | and a sender to be used to send the return value of the function. | |
241 | As with any threading.Thread, instantiate and call start(). | |
242 | Note that if the workerFn raises AbortedException, the result is not | |
243 | sent and the thread terminates gracefully. | |
244 | """ | |
245 | ||
246 | def __init__(self, sender, workerFn, args=(), kwargs={}, | |
247 | name=None, group=None, daemon=False, | |
248 | sendReturn=True, senderArg=None): | |
249 | """The sender will send the return value of | |
250 | workerFn(*args, **kwargs) to the main thread. The name and group | |
251 | are same as threading.Thread constructor parameters. Daemon causes | |
252 | setDaemon() to be called. If sendReturn is False, then the return | |
253 | value of workerFn() will not be sent. If senderArg is given, it | |
254 | must be the name of the keyword arg to use to pass the sender into | |
255 | the workerFn, so the function can send (typically many) results.""" | |
256 | if senderArg: | |
257 | kwargs[senderArg] = sender | |
258 | def wrapper(): | |
259 | try: | |
260 | result = workerFn(*args, **kwargs) | |
261 | except AbortedException: | |
262 | pass | |
263 | except Exception, exc: | |
264 | extraInfo = self._extraInfo(exc) | |
265 | sender.sendException(exc, extraInfo) | |
266 | else: | |
267 | if sendReturn: | |
268 | sender.sendResult(result) | |
269 | ||
270 | threading.Thread.__init__(self, name=name, group=group, target=wrapper) | |
271 | if daemon: | |
272 | self.setDaemon(daemon) | |
273 | ||
274 | def _extraInfo(self, exception): | |
275 | """This method could be overridden in a derived class to provide | |
276 | extra information when an exception is being sent instead of a | |
277 | result. """ | |
278 | return None | |
279 | ||
280 | ||
281 | class AbortEvent: | |
282 | """ | |
283 | Convenience class that represents a kind of threading.Event that | |
284 | raises AbortedException when called (see the __call__ method, everything | |
285 | else is just to make it look like threading.Event). | |
286 | """ | |
287 | ||
288 | def __init__(self): | |
289 | self.__ev = threading.Event() | |
290 | ||
291 | def __call__(self, timeout=None): | |
292 | """See if event has been set (wait at most timeout if given). If so, | |
293 | raise AbortedException. Otherwise return None. Allows you to do | |
294 | 'while not event():' which will always succeed unless the event | |
295 | has been set (then AbortedException will cause while to exit).""" | |
296 | if timeout: | |
297 | self.__ev.wait(timeout) | |
298 | if self.__ev.isSet(): | |
299 | raise AbortedException() | |
300 | return None | |
301 | ||
302 | def __getattr__(self, name): | |
303 | """This allows us to be a kind of threading.Event.""" | |
304 | if name in ('set','clear','wait','isSet'): | |
305 | return getattr(self.__ev, name) | |
306 | ||
307 | ||
308 | def startWorker( | |
309 | consumer, workerFn, | |
310 | cargs=(), ckwargs={}, | |
311 | wargs=(), wkwargs={}, | |
312 | jobID=None, group=None, daemon=False, | |
313 | sendReturn=True, senderArg=None): | |
314 | """ | |
315 | Convenience function to send data produced by workerFn(*wargs, **wkwargs) | |
316 | running in separate thread, to a consumer(*cargs, **ckwargs) running in | |
317 | the main thread. This function merely creates a SenderCallAfter (or a | |
318 | SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer, | |
319 | and returns immediately after starting the Producer thread. The jobID | |
320 | is used for the Sender and as name for the Producer thread. Returns the | |
321 | thread created, in case caller needs join/etc. | |
322 | """ | |
323 | ||
324 | if isinstance(consumer, wx.EvtHandler): | |
325 | eventClass = cargs[0] | |
326 | sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs) | |
327 | else: | |
328 | sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs) | |
329 | ||
330 | thread = Producer( | |
331 | sender, workerFn, args=wargs, kwargs=wkwargs, | |
332 | name=jobID, group=group, daemon=daemon, | |
333 | senderArg=senderArg, sendReturn=sendReturn) | |
334 | ||
335 | thread.start() | |
336 | return thread | |
337 | ||
338 | ||
339 | class PreProcessChain: | |
340 | """ | |
341 | Represent a 'delayed result pre-processing chain', a kind of Handler. | |
342 | Useful when lower-level objects need to apply a sequence of transformations | |
343 | to the delayed result before handing it over to a final handler. | |
344 | This allows the starter of the worker function to not know | |
345 | anything about the lower-level objects. | |
346 | """ | |
347 | def __init__(self, handler, *args, **kwargs): | |
348 | """Wrap handler(result, *args, **kwargs) so that the result | |
349 | it receives has been transformed by us. """ | |
350 | if handler is None:# assume rhs is a chain | |
351 | self.__chain = args[0] | |
352 | else: | |
353 | if args or kwargs: | |
354 | handler = Handler(handler, *args, **kwargs) | |
355 | self.__chain = [handler] | |
356 | ||
357 | def addSub(self, callable, *args, **kwargs): | |
358 | """Add a sub-callable, ie a callable(result, *args, **kwargs) | |
359 | that returns a transformed result to the previously added | |
360 | sub-callable (or the handler given at construction, if this is | |
361 | the first call to addSub). """ | |
362 | self.__chain.append( Handler(callable, *args, **kwargs) ) | |
363 | ||
364 | def clone(self): | |
365 | """Clone the chain. Shallow only. Useful when several threads | |
366 | must be started but have different sub-callables. """ | |
367 | return PreProcessChain(None, self.__chain[:] ) | |
368 | ||
369 | def cloneAddSub(self, callable, *args, **kwargs): | |
370 | """Convenience method that first clones self, then calls addSub() | |
371 | on that clone with given arguments. """ | |
372 | cc = self.clone() | |
373 | cc.addSub(callable, *args, **kwargs) | |
374 | ||
375 | def count(self): | |
376 | """How many pre-processors in the chain""" | |
377 | return len(self.__chain) | |
378 | ||
379 | class Traverser: | |
380 | """ | |
381 | Traverses the chain of pre-processors it is given, transforming | |
382 | the original delayedResult along the way. The return value of each | |
383 | callable added via addSub() is given to the previous addSub() callable, | |
384 | until the handler is reached. | |
385 | """ | |
386 | def __init__(self, delayedResult, chain): | |
387 | self.__dr = delayedResult | |
388 | self.__chain = chain | |
389 | ||
390 | def get(self): | |
391 | """This makes handler think we are a delayedResult.""" | |
392 | if not self.__chain: | |
393 | return self.__dr.get() | |
394 | ||
395 | handler = self.__chain[0] | |
396 | del self.__chain[0] | |
397 | return handler(self) | |
398 | ||
399 | def getJobID(self): | |
400 | """Return the job id for the delayedResult we transform.""" | |
401 | return self.__dr.getJobID() | |
402 | ||
403 | ||
404 | def __call__(self, delayedResult): | |
405 | """This makes us a Handler. We just call handler(Traverser). The | |
406 | handler will think it is getting a delayed result, but in fact | |
407 | will be getting an instance of Traverser, which will take care | |
408 | of properly applying the chain of transformations to delayedResult.""" | |
409 | chainTrav = self.Traverser(delayedResult, self.__chain[1:]) | |
410 | handler = self.__chain[0] | |
411 | handler( chainTrav ) | |
412 |