X-Git-Url: https://git.saurik.com/wxWidgets.git/blobdiff_plain/510bb7480c5138dd5127ed3d8b1d9cbab39983c9..1f780e48af479e7bf9a07eaaa1ab6b41f1ffb17b:/wxPython/samples/ide/activegrid/tool/process.py?ds=sidebyside diff --git a/wxPython/samples/ide/activegrid/tool/process.py b/wxPython/samples/ide/activegrid/tool/process.py new file mode 100644 index 0000000000..a521f0cf50 --- /dev/null +++ b/wxPython/samples/ide/activegrid/tool/process.py @@ -0,0 +1,2364 @@ +#!/usr/bin/env python +# Copyright (c) 2002-2003 ActiveState +# See LICENSE.txt for license details. +""" Contents of LICENSE.txt: +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +""" + +r""" + Python interface for process control. + + This module defines three Process classes for spawning, + communicating and control processes. They are: Process, ProcessOpen, + ProcessProxy. All of the classes allow one to specify the command (cmd), + starting working directory (cwd), and environment to create for the + new process (env) and to "wait" for termination of the child and + "kill" the child. + + Process: + Use this class to simply launch a process (either a GUI app or a + console app in a new console) with which you do not intend to + communicate via it std handles. + + ProcessOpen: + Think of this as a super version of Python's os.popen3() method. + This spawns the given command and sets up pipes for + stdin/stdout/stderr which can then be used to communicate with + the child. + + ProcessProxy: + This is a heavy-weight class that, similar to ProcessOpen, + spawns the given commands and sets up pipes to the child's + stdin/stdout/stderr. However, it also starts three threads to + proxy communication between each of the child's and parent's std + handles. At the parent end of this communication are, by + default, IOBuffer objects. You may specify your own objects here + (usually sub-classing from IOBuffer, which handles some + synchronization issues for you). The result is that it is + possible to have your own IOBuffer instance that gets, say, a + .write() "event" for every write that the child does on its + stdout. + + Understanding ProcessProxy is pretty complex. Some examples + below attempt to help show some uses. Here is a diagram of the + comminucation: + + + ,---->->->------' ^ `------>->->----, + | | v + IOBuffer IOBuffer IOBuffer + (p.stdout) (p.stderr) (p.stdin) + | | | + _OutFileProxy _OutFileProxy _InFileProxy + thread thread thread + | ^ | + `----<-<-<------, | ,------<-<-<----' + + + Usage: + import process + p = process.(cmd='echo hi', ...) + #... use the various methods and attributes + + Examples: + A simple 'hello world': + >>> import process + >>> p = process.ProcessOpen(['echo', 'hello']) + >>> p.stdout.read() + 'hello\r\n' + >>> p.wait() # .wait() returns the child's exit status + 0 + + Redirecting the stdout handler: + >>> import sys + >>> p = process.ProcessProxy(['echo', 'hello'], stdout=sys.stdout) + hello + + Using stdin (need to use ProcessProxy here because it defaults to + text-mode translation on Windows, ProcessOpen does not support + this): + >>> p = process.ProcessProxy(['sort']) + >>> p.stdin.write('5\n') + >>> p.stdin.write('2\n') + >>> p.stdin.write('7\n') + >>> p.stdin.close() + >>> p.stdout.read() + '2\n5\n7\n' + + Specifying environment variables: + >>> p = process.ProcessOpen(['perl', '-e', 'print $ENV{FOO}']) + >>> p.stdout.read() + '' + >>> p = process.ProcessOpen(['perl', '-e', 'print $ENV{FOO}'], + ... env={'FOO':'bar'}) + >>> p.stdout.read() + 'bar' + + Killing a long running process (On Linux, to poll you must use + p.wait(os.WNOHANG)): + >>> p = ProcessOpen(['perl', '-e', 'while (1) {}']) + >>> try: + ... p.wait(os.WNOHANG) # poll to see if is process still running + ... except ProcessError, ex: + ... if ex.errno == ProcessProxy.WAIT_TIMEOUT: + ... print "process is still running" + ... + process is still running + >>> p.kill(42) + >>> p.wait() + 42 + + Providing objects for stdin/stdout/stderr: + XXX write this, mention IOBuffer subclassing. +""" +#TODO: +# - Discuss the decision to NOT have the stdout/stderr _OutFileProxy's +# wait for process termination before closing stdin. It will just +# close stdin when stdout is seen to have been closed. That is +# considered Good Enough (tm). Theoretically it would be nice to +# only abort the stdin proxying when the process terminates, but +# watching for process termination in any of the parent's thread +# adds the undesired condition that the parent cannot exit with the +# child still running. That sucks. +# XXX Note that I don't even know if the current stdout proxy even +# closes the stdin proxy at all. +# - DavidA: if I specify "unbuffered" for my stdin handler (in the +# ProcessProxy constructor) then the stdin IOBuffer should do a +# fparent.read() rather than a fparent.readline(). TrentM: can I do +# that? What happens? +# + +import os +import sys +import threading +import types +import pprint +if sys.platform.startswith("win"): + import msvcrt + import win32api + import win32file + import win32pipe + import pywintypes + import win32process + import win32event + # constants pulled from win32con to save memory + VER_PLATFORM_WIN32_WINDOWS = 1 + CTRL_BREAK_EVENT = 1 + SW_SHOWDEFAULT = 10 + WM_CLOSE = 0x10 + DUPLICATE_SAME_ACCESS = 2 + +else: + import signal + + +#---- exceptions + +class ProcessError(Exception): + def __init__(self, msg, errno=-1): + Exception.__init__(self, msg) + self.errno = errno + + +#---- internal logging facility + +class Logger: + DEBUG, INFO, WARN, ERROR, FATAL = range(5) + def __init__(self, name, level=None, streamOrFileName=sys.stderr): + self.name = name + if level is None: + self.level = self.WARN + else: + self.level = level + if type(streamOrFileName) == types.StringType: + self.stream = open(streamOrFileName, 'w') + self._opennedStream = 1 + else: + self.stream = streamOrFileName + self._opennedStream = 0 + def __del__(self): + if self._opennedStream: + self.stream.close() + def _getLevelName(self, level): + levelNameMap = { + self.DEBUG: "DEBUG", + self.INFO: "INFO", + self.WARN: "WARN", + self.ERROR: "ERROR", + self.FATAL: "FATAL", + } + return levelNameMap[level] + def log(self, level, msg, *args): + if level < self.level: + return + message = "%s: %s:" % (self.name, self._getLevelName(level).lower()) + message = message + (msg % args) + "\n" + self.stream.write(message) + self.stream.flush() + def debug(self, msg, *args): + self.log(self.DEBUG, msg, *args) + def info(self, msg, *args): + self.log(self.INFO, msg, *args) + def warn(self, msg, *args): + self.log(self.WARN, msg, *args) + def error(self, msg, *args): + self.log(self.ERROR, msg, *args) + def fatal(self, msg, *args): + self.log(self.FATAL, msg, *args) + +# Loggers: +# - 'log' to log normal process handling +# - 'logres' to track system resource life +# - 'logfix' to track wait/kill proxying in _ThreadFixer +if 1: # normal/production usage + log = Logger("process", Logger.WARN) +else: # development/debugging usage + log = Logger("process", Logger.DEBUG, sys.stdout) +if 1: # normal/production usage + logres = Logger("process.res", Logger.WARN) +else: # development/debugging usage + logres = Logger("process.res", Logger.DEBUG, sys.stdout) +if 1: # normal/production usage + logfix = Logger("process.waitfix", Logger.WARN) +else: # development/debugging usage + logfix = Logger("process.waitfix", Logger.DEBUG, sys.stdout) + + + +#---- globals + +_version_ = (0, 5, 0) + +# List of registered processes (see _(un)registerProcess). +_processes = [] + + + +#---- internal support routines + +def _escapeArg(arg): + """Escape the given command line argument for the shell.""" + #XXX There is a probably more that we should escape here. + return arg.replace('"', r'\"') + + +def _joinArgv(argv): + r"""Join an arglist to a string appropriate for running. + + >>> import os + >>> _joinArgv(['foo', 'bar "baz']) + 'foo "bar \\"baz"' + """ + cmdstr = "" + for arg in argv: + if ' ' in arg or ';' in arg: + cmdstr += '"%s"' % _escapeArg(arg) + else: + cmdstr += _escapeArg(arg) + cmdstr += ' ' + if cmdstr.endswith(' '): cmdstr = cmdstr[:-1] # strip trailing space + return cmdstr + + +def _getPathFromEnv(env): + """Return the PATH environment variable or None. + + Do the right thing for case sensitivity per platform. + XXX Icky. This guarantee of proper case sensitivity of environment + variables should be done more fundamentally in this module. + """ + if sys.platform.startswith("win"): + for key in env.keys(): + if key.upper() == "PATH": + return env[key] + else: + return None + else: + if env.has_key("PATH"): + return env["PATH"] + else: + return None + + +def _whichFirstArg(cmd, env=None): + """Return the given command ensuring that the first arg (the command to + launch) is a full path to an existing file. + + Raise a ProcessError if no such executable could be found. + """ + # Parse out the first arg. + if cmd.startswith('"'): + # The .replace() is to ensure it does not mistakenly find the + # second '"' in, say (escaped quote): + # "C:\foo\"bar" arg1 arg2 + idx = cmd.replace('\\"', 'XX').find('"', 1) + if idx == -1: + raise ProcessError("Malformed command: %r" % cmd) + first, rest = cmd[1:idx], cmd[idx+1:] + rest = rest.lstrip() + else: + if ' ' in cmd: + first, rest = cmd.split(' ', 1) + else: + first, rest = cmd, "" + + # Ensure the first arg is a valid path to the appropriate file. + import which + if os.sep in first: + altpath = [os.path.dirname(first)] + firstbase = os.path.basename(first) + candidates = list(which.which(firstbase, path=altpath)) + elif env: + altpath = _getPathFromEnv(env) + if altpath: + candidates = list(which.which(first, altpath.split(os.pathsep))) + else: + candidates = list(which.which(first)) + else: + candidates = list(which.which(first)) + if candidates: + return _joinArgv( [candidates[0]] ) + ' ' + rest + else: + raise ProcessError("Could not find an appropriate leading command "\ + "for: %r" % cmd) + + +if sys.platform.startswith("win"): + def _SaferCreateProcess(appName, # app name + cmd, # command line + processSA, # process security attributes + threadSA, # thread security attributes + inheritHandles, # are handles are inherited + creationFlags, # creation flags + env, # environment + cwd, # current working directory + si): # STARTUPINFO pointer + """If CreateProcess fails from environment type inconsistency then + fix that and try again. + + win32process.CreateProcess requires that all environment keys and + values either be all ASCII or all unicode. Try to remove this burden + from the user of process.py. + """ + isWin9x = win32api.GetVersionEx()[3] == VER_PLATFORM_WIN32_WINDOWS + # On Win9x all keys and values of 'env' must be ASCII (XXX + # Actually this is probably only true if the Unicode support + # libraries, which are not installed by default, are not + # installed). On other Windows flavours all keys and values of + # 'env' must all be ASCII *or* all Unicode. We will try to + # automatically convert to the appropriate type, issuing a + # warning if such an automatic conversion is necessary. + + #XXX Komodo 2.0 Beta 1 hack. This requirement should be + # pushed out to Komodo code using process.py. Or should it? + if isWin9x and env: + aenv = {} + for key, value in env.items(): + aenv[str(key)] = str(value) + env = aenv + + log.debug("""\ +_SaferCreateProcess(appName=%r, + cmd=%r, + env=%r, + cwd=%r) + os.getcwd(): %r +""", appName, cmd, env, cwd, os.getcwd()) + try: + hProcess, hThread, processId, threadId\ + = win32process.CreateProcess(appName, cmd, processSA, + threadSA, inheritHandles, + creationFlags, env, cwd, si) + except TypeError, ex: + if ex.args == ('All dictionary items must be strings, or all must be unicode',): + # Try again with an all unicode environment. + #XXX Would be nice if didn't have to depend on the error + # string to catch this. + #XXX Removing this warning for 2.3 release. See bug + # 23215. The right fix is to correct the PHPAppInfo + # stuff to heed the warning. + #import warnings + #warnings.warn('env: ' + str(ex), stacklevel=4) + if isWin9x and env: + aenv = {} + try: + for key, value in env.items(): + aenv[str(key)] = str(value) + except UnicodeError, ex: + raise ProcessError(str(ex)) + env = aenv + elif env: + uenv = {} + for key, val in env.items(): + uenv[unicode(key)] = unicode(val) + env = uenv + hProcess, hThread, processId, threadId\ + = win32process.CreateProcess(appName, cmd, processSA, + threadSA, inheritHandles, + creationFlags, env, cwd, + si) + else: + raise + return hProcess, hThread, processId, threadId + + +# Maintain references to all spawned ProcessProxy objects to avoid hangs. +# Otherwise, if the user lets the a ProcessProxy object go out of +# scope before the process has terminated, it is possible to get a +# hang (at least it *used* to be so when we had the +# win32api.CloseHandle() call in the __del__() method). +# XXX Is this hang possible on Linux as well? +# A reference is removed from this list when the process's .wait or +# .kill method is called. +# XXX Should an atexit() handler be registered to kill all curently +# running processes? Else *could* get hangs, n'est ce pas? +def _registerProcess(process): + global _processes + log.info("_registerprocess(process=%r)", process) + + # Clean up zombie processes. + # If the user does not call .wait() or .kill() on processes then + # the ProcessProxy object will not get cleaned up until Python + # exits and _processes goes out of scope. Under heavy usage that + # is a big memory waste. Cleaning up here alleviates that. + for p in _processes[:]: # use copy of _process, because we may modifiy it + try: + # poll to see if is process still running + if sys.platform.startswith("win"): + timeout = 0 + else: + timeout = os.WNOHANG + p.wait(timeout) + _unregisterProcess(p) + except ProcessError, ex: + if ex.errno == ProcessProxy.WAIT_TIMEOUT: + pass + else: + raise + + _processes.append(process) + +def _unregisterProcess(process): + global _processes + log.info("_unregisterProcess(process=%r)", process) + try: + _processes.remove(process) + del process + except ValueError: + pass + + +def _fixupCommand(cmd, env=None): + """Fixup the command string so it is launchable via CreateProcess. + + One cannot just launch, say "python", via CreateProcess. A full path + to an executable is required. In general there are two choices: + 1. Launch the command string via the shell. The shell will find + the fullpath to the appropriate executable. This shell will + also be able to execute special shell commands, like "dir", + which don't map to an actual executable. + 2. Find the fullpath to the appropriate executable manually and + launch that exe. + + Option (1) is preferred because you don't have to worry about not + exactly duplicating shell behaviour and you get the added bonus of + being able to launch "dir" and friends. + + However, (1) is not always an option. Doing so when the shell is + command.com (as on all Win9x boxes) or when using WinNT's cmd.exe, + problems are created with .kill() because these shells seem to eat + up Ctrl-C's and Ctrl-Break's sent via + win32api.GenerateConsoleCtrlEvent(). Strangely this only happens + when spawn via this Python interface. For example, Ctrl-C get + through to hang.exe here: + C:\> ...\w9xpopen.exe "C:\WINDOWS\COMMAND.COM /c hang.exe" + ^C + but not here: + >>> p = ProcessOpen('hang.exe') + # This results in the same command to CreateProcess as + # above. + >>> p.kill() + + Hence, for these platforms we fallback to option (2). Cons: + - cannot spawn shell commands like 'dir' directly + - cannot spawn batch files + """ + if sys.platform.startswith("win"): + # Fixup the command string to spawn. (Lifted from + # posixmodule.c::_PyPopenCreateProcess() with some modifications) + comspec = os.environ.get("COMSPEC", None) + win32Version = win32api.GetVersion() + if comspec is None: + raise ProcessError("Cannot locate a COMSPEC environment "\ + "variable to use as the shell") + # Explicitly check if we are using COMMAND.COM. If we + # are then use the w9xpopen hack. + elif (win32Version & 0x80000000L == 0) and\ + (win32Version & 0x5L >= 5) and\ + os.path.basename(comspec).lower() != "command.com": + # 2000/XP and not using command.com. + if '"' in cmd or "'" in cmd: + cmd = comspec + ' /c "%s"' % cmd + else: + cmd = comspec + ' /c ' + cmd + elif (win32Version & 0x80000000L == 0) and\ + (win32Version & 0x5L < 5) and\ + os.path.basename(comspec).lower() != "command.com": + # NT and not using command.com. + try: + cmd = _whichFirstArg(cmd, env) + except ProcessError: + raise ProcessError("Could not find a suitable executable "\ + "to launch for '%s'. On WinNT you must manually prefix "\ + "shell commands and batch files with 'cmd.exe /c' to "\ + "have the shell run them." % cmd) + else: + # Oh gag, we're on Win9x and/or using COMMAND.COM. Use the + # workaround listed in KB: Q150956 + w9xpopen = os.path.join( + os.path.dirname(win32api.GetModuleFileName(0)), + 'w9xpopen.exe') + if not os.path.exists(w9xpopen): + # Eeek - file-not-found - possibly an embedding + # situation - see if we can locate it in sys.exec_prefix + w9xpopen = os.path.join(os.path.dirname(sys.exec_prefix), + 'w9xpopen.exe') + if not os.path.exists(w9xpopen): + raise ProcessError(\ + "Can not locate 'w9xpopen.exe' which is needed "\ + "for ProcessOpen to work with your shell or "\ + "platform.") + ## This would be option (1): + #cmd = '%s "%s /c %s"'\ + # % (w9xpopen, comspec, cmd.replace('"', '\\"')) + try: + cmd = _whichFirstArg(cmd, env) + except ProcessError: + raise ProcessError("Could not find a suitable executable "\ + "to launch for '%s'. On Win9x you must manually prefix "\ + "shell commands and batch files with 'command.com /c' "\ + "to have the shell run them." % cmd) + cmd = '%s "%s"' % (w9xpopen, cmd.replace('"', '\\"')) + return cmd + +class _FileWrapper: + """Wrap a system file object, hiding some nitpicky details. + + This class provides a Python file-like interface to either a Python + file object (pretty easy job), a file descriptor, or an OS-specific + file handle (e.g. Win32 handles to file objects on Windows). Any or + all of these object types may be passed to this wrapper. If more + than one is specified this wrapper prefers to work with certain one + in this order: + - file descriptor (because usually this allows for + return-immediately-on-read-if-anything-available semantics and + also provides text mode translation on Windows) + - OS-specific handle (allows for the above read semantics) + - file object (buffering can cause difficulty for interacting + with spawned programs) + + It also provides a place where related such objects can be kept + alive together to prevent premature ref-counted collection. (E.g. on + Windows a Python file object may be associated with a Win32 file + handle. If the file handle is not kept alive the Python file object + will cease to function.) + """ + def __init__(self, file=None, descriptor=None, handle=None): + self._file = file + self._descriptor = descriptor + self._handle = handle + self._closed = 0 + if self._descriptor is not None or self._handle is not None: + self._lineBuf = "" # to support .readline() + + def __del__(self): + self.close() + + def __getattr__(self, name): + """Forward to the underlying file object.""" + if self._file is not None: + return getattr(self._file, name) + else: + raise ProcessError("no file object to pass '%s' attribute to" + % name) + + def _win32Read(self, nBytes): + try: + log.info("[%s] _FileWrapper.read: waiting for read on pipe", + id(self)) + errCode, text = win32file.ReadFile(self._handle, nBytes) + except pywintypes.error, ex: + # Ignore errors for now, like "The pipe is being closed.", + # etc. XXX There *may* be errors we don't want to avoid. + log.info("[%s] _FileWrapper.read: error reading from pipe: %s", + id(self), ex) + return "" + assert errCode == 0,\ + "Why is 'errCode' from ReadFile non-zero? %r" % errCode + if not text: + # Empty text signifies that the pipe has been closed on + # the parent's end. + log.info("[%s] _FileWrapper.read: observed close of parent", + id(self)) + # Signal the child so it knows to stop listening. + self.close() + return "" + else: + log.info("[%s] _FileWrapper.read: read %d bytes from pipe: %r", + id(self), len(text), text) + return text + + def read(self, nBytes=-1): + # nBytes <= 0 means "read everything" + # Note that we are changing the "read everything" cue to + # include 0, because actually doing + # win32file.ReadFile(, 0) results in every subsequent + # read returning 0, i.e. it shuts down the pipe. + if self._descriptor is not None: + if nBytes <= 0: + text, self._lineBuf = self._lineBuf, "" + while 1: + t = os.read(self._descriptor, 4092) + if not t: + break + else: + text += t + else: + if len(self._lineBuf) >= nBytes: + text, self._lineBuf =\ + self._lineBuf[:nBytes], self._lineBuf[nBytes:] + else: + nBytesToGo = nBytes - len(self._lineBuf) + text = self._lineBuf + os.read(self._descriptor, + nBytesToGo) + self._lineBuf = "" + return text + elif self._handle is not None: + if nBytes <= 0: + text, self._lineBuf = self._lineBuf, "" + while 1: + t = self._win32Read(4092) + if not t: + break + else: + text += t + else: + if len(self._lineBuf) >= nBytes: + text, self._lineBuf =\ + self._lineBuf[:nBytes], self._lineBuf[nBytes:] + else: + nBytesToGo = nBytes - len(self._lineBuf) + text, self._lineBuf =\ + self._lineBuf + self._win32Read(nBytesToGo), "" + return text + elif self._file is not None: + return self._file.read(nBytes) + else: + raise "FileHandle.read: no handle to read with" + + def readline(self): + if self._descriptor is not None or self._handle is not None: + while 1: + #XXX This is not portable to the Mac. + idx = self._lineBuf.find('\n') + if idx != -1: + line, self._lineBuf =\ + self._lineBuf[:idx+1], self._lineBuf[idx+1:] + break + else: + lengthBefore = len(self._lineBuf) + t = self.read(4092) + if len(t) <= lengthBefore: # no new data was read + line, self._lineBuf = self._lineBuf, "" + break + else: + self._lineBuf += t + return line + elif self._file is not None: + return self._file.readline() + else: + raise "FileHandle.readline: no handle to read with" + + def readlines(self): + if self._descriptor is not None or self._handle is not None: + lines = [] + while 1: + line = self.readline() + if line: + lines.append(line) + else: + break + return lines + elif self._file is not None: + return self._file.readlines() + else: + raise "FileHandle.readline: no handle to read with" + + def write(self, text): + if self._descriptor is not None: + os.write(self._descriptor, text) + elif self._handle is not None: + try: + errCode, nBytesWritten = win32file.WriteFile(self._handle, text) + except pywintypes.error, ex: + # Ingore errors like "The pipe is being closed.", for + # now. + log.info("[%s] _FileWrapper.write: error writing to pipe, "\ + "ignored", id(self)) + return + assert errCode == 0,\ + "Why is 'errCode' from WriteFile non-zero? %r" % errCode + if not nBytesWritten: + # No bytes written signifies that the pipe has been + # closed on the child's end. + log.info("[%s] _FileWrapper.write: observed close of pipe", + id(self)) + return + else: + log.info("[%s] _FileWrapper.write: wrote %d bytes to pipe: %r", + id(self), len(text), text) + elif self._file is not None: + self._file.write(text) + else: + raise "FileHandle.write: nothing to write with" + + def close(self): + """Close all associated file objects and handles.""" + log.debug("[%s] _FileWrapper.close()", id(self)) + if not self._closed: + self._closed = 1 + if self._file is not None: + log.debug("[%s] _FileWrapper.close: close file", id(self)) + self._file.close() + log.debug("[%s] _FileWrapper.close: done file close", id(self)) + if self._descriptor is not None: + try: + os.close(self._descriptor) + except OSError, ex: + if ex.errno == 9: + # Ignore: OSError: [Errno 9] Bad file descriptor + # XXX *Should* we be ignoring this? It appears very + # *in*frequently in test_wait.py. + log.debug("[%s] _FileWrapper.close: closing "\ + "descriptor raised OSError", id(self)) + else: + raise + if self._handle is not None: + log.debug("[%s] _FileWrapper.close: close handle", id(self)) + try: + win32api.CloseHandle(self._handle) + except win32api.error: + log.debug("[%s] _FileWrapper.close: closing handle raised", + id(self)) + pass + log.debug("[%s] _FileWrapper.close: done closing handle", + id(self)) + + def __repr__(self): + return "<_FileWrapper: file:%r fd:%r os_handle:%r>"\ + % (self._file, self._descriptor, self._handle) + + +class _CountingCloser: + """Call .close() on the given object after own .close() is called + the precribed number of times. + """ + def __init__(self, objectsToClose, count): + """ + "objectsToClose" is a list of object on which to call .close(). + "count" is the number of times this object's .close() method + must be called before .close() is called on the given objects. + """ + self.objectsToClose = objectsToClose + self.count = count + if self.count <= 0: + raise ProcessError("illegal 'count' value: %s" % self.count) + + def close(self): + self.count -= 1 + log.debug("[%d] _CountingCloser.close(): count=%d", id(self), + self.count) + if self.count == 0: + for objectToClose in self.objectsToClose: + objectToClose.close() + + + +#---- public interface + +class Process: + """Create a process. + + One can optionally specify the starting working directory, the + process environment, and std handles to have the child process + inherit (all defaults are the parent's current settings). 'wait' and + 'kill' method allow for control of the child's termination. + """ + # TODO: + # - Rename this or merge it with ProcessOpen somehow. + # + if sys.platform.startswith("win"): + # .wait() argument constants + INFINITE = win32event.INFINITE + # .wait() return error codes + WAIT_FAILED = win32event.WAIT_FAILED + WAIT_TIMEOUT = win32event.WAIT_TIMEOUT + # creation "flags" constants + # XXX Should drop these and just document usage of + # win32process.CREATE_* constants on windows. + CREATE_NEW_CONSOLE = win32process.CREATE_NEW_CONSOLE + else: + # .wait() argument constants + INFINITE = 0 + # .wait() return error codes + WAIT_TIMEOUT = 258 + WAIT_FAILED = -1 + # creation "flags" constants + CREATE_NEW_CONSOLE = 0x10 # same as win32process.CREATE_NEW_CONSOLE + + def __init__(self, cmd, cwd=None, env=None, flags=0): + """Create a child process. + + "cmd" is a command string or argument vector to spawn. + "cwd" is a working directory in which to start the child process. + "env" is an environment dictionary for the child. + "flags" are system-specific process creation flags. On Windows + this can be a bitwise-OR of any of the win32process.CREATE_* + constants (Note: win32process.CREATE_NEW_PROCESS_GROUP is always + OR'd in). On Unix, this is currently ignored. + """ + log.info("Process.__init__(cmd=%r, cwd=%r, env=%r, flags=%r)", + cmd, cwd, env, flags) + self._cmd = cmd + if not self._cmd: + raise ProcessError("You must specify a command.") + self._cwd = cwd + self._env = env + self._flags = flags + if sys.platform.startswith("win"): + self._flags |= win32process.CREATE_NEW_PROCESS_GROUP + + if sys.platform.startswith("win"): + self._startOnWindows() + else: + self.__retvalCache = None + self._startOnUnix() + + def _runChildOnUnix(self): + #XXX Errors running the child do *not* get communicated back. + + #XXX Perhaps we should *always* prefix with '/bin/sh -c'? There is a + # disparity btwn how this works on Linux and Windows. + if isinstance(self._cmd, types.StringTypes): + # This is easier than trying to reproduce shell interpretation to + # separate the arguments. + cmd = ['/bin/sh', '-c', self._cmd] + else: + cmd = self._cmd + + # Close all file descriptors (except std*) inherited from the parent. + MAXFD = 256 # Max number of file descriptors (os.getdtablesize()???) + for i in range(3, MAXFD): + try: + os.close(i) + except OSError: + pass + + try: + if self._env: + os.execvpe(cmd[0], cmd, self._env) + else: + os.execvp(cmd[0], cmd) + finally: + os._exit(1) # Should never get here. + + def _forkAndExecChildOnUnix(self): + """Fork and start the child process. + + Sets self._pid as a side effect. + """ + pid = os.fork() + if pid == 0: # child + self._runChildOnUnix() + # parent + self._pid = pid + + def _startOnUnix(self): + if self._cwd: + oldDir = os.getcwd() + try: + os.chdir(self._cwd) + except OSError, ex: + raise ProcessError(msg=str(ex), errno=ex.errno) + self._forkAndExecChildOnUnix() + + # parent + if self._cwd: + os.chdir(oldDir) + + def _startOnWindows(self): + if type(self._cmd) in (types.ListType, types.TupleType): + # And arg vector was passed in. + cmd = _joinArgv(self._cmd) + else: + cmd = self._cmd + + si = win32process.STARTUPINFO() + si.dwFlags = win32process.STARTF_USESHOWWINDOW + si.wShowWindow = SW_SHOWDEFAULT + + if not (self._flags & self.CREATE_NEW_CONSOLE): + #XXX This is hacky. + # We cannot then use _fixupCommand because this will cause a + # shell to be openned as the command is launched. Therefore need + # to ensure be have the full path to the executable to launch. + try: + cmd = _whichFirstArg(cmd, self._env) + except ProcessError: + # Could not find the command, perhaps it is an internal + # shell command -- fallback to _fixupCommand + cmd = _fixupCommand(cmd, self._env) + else: + cmd = _fixupCommand(cmd, self._env) + log.debug("cmd = %r", cmd) + + # Start the child process. + try: + self._hProcess, self._hThread, self._processId, self._threadId\ + = _SaferCreateProcess( + None, # app name + cmd, # command line + None, # process security attributes + None, # primary thread security attributes + 0, # handles are inherited + self._flags, # creation flags + self._env, # environment + self._cwd, # current working directory + si) # STARTUPINFO pointer + win32api.CloseHandle(self._hThread) + except win32api.error, ex: + raise ProcessError(msg="Error creating process for '%s': %s"\ + % (cmd, ex.args[2]), + errno=ex.args[0]) + + def wait(self, timeout=None): + """Wait for the started process to complete. + + "timeout" (on Windows) is a floating point number of seconds after + which to timeout. Default is win32event.INFINITE. + "timeout" (on Unix) is akin to the os.waitpid() "options" argument + (os.WNOHANG may be used to return immediately if the process has + not exited). Default is 0, i.e. wait forever. + + If the wait time's out it will raise a ProcessError. Otherwise it + will return the child's exit value (on Windows) or the child's exit + status excoded as per os.waitpid() (on Linux): + "a 16-bit number, whose low byte is the signal number that killed + the process, and whose high byte is the exit status (if the + signal number is zero); the high bit of the low byte is set if a + core file was produced." + In the latter case, use the os.W*() methods to interpret the return + value. + """ + # XXX Or should returning the exit value be move out to another + # function as on Win32 process control? If so, then should + # perhaps not make WaitForSingleObject semantic transformation. + if sys.platform.startswith("win"): + if timeout is None: + timeout = win32event.INFINITE + else: + timeout = timeout * 1000.0 # Win32 API's timeout is in millisecs + + rc = win32event.WaitForSingleObject(self._hProcess, timeout) + if rc == win32event.WAIT_FAILED: + raise ProcessError("'WAIT_FAILED' when waiting for process to "\ + "terminate: %r" % self._cmd, rc) + elif rc == win32event.WAIT_TIMEOUT: + raise ProcessError("'WAIT_TIMEOUT' when waiting for process to "\ + "terminate: %r" % self._cmd, rc) + + retval = win32process.GetExitCodeProcess(self._hProcess) + else: + # os.waitpid() will raise: + # OSError: [Errno 10] No child processes + # on subsequent .wait() calls. Change these semantics to have + # subsequent .wait() calls return the exit status and return + # immediately without raising an exception. + # (XXX It would require synchronization code to handle the case + # of multiple simultaneous .wait() requests, however we can punt + # on that because it is moot while Linux still has the problem + # for which _ThreadFixer() exists.) + if self.__retvalCache is not None: + retval = self.__retvalCache + else: + if timeout is None: + timeout = 0 + pid, sts = os.waitpid(self._pid, timeout) + if pid == self._pid: + self.__retvalCache = retval = sts + else: + raise ProcessError("Wait for process timed out.", + self.WAIT_TIMEOUT) + return retval + + def kill(self, exitCode=0, gracePeriod=1.0, sig=None): + """Kill process. + + "exitCode" [deprecated, not supported] (Windows only) is the + code the terminated process should exit with. + "gracePeriod" (Windows only) is a number of seconds the process is + allowed to shutdown with a WM_CLOSE signal before a hard + terminate is called. + "sig" (Unix only) is the signal to use to kill the process. Defaults + to signal.SIGKILL. See os.kill() for more information. + + Windows: + Try for an orderly shutdown via WM_CLOSE. If still running + after gracePeriod (1 sec. default), terminate. + """ + if sys.platform.startswith("win"): + import win32gui + # Send WM_CLOSE to windows in this process group. + win32gui.EnumWindows(self._close_, 0) + + # Send Ctrl-Break signal to all processes attached to this + # console. This is supposed to trigger shutdown handlers in + # each of the processes. + try: + win32api.GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, + self._processId) + except AttributeError: + log.warn("The win32api module does not have "\ + "GenerateConsoleCtrlEvent(). This may mean that "\ + "parts of this process group have NOT been killed.") + except win32api.error, ex: + if ex.args[0] not in (6, 87): + # Ignore the following: + # api_error: (87, 'GenerateConsoleCtrlEvent', 'The parameter is incorrect.') + # api_error: (6, 'GenerateConsoleCtrlEvent', 'The handle is invalid.') + # Get error 6 if there is no console. + raise + + # Last resort: call TerminateProcess if it has not yet. + retval = 0 + try: + self.wait(gracePeriod) + except ProcessError, ex: + log.info("[%s] Process.kill: calling TerminateProcess", id(self)) + win32process.TerminateProcess(self._hProcess, -1) + win32api.Sleep(100) # wait for resources to be released + + else: + if sig is None: + sig = signal.SIGKILL + try: + os.kill(self._pid, sig) + except OSError, ex: + if ex.errno != 3: + # Ignore: OSError: [Errno 3] No such process + raise + + def _close_(self, hwnd, dummy): + """Callback used by .kill() on Windows. + + EnumWindows callback - sends WM_CLOSE to any window owned by this + process. + """ + threadId, processId = win32process.GetWindowThreadProcessId(hwnd) + if processId == self._processId: + import win32gui + win32gui.PostMessage(hwnd, WM_CLOSE, 0, 0) + + +class ProcessOpen(Process): + """Create a process and setup pipes to it standard handles. + + This is a super popen3. + """ + # TODO: + # - Share some implementation with Process and ProcessProxy. + # + + def __init__(self, cmd, mode='t', cwd=None, env=None): + """Create a Process with proxy threads for each std handle. + + "cmd" is the command string or argument vector to run. + "mode" (Windows only) specifies whether the pipes used to communicate + with the child are openned in text, 't', or binary, 'b', mode. + This is ignored on platforms other than Windows. Default is 't'. + "cwd" optionally specifies the directory in which the child process + should be started. Default is None, a.k.a. inherits the cwd from + the parent. + "env" is optionally a mapping specifying the environment in which to + start the child. Default is None, a.k.a. inherits the environment + of the parent. + """ + # Keep a reference to ensure it is around for this object's destruction. + self.__log = log + log.info("ProcessOpen.__init__(cmd=%r, mode=%r, cwd=%r, env=%r)", + cmd, mode, cwd, env) + self._cmd = cmd + if not self._cmd: + raise ProcessError("You must specify a command.") + self._cwd = cwd + self._env = env + self._mode = mode + if self._mode not in ('t', 'b'): + raise ProcessError("'mode' must be 't' or 'b'.") + self._closed = 0 + + if sys.platform.startswith("win"): + self._startOnWindows() + else: + self.__retvalCache = None + self._startOnUnix() + + _registerProcess(self) + + def __del__(self): + #XXX Should probably not rely upon this. + logres.info("[%s] ProcessOpen.__del__()", id(self)) + self.close() + del self.__log # drop reference + + def close(self): + if not self._closed: + self.__log.info("[%s] ProcessOpen.close()" % id(self)) + + # Ensure that all IOBuffer's are closed. If they are not, these + # can cause hangs. + try: + self.__log.info("[%s] ProcessOpen: closing stdin (%r)."\ + % (id(self), self.stdin)) + self.stdin.close() + except AttributeError: + # May not have gotten far enough in the __init__ to set + # self.stdin, etc. + pass + try: + self.__log.info("[%s] ProcessOpen: closing stdout (%r)."\ + % (id(self), self.stdout)) + self.stdout.close() + except AttributeError: + # May not have gotten far enough in the __init__ to set + # self.stdout, etc. + pass + try: + self.__log.info("[%s] ProcessOpen: closing stderr (%r)."\ + % (id(self), self.stderr)) + self.stderr.close() + except AttributeError: + # May not have gotten far enough in the __init__ to set + # self.stderr, etc. + pass + + self._closed = 1 + + def _forkAndExecChildOnUnix(self, fdChildStdinRd, fdChildStdoutWr, + fdChildStderrWr): + """Fork and start the child process. + + Sets self._pid as a side effect. + """ + pid = os.fork() + if pid == 0: # child + os.dup2(fdChildStdinRd, 0) + os.dup2(fdChildStdoutWr, 1) + os.dup2(fdChildStderrWr, 2) + self._runChildOnUnix() + # parent + self._pid = pid + + def _startOnUnix(self): + # Create pipes for std handles. + fdChildStdinRd, fdChildStdinWr = os.pipe() + fdChildStdoutRd, fdChildStdoutWr = os.pipe() + fdChildStderrRd, fdChildStderrWr = os.pipe() + + if self._cwd: + oldDir = os.getcwd() + try: + os.chdir(self._cwd) + except OSError, ex: + raise ProcessError(msg=str(ex), errno=ex.errno) + self._forkAndExecChildOnUnix(fdChildStdinRd, fdChildStdoutWr, + fdChildStderrWr) + if self._cwd: + os.chdir(oldDir) + + os.close(fdChildStdinRd) + os.close(fdChildStdoutWr) + os.close(fdChildStderrWr) + + self.stdin = _FileWrapper(descriptor=fdChildStdinWr) + logres.info("[%s] ProcessOpen._start(): create child stdin: %r", + id(self), self.stdin) + self.stdout = _FileWrapper(descriptor=fdChildStdoutRd) + logres.info("[%s] ProcessOpen._start(): create child stdout: %r", + id(self), self.stdout) + self.stderr = _FileWrapper(descriptor=fdChildStderrRd) + logres.info("[%s] ProcessOpen._start(): create child stderr: %r", + id(self), self.stderr) + + def _startOnWindows(self): + if type(self._cmd) in (types.ListType, types.TupleType): + # An arg vector was passed in. + cmd = _joinArgv(self._cmd) + else: + cmd = self._cmd + + # Create pipes for std handles. + # (Set the bInheritHandle flag so pipe handles are inherited.) + saAttr = pywintypes.SECURITY_ATTRIBUTES() + saAttr.bInheritHandle = 1 + #XXX Should maybe try with os.pipe. Dunno what that does for + # inheritability though. + hChildStdinRd, hChildStdinWr = win32pipe.CreatePipe(saAttr, 0) + hChildStdoutRd, hChildStdoutWr = win32pipe.CreatePipe(saAttr, 0) + hChildStderrRd, hChildStderrWr = win32pipe.CreatePipe(saAttr, 0) + + try: + # Duplicate the parent ends of the pipes so they are not + # inherited. + hChildStdinWrDup = win32api.DuplicateHandle( + win32api.GetCurrentProcess(), + hChildStdinWr, + win32api.GetCurrentProcess(), + 0, + 0, # not inherited + DUPLICATE_SAME_ACCESS) + win32api.CloseHandle(hChildStdinWr) + self._hChildStdinWr = hChildStdinWrDup + hChildStdoutRdDup = win32api.DuplicateHandle( + win32api.GetCurrentProcess(), + hChildStdoutRd, + win32api.GetCurrentProcess(), + 0, + 0, # not inherited + DUPLICATE_SAME_ACCESS) + win32api.CloseHandle(hChildStdoutRd) + self._hChildStdoutRd = hChildStdoutRdDup + hChildStderrRdDup = win32api.DuplicateHandle( + win32api.GetCurrentProcess(), + hChildStderrRd, + win32api.GetCurrentProcess(), + 0, + 0, # not inherited + DUPLICATE_SAME_ACCESS) + win32api.CloseHandle(hChildStderrRd) + self._hChildStderrRd = hChildStderrRdDup + + # Set the translation mode and buffering. + if self._mode == 't': + flags = os.O_TEXT + else: + flags = 0 + fdChildStdinWr = msvcrt.open_osfhandle(self._hChildStdinWr, flags) + fdChildStdoutRd = msvcrt.open_osfhandle(self._hChildStdoutRd, flags) + fdChildStderrRd = msvcrt.open_osfhandle(self._hChildStderrRd, flags) + + self.stdin = _FileWrapper(descriptor=fdChildStdinWr, + handle=self._hChildStdinWr) + logres.info("[%s] ProcessOpen._start(): create child stdin: %r", + id(self), self.stdin) + self.stdout = _FileWrapper(descriptor=fdChildStdoutRd, + handle=self._hChildStdoutRd) + logres.info("[%s] ProcessOpen._start(): create child stdout: %r", + id(self), self.stdout) + self.stderr = _FileWrapper(descriptor=fdChildStderrRd, + handle=self._hChildStderrRd) + logres.info("[%s] ProcessOpen._start(): create child stderr: %r", + id(self), self.stderr) + + # Start the child process. + si = win32process.STARTUPINFO() + si.dwFlags = win32process.STARTF_USESHOWWINDOW + si.wShowWindow = 0 # SW_HIDE + si.hStdInput = hChildStdinRd + si.hStdOutput = hChildStdoutWr + si.hStdError = hChildStderrWr + si.dwFlags |= win32process.STARTF_USESTDHANDLES + + cmd = _fixupCommand(cmd, self._env) + + creationFlags = win32process.CREATE_NEW_PROCESS_GROUP + try: + self._hProcess, hThread, self._processId, threadId\ + = _SaferCreateProcess( + None, # app name + cmd, # command line + None, # process security attributes + None, # primary thread security attributes + 1, # handles are inherited + creationFlags, # creation flags + self._env, # environment + self._cwd, # current working directory + si) # STARTUPINFO pointer + except win32api.error, ex: + raise ProcessError(msg=ex.args[2], errno=ex.args[0]) + win32api.CloseHandle(hThread) + + finally: + # Close child ends of pipes on the parent's side (the + # parent's ends of the pipe are closed in the _FileWrappers.) + win32file.CloseHandle(hChildStdinRd) + win32file.CloseHandle(hChildStdoutWr) + win32file.CloseHandle(hChildStderrWr) + + def wait(self, timeout=None): + """Wait for the started process to complete. + + "timeout" (on Windows) is a floating point number of seconds after + which to timeout. Default is win32event.INFINITE. + "timeout" (on Unix) is akin to the os.waitpid() "options" argument + (os.WNOHANG may be used to return immediately if the process has + not exited). Default is 0, i.e. wait forever. + + If the wait time's out it will raise a ProcessError. Otherwise it + will return the child's exit value (on Windows) or the child's exit + status excoded as per os.waitpid() (on Linux): + "a 16-bit number, whose low byte is the signal number that killed + the process, and whose high byte is the exit status (if the + signal number is zero); the high bit of the low byte is set if a + core file was produced." + In the latter case, use the os.W*() methods to interpret the return + value. + """ + # XXX Or should returning the exit value be move out to another + # function as on Win32 process control? If so, then should + # perhaps not make WaitForSingleObject semantic + # transformation. + # TODO: + # - Need to rationalize the .wait() API for Windows vs. Unix. + # It is a real pain in the current situation. + if sys.platform.startswith("win"): + if timeout is None: + timeout = win32event.INFINITE + else: + timeout = timeout * 1000.0 # Win32 API's timeout is in millisecs + + #rc = win32event.WaitForSingleObject(self._hProcess, timeout) + rc = win32event.WaitForSingleObject(self._hProcess, int(timeout)) # MATT -- Making timeout an integer + if rc == win32event.WAIT_FAILED: + raise ProcessError("'WAIT_FAILED' when waiting for process to "\ + "terminate: %r" % self._cmd, rc) + elif rc == win32event.WAIT_TIMEOUT: + raise ProcessError("'WAIT_TIMEOUT' when waiting for process to "\ + "terminate: %r" % self._cmd, rc) + + retval = win32process.GetExitCodeProcess(self._hProcess) + else: + # os.waitpid() will raise: + # OSError: [Errno 10] No child processes + # on subsequent .wait() calls. Change these semantics to have + # subsequent .wait() calls return the exit status and return + # immediately without raising an exception. + # (XXX It would require synchronization code to handle the case + # of multiple simultaneous .wait() requests, however we can punt + # on that because it is moot while Linux still has the problem + # for which _ThreadFixer() exists.) + if self.__retvalCache is not None: + retval = self.__retvalCache + else: + if timeout is None: + timeout = 0 + pid, sts = os.waitpid(self._pid, timeout) + if pid == self._pid: + self.__retvalCache = retval = sts + else: + raise ProcessError("Wait for process timed out.", + self.WAIT_TIMEOUT) + _unregisterProcess(self) + return retval + + def kill(self, exitCode=0, gracePeriod=1.0, sig=None): + """Kill process. + + "exitCode" [deprecated, not supported] (Windows only) is the + code the terminated process should exit with. + "gracePeriod" (Windows only) is a number of seconds the process is + allowed to shutdown with a WM_CLOSE signal before a hard + terminate is called. + "sig" (Unix only) is the signal to use to kill the process. Defaults + to signal.SIGKILL. See os.kill() for more information. + + Windows: + Try for an orderly shutdown via WM_CLOSE. If still running + after gracePeriod (1 sec. default), terminate. + """ + if sys.platform.startswith("win"): + import win32gui + # Send WM_CLOSE to windows in this process group. + win32gui.EnumWindows(self._close_, 0) + + # Send Ctrl-Break signal to all processes attached to this + # console. This is supposed to trigger shutdown handlers in + # each of the processes. + try: + win32api.GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, + self._processId) + except AttributeError: + log.warn("The win32api module does not have "\ + "GenerateConsoleCtrlEvent(). This may mean that "\ + "parts of this process group have NOT been killed.") + except win32api.error, ex: + if ex.args[0] not in (6, 87): + # Ignore the following: + # api_error: (87, 'GenerateConsoleCtrlEvent', 'The parameter is incorrect.') + # api_error: (6, 'GenerateConsoleCtrlEvent', 'The handle is invalid.') + # Get error 6 if there is no console. + raise + + # Last resort: call TerminateProcess if it has not yet. + retval = 0 + try: + self.wait(gracePeriod) + except ProcessError, ex: + log.info("[%s] Process.kill: calling TerminateProcess", id(self)) + win32process.TerminateProcess(self._hProcess, -1) + win32api.Sleep(100) # wait for resources to be released + + else: + if sig is None: + sig = signal.SIGKILL + try: + os.kill(self._pid, sig) + except OSError, ex: + if ex.errno != 3: + # Ignore: OSError: [Errno 3] No such process + raise + + _unregisterProcess(self) + + def _close_(self, hwnd, dummy): + """Callback used by .kill() on Windows. + + EnumWindows callback - sends WM_CLOSE to any window owned by this + process. + """ + threadId, processId = win32process.GetWindowThreadProcessId(hwnd) + if processId == self._processId: + import win32gui + win32gui.PostMessage(hwnd, WM_CLOSE, 0, 0) + + +class ProcessProxy(Process): + """Create a process and proxy communication via the standard handles. + """ + #XXX To add to docstring: + # - stdout/stderr proxy handling + # - stdin proxy handling + # - termination + # - how to .start(), i.e. basic usage rules + # - mention that pased in stdin/stdout/stderr objects have to + # implement at least .write (is .write correct for stdin)? + # - if you pass in stdin, stdout, and/or stderr streams it is the + # user's responsibility to close them afterwards. + # - 'cmd' arg can be a command string or an arg vector + # - etc. + #TODO: + # - .suspend() and .resume()? See Win32::Process Perl module. + # + def __init__(self, cmd, mode='t', cwd=None, env=None, + stdin=None, stdout=None, stderr=None): + """Create a Process with proxy threads for each std handle. + + "cmd" is the command string or argument vector to run. + "mode" (Windows only) specifies whether the pipes used to communicate + with the child are openned in text, 't', or binary, 'b', mode. + This is ignored on platforms other than Windows. Default is 't'. + "cwd" optionally specifies the directory in which the child process + should be started. Default is None, a.k.a. inherits the cwd from + the parent. + "env" is optionally a mapping specifying the environment in which to + start the child. Default is None, a.k.a. inherits the environment + of the parent. + "stdin", "stdout", "stderr" can be used to specify objects with + file-like interfaces to handle read (stdout/stderr) and write + (stdin) events from the child. By default a process.IOBuffer + instance is assigned to each handler. IOBuffer may be + sub-classed. See the IOBuffer doc string for more information. + """ + # Keep a reference to ensure it is around for this object's destruction. + self.__log = log + log.info("ProcessProxy.__init__(cmd=%r, mode=%r, cwd=%r, env=%r, "\ + "stdin=%r, stdout=%r, stderr=%r)", + cmd, mode, cwd, env, stdin, stdout, stderr) + self._cmd = cmd + if not self._cmd: + raise ProcessError("You must specify a command.") + self._mode = mode + if self._mode not in ('t', 'b'): + raise ProcessError("'mode' must be 't' or 'b'.") + self._cwd = cwd + self._env = env + if stdin is None: + self.stdin = IOBuffer(name='') + else: + self.stdin = stdin + if stdout is None: + self.stdout = IOBuffer(name='') + else: + self.stdout = stdout + if stderr is None: + self.stderr = IOBuffer(name='') + else: + self.stderr = stderr + self._closed = 0 + + if sys.platform.startswith("win"): + self._startOnWindows() + else: + self.__retvalCache = None + self._startOnUnix() + + _registerProcess(self) + + def __del__(self): + #XXX Should probably not rely upon this. + logres.info("[%s] ProcessProxy.__del__()", id(self)) + self.close() + del self.__log # drop reference + + def close(self): + if not self._closed: + self.__log.info("[%s] ProcessProxy.close()" % id(self)) + + # Ensure that all IOBuffer's are closed. If they are not, these + # can cause hangs. + self.__log.info("[%s] ProcessProxy: closing stdin (%r)."\ + % (id(self), self.stdin)) + try: + self.stdin.close() + self._stdinProxy.join() + except AttributeError: + # May not have gotten far enough in the __init__ to set + # self.stdin, etc. + pass + self.__log.info("[%s] ProcessProxy: closing stdout (%r)."\ + % (id(self), self.stdout)) + try: + self.stdout.close() + if self._stdoutProxy is not threading.currentThread(): + self._stdoutProxy.join() + except AttributeError: + # May not have gotten far enough in the __init__ to set + # self.stdout, etc. + pass + self.__log.info("[%s] ProcessProxy: closing stderr (%r)."\ + % (id(self), self.stderr)) + try: + self.stderr.close() + if self._stderrProxy is not threading.currentThread(): + self._stderrProxy.join() + except AttributeError: + # May not have gotten far enough in the __init__ to set + # self.stderr, etc. + pass + + self._closed = 1 + + def _forkAndExecChildOnUnix(self, fdChildStdinRd, fdChildStdoutWr, + fdChildStderrWr): + """Fork and start the child process. + + Sets self._pid as a side effect. + """ + pid = os.fork() + if pid == 0: # child + os.dup2(fdChildStdinRd, 0) + os.dup2(fdChildStdoutWr, 1) + os.dup2(fdChildStderrWr, 2) + self._runChildOnUnix() + # parent + self._pid = pid + + def _startOnUnix(self): + # Create pipes for std handles. + fdChildStdinRd, fdChildStdinWr = os.pipe() + fdChildStdoutRd, fdChildStdoutWr = os.pipe() + fdChildStderrRd, fdChildStderrWr = os.pipe() + + if self._cwd: + oldDir = os.getcwd() + try: + os.chdir(self._cwd) + except OSError, ex: + raise ProcessError(msg=str(ex), errno=ex.errno) + self._forkAndExecChildOnUnix(fdChildStdinRd, fdChildStdoutWr, + fdChildStderrWr) + if self._cwd: + os.chdir(oldDir) + + os.close(fdChildStdinRd) + os.close(fdChildStdoutWr) + os.close(fdChildStderrWr) + + childStdin = _FileWrapper(descriptor=fdChildStdinWr) + logres.info("[%s] ProcessProxy._start(): create child stdin: %r", + id(self), childStdin) + childStdout = _FileWrapper(descriptor=fdChildStdoutRd) + logres.info("[%s] ProcessProxy._start(): create child stdout: %r", + id(self), childStdout) + childStderr = _FileWrapper(descriptor=fdChildStderrRd) + logres.info("[%s] ProcessProxy._start(): create child stderr: %r", + id(self), childStderr) + + # Create proxy threads for the out pipes. + self._stdinProxy = _InFileProxy(self.stdin, childStdin, name='') + self._stdinProxy.start() + # Clean up the parent's side of when it is observed that + # the child has closed its side of and . (This + # is one way of determining when it is appropriate to clean up + # this pipe, with compromises. See the discussion at the top of + # this module.) + closer = _CountingCloser([self.stdin, childStdin, self], 2) + self._stdoutProxy = _OutFileProxy(childStdout, self.stdout, + [closer], + name='') + self._stdoutProxy.start() + self._stderrProxy = _OutFileProxy(childStderr, self.stderr, + [closer], + name='') + self._stderrProxy.start() + + def _startOnWindows(self): + if type(self._cmd) in (types.ListType, types.TupleType): + # An arg vector was passed in. + cmd = _joinArgv(self._cmd) + else: + cmd = self._cmd + + # Create pipes for std handles. + # (Set the bInheritHandle flag so pipe handles are inherited.) + saAttr = pywintypes.SECURITY_ATTRIBUTES() + saAttr.bInheritHandle = 1 + #XXX Should maybe try with os.pipe. Dunno what that does for + # inheritability though. + hChildStdinRd, hChildStdinWr = win32pipe.CreatePipe(saAttr, 0) + hChildStdoutRd, hChildStdoutWr = win32pipe.CreatePipe(saAttr, 0) + hChildStderrRd, hChildStderrWr = win32pipe.CreatePipe(saAttr, 0) + + try: + # Duplicate the parent ends of the pipes so they are not + # inherited. + hChildStdinWrDup = win32api.DuplicateHandle( + win32api.GetCurrentProcess(), + hChildStdinWr, + win32api.GetCurrentProcess(), + 0, + 0, # not inherited + DUPLICATE_SAME_ACCESS) + win32api.CloseHandle(hChildStdinWr) + self._hChildStdinWr = hChildStdinWrDup + hChildStdoutRdDup = win32api.DuplicateHandle( + win32api.GetCurrentProcess(), + hChildStdoutRd, + win32api.GetCurrentProcess(), + 0, + 0, # not inherited + DUPLICATE_SAME_ACCESS) + win32api.CloseHandle(hChildStdoutRd) + self._hChildStdoutRd = hChildStdoutRdDup + hChildStderrRdDup = win32api.DuplicateHandle( + win32api.GetCurrentProcess(), + hChildStderrRd, + win32api.GetCurrentProcess(), + 0, + 0, # not inherited + DUPLICATE_SAME_ACCESS) + win32api.CloseHandle(hChildStderrRd) + self._hChildStderrRd = hChildStderrRdDup + + # Set the translation mode. + if self._mode == 't': + flags = os.O_TEXT + mode = '' + else: + flags = 0 + mode = 'b' + fdChildStdinWr = msvcrt.open_osfhandle(self._hChildStdinWr, flags) + fdChildStdoutRd = msvcrt.open_osfhandle(self._hChildStdoutRd, flags) + fdChildStderrRd = msvcrt.open_osfhandle(self._hChildStderrRd, flags) + + childStdin = _FileWrapper(descriptor=fdChildStdinWr, + handle=self._hChildStdinWr) + logres.info("[%s] ProcessProxy._start(): create child stdin: %r", + id(self), childStdin) + childStdout = _FileWrapper(descriptor=fdChildStdoutRd, + handle=self._hChildStdoutRd) + logres.info("[%s] ProcessProxy._start(): create child stdout: %r", + id(self), childStdout) + childStderr = _FileWrapper(descriptor=fdChildStderrRd, + handle=self._hChildStderrRd) + logres.info("[%s] ProcessProxy._start(): create child stderr: %r", + id(self), childStderr) + + # Start the child process. + si = win32process.STARTUPINFO() + si.dwFlags = win32process.STARTF_USESHOWWINDOW + si.wShowWindow = 0 # SW_HIDE + si.hStdInput = hChildStdinRd + si.hStdOutput = hChildStdoutWr + si.hStdError = hChildStderrWr + si.dwFlags |= win32process.STARTF_USESTDHANDLES + + cmd = _fixupCommand(cmd, self._env) + log.debug("cmd = %r", cmd) + + creationFlags = win32process.CREATE_NEW_PROCESS_GROUP + try: + self._hProcess, hThread, self._processId, threadId\ + = _SaferCreateProcess( + None, # app name + cmd, # command line + None, # process security attributes + None, # primary thread security attributes + 1, # handles are inherited + creationFlags, # creation flags + self._env, # environment + self._cwd, # current working directory + si) # STARTUPINFO pointer + except win32api.error, ex: + raise ProcessError(msg=ex.args[2], errno=ex.args[0]) + win32api.CloseHandle(hThread) + + finally: + # Close child ends of pipes on the parent's side (the + # parent's ends of the pipe are closed in the _FileWrappers.) + win32file.CloseHandle(hChildStdinRd) + win32file.CloseHandle(hChildStdoutWr) + win32file.CloseHandle(hChildStderrWr) + + # Create proxy threads for the pipes. + self._stdinProxy = _InFileProxy(self.stdin, childStdin, name='') + self._stdinProxy.start() + # Clean up the parent's side of when it is observed that + # the child has closed its side of . (This is one way of + # determining when it is appropriate to clean up this pipe, with + # compromises. See the discussion at the top of this module.) + self._stdoutProxy = _OutFileProxy(childStdout, self.stdout, + [self.stdin, childStdin, self], + name='') + self._stdoutProxy.start() + self._stderrProxy = _OutFileProxy(childStderr, self.stderr, + name='') + self._stderrProxy.start() + + def wait(self, timeout=None): + """Wait for the started process to complete. + + "timeout" (on Windows) is a floating point number of seconds after + which to timeout. Default is win32event.INFINITE. + "timeout" (on Unix) is akin to the os.waitpid() "options" argument + (os.WNOHANG may be used to return immediately if the process has + not exited). Default is 0, i.e. wait forever. + + If the wait time's out it will raise a ProcessError. Otherwise it + will return the child's exit value (on Windows) or the child's exit + status excoded as per os.waitpid() (on Linux): + "a 16-bit number, whose low byte is the signal number that killed + the process, and whose high byte is the exit status (if the + signal number is zero); the high bit of the low byte is set if a + core file was produced." + In the latter case, use the os.W*() methods to interpret the return + value. + """ + # XXX Or should returning the exit value be move out to another + # function as on Win32 process control? If so, then should + # perhaps not make WaitForSingleObject semantic transformation. + if sys.platform.startswith("win"): + if timeout is None: + timeout = win32event.INFINITE + else: + timeout = timeout * 1000.0 # Win32 API's timeout is in millisecs + + rc = win32event.WaitForSingleObject(self._hProcess, timeout) + if rc == win32event.WAIT_FAILED: + raise ProcessError("'WAIT_FAILED' when waiting for process to "\ + "terminate: %r" % self._cmd, rc) + elif rc == win32event.WAIT_TIMEOUT: + raise ProcessError("'WAIT_TIMEOUT' when waiting for process to "\ + "terminate: %r" % self._cmd, rc) + + retval = win32process.GetExitCodeProcess(self._hProcess) + else: + # os.waitpid() will raise: + # OSError: [Errno 10] No child processes + # on subsequent .wait() calls. Change these semantics to have + # subsequent .wait() calls return the exit status and return + # immediately without raising an exception. + # (XXX It would require synchronization code to handle the case + # of multiple simultaneous .wait() requests, however we can punt + # on that because it is moot while Linux still has the problem + # for which _ThreadFixer() exists.) + if self.__retvalCache is not None: + retval = self.__retvalCache + else: + if timeout is None: + timeout = 0 + pid, sts = os.waitpid(self._pid, timeout) + if pid == self._pid: + self.__retvalCache = retval = sts + else: + raise ProcessError("Wait for process timed out.", + self.WAIT_TIMEOUT) + _unregisterProcess(self) + return retval + + def kill(self, exitCode=0, gracePeriod=1.0, sig=None): + """Kill process. + + "exitCode" [deprecated, not supported] (Windows only) is the + code the terminated process should exit with. + "gracePeriod" (Windows only) is a number of seconds the process is + allowed to shutdown with a WM_CLOSE signal before a hard + terminate is called. + "sig" (Unix only) is the signal to use to kill the process. Defaults + to signal.SIGKILL. See os.kill() for more information. + + Windows: + Try for an orderly shutdown via WM_CLOSE. If still running + after gracePeriod (1 sec. default), terminate. + """ + if sys.platform.startswith("win"): + import win32gui + # Send WM_CLOSE to windows in this process group. + win32gui.EnumWindows(self._close_, 0) + + # Send Ctrl-Break signal to all processes attached to this + # console. This is supposed to trigger shutdown handlers in + # each of the processes. + try: + win32api.GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, + self._processId) + except AttributeError: + log.warn("The win32api module does not have "\ + "GenerateConsoleCtrlEvent(). This may mean that "\ + "parts of this process group have NOT been killed.") + except win32api.error, ex: + if ex.args[0] not in (6, 87): + # Ignore the following: + # api_error: (87, 'GenerateConsoleCtrlEvent', 'The parameter is incorrect.') + # api_error: (6, 'GenerateConsoleCtrlEvent', 'The handle is invalid.') + # Get error 6 if there is no console. + raise + + # Last resort: call TerminateProcess if it has not yet. + retval = 0 + try: + self.wait(gracePeriod) + except ProcessError, ex: + log.info("[%s] Process.kill: calling TerminateProcess", id(self)) + win32process.TerminateProcess(self._hProcess, -1) + win32api.Sleep(100) # wait for resources to be released + + else: + if sig is None: + sig = signal.SIGKILL + try: + os.kill(self._pid, sig) + except OSError, ex: + if ex.errno != 3: + # Ignore: OSError: [Errno 3] No such process + raise + + _unregisterProcess(self) + + def _close_(self, hwnd, dummy): + """Callback used by .kill() on Windows. + + EnumWindows callback - sends WM_CLOSE to any window owned by this + process. + """ + threadId, processId = win32process.GetWindowThreadProcessId(hwnd) + if processId == self._processId: + import win32gui + win32gui.PostMessage(hwnd, WM_CLOSE, 0, 0) + + +class IOBuffer: + """Want to be able to both read and write to this buffer from + difference threads and have the same read/write semantics as for a + std handler. + + This class is subclass-able. _doRead(), _doWrite(), _doReadline(), + _doClose(), _haveLine(), and _haveNumBytes() can be overridden for + specific functionality. The synchronization issues (block on read + until write provides the needed data, termination) are handled for + free. + + Cannot support: + .seek() # Because we are managing *two* positions (one each + .tell() # for reading and writing), these do not make + # sense. + """ + #TODO: + # - Is performance a problem? This will likely be slower that + # StringIO.StringIO(). + # + def __init__(self, mutex=None, stateChange=None, name=None): + """'name' can be set for debugging, it will be used in log messages.""" + if name is not None: + self._name = name + else: + self._name = id(self) + log.info("[%s] IOBuffer.__init__()" % self._name) + + self.__buf = '' + # A state change is defined as the buffer being closed or a + # write occuring. + if mutex is not None: + self._mutex = mutex + else: + self._mutex = threading.Lock() + if stateChange is not None: + self._stateChange = stateChange + else: + self._stateChange = threading.Condition() + self._closed = 0 + + def _doWrite(self, s): + self.__buf += s # Append to buffer. + + def write(self, s): + log.info("[%s] IOBuffer.write(s=%r)", self._name, s) + # Silently drop writes after the buffer has been close()'d. + if self._closed: + return + # If empty write, close buffer (mimicking behaviour from + # koprocess.cpp.) + if not s: + self.close() + return + + self._mutex.acquire() + self._doWrite(s) + self._stateChange.acquire() + self._stateChange.notifyAll() # Notify of the write(). + self._stateChange.release() + self._mutex.release() + + def writelines(self, list): + self.write(''.join(list)) + + def _doRead(self, n): + """Pop 'n' bytes from the internal buffer and return them.""" + if n < 0: + idx = len(self.__buf) + else: + idx = min(n, len(self.__buf)) + retval, self.__buf = self.__buf[:idx], self.__buf[idx:] + return retval + + def read(self, n=-1): + log.info("[%s] IOBuffer.read(n=%r)" % (self._name, n)) + log.info("[%s] IOBuffer.read(): wait for data" % self._name) + if n < 0: + # Wait until the buffer is closed, i.e. no more writes will + # come. + while 1: + if self._closed: break + #log.debug("[%s] <<< IOBuffer.read: state change .wait()"\ + # % self._name) + self._stateChange.acquire() + self._stateChange.wait() + self._stateChange.release() + #log.debug("[%s] >>> IOBuffer.read: done change .wait()"\ + # % self._name) + else: + # Wait until there are the requested number of bytes to read + # (or until the buffer is closed, i.e. no more writes will + # come). + # XXX WARNING: I *think* there is a race condition around + # here whereby self.fparent.read() in _InFileProxy can + # hang. *Sometime* test_stdin::test_stdin_buffer() will + # hang. This was *before* I moved the + # _stateChange.acquire() and .release() calls out side + # of the 'while 1:' here. ...and now they are back + # inside. + while 1: + if self._closed: break + if self._haveNumBytes(n): break + #log.debug("[%s] <<< IOBuffer.read: state change .wait()"\ + # % self._name) + self._stateChange.acquire() + self._stateChange.wait() + self._stateChange.release() + #log.debug("[%s] >>> IOBuffer.read: done change .wait()"\ + # % self._name) + log.info("[%s] IOBuffer.read(): done waiting for data" % self._name) + + self._mutex.acquire() + retval = self._doRead(n) + self._mutex.release() + return retval + + def _doReadline(self, n): + """Pop the front line (or n bytes of it, whichever is less) from + the internal buffer and return it. + """ + idx = self.__buf.find('\n') + if idx == -1: + idx = len(self.__buf) + else: + idx += 1 # include the '\n' + if n is not None: + idx = min(idx, n) + retval, self.__buf = self.__buf[:idx], self.__buf[idx:] + return retval + + def _haveLine(self): + return self.__buf.find('\n') != -1 + + def _haveNumBytes(self, n=None): + return len(self.__buf) >= n + + def readline(self, n=None): + # Wait until there is a full line (or at least 'n' bytes) + # in the buffer or until the buffer is closed, i.e. no more + # writes will come. + log.info("[%s] IOBuffer.readline(n=%r)" % (self._name, n)) + + log.info("[%s] IOBuffer.readline(): wait for data" % self._name) + while 1: + if self._closed: break + if self._haveLine(): break + if n is not None and self._haveNumBytes(n): break + self._stateChange.acquire() + self._stateChange.wait() + self._stateChange.release() + log.info("[%s] IOBuffer.readline(): done waiting for data"\ + % self._name) + + self._mutex.acquire() + retval = self._doReadline(n) + self._mutex.release() + return retval + + def readlines(self): + lines = [] + while 1: + line = self.readline() + if line: + lines.append(line) + else: + break + return lines + + def _doClose(self): + pass + + def close(self): + if not self._closed: + log.info("[%s] IOBuffer.close()" % self._name) + self._doClose() + self._closed = 1 + self._stateChange.acquire() + self._stateChange.notifyAll() # Notify of the close(). + self._stateChange.release() + + def flush(self): + log.info("[%s] IOBuffer.flush()" % self._name) + #XXX Perhaps flush() should unwedged possible waiting .read() + # and .readline() calls that are waiting for more data??? + + +class _InFileProxy(threading.Thread): + """A thread to proxy stdin.write()'s from the parent to the child.""" + def __init__(self, fParent, fChild, name=None): + """ + "fParent" is a Python file-like object setup for writing. + "fChild" is a Win32 handle to the a child process' output pipe. + "name" can be set for debugging, it will be used in log messages. + """ + log.info("[%s, %s] _InFileProxy.__init__(fChild=%r, fParent=%r)", + name, id(self), fChild, fParent) + threading.Thread.__init__(self, name=name) + self.fChild = fChild + self.fParent = fParent + + def run(self): + log.info("[%s] _InFileProxy: start" % self.getName()) + try: + self._proxyFromParentToChild() + finally: + log.info("[%s] _InFileProxy: closing parent (%r)"\ + % (self.getName(), self.fParent)) + try: + self.fParent.close() + except IOError: + pass # Ignore: IOError: [Errno 4] Interrupted system call + log.info("[%s] _InFileProxy: done" % self.getName()) + + def _proxyFromParentToChild(self): + CHUNKSIZE = 4096 + # Read output from the child process, and (for now) just write + # it out. + while 1: + log.info("[%s] _InFileProxy: waiting for read on parent (%r)"\ + % (self.getName(), self.fParent)) + # XXX Get hangs here (!) even with + # self.stdin.close() in ProcessProxy' __del__() under this + # cond: + # p = ProcessProxy([...], stdin=sys.stdin) + # The user must manually send '\n' via or EOF + # via to unlock this. How to get around that? + # See cleanOnTermination note in _OutFileProxy.run() + # below. + #log.debug("XXX -> start read on %r" % self.fParent) + try: + text = self.fParent.read(CHUNKSIZE) + except ValueError, ex: + # ValueError is raised with trying to write to a closed + # file/pipe. + text = None + #log.debug("XXX <- done read on %r" % self.fParent) + if not text: + # Empty text signifies that the pipe has been closed on + # the parent's end. + log.info("[%s] _InFileProxy: observed close of parent (%r)"\ + % (self.getName(), self.fParent)) + # Signal the child so it knows to stop listening. + try: + logres.info("[%s] _InFileProxy: closing child after "\ + "observing parent's close: %r", self.getName(), + self.fChild) + try: + self.fChild.close() + except IOError: + pass # Ignore: IOError: [Errno 4] Interrupted system call + except IOError, ex: + # Ignore: IOError: [Errno 9] Bad file descriptor + # XXX Do we *know* we want to do that? + pass + break + else: + log.info("[%s] _InFileProxy: read %d bytes from parent: %r"\ + % (self.getName(), len(text), text)) + + log.info("[%s, %s] _InFileProxy: writing %r to child (%r)", + self.getName(), id(self), text, self.fChild) + try: + self.fChild.write(text) + except (OSError, IOError), ex: + # Ignore errors for now. For example: + # - Get this on Win9x when writing multiple lines to "dir": + # OSError: [Errno 32] Broken pipe + #XXX There *may* be errors we don't want to avoid. + #XXX Should maybe just ignore EnvironmentError (base class). + log.info("[%s] _InFileProxy: error writing to child (%r), "\ + "closing: %s" % (self.getName(), self.fParent, ex)) + break + log.info("[%s] _InFileProxy: wrote %d bytes to child: %r"\ + % (self.getName(), len(text), text)) + + +class _OutFileProxy(threading.Thread): + """A thread to watch an "out" file from the spawned child process + and pass on write's to the parent. + """ + def __init__(self, fChild, fParent, toClose=[], name=None): + """ + "fChild" is a Win32 handle to the a child process' output pipe. + "fParent" is a Python file-like object setup for writing. + "toClose" is a list of objects on which to call .close when this + proxy is terminating. + "name" can be set for debugging, it will be used in log messages. + """ + log.info("[%s] _OutFileProxy.__init__(fChild=%r, fParent=%r, "\ + "toClose=%r)", name, fChild, fParent, toClose) + threading.Thread.__init__(self, name=name) + self.fChild = fChild + self.fParent = fParent + self.toClose = toClose + + def run(self): + log.info("[%s] _OutFileProxy: start" % self.getName()) + try: + self._proxyFromChildToParent() + finally: + logres.info("[%s] _OutFileProxy: terminating, close child (%r)", + self.getName(), self.fChild) + try: + self.fChild.close() + except IOError: + pass # Ignore: IOError: [Errno 4] Interrupted system call + log.info("[%s] _OutFileProxy: closing parent (%r)", + self.getName(), self.fParent) + try: + self.fParent.close() + except IOError: + pass # Ignore: IOError: [Errno 4] Interrupted system call + while self.toClose: + logres.info("[%s] _OutFileProxy: closing %r after "\ + "closing parent", self.getName(), self.toClose[0]) + try: + self.toClose[0].close() + except IOError: + pass # Ignore: IOError: [Errno 4] Interrupted system call + del self.toClose[0] + log.info("[%s] _OutFileProxy: done" % self.getName()) + + def _proxyFromChildToParent(self): + CHUNKSIZE = 4096 + # Read output from the child process, and (for now) just write + # it out. + while 1: + text = None + try: + log.info("[%s] _OutFileProxy: waiting for read on child (%r)"\ + % (self.getName(), self.fChild)) + text = self.fChild.read(CHUNKSIZE) + except IOError, ex: + # Ignore: IOError: [Errno 9] Bad file descriptor + # XXX Do we *know* we want to do that? + log.info("[%s] _OutFileProxy: error reading from child (%r), "\ + "shutting down: %s", self.getName(), self.fChild, ex) + break + if not text: + # Empty text signifies that the pipe has been closed on + # the child's end. + log.info("[%s] _OutFileProxy: observed close of child (%r)"\ + % (self.getName(), self.fChild)) + break + + log.info("[%s] _OutFileProxy: text(len=%d): %r", + self.getName(), len(text), text) + self.fParent.write(text) + + + +if sys.platform.startswith("linux"): + class _ThreadFixer: + """Mixin class for various classes in the Process hierarchy to + work around the known LinuxThreads bug where one cannot .wait() + on a created process from a subthread of the thread that created + the process. + + Usage: + class ProcessXXX(_ThreadFixer, BrokenProcessXXX): + _pclass = BrokenProcessXXX + + Details: + Because we must do all real os.wait() calls on the child + process from the thread that spawned it, we use a proxy + thread whose only responsibility is just that. The proxy + thread just starts the child and then immediately wait's for + the child to terminate. On termination is stores the exit + status (for use by the main thread) and notifies any thread + waiting for this termination (possibly the main thread). The + overriden .wait() uses this stored exit status and the + termination notification to simulate the .wait(). + """ + def __init__(self, *args, **kwargs): + # Keep a reference to 'log' ensure it is around for this object's + # destruction. + self.__log = log + self.__waiter = None + self.__hasTerminated = threading.Condition() + self.__terminationResult = None + self.__childStarted = threading.Condition() + self._pclass.__init__(self, *args, **kwargs) + + def _forkAndExecChildOnUnix(self, *args, **kwargs): + """Fork and start the child process do it in a special subthread + that will negotiate subsequent .wait()'s. + + Sets self._pid as a side effect. + """ + self.__waiter = threading.Thread( + target=self.__launchAndWait, args=args, kwargs=kwargs) + + # Start subthread that will launch child and wait until it + # *has* started. + self.__childStarted.acquire() + self.__waiter.start() + self.__childStarted.wait() + self.__childStarted.release() + + def __launchAndWait(self, *args, **kwargs): + """Launch the given command and wait for it to terminate. + + When the process has terminated then store its exit value + and finish. + """ + logfix.info("start child in thread %s", + threading.currentThread().getName()) + + # Spawn the child process and notify the main thread of + # this. + self.__childStarted.acquire() + self._pclass._forkAndExecChildOnUnix(self, *args, **kwargs) + self.__childStarted.notifyAll() + self.__childStarted.release() + + # Wait on the thread and store appropriate results when + # finished. + try: + waitResult = self._pclass.wait(self) + except ProcessError, ex: + waitResult = ex + self.__hasTerminated.acquire() + self.__terminationResult = waitResult + self.__hasTerminated.notifyAll() + self.__hasTerminated.release() + + self.__waiter = None # drop ref that would keep instance alive + + def wait(self, timeout=None): + # If the process __hasTerminated then return the exit + # status. Otherwise simulate the wait as appropriate. + # Note: + # - This class is only used on linux so 'timeout' has the + # Unix 'timeout' semantics. + self.__hasTerminated.acquire() + if self.__terminationResult is None: + if timeout == os.WNOHANG: # Poll. + self.__hasTerminated.wait(0) + else: # Block until process finishes. + self.__hasTerminated.wait() + terminationResult = self.__terminationResult + self.__hasTerminated.release() + + if terminationResult is None: + # process has not finished yet + raise ProcessError("Wait for process timed out.", + self.WAIT_TIMEOUT) + elif isinstance(terminationResult, Exception): + # some error waiting for process termination + raise terminationResult + else: + # the process terminated + return terminationResult + + _ThreadBrokenProcess = Process + class Process(_ThreadFixer, _ThreadBrokenProcess): + _pclass = _ThreadBrokenProcess + + _ThreadBrokenProcessOpen = ProcessOpen + class ProcessOpen(_ThreadFixer, _ThreadBrokenProcessOpen): + _pclass = _ThreadBrokenProcessOpen + + _ThreadBrokenProcessProxy = ProcessProxy + class ProcessProxy(_ThreadFixer, _ThreadBrokenProcessProxy): + _pclass = _ThreadBrokenProcessProxy + +