]> git.saurik.com Git - wxWidgets.git/blob - wxPython/distrib/all/taskrunner.py
Sorry, zapping rogue print statement. ;-/
[wxWidgets.git] / wxPython / distrib / all / taskrunner.py
1 #----------------------------------------------------------------------
2 # Name: taskrunner.py
3 # Purpose: Classes that can manage running of external processes,
4 # either consecutively, simultaneously, or both, and can
5 # log the output of those jobs
6 #
7 # Author: Robin Dunn
8 #
9 # Created: 05-Nov-2004
10 # RCS-ID: $Id$
11 # Copyright: (c) 2004 by Total Control Software
12 # Licence: wxWindows license
13 #----------------------------------------------------------------------
14
15 import sys
16 import os
17 import signal
18 import select
19 import fcntl
20 from subprocess import Popen, PIPE, STDOUT
21
22
23 __all__ = ["Job", "Task", "TaskRunner"]
24
25 #----------------------------------------------------------------------
26
27 # For environment settings
28 class Config:
29 def asDict(self):
30 return self.__dict__.copy()
31
32 def write(self, filename="config", outfile=None):
33 if outfile is None:
34 f = file(filename, "w")
35 else:
36 f = outfile
37 for k, v in self.__dict__.items():
38 f.write('%s="%s"\n' % (k, v))
39
40 def read(self, filename="config"):
41 myfile = open(filename, "r")
42 for line in myfile.readlines():
43 line = line.strip()
44 if len(line) > 0 and line[0] == "#":
45 continue # it's a comment, move on
46 data = line.split("=")
47 if len(data) == 2:
48 self.__dict__[data[0]] = data[1]
49 myfile.close()
50
51 class Job(object):
52 """
53 Each Job is a monitor wrapped around an externally executing
54 process. It handles starting the process, polling if it is still
55 running, reading and logging it's output, and killing it if
56 needed.
57 """
58
59 LOGBASE="."
60
61 def __init__(self, label, command, args=[], env=os.environ):
62 self.label = label
63 self.command = command
64 self.args = args
65 self.env = env
66 self.proc = None
67 if self.label:
68 if not os.path.exists(self.LOGBASE):
69 os.mkdirs(self.LOGBASE)
70 self.log = file("%s/%s.log" % (self.LOGBASE, label), "w", 0)
71
72 def start(self):
73 self.proc = Popen([self.command] + self.args, # the command and args to execute
74 stdout=PIPE, stderr=STDOUT, env=self.env,
75 bufsize=0 # line-buffered
76 )
77 # put the file in non-blocking mode
78 #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0)
79 #flags = flags | os.O_NONBLOCK
80 #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags)
81
82
83 def stop(self):
84 if self.proc is not None and self.proc.returncode is None:
85 os.kill(self.proc.pid, signal.SIGTERM)
86 self.logLines()
87
88
89 def fileno(self):
90 if self.proc is not None:
91 return self.proc.stdout.fileno()
92 else:
93 return -1
94
95
96 def logLines(self):
97 if self.proc is not None:
98 while self.linesAvailable():
99 line = self.proc.stdout.readline()
100 if not line: break
101 if self.label:
102 self.log.write(line)
103 line = "** %s: %s" % (self.label, line)
104 sys.stdout.write(line)
105
106
107 def linesAvailable(self):
108 if self.proc is None:
109 return False
110 ind, outd, err = select.select([self], [], [], 0)
111 if ind:
112 return True
113 else:
114 return False
115
116
117 def finished(self):
118 if self.proc is None:# or self.linesAvailable():
119 return False
120 return self.proc.poll() is not None
121
122
123 def wait(self):
124 if self.proc is None: return None
125 return self.proc.wait()
126
127
128 def poll(self):
129 if self.proc is None: return None
130 return self.proc.poll()
131
132
133 def returnCode(self):
134 if self.proc is None: return None
135 return self.proc.returncode
136
137
138 #----------------------------------------------------------------------
139
140 class Task(object):
141 """
142 This class helps manage the running of a Task, which is a simply a
143 sequence of one or more Jobs, where subesquent jobs are not
144 started until prior ones are completed.
145 """
146 def __init__(self, jobs=[]):
147 if type(jobs) != list:
148 jobs = [jobs]
149 self.jobs = jobs[:]
150 self.active = 0
151
152 def append(self, job):
153 self.jobs.append(job)
154
155 def activeJob(self):
156 if self.active > len(self.jobs)-1:
157 return None
158 else:
159 return self.jobs[self.active]
160
161 def next(self):
162 self.active += 1
163 if self.active < len(self.jobs):
164 self.jobs[self.active].start()
165
166 #----------------------------------------------------------------------
167
168 class TaskRunner(object):
169 """
170 Manages the running of multiple tasks.
171 """
172 def __init__(self, tasks=[]):
173 if type(tasks) != list:
174 tasks = [tasks]
175 self.tasks = tasks[:]
176
177 def append(self, task):
178 self.tasks.append(task)
179
180 def run(self):
181 # start all the active jobs
182 for task in self.tasks:
183 task.activeJob().start()
184
185 try:
186 # loop, getting output from the jobs, etc.
187 while True:
188 # get all active Jobs
189 jobs = [t.activeJob() for t in self.tasks if t.activeJob()]
190 if not jobs:
191 break
192
193 # wait for a job to have output ready, then log it
194 input, output, err = select.select(jobs, [], [], 1)
195 for job in input:
196 job.logLines()
197
198 # check for finished jobs
199 for task in self.tasks:
200 job = task.activeJob()
201 if job and job.finished():
202 if job.returnCode() != 0:
203 rc = job.returnCode()
204 print "JOB RETURNED FAILURE CODE! (%d)" % rc
205 self.stopAllJobs()
206 return rc
207 else:
208 task.next()
209 except KeyboardInterrupt:
210 print "STOPPING JOBS..."
211 self.stopAllJobs()
212 return 1
213
214 except:
215 print "Unknown exception..."
216 self.stopAllJobs()
217 raise
218
219 return 0
220
221
222 def stopAllJobs(self):
223 for task in self.tasks:
224 job = task.activeJob()
225 if job:
226 job.stop()
227
228 #----------------------------------------------------------------------
229
230
231 if __name__ == "__main__":
232
233 j1 = Job("label1", ["./tmp/job-1.py", "TEST-1"])
234 j2 = Job("label2", ["./tmp/job-2.sh", "TEST-2"])
235
236 t1 = Task()
237 t1.append(j1)
238 t1.append(j2)
239
240 j3 = Job("task2a", ["./tmp/job-1.py", "TASK-2a"])
241 j4 = Job("task2b", ["./tmp/job-2.sh", "TASK-2b"])
242
243 t2 = Task()
244 t2.append(j4)
245 t2.append(j3)
246
247 t3 = Task([Job("error", ["./tmp/job-3.sh", "TASK-3"])])
248
249 tr = TaskRunner()
250 tr.append(t1)
251 tr.append(t2)
252 tr.append(t3)
253
254 for task in tr.tasks:
255 for job in task.jobs:
256 print job.label
257
258 print tr.run()
259