]>
Commit | Line | Data |
---|---|---|
1 | #---------------------------------------------------------------------- | |
2 | # Name: taskrunner.py | |
3 | # Purpose: Classes that can manage running of external processes, | |
4 | # either consecutively, simultaneously, or both, and can | |
5 | # log the output of those jobs | |
6 | # | |
7 | # Author: Robin Dunn | |
8 | # | |
9 | # Created: 05-Nov-2004 | |
10 | # RCS-ID: $Id$ | |
11 | # Copyright: (c) 2004 by Total Control Software | |
12 | # Licence: wxWindows license | |
13 | #---------------------------------------------------------------------- | |
14 | ||
15 | import sys | |
16 | import os | |
17 | import signal | |
18 | import select | |
19 | import fcntl | |
20 | from subprocess import Popen, PIPE, STDOUT | |
21 | ||
22 | ||
23 | __all__ = ["Job", "Task", "TaskRunner"] | |
24 | ||
25 | #---------------------------------------------------------------------- | |
26 | ||
27 | ||
28 | class Job(object): | |
29 | """ | |
30 | Each Job is a monitor wrapped around an externally executing | |
31 | process. It handles starting the process, polling if it is still | |
32 | running, reading and logging it's output, and killing it if | |
33 | needed. | |
34 | """ | |
35 | ||
36 | LOGBASE="." | |
37 | ||
38 | def __init__(self, label, args): | |
39 | self.label = label | |
40 | self.args = args | |
41 | self.proc = None | |
42 | if self.label: | |
43 | self.log = file("%s/%s.log" % (self.LOGBASE, label), "w", 0) | |
44 | ||
45 | def start(self): | |
46 | self.proc = Popen(self.args, # the command and args to execute | |
47 | stdout=PIPE, stderr=STDOUT, | |
48 | bufsize=0, # line-buffered | |
49 | ) | |
50 | # put the file in non-blocking mode | |
51 | #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0) | |
52 | #flags = flags | os.O_NONBLOCK | |
53 | #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags) | |
54 | ||
55 | ||
56 | def stop(self): | |
57 | if self.proc is not None and self.proc.returncode is None: | |
58 | os.kill(self.proc.pid, signal.SIGTERM) | |
59 | self.logLines() | |
60 | ||
61 | ||
62 | def fileno(self): | |
63 | if self.proc is not None: | |
64 | return self.proc.stdout.fileno() | |
65 | else: | |
66 | return -1 | |
67 | ||
68 | ||
69 | def logLines(self): | |
70 | if self.proc is not None: | |
71 | while self.linesAvailable(): | |
72 | line = self.proc.stdout.readline() | |
73 | if not line: break | |
74 | if self.label: | |
75 | self.log.write(line) | |
76 | line = "** %s: %s" % (self.label, line) | |
77 | sys.stdout.write(line) | |
78 | ||
79 | ||
80 | def linesAvailable(self): | |
81 | if self.proc is None: | |
82 | return False | |
83 | ind, outd, err = select.select([self], [], [], 0) | |
84 | if ind: | |
85 | return True | |
86 | else: | |
87 | return False | |
88 | ||
89 | ||
90 | def finished(self): | |
91 | if self.proc is None:# or self.linesAvailable(): | |
92 | return False | |
93 | return self.proc.poll() is not None | |
94 | ||
95 | ||
96 | def wait(self): | |
97 | if self.proc is None: return None | |
98 | return self.proc.wait() | |
99 | ||
100 | ||
101 | def poll(self): | |
102 | if self.proc is None: return None | |
103 | return self.proc.poll() | |
104 | ||
105 | ||
106 | def returnCode(self): | |
107 | if self.proc is None: return None | |
108 | return self.proc.returncode | |
109 | ||
110 | ||
111 | #---------------------------------------------------------------------- | |
112 | ||
113 | class Task(object): | |
114 | """ | |
115 | This class helps manage the running of a Task, which is a simply a | |
116 | sequence of one or more Jobs, where subesquent jobs are not | |
117 | started until prior ones are completed. | |
118 | """ | |
119 | def __init__(self, jobs=[]): | |
120 | if type(jobs) != list: | |
121 | jobs = [jobs] | |
122 | self.jobs = jobs[:] | |
123 | self.active = 0 | |
124 | ||
125 | def append(self, job): | |
126 | self.jobs.append(job) | |
127 | ||
128 | def activeJob(self): | |
129 | if self.active > len(self.jobs)-1: | |
130 | return None | |
131 | else: | |
132 | return self.jobs[self.active] | |
133 | ||
134 | def next(self): | |
135 | self.active += 1 | |
136 | if self.active < len(self.jobs): | |
137 | self.jobs[self.active].start() | |
138 | ||
139 | #---------------------------------------------------------------------- | |
140 | ||
141 | class TaskRunner(object): | |
142 | """ | |
143 | Manages the running of multiple tasks. | |
144 | """ | |
145 | def __init__(self, tasks=[]): | |
146 | if type(tasks) != list: | |
147 | tasks = [tasks] | |
148 | self.tasks = tasks[:] | |
149 | ||
150 | def append(self, task): | |
151 | self.tasks.append(task) | |
152 | ||
153 | def run(self): | |
154 | # start all the active jobs | |
155 | for task in self.tasks: | |
156 | task.activeJob().start() | |
157 | ||
158 | try: | |
159 | # loop, getting output from the jobs, etc. | |
160 | while True: | |
161 | # get all active Jobs | |
162 | jobs = [t.activeJob() for t in self.tasks if t.activeJob()] | |
163 | if not jobs: | |
164 | break | |
165 | ||
166 | # wait for a job to have output ready, then log it | |
167 | input, output, err = select.select(jobs, [], [], 1) | |
168 | for job in input: | |
169 | job.logLines() | |
170 | ||
171 | # check for finished jobs | |
172 | for task in self.tasks: | |
173 | job = task.activeJob() | |
174 | if job and job.finished(): | |
175 | if job.returnCode() != 0: | |
176 | rc = job.returnCode() | |
177 | print "JOB RETURNED FAILURE CODE! (%d)" % rc | |
178 | self.stopAllJobs() | |
179 | return rc | |
180 | else: | |
181 | task.next() | |
182 | except KeyboardInterrupt: | |
183 | print "STOPPING JOBS..." | |
184 | self.stopAllJobs() | |
185 | return 1 | |
186 | ||
187 | except: | |
188 | print "Unknown exception..." | |
189 | self.stopAllJobs() | |
190 | raise | |
191 | ||
192 | return 0 | |
193 | ||
194 | ||
195 | def stopAllJobs(self): | |
196 | for task in self.tasks: | |
197 | job = task.activeJob() | |
198 | if job: | |
199 | job.stop() | |
200 | ||
201 | #---------------------------------------------------------------------- | |
202 | ||
203 | ||
204 | if __name__ == "__main__": | |
205 | ||
206 | j1 = Job("label1", ["./tmp/job-1.py", "TEST-1"]) | |
207 | j2 = Job("label2", ["./tmp/job-2.sh", "TEST-2"]) | |
208 | ||
209 | t1 = Task() | |
210 | t1.append(j1) | |
211 | t1.append(j2) | |
212 | ||
213 | j3 = Job("task2a", ["./tmp/job-1.py", "TASK-2a"]) | |
214 | j4 = Job("task2b", ["./tmp/job-2.sh", "TASK-2b"]) | |
215 | ||
216 | t2 = Task() | |
217 | t2.append(j4) | |
218 | t2.append(j3) | |
219 | ||
220 | t3 = Task([Job("error", ["./tmp/job-3.sh", "TASK-3"])]) | |
221 | ||
222 | tr = TaskRunner() | |
223 | tr.append(t1) | |
224 | tr.append(t2) | |
225 | tr.append(t3) | |
226 | ||
227 | for task in tr.tasks: | |
228 | for job in task.jobs: | |
229 | print job.label | |
230 | ||
231 | print tr.run() | |
232 |