]>
Commit | Line | Data |
---|---|---|
e4bb5998 RD |
1 | #---------------------------------------------------------------------- |
2 | # Name: taskrunner.py | |
3 | # Purpose: Classes that can manage running of external processes, | |
4 | # either consecutively, simultaneously, or both, and can | |
5 | # log the output of those jobs | |
6 | # | |
7 | # Author: Robin Dunn | |
8 | # | |
9 | # Created: 05-Nov-2004 | |
10 | # RCS-ID: $Id$ | |
11 | # Copyright: (c) 2004 by Total Control Software | |
12 | # Licence: wxWindows license | |
13 | #---------------------------------------------------------------------- | |
14 | ||
15 | import sys | |
16 | import os | |
17 | import signal | |
18 | import select | |
19 | import fcntl | |
20 | from subprocess import Popen, PIPE, STDOUT | |
21 | ||
22 | ||
23 | __all__ = ["Job", "Task", "TaskRunner"] | |
24 | ||
25 | #---------------------------------------------------------------------- | |
26 | ||
f1a9f331 KO |
27 | # For environment settings |
28 | class Config: | |
29 | def asDict(self): | |
30 | return self.__dict__.copy() | |
31 | ||
32 | def write(self, filename="config", outfile=None): | |
33 | if outfile is None: | |
34 | f = file(filename, "w") | |
35 | else: | |
36 | f = outfile | |
37 | for k, v in self.__dict__.items(): | |
38 | f.write('%s="%s"\n' % (k, v)) | |
39 | ||
40 | def read(self, filename="config"): | |
41 | myfile = open(filename, "r") | |
42 | for line in myfile.readlines(): | |
43 | line = line.strip() | |
44 | if len(line) > 0 and line[0] == "#": | |
45 | continue # it's a comment, move on | |
46 | data = line.split("=") | |
47 | if len(data) == 2: | |
48 | self.__dict__[data[0]] = data[1] | |
49 | myfile.close() | |
e4bb5998 RD |
50 | |
51 | class Job(object): | |
52 | """ | |
53 | Each Job is a monitor wrapped around an externally executing | |
54 | process. It handles starting the process, polling if it is still | |
55 | running, reading and logging it's output, and killing it if | |
56 | needed. | |
57 | """ | |
58 | ||
59 | LOGBASE="." | |
60 | ||
f1a9f331 | 61 | def __init__(self, label, command, args=[], env=os.environ): |
e4bb5998 | 62 | self.label = label |
f1a9f331 | 63 | self.command = command |
e4bb5998 | 64 | self.args = args |
f1a9f331 | 65 | self.env = env |
e4bb5998 RD |
66 | self.proc = None |
67 | if self.label: | |
f1a9f331 KO |
68 | if not os.path.exists(self.LOGBASE): |
69 | os.mkdirs(self.LOGBASE) | |
e4bb5998 RD |
70 | self.log = file("%s/%s.log" % (self.LOGBASE, label), "w", 0) |
71 | ||
72 | def start(self): | |
f1a9f331 KO |
73 | self.proc = Popen([self.command] + self.args, # the command and args to execute |
74 | stdout=PIPE, stderr=STDOUT, env=self.env, | |
75 | bufsize=0 # line-buffered | |
e4bb5998 RD |
76 | ) |
77 | # put the file in non-blocking mode | |
78 | #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0) | |
79 | #flags = flags | os.O_NONBLOCK | |
80 | #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags) | |
81 | ||
82 | ||
83 | def stop(self): | |
84 | if self.proc is not None and self.proc.returncode is None: | |
85 | os.kill(self.proc.pid, signal.SIGTERM) | |
86 | self.logLines() | |
87 | ||
88 | ||
89 | def fileno(self): | |
90 | if self.proc is not None: | |
91 | return self.proc.stdout.fileno() | |
92 | else: | |
93 | return -1 | |
94 | ||
95 | ||
96 | def logLines(self): | |
97 | if self.proc is not None: | |
98 | while self.linesAvailable(): | |
99 | line = self.proc.stdout.readline() | |
100 | if not line: break | |
101 | if self.label: | |
102 | self.log.write(line) | |
103 | line = "** %s: %s" % (self.label, line) | |
104 | sys.stdout.write(line) | |
105 | ||
106 | ||
107 | def linesAvailable(self): | |
108 | if self.proc is None: | |
109 | return False | |
110 | ind, outd, err = select.select([self], [], [], 0) | |
111 | if ind: | |
112 | return True | |
113 | else: | |
114 | return False | |
115 | ||
116 | ||
117 | def finished(self): | |
118 | if self.proc is None:# or self.linesAvailable(): | |
119 | return False | |
120 | return self.proc.poll() is not None | |
121 | ||
122 | ||
123 | def wait(self): | |
124 | if self.proc is None: return None | |
125 | return self.proc.wait() | |
126 | ||
127 | ||
128 | def poll(self): | |
129 | if self.proc is None: return None | |
130 | return self.proc.poll() | |
131 | ||
132 | ||
133 | def returnCode(self): | |
134 | if self.proc is None: return None | |
135 | return self.proc.returncode | |
136 | ||
137 | ||
138 | #---------------------------------------------------------------------- | |
139 | ||
140 | class Task(object): | |
141 | """ | |
142 | This class helps manage the running of a Task, which is a simply a | |
143 | sequence of one or more Jobs, where subesquent jobs are not | |
144 | started until prior ones are completed. | |
145 | """ | |
146 | def __init__(self, jobs=[]): | |
147 | if type(jobs) != list: | |
148 | jobs = [jobs] | |
149 | self.jobs = jobs[:] | |
150 | self.active = 0 | |
151 | ||
152 | def append(self, job): | |
153 | self.jobs.append(job) | |
154 | ||
155 | def activeJob(self): | |
156 | if self.active > len(self.jobs)-1: | |
157 | return None | |
158 | else: | |
159 | return self.jobs[self.active] | |
160 | ||
161 | def next(self): | |
162 | self.active += 1 | |
163 | if self.active < len(self.jobs): | |
164 | self.jobs[self.active].start() | |
165 | ||
166 | #---------------------------------------------------------------------- | |
167 | ||
168 | class TaskRunner(object): | |
169 | """ | |
170 | Manages the running of multiple tasks. | |
171 | """ | |
172 | def __init__(self, tasks=[]): | |
173 | if type(tasks) != list: | |
174 | tasks = [tasks] | |
175 | self.tasks = tasks[:] | |
176 | ||
177 | def append(self, task): | |
178 | self.tasks.append(task) | |
179 | ||
180 | def run(self): | |
181 | # start all the active jobs | |
182 | for task in self.tasks: | |
183 | task.activeJob().start() | |
184 | ||
185 | try: | |
186 | # loop, getting output from the jobs, etc. | |
187 | while True: | |
188 | # get all active Jobs | |
189 | jobs = [t.activeJob() for t in self.tasks if t.activeJob()] | |
190 | if not jobs: | |
191 | break | |
192 | ||
193 | # wait for a job to have output ready, then log it | |
194 | input, output, err = select.select(jobs, [], [], 1) | |
195 | for job in input: | |
196 | job.logLines() | |
197 | ||
198 | # check for finished jobs | |
199 | for task in self.tasks: | |
200 | job = task.activeJob() | |
201 | if job and job.finished(): | |
202 | if job.returnCode() != 0: | |
203 | rc = job.returnCode() | |
204 | print "JOB RETURNED FAILURE CODE! (%d)" % rc | |
205 | self.stopAllJobs() | |
206 | return rc | |
207 | else: | |
208 | task.next() | |
209 | except KeyboardInterrupt: | |
210 | print "STOPPING JOBS..." | |
211 | self.stopAllJobs() | |
2c12daf9 | 212 | return 1 |
e4bb5998 RD |
213 | |
214 | except: | |
215 | print "Unknown exception..." | |
216 | self.stopAllJobs() | |
217 | raise | |
218 | ||
219 | return 0 | |
220 | ||
221 | ||
222 | def stopAllJobs(self): | |
223 | for task in self.tasks: | |
224 | job = task.activeJob() | |
225 | if job: | |
226 | job.stop() | |
227 | ||
228 | #---------------------------------------------------------------------- | |
229 | ||
230 | ||
231 | if __name__ == "__main__": | |
232 | ||
233 | j1 = Job("label1", ["./tmp/job-1.py", "TEST-1"]) | |
234 | j2 = Job("label2", ["./tmp/job-2.sh", "TEST-2"]) | |
235 | ||
236 | t1 = Task() | |
237 | t1.append(j1) | |
238 | t1.append(j2) | |
239 | ||
240 | j3 = Job("task2a", ["./tmp/job-1.py", "TASK-2a"]) | |
241 | j4 = Job("task2b", ["./tmp/job-2.sh", "TASK-2b"]) | |
242 | ||
243 | t2 = Task() | |
244 | t2.append(j4) | |
245 | t2.append(j3) | |
246 | ||
247 | t3 = Task([Job("error", ["./tmp/job-3.sh", "TASK-3"])]) | |
248 | ||
249 | tr = TaskRunner() | |
250 | tr.append(t1) | |
251 | tr.append(t2) | |
252 | tr.append(t3) | |
253 | ||
254 | for task in tr.tasks: | |
255 | for job in task.jobs: | |
256 | print job.label | |
257 | ||
258 | print tr.run() | |
259 |