]>
git.saurik.com Git - wxWidgets.git/blob - wxPython/distrib/all/taskrunner.py
1 #----------------------------------------------------------------------
3 # Purpose: Classes that can manage running of external processes,
4 # either consecutively, simultaneously, or both, and can
5 # log the output of those jobs
11 # Copyright: (c) 2004 by Total Control Software
12 # Licence: wxWindows license
13 #----------------------------------------------------------------------
21 from subprocess
import Popen
, PIPE
, STDOUT
24 __all__
= ["Job", "Task", "TaskRunner", "TaskRunnerThread"]
26 #----------------------------------------------------------------------
28 # For environment settings
31 return self
.__dict
__.copy()
33 def write(self
, filename
="config", outfile
=None):
35 f
= file(filename
, "w")
38 for k
, v
in self
.__dict
__.items():
39 f
.write('%s="%s"\n' % (k
, v
))
41 def read(self
, filename
="config"):
42 myfile
= open(filename
, "r")
43 for line
in myfile
.readlines():
45 if len(line
) > 0 and line
[0] == "#":
46 continue # it's a comment, move on
47 data
= line
.split("=")
49 self
.__dict
__[data
[0].strip()] = data
[1].strip()
54 Each Job is a monitor wrapped around an externally executing
55 process. It handles starting the process, polling if it is still
56 running, reading and logging it's output, and killing it if
62 def __init__(self
, label
, command
, args
=[], env
=os
.environ
, verbose
=True):
64 self
.command
= command
70 self
.verbose
= verbose
74 self
.proc
= Popen([self
.command
] + self
.args
, # the command and args to execute
75 stdout
=PIPE
, stderr
=STDOUT
, env
=self
.env
,
76 bufsize
=0 # line-buffered
78 self
.startTime
= time
.time()
80 if not os
.path
.exists(self
.LOGBASE
):
81 os
.makedirs(self
.LOGBASE
)
82 self
.log
= file("%s/%s.log" % (self
.LOGBASE
, self
.label
), "w", 0)
84 # put the file in non-blocking mode
85 #flags = fcntl.fcntl (self.proc.stdout, fcntl.F_GETFL, 0)
86 #flags = flags | os.O_NONBLOCK
87 #fcntl.fcntl (self.proc.stdout, fcntl.F_SETFL, flags)
91 if self
.proc
is not None and self
.proc
.returncode
is None:
92 os
.kill(self
.proc
.pid
, signal
.SIGTERM
)
94 self
.stopTime
= time
.time()
98 if self
.proc
is not None:
99 return self
.proc
.stdout
.fileno()
103 def elapsedTime(self
):
107 elapsed_time
= now
-self
.startTime
108 mins
= elapsed_time
/60
110 seconds
= (elapsed_time
- mins
) % 60
111 return "%d:%d:%d" % (hours
, mins
, seconds
)
114 if self
.proc
is not None:
115 while self
.linesAvailable():
116 line
= self
.proc
.stdout
.readline()
120 line
= "** %s: %s" % (self
.label
, line
)
122 sys
.stdout
.write(line
)
125 def linesAvailable(self
):
126 if self
.proc
is None:
128 ind
, outd
, err
= select
.select([self
], [], [], 0)
136 if self
.proc
is None:# or self.linesAvailable():
138 return self
.proc
.poll() is not None
142 if self
.proc
is None: return None
143 return self
.proc
.wait()
147 if self
.proc
is None: return None
148 return self
.proc
.poll()
151 def returnCode(self
):
152 if self
.proc
is None: return None
153 return self
.proc
.returncode
156 #----------------------------------------------------------------------
160 This class helps manage the running of a Task, which is a simply a
161 sequence of one or more Jobs, where subesquent jobs are not
162 started until prior ones are completed.
164 def __init__(self
, jobs
=[]):
165 if type(jobs
) != list:
170 def append(self
, job
):
171 self
.jobs
.append(job
)
174 if self
.active
> len(self
.jobs
)-1:
177 return self
.jobs
[self
.active
]
181 if self
.active
< len(self
.jobs
):
182 self
.jobs
[self
.active
].start()
184 #----------------------------------------------------------------------
186 class TaskRunner(object):
188 Manages the running of multiple tasks. Name can be used to identify
189 a specific TaskRunner instance when reporting information back to the user.
191 def __init__(self
, tasks
=[], name
="TaskRunner Tasks"):
192 if type(tasks
) != list:
194 self
.tasks
= tasks
[:]
198 def append(self
, task
):
199 self
.tasks
.append(task
)
201 def errorOccurred(self
):
203 Only used for threaded TR instances. Once all TR tasks have completed,
204 we'll want to check to make sure there were no errors in the process.
209 # start all the active jobs
210 for task
in self
.tasks
:
211 task
.activeJob().start()
214 # loop, getting output from the jobs, etc.
216 # get all active Jobs
217 jobs
= [t
.activeJob() for t
in self
.tasks
if t
.activeJob()]
221 # wait for a job to have output ready, then log it
222 input, output
, err
= select
.select(jobs
, [], [], 1)
226 # check for finished jobs
227 for task
in self
.tasks
:
228 job
= task
.activeJob()
229 if job
and job
.finished():
230 if job
.returnCode() != 0:
231 rc
= job
.returnCode()
232 print "JOB RETURNED FAILURE CODE! (%d)" % rc
238 except KeyboardInterrupt:
239 print "STOPPING JOBS..."
244 print "Unknown exception..."
251 def stopAllJobs(self
):
252 for task
in self
.tasks
:
253 job
= task
.activeJob()
260 class TaskRunnerThread(threading
.Thread
):
261 def __init__(self
, taskRunner
, callback
=None):
262 self
.taskRunner
= taskRunner
263 self
.startTime
= None
265 self
.callback
= callback
266 threading
.Thread
.__init
__ ( self
)
269 self
.startTime
= time
.time()
270 self
.taskRunner
.run()
271 self
.stopTime
= time
.time()
275 def elapsedTime(self
):
279 elapsed_time
= now
-self
.startTime
280 mins
= elapsed_time
/60
282 seconds
= (elapsed_time
- mins
) % 60
283 return "%d:%d:%d" % (hours
, mins
, seconds
)
287 return self
.elapsedTime()
291 #----------------------------------------------------------------------
294 if __name__
== "__main__":
296 j1
= Job("label1", "./tmp/job-1.py", ["TEST-1"])
297 j2
= Job("label2", "./tmp/job-2.sh", ["TEST-2"])
303 j3
= Job("task2a", "./tmp/job-1.py", ["TASK-2a"])
304 j4
= Job("task2b", "./tmp/job-2.sh", ["TASK-2b"])
310 t3
= Task([Job("error", "./tmp/job-3.sh", ["TASK-3"])])
317 for task
in tr
.tasks
:
318 for job
in task
.jobs
: