microproduct/atmosphericDelay/ISCEApp/site-packages/tox/action.py

281 lines
11 KiB
Python

from __future__ import absolute_import, unicode_literals
import os
import pipes
import signal
import subprocess
import sys
import time
from contextlib import contextmanager
from threading import Thread
import py
from tox import reporter
from tox.constants import INFO
from tox.exception import InvocationError
from tox.reporter import Verbosity
from tox.util.lock import get_unique_file
from tox.util.stdlib import is_main_thread
class Action(object):
"""Action is an effort to group operations with the same goal (within reporting)"""
def __init__(
self,
name,
msg,
args,
log_dir,
generate_tox_log,
command_log,
popen,
python,
suicide_timeout,
interrupt_timeout,
terminate_timeout,
):
self.name = name
self.args = args
self.msg = msg
self.activity = self.msg.split(" ", 1)[0]
self.log_dir = log_dir
self.generate_tox_log = generate_tox_log
self.via_popen = popen
self.command_log = command_log
self._timed_report = None
self.python = python
self.suicide_timeout = suicide_timeout
self.interrupt_timeout = interrupt_timeout
self.terminate_timeout = terminate_timeout
def __enter__(self):
msg = "{} {}".format(self.msg, " ".join(map(str, self.args)))
self._timed_report = reporter.timed_operation(self.name, msg)
self._timed_report.__enter__()
return self
def __exit__(self, type, value, traceback):
self._timed_report.__exit__(type, value, traceback)
def setactivity(self, name, msg):
self.activity = name
if msg:
reporter.verbosity0("{} {}: {}".format(self.name, name, msg), bold=True)
else:
reporter.verbosity1("{} {}: {}".format(self.name, name, msg), bold=True)
def info(self, name, msg):
reporter.verbosity1("{} {}: {}".format(self.name, name, msg), bold=True)
def popen(
self,
args,
cwd=None,
env=None,
redirect=True,
returnout=False,
ignore_ret=False,
capture_err=True,
callback=None,
report_fail=True,
):
"""this drives an interaction with a subprocess"""
cwd = py.path.local() if cwd is None else cwd
cmd_args = [str(x) for x in self._rewrite_args(cwd, args)]
cmd_args_shell = " ".join(pipes.quote(i) for i in cmd_args)
stream_getter = self._get_standard_streams(
capture_err,
cmd_args_shell,
redirect,
returnout,
cwd,
)
exit_code, output = None, None
with stream_getter as (fin, out_path, stderr, stdout):
try:
process = self.via_popen(
cmd_args,
stdout=stdout,
stderr=stderr,
cwd=str(cwd),
env=os.environ.copy() if env is None else env,
universal_newlines=True,
shell=False,
creationflags=(
subprocess.CREATE_NEW_PROCESS_GROUP
if sys.platform == "win32"
else 0
# needed for Windows signal send ability (CTRL+C)
),
)
except OSError as exception:
exit_code = exception.errno
else:
if callback is not None:
callback(process)
reporter.log_popen(cwd, out_path, cmd_args_shell, process.pid)
output = self.evaluate_cmd(fin, process, redirect)
exit_code = process.returncode
finally:
if out_path is not None and out_path.exists():
lines = out_path.read_text("UTF-8").split("\n")
# first three lines are the action, cwd, and cmd - remove it
output = "\n".join(lines[3:])
try:
if exit_code and not ignore_ret:
if report_fail:
msg = "invocation failed (exit code {:d})".format(exit_code)
if out_path is not None:
msg += ", logfile: {}".format(out_path)
if not out_path.exists():
msg += " warning log file missing"
reporter.error(msg)
if out_path is not None and out_path.exists():
reporter.separator("=", "log start", Verbosity.QUIET)
reporter.quiet(output)
reporter.separator("=", "log end", Verbosity.QUIET)
raise InvocationError(cmd_args_shell, exit_code, output)
finally:
self.command_log.add_command(cmd_args, output, exit_code)
return output
def evaluate_cmd(self, input_file_handler, process, redirect):
try:
if self.generate_tox_log and not redirect:
if process.stderr is not None:
# prevent deadlock
raise ValueError("stderr must not be piped here")
# we read binary from the process and must write using a binary stream
buf = getattr(sys.stdout, "buffer", sys.stdout)
last_time = time.time()
while True:
# we have to read one byte at a time, otherwise there
# might be no output for a long time with slow tests
data = input_file_handler.read(1)
if data:
buf.write(data)
if b"\n" in data or (time.time() - last_time) > 1:
# we flush on newlines or after 1 second to
# provide quick enough feedback to the user
# when printing a dot per test
buf.flush()
last_time = time.time()
elif process.poll() is not None:
if process.stdout is not None:
process.stdout.close()
break
else:
time.sleep(0.1)
# the seek updates internal read buffers
input_file_handler.seek(0, 1)
input_file_handler.close()
out, _ = process.communicate() # wait to finish
except KeyboardInterrupt as exception:
reporter.error("got KeyboardInterrupt signal")
main_thread = is_main_thread()
while True:
try:
if main_thread:
# spin up a new thread to disable further interrupt on main thread
stopper = Thread(target=self.handle_interrupt, args=(process,))
stopper.start()
stopper.join()
else:
self.handle_interrupt(process)
except KeyboardInterrupt:
continue
break
raise exception
return out
def handle_interrupt(self, process):
"""A three level stop mechanism for children - INT -> TERM -> KILL"""
msg = "from {} {{}} pid {}".format(os.getpid(), process.pid)
if self._wait(process, self.suicide_timeout) is None:
self.info("KeyboardInterrupt", msg.format("SIGINT"))
process.send_signal(signal.CTRL_C_EVENT if sys.platform == "win32" else signal.SIGINT)
if self._wait(process, self.interrupt_timeout) is None:
self.info("KeyboardInterrupt", msg.format("SIGTERM"))
process.terminate()
if self._wait(process, self.terminate_timeout) is None:
self.info("KeyboardInterrupt", msg.format("SIGKILL"))
process.kill()
process.communicate()
@staticmethod
def _wait(process, timeout):
if sys.version_info >= (3, 3):
# python 3 has timeout feature built-in
try:
process.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
pass
else:
# on Python 2 we need to simulate it
delay = 0.01
while process.poll() is None and timeout > 0:
time.sleep(delay)
timeout -= delay
return process.poll()
@contextmanager
def _get_standard_streams(self, capture_err, cmd_args_shell, redirect, returnout, cwd):
stdout = out_path = input_file_handler = None
stderr = subprocess.STDOUT if capture_err else None
if self.generate_tox_log or redirect:
out_path = self.get_log_path(self.name)
with out_path.open("wt") as stdout, out_path.open("rb") as input_file_handler:
msg = "action: {}, msg: {}\ncwd: {}\ncmd: {}\n".format(
self.name.replace("\n", " "),
self.msg.replace("\n", " "),
str(cwd).replace("\n", " "),
cmd_args_shell.replace("\n", " "),
)
stdout.write(msg)
stdout.flush()
input_file_handler.read() # read the header, so it won't be written to stdout
yield input_file_handler, out_path, stderr, stdout
return
if returnout:
stdout = subprocess.PIPE
yield input_file_handler, out_path, stderr, stdout
def get_log_path(self, actionid):
log_file = get_unique_file(self.log_dir, prefix=actionid, suffix=".log")
return log_file
def _rewrite_args(self, cwd, args):
executable = None
if INFO.IS_WIN:
# shebang lines are not adhered on Windows so if it's a python script
# pre-pend the interpreter
ext = os.path.splitext(str(args[0]))[1].lower()
if ext == ".py":
executable = str(self.python)
if executable is None:
executable = args[0]
args = args[1:]
new_args = [executable]
# to make the command shorter try to use relative paths for all subsequent arguments
# note the executable cannot be relative as the Windows applies cwd after invocation
for arg in args:
if arg and os.path.isabs(str(arg)):
arg_path = py.path.local(arg)
if arg_path.exists() and arg_path.common(cwd) is not None:
potential_arg = cwd.bestrelpath(arg_path)
if len(potential_arg.split("..")) < 2:
# just one parent directory accepted as relative path
arg = potential_arg
new_args.append(str(arg))
return new_args