diff --git a/test/basetest/exceptions.py b/test/basetest/exceptions.py index f1e227975..c960442bc 100644 --- a/test/basetest/exceptions.py +++ b/test/basetest/exceptions.py @@ -1,23 +1,28 @@ # -*- coding: utf-8 -*- import signal +sig_names = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items())) + if v.startswith('SIG') and not v.startswith('SIG_')) + class CommandError(Exception): def __init__(self, cmd, code, out, err=None, msg=None): + DEFAULT = ("Command '{{0}}' was {signal}'ed. " + "SIGABRT usually means task timed out.\n") if msg is None: msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n" if err is not None: msg_suffix += ( "\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n" ) - if code == -signal.SIGABRT: - self.msg = ("Command '{0}' was aborted, likely due to not " - "finishing in due time. The exit code was '{1}'.\n" - ) + msg_suffix + + if code < 0: + self.msg = DEFAULT.format(signal=sig_names[abs(code)]) else: self.msg = ("Command '{0}' finished with unexpected exit " - "code '{1}'.\n" - ) + msg_suffix + "code '{1}'.\n") + + self.msg += msg_suffix else: self.msg = msg @@ -34,7 +39,7 @@ class HookError(Exception): pass -class TimeoutWaitingForStream(object): +class TimeoutWaitingFor(object): def __init__(self, name): self.name = name diff --git a/test/basetest/utils.py b/test/basetest/utils.py index 6caec070f..6f93640d5 100644 --- a/test/basetest/utils.py +++ b/test/basetest/utils.py @@ -13,7 +13,7 @@ try: import simplejson as json except ImportError: import json -from .exceptions import CommandError, TimeoutWaitingForStream +from .exceptions import CommandError, TimeoutWaitingFor USED_PORTS = set() ON_POSIX = 'posix' in sys.builtin_module_names @@ -54,76 +54,140 @@ def binary_location(cmd): return os.path.join(BIN_PREFIX, cmd) -def wait_process(proc, timeout=1): - """Wait for process to finish +def wait_condition(cond, timeout=1): + """Wait for condition to return anything other than None """ if timeout is None: timeout = 1 sleeptime = .1 + + if timeout < sleeptime: + print("Warning, timeout cannot be smaller than", sleeptime) + timeout = sleeptime + # Max number of attempts until giving up tries = int(timeout / sleeptime) # Wait for up to a second for the process to finish and avoid zombies for i in range(tries): - exit = proc.poll() + val = cond() - if exit is not None: + if val is not None: break sleep(sleeptime) - return exit + return val -def _get_output(proc, input, timeout=None): +def wait_process(pid, timeout=None): + """Wait for process to finish + """ + def process(): + try: + os.kill(pid, 0) + except OSError: + # Process is dead + return True + else: + # Process is still ticking + return None + + return wait_condition(process, timeout) + + +def _queue_output(arguments, pidq, outputq): + """Read/Write output/input of given process. + This function is meant to be executed in a thread as it may block + """ + kwargs = arguments["process"] + input = arguments["input"] + + proc = Popen(**kwargs) + + # NOTE If for whatever reason pid is None at the time of access, use the + # following line instead + # pid = wait_condition(lambda: proc.pid) + pid = proc.pid + # Put the PID in the queue for main process to know + pidq.put(pid) + + # Send input and wait for finish + out, err = proc.communicate(input) + + # Give the output back to the caller + outputq.put((out, err, proc.returncode)) + + +def _retrieve_output(thread, timeout, queue, thread_error): + """Fetch output from taskw subprocess queues + """ + # Try to join the thread on failure abort + thread.join(timeout) + if thread.isAlive(): + # Join should have killed the thread. This is unexpected + raise TimeoutWaitingFor(thread_error + ". Unexpected error") + + # Thread died so we should have output + try: + # data = (stdout, stderr, exitcode) + data = queue.get(timeout=timeout) + except Empty: + data = TimeoutWaitingFor("streams from TaskWarrior") + + return data + + +def _get_output(arguments, timeout=None): """Collect output from the subprocess without blocking the main process if subprocess hangs. """ - def queue_output(proc, input, outq, errq): - """Read/Write output/input of given process. - This function is meant to be executed in a thread as it may block - """ - # Send input and wait for finish - out, err = proc.communicate(input) - # Give the output back to the caller - outq.put(out) - errq.put(err) + # NOTE Increase this value if tests fail with None being received as + # stdout/stderr instead of the expected content + output_timeout = 0.1 # seconds - outq = Queue() - errq = Queue() + pidq = Queue() + outputq = Queue() - t = Thread(target=queue_output, args=(proc, input, outq, errq)) + t = Thread(target=_queue_output, args=(arguments, pidq, outputq)) t.daemon = True t.start() - # A task process shouldn't take longer than 1 second to finish - exit = wait_process(proc, timeout) + try: + pid = pidq.get(timeout=timeout) + except Empty: + return _retrieve_output(t, output_timeout, outputq, + "TaskWarrior to start") - # If it does take longer than 1 second, abort it - if exit is None: + # Wait for process to finish (normal execution) + state = wait_process(pid, timeout) + + if state: + # Process finished + return _retrieve_output(t, output_timeout, outputq, + "TaskWarrior thread to join") + + # If we reach this point we assume the process got stuck or timed out + for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL): + # Start with lower signals and escalate if process ignores them try: - proc.send_signal(signal.SIGABRT) + os.kill(pid, signal.SIGABRT) except OSError as e: # 3 means the process finished/died between last check and now if e.errno != 3: raise - exit = wait_process(proc) - # NOTE Increase this value if tests fail with None being received as - # stdout/stderr instead of the expected content - timeout = 0.1 # seconds + # Wait for process to finish (should die/exit after signal) + state = wait_process(pid, timeout) - try: - out = outq.get(timeout=timeout) - except Empty: - out = TimeoutWaitingForStream("stdout") - try: - err = errq.get(timeout=timeout) - except Empty: - err = TimeoutWaitingForStream("stderr") + if state: + # Process finished + return _retrieve_output(t, output_timeout, outputq, + "TaskWarrior to die") - return out, err, exit + # This should never happen but in case something goes really bad + raise OSError("TaskWarrior stopped responding and couldn't be killed") def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE, @@ -140,9 +204,19 @@ def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE, else: stderr = PIPE - p = Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=1, - close_fds=ON_POSIX, env=env) - out, err, exit = _get_output(p, input, timeout) + arguments = { + "process": { + "args": cmd, + "stdin": stdin, + "stdout": stdout, + "stderr": stderr, + "bufsize": 1, + "close_fds": ON_POSIX, + "env": env, + }, + "input": input, + } + out, err, exit = _get_output(arguments, timeout) if merge_streams: if exit != 0: