]> git.saurik.com Git - wxWidgets.git/blob - wxPython/distrib/all/taskrunner.py
reduce the number of objects for a quicker startup time
[wxWidgets.git] / wxPython / distrib / all / taskrunner.py
1 #----------------------------------------------------------------------
2 # Name: taskrunner.py
3 # Purpose: Classes that can manage running of external processes,
4 # either consecutively, simultaneously, or both, and can
5 # log the output of those jobs
6 #
7 # Author: Robin Dunn
8 #
9 # Created: 05-Nov-2004
10 # RCS-ID: $Id$
11 # Copyright: (c) 2004 by Total Control Software
12 # Licence: wxWindows license
13 #----------------------------------------------------------------------
14
15 import sys
16 import os
17 import signal
18 import select
19 import time
20
21 from subprocess import Popen, PIPE, STDOUT
22
23
24 __all__ = ["Job", "Task", "TaskRunner", "TaskRunnerThread"]
25
26 #----------------------------------------------------------------------
27
28 # For environment settings
29 class Config:
30 def asDict(self):
31 return self.__dict__.copy()
32
33 def write(self, filename="config", outfile=None):
34 if outfile is None:
35 f = file(filename, "w")
36 else:
37 f = outfile
38 for k, v in self.__dict__.items():
39 f.write('%s="%s"\n' % (k, v))
40
41 def read(self, filename="config"):
42 myfile = open(filename, "r")
43 for line in myfile.readlines():
44 line = line.strip()
45 if len(line) > 0 and line[0] == "#":
46 continue # it's a comment, move on
47 data = line.split("=")
48 if len(data) == 2:
49 self.__dict__[data[0].strip()] = data[1].strip()
50 myfile.close()
51
52 class Job(object):
53 """
54 Each Job is a monitor wrapped around an externally executing
55 process. It handles starting the process, polling if it is still
56 running, reading and logging it's output, and killing it if
57 needed.
58 """
59
60 LOGBASE="."
61
62 def __init__(self, label, command, args=[], env=os.environ, verbose=True):
63 self.label = label
64 self.command = command
65 self.args = args
66 self.env = env
67 self.proc = None
68 self.startTime = None
69 self.stopTime = None
70 self.verbose = verbose
71 self.label = label
72
73 def start(self):
74 self.proc = Popen([self.command] + self.args, # the command and args to execute
75 stdout=PIPE, stderr=STDOUT, env=self.env,
76 bufsize=0 # line-buffered
77 )
78 self.startTime = time.time()
79 if self.label:
80 if not os.path.exists(self.LOGBASE):
81 os.makedirs(self.LOGBASE)
82 self.log = file("%s/%s.log" % (self.LOGBASE, self.label), "w", 0)
83
84 # put the file in non-blocking mode
85 #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0)
86 #flags = flags | os.O_NONBLOCK
87 #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags)
88
89
90 def stop(self):
91 if self.proc is not None and self.proc.returncode is None:
92 os.kill(self.proc.pid, signal.SIGTERM)
93 self.logLines()
94 self.stopTime = time.time()
95
96
97 def fileno(self):
98 if self.proc is not None:
99 return self.proc.stdout.fileno()
100 else:
101 return -1
102
103 def elapsedTime(self):
104 now = self.stopTime
105 if not now:
106 now = time.time()
107 elapsed_time = now-self.startTime
108 mins = elapsed_time/60
109 hours = mins/60
110 seconds = (elapsed_time - mins) % 60
111 return "%d:%d:%d" % (hours, mins, seconds)
112
113 def logLines(self):
114 if self.proc is not None:
115 while self.linesAvailable():
116 line = self.proc.stdout.readline()
117 if not line: break
118 if self.label:
119 self.log.write(line)
120 line = "** %s: %s" % (self.label, line)
121 if self.verbose:
122 sys.stdout.write(line)
123
124
125 def linesAvailable(self):
126 if self.proc is None:
127 return False
128 ind, outd, err = select.select([self], [], [], 0)
129 if ind:
130 return True
131 else:
132 return False
133
134
135 def finished(self):
136 if self.proc is None:# or self.linesAvailable():
137 return False
138 return self.proc.poll() is not None
139
140
141 def wait(self):
142 if self.proc is None: return None
143 return self.proc.wait()
144
145
146 def poll(self):
147 if self.proc is None: return None
148 return self.proc.poll()
149
150
151 def returnCode(self):
152 if self.proc is None: return None
153 return self.proc.returncode
154
155
156 #----------------------------------------------------------------------
157
158 class Task(object):
159 """
160 This class helps manage the running of a Task, which is a simply a
161 sequence of one or more Jobs, where subesquent jobs are not
162 started until prior ones are completed.
163 """
164 def __init__(self, jobs=[]):
165 if type(jobs) != list:
166 jobs = [jobs]
167 self.jobs = jobs[:]
168 self.active = 0
169
170 def append(self, job):
171 self.jobs.append(job)
172
173 def activeJob(self):
174 if self.active > len(self.jobs)-1:
175 return None
176 else:
177 return self.jobs[self.active]
178
179 def next(self):
180 self.active += 1
181 if self.active < len(self.jobs):
182 self.jobs[self.active].start()
183
184 #----------------------------------------------------------------------
185
186 class TaskRunner(object):
187 """
188 Manages the running of multiple tasks. Name can be used to identify
189 a specific TaskRunner instance when reporting information back to the user.
190 """
191 def __init__(self, tasks=[], name="TaskRunner Tasks"):
192 if type(tasks) != list:
193 tasks = [tasks]
194 self.tasks = tasks[:]
195 self.name = name
196 self.rc = 0
197
198 def append(self, task):
199 self.tasks.append(task)
200
201 def errorOccurred(self):
202 """
203 Only used for threaded TR instances. Once all TR tasks have completed,
204 we'll want to check to make sure there were no errors in the process.
205 """
206 return self.rc != 0
207
208 def run(self):
209 # start all the active jobs
210 for task in self.tasks:
211 task.activeJob().start()
212
213 try:
214 # loop, getting output from the jobs, etc.
215 while True:
216 # get all active Jobs
217 jobs = [t.activeJob() for t in self.tasks if t.activeJob()]
218 if not jobs:
219 break
220
221 # wait for a job to have output ready, then log it
222 input, output, err = select.select(jobs, [], [], 1)
223 for job in input:
224 job.logLines()
225
226 # check for finished jobs
227 for task in self.tasks:
228 job = task.activeJob()
229 if job and job.finished():
230 if job.returnCode() != 0:
231 rc = job.returnCode()
232 print "JOB RETURNED FAILURE CODE! (%d)" % rc
233 self.rc = rc
234 self.stopAllJobs()
235 return rc
236 else:
237 task.next()
238 except KeyboardInterrupt:
239 print "STOPPING JOBS..."
240 self.stopAllJobs()
241 return 1
242
243 except:
244 print "Unknown exception..."
245 self.stopAllJobs()
246 raise
247
248 return 0
249
250
251 def stopAllJobs(self):
252 for task in self.tasks:
253 job = task.activeJob()
254 if job:
255 job.stop()
256
257
258 import threading
259
260 class TaskRunnerThread(threading.Thread):
261 def __init__(self, taskRunner, callback=None):
262 self.taskRunner = taskRunner
263 self.startTime = None
264 self.stopTime = None
265 self.callback = callback
266 threading.Thread.__init__ ( self )
267
268 def run(self):
269 self.startTime = time.time()
270 self.taskRunner.run()
271 self.stopTime = time.time()
272 #if self.callback:
273 # self.callback
274
275 def elapsedTime(self):
276 now = self.stopTime
277 if not now:
278 now = time.time()
279 elapsed_time = now-self.startTime
280 mins = elapsed_time/60
281 hours = mins/60
282 seconds = (elapsed_time - mins) % 60
283 return "%d:%d:%d" % (hours, mins, seconds)
284
285 def totalTime(self):
286 if self.stopTime:
287 return self.elapsedTime()
288 else:
289 return None
290
291 #----------------------------------------------------------------------
292
293
294 if __name__ == "__main__":
295
296 j1 = Job("label1", "./tmp/job-1.py", ["TEST-1"])
297 j2 = Job("label2", "./tmp/job-2.sh", ["TEST-2"])
298
299 t1 = Task()
300 t1.append(j1)
301 t1.append(j2)
302
303 j3 = Job("task2a", "./tmp/job-1.py", ["TASK-2a"])
304 j4 = Job("task2b", "./tmp/job-2.sh", ["TASK-2b"])
305
306 t2 = Task()
307 t2.append(j4)
308 t2.append(j3)
309
310 t3 = Task([Job("error", "./tmp/job-3.sh", ["TASK-3"])])
311
312 tr = TaskRunner()
313 tr.append(t1)
314 tr.append(t2)
315 tr.append(t3)
316
317 for task in tr.tasks:
318 for job in task.jobs:
319 print job.label
320
321 print tr.run()
322