* fix path to task executable in pyton tests The current approach would copy the current files into the `build/test` directory. Updating the paths according to the custom user setup. By the copy I appended `.py` to have a clear visible distingtion which ones are the python tests. As soon as a source file in the normal directory is changed, it is copied over and the corresponding file is updated. From now on the python tests would need to get run in the according build directory. * reflect the current build instruction in PR template * update paths and globing in run_all * add line break for every cpp test * remove .gitignore in test folder As now all the auxillary files such as `all.log` as well as the executables are present in the `build` directory there is no longer a need to ignore them. * update paths in python test scripts and enable deactivated * remove .py extension when copy to build Further remove glob pattern for `*.t.py` tests. * remove accidentally added template.t from test files
240 lines
6.8 KiB
Python
Executable File
240 lines
6.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import print_function
|
|
import os
|
|
import sys
|
|
import glob
|
|
import argparse
|
|
import logging
|
|
import time
|
|
from multiprocessing import cpu_count
|
|
from threading import Thread
|
|
from subprocess import call, Popen, PIPE
|
|
|
|
if sys.version_info > (3,):
|
|
import codecs
|
|
|
|
try:
|
|
# python 2
|
|
from Queue import Queue, Empty
|
|
except ImportError:
|
|
# python 3
|
|
from queue import Queue, Empty
|
|
|
|
TIMEOUT = .2
|
|
|
|
|
|
def run_test(testqueue, outqueue, threadname):
|
|
start = time.time()
|
|
while True:
|
|
try:
|
|
test = testqueue.get(block=True, timeout=TIMEOUT)
|
|
except Empty:
|
|
break
|
|
|
|
log.info("Running test %s", test)
|
|
|
|
try:
|
|
p = Popen(os.path.abspath(test), stdout=PIPE, stderr=PIPE,
|
|
env=os.environ)
|
|
out, err = p.communicate()
|
|
except Exception as e:
|
|
log.exception(e)
|
|
# Premature end
|
|
break
|
|
|
|
if sys.version_info > (3,):
|
|
out, err = out.decode('utf-8'), err.decode('utf-8')
|
|
|
|
output = ("# {0}\n".format(os.path.basename(test)), out, err)
|
|
log.debug("Collected output %s", output)
|
|
outqueue.put(output)
|
|
|
|
testqueue.task_done()
|
|
|
|
log.warning("Finished %s thread after %s seconds",
|
|
threadname, round(time.time() - start, 3))
|
|
|
|
|
|
class TestRunner(object):
|
|
def __init__(self):
|
|
self.threads = []
|
|
if sys.version_info > (3,):
|
|
self.tap = open(cmd_args.tapfile, 'w', errors='ignore')
|
|
else:
|
|
self.tap = open(cmd_args.tapfile, 'w')
|
|
|
|
self._parallelq = Queue()
|
|
self._serialq = Queue()
|
|
self._outputq = Queue()
|
|
|
|
def _find_tests(self):
|
|
for test in glob.glob("*.t"):
|
|
if os.access(test, os.X_OK):
|
|
# Executables only
|
|
if self._is_parallelizable(test):
|
|
log.debug("Treating as parallel: %s", test)
|
|
self._parallelq.put(test)
|
|
else:
|
|
log.debug("Treating as serial: %s", test)
|
|
self._serialq.put(test)
|
|
else:
|
|
log.debug("Ignored test %s as it is not executable", test)
|
|
|
|
log.info("Parallel tests: %s", self._parallelq.qsize())
|
|
log.info("Serial tests: %s", self._serialq.qsize())
|
|
|
|
def _prepare_threads(self):
|
|
# Serial thread
|
|
self.threads.append(
|
|
Thread(target=run_test, args=(self._serialq, self._outputq, "Serial"))
|
|
)
|
|
# Parallel threads
|
|
self.threads.extend([
|
|
Thread(target=run_test, args=(self._parallelq, self._outputq, "Parallel"))
|
|
for i in range(cpu_count())
|
|
])
|
|
log.info("Spawned %s threads to run tests", len(self.threads))
|
|
|
|
def _start_threads(self):
|
|
for thread in self.threads:
|
|
# Threads die when main thread dies
|
|
log.debug("Starting thread %s", thread)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _print_timestamp_to_tap(self):
|
|
now = time.time()
|
|
timestamp = "# {0} ==> {1}\n".format(
|
|
now,
|
|
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now)),
|
|
)
|
|
|
|
log.debug("Adding timestamp %s to TAP file", timestamp)
|
|
self.tap.write(timestamp)
|
|
|
|
def _is_parallelizable(self, test):
|
|
if cmd_args.serial:
|
|
return False
|
|
|
|
# This is a pretty weird way to do it, and not realiable.
|
|
# We are dealing with some binary tests though.
|
|
with open(test, 'rb') as fh:
|
|
header = fh.read(100).split(b"\n")
|
|
if len(header) >= 2 and \
|
|
((b"!#/usr/bin/env python3" in header[0]) or \
|
|
(header[1][-14:] == b"bash_tap_tw.sh")):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def _get_remaining_tests(self):
|
|
return self._parallelq.qsize() + self._serialq.qsize()
|
|
|
|
def is_running(self):
|
|
for thread in self.threads:
|
|
if thread.is_alive():
|
|
return True
|
|
|
|
return False
|
|
|
|
def start(self):
|
|
self._find_tests()
|
|
self._prepare_threads()
|
|
|
|
self._print_timestamp_to_tap()
|
|
|
|
finished = 0
|
|
total = self._get_remaining_tests()
|
|
|
|
self._start_threads()
|
|
|
|
while self.is_running() or not self._outputq.empty():
|
|
try:
|
|
outputs = self._outputq.get(block=True, timeout=TIMEOUT)
|
|
except Empty:
|
|
continue
|
|
|
|
log.debug("Outputting to TAP: %s", outputs)
|
|
|
|
for output in outputs:
|
|
self.tap.write(output)
|
|
|
|
if cmd_args.verbose:
|
|
sys.stdout.write(output)
|
|
|
|
self._outputq.task_done()
|
|
finished += 1
|
|
|
|
log.warning("Finished %s out of %s tests", finished, total)
|
|
|
|
self._print_timestamp_to_tap()
|
|
|
|
if not self._parallelq.empty() or not self._serialq.empty():
|
|
raise RuntimeError(
|
|
"Something went wrong, not all tests were ran. {0} "
|
|
"remaining.".format(self._get_remaining_tests()))
|
|
|
|
def show_report(self):
|
|
self.tap.flush()
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
|
|
log.debug("Calling 'problems --summary' for report")
|
|
return call([os.path.abspath("problems"), "--summary", cmd_args.tapfile])
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Run Taskwarrior tests")
|
|
parser.add_argument('--verbose', '-v', action="store_true",
|
|
help="Also send TAP output to stdout")
|
|
parser.add_argument('--logging', '-l', action="count",
|
|
default=0,
|
|
help="Logging level. -lll is the highest level")
|
|
parser.add_argument('--serial', action="store_true",
|
|
help="Do not run tests in parallel")
|
|
parser.add_argument('--tapfile', default="all.log",
|
|
help="File to use for TAP output")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
if sys.version_info > (3,):
|
|
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
|
|
|
|
runner = TestRunner()
|
|
runner.start()
|
|
|
|
# Propagate the return code
|
|
return runner.show_report()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cmd_args = parse_args()
|
|
|
|
if cmd_args.logging == 1:
|
|
level = logging.WARN
|
|
elif cmd_args.logging == 2:
|
|
level = logging.INFO
|
|
elif cmd_args.logging >= 3:
|
|
level = logging.DEBUG
|
|
else:
|
|
level = logging.ERROR
|
|
|
|
logging.basicConfig(
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
level=level,
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
log.debug("Parsed commandline arguments: %s", cmd_args)
|
|
|
|
try:
|
|
sys.exit(main())
|
|
except Exception as e:
|
|
log.exception(e)
|
|
sys.exit(1)
|
|
|
|
# vim: ai sts=4 et sw=4
|