Tests - Rework how taskw is launched from python

* Should avoid some odd race conditions
This commit is contained in:
Renato Alves
2015-02-16 01:21:04 +00:00
parent 880ab5d665
commit 1e1bd32c42
2 changed files with 126 additions and 47 deletions

View File

@@ -1,23 +1,28 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import signal import signal
sig_names = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items()))
if v.startswith('SIG') and not v.startswith('SIG_'))
class CommandError(Exception): class CommandError(Exception):
def __init__(self, cmd, code, out, err=None, msg=None): def __init__(self, cmd, code, out, err=None, msg=None):
DEFAULT = ("Command '{{0}}' was {signal}'ed. "
"SIGABRT usually means task timed out.\n")
if msg is None: if msg is None:
msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n" msg_suffix = "\n*** Start STDOUT ***\n{2}\n*** End STDOUT ***\n"
if err is not None: if err is not None:
msg_suffix += ( msg_suffix += (
"\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n" "\n*** Start STDERR ***\n{3}\n*** End STDERR ***\n"
) )
if code == -signal.SIGABRT:
self.msg = ("Command '{0}' was aborted, likely due to not " if code < 0:
"finishing in due time. The exit code was '{1}'.\n" self.msg = DEFAULT.format(signal=sig_names[abs(code)])
) + msg_suffix
else: else:
self.msg = ("Command '{0}' finished with unexpected exit " self.msg = ("Command '{0}' finished with unexpected exit "
"code '{1}'.\n" "code '{1}'.\n")
) + msg_suffix
self.msg += msg_suffix
else: else:
self.msg = msg self.msg = msg
@@ -34,7 +39,7 @@ class HookError(Exception):
pass pass
class TimeoutWaitingForStream(object): class TimeoutWaitingFor(object):
def __init__(self, name): def __init__(self, name):
self.name = name self.name = name

View File

@@ -13,7 +13,7 @@ try:
import simplejson as json import simplejson as json
except ImportError: except ImportError:
import json import json
from .exceptions import CommandError, TimeoutWaitingForStream from .exceptions import CommandError, TimeoutWaitingFor
USED_PORTS = set() USED_PORTS = set()
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
@@ -54,76 +54,140 @@ def binary_location(cmd):
return os.path.join(BIN_PREFIX, cmd) return os.path.join(BIN_PREFIX, cmd)
def wait_process(proc, timeout=1): def wait_condition(cond, timeout=1):
"""Wait for process to finish """Wait for condition to return anything other than None
""" """
if timeout is None: if timeout is None:
timeout = 1 timeout = 1
sleeptime = .1 sleeptime = .1
if timeout < sleeptime:
print("Warning, timeout cannot be smaller than", sleeptime)
timeout = sleeptime
# Max number of attempts until giving up # Max number of attempts until giving up
tries = int(timeout / sleeptime) tries = int(timeout / sleeptime)
# Wait for up to a second for the process to finish and avoid zombies # Wait for up to a second for the process to finish and avoid zombies
for i in range(tries): for i in range(tries):
exit = proc.poll() val = cond()
if exit is not None: if val is not None:
break break
sleep(sleeptime) sleep(sleeptime)
return exit return val
def _get_output(proc, input, timeout=None): def wait_process(pid, timeout=None):
"""Wait for process to finish
"""
def process():
try:
os.kill(pid, 0)
except OSError:
# Process is dead
return True
else:
# Process is still ticking
return None
return wait_condition(process, timeout)
def _queue_output(arguments, pidq, outputq):
"""Read/Write output/input of given process.
This function is meant to be executed in a thread as it may block
"""
kwargs = arguments["process"]
input = arguments["input"]
proc = Popen(**kwargs)
# NOTE If for whatever reason pid is None at the time of access, use the
# following line instead
# pid = wait_condition(lambda: proc.pid)
pid = proc.pid
# Put the PID in the queue for main process to know
pidq.put(pid)
# Send input and wait for finish
out, err = proc.communicate(input)
# Give the output back to the caller
outputq.put((out, err, proc.returncode))
def _retrieve_output(thread, timeout, queue, thread_error):
"""Fetch output from taskw subprocess queues
"""
# Try to join the thread on failure abort
thread.join(timeout)
if thread.isAlive():
# Join should have killed the thread. This is unexpected
raise TimeoutWaitingFor(thread_error + ". Unexpected error")
# Thread died so we should have output
try:
# data = (stdout, stderr, exitcode)
data = queue.get(timeout=timeout)
except Empty:
data = TimeoutWaitingFor("streams from TaskWarrior")
return data
def _get_output(arguments, timeout=None):
"""Collect output from the subprocess without blocking the main process if """Collect output from the subprocess without blocking the main process if
subprocess hangs. subprocess hangs.
""" """
def queue_output(proc, input, outq, errq): # NOTE Increase this value if tests fail with None being received as
"""Read/Write output/input of given process. # stdout/stderr instead of the expected content
This function is meant to be executed in a thread as it may block output_timeout = 0.1 # seconds
"""
# Send input and wait for finish
out, err = proc.communicate(input)
# Give the output back to the caller
outq.put(out)
errq.put(err)
outq = Queue() pidq = Queue()
errq = Queue() outputq = Queue()
t = Thread(target=queue_output, args=(proc, input, outq, errq)) t = Thread(target=_queue_output, args=(arguments, pidq, outputq))
t.daemon = True t.daemon = True
t.start() t.start()
# A task process shouldn't take longer than 1 second to finish try:
exit = wait_process(proc, timeout) pid = pidq.get(timeout=timeout)
except Empty:
return _retrieve_output(t, output_timeout, outputq,
"TaskWarrior to start")
# If it does take longer than 1 second, abort it # Wait for process to finish (normal execution)
if exit is None: state = wait_process(pid, timeout)
if state:
# Process finished
return _retrieve_output(t, output_timeout, outputq,
"TaskWarrior thread to join")
# If we reach this point we assume the process got stuck or timed out
for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL):
# Start with lower signals and escalate if process ignores them
try: try:
proc.send_signal(signal.SIGABRT) os.kill(pid, signal.SIGABRT)
except OSError as e: except OSError as e:
# 3 means the process finished/died between last check and now # 3 means the process finished/died between last check and now
if e.errno != 3: if e.errno != 3:
raise raise
exit = wait_process(proc)
# NOTE Increase this value if tests fail with None being received as # Wait for process to finish (should die/exit after signal)
# stdout/stderr instead of the expected content state = wait_process(pid, timeout)
timeout = 0.1 # seconds
try: if state:
out = outq.get(timeout=timeout) # Process finished
except Empty: return _retrieve_output(t, output_timeout, outputq,
out = TimeoutWaitingForStream("stdout") "TaskWarrior to die")
try:
err = errq.get(timeout=timeout)
except Empty:
err = TimeoutWaitingForStream("stderr")
return out, err, exit # This should never happen but in case something goes really bad
raise OSError("TaskWarrior stopped responding and couldn't be killed")
def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE, def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
@@ -140,9 +204,19 @@ def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
else: else:
stderr = PIPE stderr = PIPE
p = Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=1, arguments = {
close_fds=ON_POSIX, env=env) "process": {
out, err, exit = _get_output(p, input, timeout) "args": cmd,
"stdin": stdin,
"stdout": stdout,
"stderr": stderr,
"bufsize": 1,
"close_fds": ON_POSIX,
"env": env,
},
"input": input,
}
out, err, exit = _get_output(arguments, timeout)
if merge_streams: if merge_streams:
if exit != 0: if exit != 0: