]> git.saurik.com Git - wxWidgets.git/blob - wxPython/distrib/all/taskrunner.py
renamed arg[cv] parameters to not hide wxApp members
[wxWidgets.git] / wxPython / distrib / all / taskrunner.py
1 #----------------------------------------------------------------------
2 # Name: taskrunner.py
3 # Purpose: Classes that can manage running of external processes,
4 # either consecutively, simultaneously, or both, and can
5 # log the output of those jobs
6 #
7 # Author: Robin Dunn
8 #
9 # Created: 05-Nov-2004
10 # RCS-ID: $Id$
11 # Copyright: (c) 2004 by Total Control Software
12 # Licence: wxWindows license
13 #----------------------------------------------------------------------
14
15 import sys
16 import os
17 import signal
18 import select
19 import fcntl
20 from subprocess import Popen, PIPE, STDOUT
21
22
23 __all__ = ["Job", "Task", "TaskRunner"]
24
25 #----------------------------------------------------------------------
26
27
28 class Job(object):
29 """
30 Each Job is a monitor wrapped around an externally executing
31 process. It handles starting the process, polling if it is still
32 running, reading and logging it's output, and killing it if
33 needed.
34 """
35
36 LOGBASE="."
37
38 def __init__(self, label, args):
39 self.label = label
40 self.args = args
41 self.proc = None
42 if self.label:
43 self.log = file("%s/%s.log" % (self.LOGBASE, label), "w", 0)
44
45 def start(self):
46 self.proc = Popen(self.args, # the command and args to execute
47 stdout=PIPE, stderr=STDOUT,
48 bufsize=0, # line-buffered
49 )
50 # put the file in non-blocking mode
51 #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0)
52 #flags = flags | os.O_NONBLOCK
53 #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags)
54
55
56 def stop(self):
57 if self.proc is not None and self.proc.returncode is None:
58 os.kill(self.proc.pid, signal.SIGTERM)
59 self.logLines()
60
61
62 def fileno(self):
63 if self.proc is not None:
64 return self.proc.stdout.fileno()
65 else:
66 return -1
67
68
69 def logLines(self):
70 if self.proc is not None:
71 while self.linesAvailable():
72 line = self.proc.stdout.readline()
73 if not line: break
74 if self.label:
75 self.log.write(line)
76 line = "** %s: %s" % (self.label, line)
77 sys.stdout.write(line)
78
79
80 def linesAvailable(self):
81 if self.proc is None:
82 return False
83 ind, outd, err = select.select([self], [], [], 0)
84 if ind:
85 return True
86 else:
87 return False
88
89
90 def finished(self):
91 if self.proc is None:# or self.linesAvailable():
92 return False
93 return self.proc.poll() is not None
94
95
96 def wait(self):
97 if self.proc is None: return None
98 return self.proc.wait()
99
100
101 def poll(self):
102 if self.proc is None: return None
103 return self.proc.poll()
104
105
106 def returnCode(self):
107 if self.proc is None: return None
108 return self.proc.returncode
109
110
111 #----------------------------------------------------------------------
112
113 class Task(object):
114 """
115 This class helps manage the running of a Task, which is a simply a
116 sequence of one or more Jobs, where subesquent jobs are not
117 started until prior ones are completed.
118 """
119 def __init__(self, jobs=[]):
120 if type(jobs) != list:
121 jobs = [jobs]
122 self.jobs = jobs[:]
123 self.active = 0
124
125 def append(self, job):
126 self.jobs.append(job)
127
128 def activeJob(self):
129 if self.active > len(self.jobs)-1:
130 return None
131 else:
132 return self.jobs[self.active]
133
134 def next(self):
135 self.active += 1
136 if self.active < len(self.jobs):
137 self.jobs[self.active].start()
138
139 #----------------------------------------------------------------------
140
141 class TaskRunner(object):
142 """
143 Manages the running of multiple tasks.
144 """
145 def __init__(self, tasks=[]):
146 if type(tasks) != list:
147 tasks = [tasks]
148 self.tasks = tasks[:]
149
150 def append(self, task):
151 self.tasks.append(task)
152
153 def run(self):
154 # start all the active jobs
155 for task in self.tasks:
156 task.activeJob().start()
157
158 try:
159 # loop, getting output from the jobs, etc.
160 while True:
161 # get all active Jobs
162 jobs = [t.activeJob() for t in self.tasks if t.activeJob()]
163 if not jobs:
164 break
165
166 # wait for a job to have output ready, then log it
167 input, output, err = select.select(jobs, [], [], 1)
168 for job in input:
169 job.logLines()
170
171 # check for finished jobs
172 for task in self.tasks:
173 job = task.activeJob()
174 if job and job.finished():
175 if job.returnCode() != 0:
176 rc = job.returnCode()
177 print "JOB RETURNED FAILURE CODE! (%d)" % rc
178 self.stopAllJobs()
179 return rc
180 else:
181 task.next()
182 except KeyboardInterrupt:
183 print "STOPPING JOBS..."
184 self.stopAllJobs()
185 return 1
186
187 except:
188 print "Unknown exception..."
189 self.stopAllJobs()
190 raise
191
192 return 0
193
194
195 def stopAllJobs(self):
196 for task in self.tasks:
197 job = task.activeJob()
198 if job:
199 job.stop()
200
201 #----------------------------------------------------------------------
202
203
204 if __name__ == "__main__":
205
206 j1 = Job("label1", ["./tmp/job-1.py", "TEST-1"])
207 j2 = Job("label2", ["./tmp/job-2.sh", "TEST-2"])
208
209 t1 = Task()
210 t1.append(j1)
211 t1.append(j2)
212
213 j3 = Job("task2a", ["./tmp/job-1.py", "TASK-2a"])
214 j4 = Job("task2b", ["./tmp/job-2.sh", "TASK-2b"])
215
216 t2 = Task()
217 t2.append(j4)
218 t2.append(j3)
219
220 t3 = Task([Job("error", ["./tmp/job-3.sh", "TASK-3"])])
221
222 tr = TaskRunner()
223 tr.append(t1)
224 tr.append(t2)
225 tr.append(t3)
226
227 for task in tr.tasks:
228 for job in task.jobs:
229 print job.label
230
231 print tr.run()
232