]> git.saurik.com Git - wxWidgets.git/blame - wxPython/distrib/all/taskrunner.py
fixed deadlock when calling wxPostEvent() from worker thread
[wxWidgets.git] / wxPython / distrib / all / taskrunner.py
CommitLineData
e4bb5998
RD
1#----------------------------------------------------------------------
2# Name: taskrunner.py
3# Purpose: Classes that can manage running of external processes,
4# either consecutively, simultaneously, or both, and can
5# log the output of those jobs
6#
7# Author: Robin Dunn
8#
9# Created: 05-Nov-2004
10# RCS-ID: $Id$
11# Copyright: (c) 2004 by Total Control Software
12# Licence: wxWindows license
13#----------------------------------------------------------------------
14
15import sys
16import os
17import signal
18import select
3a6e9820
KO
19import time
20
e4bb5998
RD
21from subprocess import Popen, PIPE, STDOUT
22
23
3a6e9820 24__all__ = ["Job", "Task", "TaskRunner", "TaskRunnerThread"]
e4bb5998
RD
25
26#----------------------------------------------------------------------
27
f1a9f331
KO
28# For environment settings
29class Config:
30 def asDict(self):
31 return self.__dict__.copy()
32
33 def write(self, filename="config", outfile=None):
34 if outfile is None:
35 f = file(filename, "w")
36 else:
37 f = outfile
38 for k, v in self.__dict__.items():
39 f.write('%s="%s"\n' % (k, v))
40
41 def read(self, filename="config"):
42 myfile = open(filename, "r")
43 for line in myfile.readlines():
44 line = line.strip()
45 if len(line) > 0 and line[0] == "#":
46 continue # it's a comment, move on
47 data = line.split("=")
48 if len(data) == 2:
0f475e8a 49 self.__dict__[data[0].strip()] = data[1].strip()
f1a9f331 50 myfile.close()
e4bb5998
RD
51
52class Job(object):
53 """
54 Each Job is a monitor wrapped around an externally executing
55 process. It handles starting the process, polling if it is still
56 running, reading and logging it's output, and killing it if
57 needed.
58 """
59
60 LOGBASE="."
61
3a6e9820 62 def __init__(self, label, command, args=[], env=os.environ, verbose=True):
e4bb5998 63 self.label = label
f1a9f331 64 self.command = command
e4bb5998 65 self.args = args
f1a9f331 66 self.env = env
e4bb5998 67 self.proc = None
3a6e9820
KO
68 self.startTime = None
69 self.stopTime = None
70 self.verbose = verbose
71 self.label = label
e4bb5998
RD
72
73 def start(self):
f1a9f331
KO
74 self.proc = Popen([self.command] + self.args, # the command and args to execute
75 stdout=PIPE, stderr=STDOUT, env=self.env,
76 bufsize=0 # line-buffered
e4bb5998 77 )
3a6e9820
KO
78 self.startTime = time.time()
79 if self.label:
80 if not os.path.exists(self.LOGBASE):
81 os.makedirs(self.LOGBASE)
82 self.log = file("%s/%s.log" % (self.LOGBASE, self.label), "w", 0)
83
e4bb5998
RD
84 # put the file in non-blocking mode
85 #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0)
86 #flags = flags | os.O_NONBLOCK
87 #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags)
88
89
90 def stop(self):
91 if self.proc is not None and self.proc.returncode is None:
92 os.kill(self.proc.pid, signal.SIGTERM)
93 self.logLines()
3a6e9820 94 self.stopTime = time.time()
e4bb5998
RD
95
96
97 def fileno(self):
98 if self.proc is not None:
99 return self.proc.stdout.fileno()
100 else:
101 return -1
102
3a6e9820
KO
103 def elapsedTime(self):
104 now = self.stopTime
105 if not now:
106 now = time.time()
107 elapsed_time = now-self.startTime
108 mins = elapsed_time/60
109 hours = mins/60
110 seconds = (elapsed_time - mins) % 60
111 return "%d:%d:%d" % (hours, mins, seconds)
112
e4bb5998
RD
113 def logLines(self):
114 if self.proc is not None:
115 while self.linesAvailable():
116 line = self.proc.stdout.readline()
117 if not line: break
118 if self.label:
119 self.log.write(line)
120 line = "** %s: %s" % (self.label, line)
3a6e9820
KO
121 if self.verbose:
122 sys.stdout.write(line)
e4bb5998
RD
123
124
125 def linesAvailable(self):
126 if self.proc is None:
127 return False
128 ind, outd, err = select.select([self], [], [], 0)
129 if ind:
130 return True
131 else:
132 return False
133
134
135 def finished(self):
136 if self.proc is None:# or self.linesAvailable():
137 return False
138 return self.proc.poll() is not None
139
140
141 def wait(self):
142 if self.proc is None: return None
143 return self.proc.wait()
144
145
146 def poll(self):
147 if self.proc is None: return None
148 return self.proc.poll()
149
150
151 def returnCode(self):
152 if self.proc is None: return None
153 return self.proc.returncode
154
155
156#----------------------------------------------------------------------
157
158class Task(object):
159 """
160 This class helps manage the running of a Task, which is a simply a
161 sequence of one or more Jobs, where subesquent jobs are not
162 started until prior ones are completed.
163 """
164 def __init__(self, jobs=[]):
165 if type(jobs) != list:
166 jobs = [jobs]
167 self.jobs = jobs[:]
168 self.active = 0
169
170 def append(self, job):
171 self.jobs.append(job)
172
173 def activeJob(self):
174 if self.active > len(self.jobs)-1:
175 return None
176 else:
177 return self.jobs[self.active]
178
179 def next(self):
180 self.active += 1
181 if self.active < len(self.jobs):
182 self.jobs[self.active].start()
183
184#----------------------------------------------------------------------
185
186class TaskRunner(object):
187 """
3a6e9820
KO
188 Manages the running of multiple tasks. Name can be used to identify
189 a specific TaskRunner instance when reporting information back to the user.
e4bb5998 190 """
3a6e9820 191 def __init__(self, tasks=[], name="TaskRunner Tasks"):
e4bb5998
RD
192 if type(tasks) != list:
193 tasks = [tasks]
194 self.tasks = tasks[:]
3a6e9820
KO
195 self.name = name
196 self.rc = 0
e4bb5998
RD
197
198 def append(self, task):
199 self.tasks.append(task)
3a6e9820
KO
200
201 def errorOccurred(self):
202 """
203 Only used for threaded TR instances. Once all TR tasks have completed,
204 we'll want to check to make sure there were no errors in the process.
205 """
206 return self.rc != 0
207
e4bb5998
RD
208 def run(self):
209 # start all the active jobs
210 for task in self.tasks:
211 task.activeJob().start()
212
213 try:
214 # loop, getting output from the jobs, etc.
215 while True:
216 # get all active Jobs
217 jobs = [t.activeJob() for t in self.tasks if t.activeJob()]
218 if not jobs:
219 break
220
221 # wait for a job to have output ready, then log it
222 input, output, err = select.select(jobs, [], [], 1)
223 for job in input:
224 job.logLines()
225
226 # check for finished jobs
227 for task in self.tasks:
228 job = task.activeJob()
229 if job and job.finished():
230 if job.returnCode() != 0:
231 rc = job.returnCode()
232 print "JOB RETURNED FAILURE CODE! (%d)" % rc
3a6e9820 233 self.rc = rc
e4bb5998
RD
234 self.stopAllJobs()
235 return rc
236 else:
237 task.next()
238 except KeyboardInterrupt:
239 print "STOPPING JOBS..."
240 self.stopAllJobs()
2c12daf9 241 return 1
e4bb5998
RD
242
243 except:
244 print "Unknown exception..."
245 self.stopAllJobs()
246 raise
247
248 return 0
249
250
251 def stopAllJobs(self):
252 for task in self.tasks:
253 job = task.activeJob()
254 if job:
255 job.stop()
256
3a6e9820
KO
257
258import threading
259
260class TaskRunnerThread(threading.Thread):
261 def __init__(self, taskRunner, callback=None):
262 self.taskRunner = taskRunner
263 self.startTime = None
264 self.stopTime = None
265 self.callback = callback
266 threading.Thread.__init__ ( self )
267
268 def run(self):
269 self.startTime = time.time()
270 self.taskRunner.run()
271 self.stopTime = time.time()
272 #if self.callback:
273 # self.callback
274
275 def elapsedTime(self):
276 now = self.stopTime
277 if not now:
278 now = time.time()
279 elapsed_time = now-self.startTime
280 mins = elapsed_time/60
281 hours = mins/60
282 seconds = (elapsed_time - mins) % 60
283 return "%d:%d:%d" % (hours, mins, seconds)
284
285 def totalTime(self):
286 if self.stopTime:
287 return self.elapsedTime()
288 else:
289 return None
290
e4bb5998
RD
291#----------------------------------------------------------------------
292
293
294if __name__ == "__main__":
295
3a6e9820
KO
296 j1 = Job("label1", "./tmp/job-1.py", ["TEST-1"])
297 j2 = Job("label2", "./tmp/job-2.sh", ["TEST-2"])
e4bb5998
RD
298
299 t1 = Task()
300 t1.append(j1)
301 t1.append(j2)
302
3a6e9820
KO
303 j3 = Job("task2a", "./tmp/job-1.py", ["TASK-2a"])
304 j4 = Job("task2b", "./tmp/job-2.sh", ["TASK-2b"])
e4bb5998
RD
305
306 t2 = Task()
307 t2.append(j4)
308 t2.append(j3)
309
3a6e9820 310 t3 = Task([Job("error", "./tmp/job-3.sh", ["TASK-3"])])
e4bb5998
RD
311
312 tr = TaskRunner()
313 tr.append(t1)
314 tr.append(t2)
315 tr.append(t3)
316
317 for task in tr.tasks:
318 for job in task.jobs:
319 print job.label
320
321 print tr.run()
322