]>
Commit | Line | Data |
---|---|---|
1 | #---------------------------------------------------------------------- | |
2 | # Name: taskrunner.py | |
3 | # Purpose: Classes that can manage running of external processes, | |
4 | # either consecutively, simultaneously, or both, and can | |
5 | # log the output of those jobs | |
6 | # | |
7 | # Author: Robin Dunn | |
8 | # | |
9 | # Created: 05-Nov-2004 | |
10 | # RCS-ID: $Id$ | |
11 | # Copyright: (c) 2004 by Total Control Software | |
12 | # Licence: wxWindows license | |
13 | #---------------------------------------------------------------------- | |
14 | ||
15 | import sys | |
16 | import os | |
17 | import signal | |
18 | import select | |
19 | import time | |
20 | ||
21 | from subprocess import Popen, PIPE, STDOUT | |
22 | ||
23 | ||
24 | __all__ = ["Job", "Task", "TaskRunner", "TaskRunnerThread"] | |
25 | ||
26 | #---------------------------------------------------------------------- | |
27 | ||
28 | # For environment settings | |
29 | class Config: | |
30 | def asDict(self): | |
31 | return self.__dict__.copy() | |
32 | ||
33 | def write(self, filename="config", outfile=None): | |
34 | if outfile is None: | |
35 | f = file(filename, "w") | |
36 | else: | |
37 | f = outfile | |
38 | for k, v in self.__dict__.items(): | |
39 | f.write('%s="%s"\n' % (k, v)) | |
40 | ||
41 | def read(self, filename="config"): | |
42 | myfile = open(filename, "r") | |
43 | for line in myfile.readlines(): | |
44 | line = line.strip() | |
45 | if len(line) > 0 and line[0] == "#": | |
46 | continue # it's a comment, move on | |
47 | data = line.split("=") | |
48 | if len(data) == 2: | |
49 | self.__dict__[data[0].strip()] = data[1].strip() | |
50 | myfile.close() | |
51 | ||
52 | class Job(object): | |
53 | """ | |
54 | Each Job is a monitor wrapped around an externally executing | |
55 | process. It handles starting the process, polling if it is still | |
56 | running, reading and logging it's output, and killing it if | |
57 | needed. | |
58 | """ | |
59 | ||
60 | LOGBASE="." | |
61 | ||
62 | def __init__(self, label, command, args=[], env=os.environ, verbose=True): | |
63 | self.label = label | |
64 | self.command = command | |
65 | self.args = args | |
66 | self.env = env | |
67 | self.proc = None | |
68 | self.startTime = None | |
69 | self.stopTime = None | |
70 | self.verbose = verbose | |
71 | self.label = label | |
72 | ||
73 | def start(self): | |
74 | self.proc = Popen([self.command] + self.args, # the command and args to execute | |
75 | stdout=PIPE, stderr=STDOUT, env=self.env, | |
76 | bufsize=0 # line-buffered | |
77 | ) | |
78 | self.startTime = time.time() | |
79 | if self.label: | |
80 | if not os.path.exists(self.LOGBASE): | |
81 | os.makedirs(self.LOGBASE) | |
82 | self.log = file("%s/%s.log" % (self.LOGBASE, self.label), "w", 0) | |
83 | ||
84 | # put the file in non-blocking mode | |
85 | #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0) | |
86 | #flags = flags | os.O_NONBLOCK | |
87 | #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags) | |
88 | ||
89 | ||
90 | def stop(self): | |
91 | if self.proc is not None and self.proc.returncode is None: | |
92 | os.kill(self.proc.pid, signal.SIGTERM) | |
93 | self.logLines() | |
94 | self.stopTime = time.time() | |
95 | ||
96 | ||
97 | def fileno(self): | |
98 | if self.proc is not None: | |
99 | return self.proc.stdout.fileno() | |
100 | else: | |
101 | return -1 | |
102 | ||
103 | def elapsedTime(self): | |
104 | now = self.stopTime | |
105 | if not now: | |
106 | now = time.time() | |
107 | elapsed_time = now-self.startTime | |
108 | mins = elapsed_time/60 | |
109 | hours = mins/60 | |
110 | seconds = (elapsed_time - mins) % 60 | |
111 | return "%d:%d:%d" % (hours, mins, seconds) | |
112 | ||
113 | def logLines(self): | |
114 | if self.proc is not None: | |
115 | while self.linesAvailable(): | |
116 | line = self.proc.stdout.readline() | |
117 | if not line: break | |
118 | if self.label: | |
119 | self.log.write(line) | |
120 | line = "** %s: %s" % (self.label, line) | |
121 | if self.verbose: | |
122 | sys.stdout.write(line) | |
123 | ||
124 | ||
125 | def linesAvailable(self): | |
126 | if self.proc is None: | |
127 | return False | |
128 | ind, outd, err = select.select([self], [], [], 0) | |
129 | if ind: | |
130 | return True | |
131 | else: | |
132 | return False | |
133 | ||
134 | ||
135 | def finished(self): | |
136 | if self.proc is None:# or self.linesAvailable(): | |
137 | return False | |
138 | return self.proc.poll() is not None | |
139 | ||
140 | ||
141 | def wait(self): | |
142 | if self.proc is None: return None | |
143 | return self.proc.wait() | |
144 | ||
145 | ||
146 | def poll(self): | |
147 | if self.proc is None: return None | |
148 | return self.proc.poll() | |
149 | ||
150 | ||
151 | def returnCode(self): | |
152 | if self.proc is None: return None | |
153 | return self.proc.returncode | |
154 | ||
155 | ||
156 | #---------------------------------------------------------------------- | |
157 | ||
158 | class Task(object): | |
159 | """ | |
160 | This class helps manage the running of a Task, which is a simply a | |
161 | sequence of one or more Jobs, where subesquent jobs are not | |
162 | started until prior ones are completed. | |
163 | """ | |
164 | def __init__(self, jobs=[]): | |
165 | if type(jobs) != list: | |
166 | jobs = [jobs] | |
167 | self.jobs = jobs[:] | |
168 | self.active = 0 | |
169 | ||
170 | def append(self, job): | |
171 | self.jobs.append(job) | |
172 | ||
173 | def activeJob(self): | |
174 | if self.active > len(self.jobs)-1: | |
175 | return None | |
176 | else: | |
177 | return self.jobs[self.active] | |
178 | ||
179 | def next(self): | |
180 | self.active += 1 | |
181 | if self.active < len(self.jobs): | |
182 | self.jobs[self.active].start() | |
183 | ||
184 | #---------------------------------------------------------------------- | |
185 | ||
186 | class TaskRunner(object): | |
187 | """ | |
188 | Manages the running of multiple tasks. Name can be used to identify | |
189 | a specific TaskRunner instance when reporting information back to the user. | |
190 | """ | |
191 | def __init__(self, tasks=[], name="TaskRunner Tasks"): | |
192 | if type(tasks) != list: | |
193 | tasks = [tasks] | |
194 | self.tasks = tasks[:] | |
195 | self.name = name | |
196 | self.rc = 0 | |
197 | ||
198 | def append(self, task): | |
199 | self.tasks.append(task) | |
200 | ||
201 | def errorOccurred(self): | |
202 | """ | |
203 | Only used for threaded TR instances. Once all TR tasks have completed, | |
204 | we'll want to check to make sure there were no errors in the process. | |
205 | """ | |
206 | return self.rc != 0 | |
207 | ||
208 | def run(self): | |
209 | # start all the active jobs | |
210 | for task in self.tasks: | |
211 | task.activeJob().start() | |
212 | ||
213 | try: | |
214 | # loop, getting output from the jobs, etc. | |
215 | while True: | |
216 | # get all active Jobs | |
217 | jobs = [t.activeJob() for t in self.tasks if t.activeJob()] | |
218 | if not jobs: | |
219 | break | |
220 | ||
221 | # wait for a job to have output ready, then log it | |
222 | input, output, err = select.select(jobs, [], [], 1) | |
223 | for job in input: | |
224 | job.logLines() | |
225 | ||
226 | # check for finished jobs | |
227 | for task in self.tasks: | |
228 | job = task.activeJob() | |
229 | if job and job.finished(): | |
230 | if job.returnCode() != 0: | |
231 | rc = job.returnCode() | |
232 | print "JOB RETURNED FAILURE CODE! (%d)" % rc | |
233 | self.rc = rc | |
234 | self.stopAllJobs() | |
235 | return rc | |
236 | else: | |
237 | task.next() | |
238 | except KeyboardInterrupt: | |
239 | print "STOPPING JOBS..." | |
240 | self.stopAllJobs() | |
241 | return 1 | |
242 | ||
243 | except: | |
244 | print "Unknown exception..." | |
245 | self.stopAllJobs() | |
246 | raise | |
247 | ||
248 | return 0 | |
249 | ||
250 | ||
251 | def stopAllJobs(self): | |
252 | for task in self.tasks: | |
253 | job = task.activeJob() | |
254 | if job: | |
255 | job.stop() | |
256 | ||
257 | ||
258 | import threading | |
259 | ||
260 | class TaskRunnerThread(threading.Thread): | |
261 | def __init__(self, taskRunner, callback=None): | |
262 | self.taskRunner = taskRunner | |
263 | self.startTime = None | |
264 | self.stopTime = None | |
265 | self.callback = callback | |
266 | threading.Thread.__init__ ( self ) | |
267 | ||
268 | def run(self): | |
269 | self.startTime = time.time() | |
270 | self.taskRunner.run() | |
271 | self.stopTime = time.time() | |
272 | #if self.callback: | |
273 | # self.callback | |
274 | ||
275 | def elapsedTime(self): | |
276 | now = self.stopTime | |
277 | if not now: | |
278 | now = time.time() | |
279 | elapsed_time = now-self.startTime | |
280 | mins = elapsed_time/60 | |
281 | hours = mins/60 | |
282 | seconds = (elapsed_time - mins) % 60 | |
283 | return "%d:%d:%d" % (hours, mins, seconds) | |
284 | ||
285 | def totalTime(self): | |
286 | if self.stopTime: | |
287 | return self.elapsedTime() | |
288 | else: | |
289 | return None | |
290 | ||
291 | #---------------------------------------------------------------------- | |
292 | ||
293 | ||
294 | if __name__ == "__main__": | |
295 | ||
296 | j1 = Job("label1", "./tmp/job-1.py", ["TEST-1"]) | |
297 | j2 = Job("label2", "./tmp/job-2.sh", ["TEST-2"]) | |
298 | ||
299 | t1 = Task() | |
300 | t1.append(j1) | |
301 | t1.append(j2) | |
302 | ||
303 | j3 = Job("task2a", "./tmp/job-1.py", ["TASK-2a"]) | |
304 | j4 = Job("task2b", "./tmp/job-2.sh", ["TASK-2b"]) | |
305 | ||
306 | t2 = Task() | |
307 | t2.append(j4) | |
308 | t2.append(j3) | |
309 | ||
310 | t3 = Task([Job("error", "./tmp/job-3.sh", ["TASK-3"])]) | |
311 | ||
312 | tr = TaskRunner() | |
313 | tr.append(t1) | |
314 | tr.append(t2) | |
315 | tr.append(t3) | |
316 | ||
317 | for task in tr.tasks: | |
318 | for job in task.jobs: | |
319 | print job.label | |
320 | ||
321 | print tr.run() | |
322 |