]> git.saurik.com Git - wxWidgets.git/blob - wxPython/wx/lib/delayedresult.py
added more properties
[wxWidgets.git] / wxPython / wx / lib / delayedresult.py
1 """
2 This module supports the thread-safe, asynchronous transmission of data
3 ('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't
4 need to mutex lock any data, the worker thread doesn't wait (or even check)
5 for the result to be received, and the main thread doesn't wait for the
6 worker thread to send the result. Instead, the consumer will be called
7 automatically by the wx app when the worker thread result is available.
8
9 In most cases you just need to use startWorker() with the correct parameters
10 (your worker function and your 'consumer' in the simplest of cases). The
11 only requirement on consumer is that it must accept a DelayedResult instance
12 as first arg.
13
14 In the following example, this will call consumer(delayedResult) with the
15 return value from workerFn::
16
17 from delayedresult import startWorker
18 startWorker(consumer, workerFn)
19
20 More advanced uses:
21
22 - The other parameters to startWorker()
23 - Derive from Producer to override _extraInfo (e.g. to provide traceback info)
24 - Create your own worker-function-thread wrapper instead of using Producer
25 - Create your own Handler-like wrapper to pre- or post-process the result
26 (see PreProcessChain)
27 - Derive from Sender to use your own way of making result hop over the
28 "thread boundary" (from non-main thread to main thread), e.g. using Queue
29
30 Thanks to Josiah Carlson for critical feedback/ideas that helped me
31 improve this module.
32
33 :Copyright: (c) 2006 by Oliver Schoenborn
34 :License: wxWidgets license
35 :Version: 1.0
36
37 """
38
39 __author__ = 'Oliver Schoenborn at utoronto dot ca'
40 __version__ = '1.0'
41
42 __all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter',
43 'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')
44
45
46 import wx
47 import threading
48
49
50 class Struct:
51 """
52 An object that has attributes built from the dictionary given in
53 constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
54 and assert ss.b == 'b'.
55 """
56
57 def __init__(self, **kwargs):
58 self.__dict__.update( kwargs )
59
60
61 class Handler:
62 """
63 Bind some of the arguments and keyword arguments of a callable ('listener').
64 Then when the Handler instance is called (e.g. handler(result, **kwargs))
65 the result is passed as first argument to callable, the kwargs is
66 combined with those given at construction, and the args are those
67 given at construction. Its return value is returned.
68 """
69 def __init__(self, listener, *args, **kwargs ):
70 """Bind args and kwargs to listener. """
71 self.__listener = listener
72 self.__args = args
73 self.__kwargs = kwargs
74
75 def __call__(self, result, **moreKwargs):
76 """Listener is assumed to take result as first arg, then *args,
77 then the combination of moreKwargs and the kwargs given at construction."""
78 if moreKwargs:
79 moreKwargs.update(self.__kwargs)
80 else:
81 moreKwargs = self.__kwargs
82 return self.__listener(result, *self.__args, **moreKwargs)
83
84
85 class Sender:
86 """
87 Base class for various kinds of senders. A sender sends a result
88 produced by a worker funtion to a result handler (listener). Note
89 that each sender can be given a "job id". This can be anything
90 (number, string, id, and object, etc) and is not used, it is
91 simply added as attribute whenever a DelayedResult is created.
92 This allows you to know, if desired, what result corresponds to
93 which sender. Note that uniqueness is not necessary.
94
95 Derive from this class if none of the existing derived classes
96 are adequate, and override _sendImpl().
97 """
98
99 def __init__(self, jobID=None):
100 """The optional jobID can be anything that you want to use to
101 track which sender particular results come from. """
102 self.__jobID = jobID
103
104 def getJobID(self):
105 """Return the jobID given at construction"""
106 return self.__jobID
107
108 def sendResult(self, result):
109 """This will send the result to handler, using whatever
110 technique the derived class uses. """
111 delayedResult = DelayedResult(result, jobID=self.__jobID)
112 self._sendImpl(delayedResult)
113
114 def sendException(self, exception, extraInfo = None):
115 """Use this when the worker function raised an exception.
116 The *exception* is the instance of Exception caught. The extraInfo
117 could be anything you want (e.g. locals or traceback etc),
118 it will be added to the exception as attribute 'extraInfo'. The
119 exception will be raised when DelayedResult.get() is called."""
120 assert exception is not None
121 delayedResult = DelayedResult(extraInfo,
122 exception=exception, jobID=self.__jobID)
123 self._sendImpl(delayedResult)
124
125 def _sendImpl(self, delayedResult):
126 msg = '_sendImpl() must be implemented in %s' % self.__class__
127 raise NotImplementedError(msg)
128
129
130 class SenderNoWx( Sender ):
131 """
132 Sender that works without wx. The results are sent directly, ie
133 the consumer will get them "in the worker thread". So it should
134 only be used for testing.
135 """
136 def __init__(self, consumer, jobID=None, args=(), kwargs={}):
137 """The consumer can be any callable of the form
138 callable(result, *args, **kwargs)"""
139 Sender.__init__(self, jobID)
140 if args or kwargs:
141 self.__consumer = Handler(consumer, *args, **kwargs)
142 else:
143 self.__consumer = consumer
144
145 def _sendImpl(self, delayedResult):
146 self.__consumer(delayedResult)
147
148
149 class SenderWxEvent( Sender ):
150 """
151 This sender sends the delayed result produced in the worker thread
152 to an event handler in the main thread, via a wx event of class
153 *eventClass*. The result is an attribute of the event (default:
154 "delayedResult".
155 """
156 def __init__(self, handler, eventClass, resultAttr="delayedResult",
157 jobID=None, **kwargs):
158 """The handler must derive from wx.EvtHandler. The event class
159 is typically the first item in the pair returned by
160 wx.lib.newevent.NewEvent(). You can use the *resultAttr*
161 to change the attribute name of the generated event's
162 delayed result. """
163 Sender.__init__(self, jobID)
164 if not isinstance(handler, wx.EvtHandler):
165 msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler)
166 msg = '%s handler must derive from wx.EvtHandler' % msg
167 raise ValueError(msg)
168 self.__consumer = Struct(handler=handler, eventClass=eventClass,
169 resultAttr=resultAttr, kwargs=kwargs)
170
171 def _sendImpl(self, delayedResult):
172 """Must not modify the consumer (that was created at construction)
173 since might be shared by several senders, each sending from
174 separate threads."""
175 consumer = self.__consumer
176 kwargs = consumer.kwargs.copy()
177 kwargs[ consumer.resultAttr ] = delayedResult
178 event = consumer.eventClass(** kwargs)
179 wx.PostEvent(consumer.handler, event)
180
181
182 class SenderCallAfter( Sender ):
183 """
184 This sender sends the delayed result produced in the worker thread
185 to a callable in the main thread, via wx.CallAfter.
186 """
187 def __init__(self, listener, jobID=None, args=(), kwargs={}):
188 Sender.__init__(self, jobID)
189 if args or kwargs:
190 self.__consumer = Handler(listener, *args, **kwargs)
191 else:
192 self.__consumer = listener
193
194 def _sendImpl(self, delayedResult):
195 wx.CallAfter(self.__consumer, delayedResult)
196
197
198 class DelayedResult:
199 """
200 Represent the actual delayed result coming from the non-main thread.
201 An instance of this is given to the result handler. This result is
202 either a (reference to a) the value sent, or an exception.
203 If the latter, the exception is raised when the get() method gets
204 called.
205 """
206
207 def __init__(self, result, jobID=None, exception = None):
208 """You should never have to call this yourself. A DelayedResult
209 is created by a concrete Sender for you."""
210 self.__result = result
211 self.__exception = exception
212 self.__jobID = jobID
213
214 def getJobID(self):
215 """Return the jobID given when Sender initialized,
216 or None if none given. """
217 return self.__jobID
218
219 def get(self):
220 """Get the result. If an exception was sent instead of a result,
221 (via Sender's sendExcept()), that **exception is raised**.
222 Otherwise the result is simply returned. """
223 if self.__exception: # exception was raised!
224 self.__exception.extraInfo = self.__result
225 raise self.__exception
226
227 return self.__result
228
229
230 class Producer(threading.Thread):
231 """
232 Represent the worker thread that produces delayed results.
233 It causes the given function to run in a separate thread,
234 and a sender to be used to send the return value of the function.
235 As with any threading.Thread, instantiate and call start().
236 """
237
238 def __init__(self, sender, workerFn, args=(), kwargs={},
239 name=None, group=None, daemon=False,
240 sendReturn=True, senderArg=None):
241 """The sender will send the return value of
242 workerFn(*args, **kwargs) to the main thread. The name and group
243 are same as threading.Thread constructor parameters. Daemon causes
244 setDaemon() to be called. If sendReturn is False, then the return
245 value of workerFn() will not be sent. If senderArg is given, it
246 must be the name of the keyword arg to use to pass the sender into
247 the workerFn, so the function can send (typically many) results."""
248 if senderArg:
249 kwargs[senderArg] = sender
250 def wrapper():
251 try:
252 result = workerFn(*args, **kwargs)
253 except Exception, exc:
254 extraInfo = self._extraInfo(exc)
255 sender.sendException(exc, extraInfo)
256 else:
257 if sendReturn:
258 sender.sendResult(result)
259
260 threading.Thread.__init__(self, name=name, group=group, target=wrapper)
261 if daemon:
262 self.setDaemon(daemon)
263
264 def _extraInfo(self, exception):
265 """This method could be overridden in a derived class to provide
266 extra information when an exception is being sent instead of a
267 result. """
268 return None
269
270
271 def startWorker(
272 consumer, workerFn,
273 cargs=(), ckwargs={},
274 wargs=(), wkwargs={},
275 jobID=None, group=None, daemon=False,
276 sendReturn=True, senderArg=None):
277 """
278 Convenience function to send data produced by workerFn(*wargs, **wkwargs)
279 running in separate thread, to a consumer(*cargs, **ckwargs) running in
280 the main thread. This function merely creates a SenderCallAfter (or a
281 SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
282 and returns immediately after starting the Producer thread. The jobID
283 is used for the Sender and as name for the Producer thread. Returns the
284 thread created, in case caller needs join/etc.
285 """
286
287 if isinstance(consumer, wx.EvtHandler):
288 eventClass = cargs[0]
289 sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs)
290 else:
291 sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs)
292
293 thread = Producer(
294 sender, workerFn, args=wargs, kwargs=wkwargs,
295 name=jobID, group=group, daemon=daemon,
296 senderArg=senderArg, sendReturn=sendReturn)
297
298 thread.start()
299 return thread
300
301
302 class PreProcessChain:
303 """
304 Represent a 'delayed result pre-processing chain', a kind of Handler.
305 Useful when lower-level objects need to apply a sequence of transformations
306 to the delayed result before handing it over to a final handler.
307 This allows the starter of the worker function to not know
308 anything about the lower-level objects.
309 """
310 def __init__(self, handler, *args, **kwargs):
311 """Wrap handler(result, *args, **kwargs) so that the result
312 it receives has been transformed by us. """
313 if handler is None:# assume rhs is a chain
314 self.__chain = args[0]
315 else:
316 if args or kwargs:
317 handler = Handler(handler, *args, **kwargs)
318 self.__chain = [handler]
319
320 def addSub(self, callable, *args, **kwargs):
321 """Add a sub-callable, ie a callable(result, *args, **kwargs)
322 that returns a transformed result to the previously added
323 sub-callable (or the handler given at construction, if this is
324 the first call to addSub). """
325 self.__chain.append( Handler(callable, *args, **kwargs) )
326
327 def clone(self):
328 """Clone the chain. Shallow only. Useful when several threads
329 must be started but have different sub-callables. """
330 return PreProcessChain(None, self.__chain[:] )
331
332 def cloneAddSub(self, callable, *args, **kwargs):
333 """Convenience method that first clones self, then calls addSub()
334 on that clone with given arguments. """
335 cc = self.clone()
336 cc.addSub(callable, *args, **kwargs)
337
338 def count(self):
339 """How many pre-processors in the chain"""
340 return len(self.__chain)
341
342 class Traverser:
343 """
344 Traverses the chain of pre-processors it is given, transforming
345 the original delayedResult along the way. The return value of each
346 callable added via addSub() is given to the previous addSub() callable,
347 until the handler is reached.
348 """
349 def __init__(self, delayedResult, chain):
350 self.__dr = delayedResult
351 self.__chain = chain
352
353 def get(self):
354 """This makes handler think we are a delayedResult."""
355 if not self.__chain:
356 return self.__dr.get()
357
358 handler = self.__chain[0]
359 del self.__chain[0]
360 return handler(self)
361
362 def getJobID(self):
363 """Return the job id for the delayedResult we transform."""
364 return self.__dr.getJobID()
365
366
367 def __call__(self, delayedResult):
368 """This makes us a Handler. We just call handler(Traverser). The
369 handler will think it is getting a delayed result, but in fact
370 will be getting an instance of Traverser, which will take care
371 of properly applying the chain of transformations to delayedResult."""
372 chainTrav = self.Traverser(delayedResult, self.__chain[1:])
373 handler = self.__chain[0]
374 handler( chainTrav )
375
376