diff --git a/test/basetest/__init__.py b/test/basetest/__init__.py index 5933ba5c8..4dd647039 100644 --- a/test/basetest/__init__.py +++ b/test/basetest/__init__.py @@ -2,6 +2,6 @@ from .task import Task from .taskd import Taskd -from .testing import TestCase +from .testing import TestCase, ServerTestCase # vim: ai sts=4 et sw=4 diff --git a/test/basetest/task.py b/test/basetest/task.py index 72286f22d..f6c263637 100644 --- a/test/basetest/task.py +++ b/test/basetest/task.py @@ -4,7 +4,8 @@ import os import tempfile import shutil import atexit -from .utils import run_cmd_wait, run_cmd_wait_nofail +import unittest +from .utils import run_cmd_wait, run_cmd_wait_nofail, which from .exceptions import CommandError @@ -29,21 +30,18 @@ class Task(object): self.taskw = taskw self.taskd = taskd + # Used to specify what command to launch (and to inject faketime) + self._command = [self.taskw] + # Configuration of the isolated environment self._original_pwd = os.getcwd() - self.datadir = tempfile.mkdtemp() + self.datadir = tempfile.mkdtemp(prefix="task_") self.taskrc = os.path.join(self.datadir, "test.rc") # Ensure any instance is properly destroyed at session end atexit.register(lambda: self.destroy()) - # Copy all env variables to avoid clashing subprocess environments - self.env = os.environ.copy() - - # Make sure no TASKDDATA is isolated - self.env["TASKDATA"] = self.datadir - # As well as TASKRC - self.env["TASKRC"] = self.taskrc + self.reset_env() # Cannot call self.config until confirmation is disabled with open(self.taskrc, 'w') as rc: @@ -58,6 +56,17 @@ class Task(object): txt = super(Task, self).__repr__() return "{0} running from {1}>".format(txt[:-1], self.datadir) + def reset_env(self): + """Set a new environment derived from the one used to launch the test + """ + # Copy all env variables to avoid clashing subprocess environments + self.env = os.environ.copy() + + # Make sure no TASKDDATA is isolated + self.env["TASKDATA"] = self.datadir + # As well as TASKRC + self.env["TASKRC"] = self.taskrc + def __call__(self, *args, **kwargs): "aka t = Task() ; t() which is now an alias to t.runSuccess()" return self.runSuccess(*args, **kwargs) @@ -120,7 +129,8 @@ class Task(object): Returns (exit_code, stdout, stderr) """ - command = [self.taskw] + # Create a copy of the command + command = self._command[:] command.extend(args) output = run_cmd_wait_nofail(command, input, @@ -145,7 +155,8 @@ class Task(object): Returns (exit_code, stdout, stderr) """ - command = [self.taskw] + # Create a copy of the command + command = self._command[:] command.extend(args) output = run_cmd_wait_nofail(command, input, @@ -226,4 +237,21 @@ class Task(object): return code, newout, newerr + def faketime(self, faketime=None): + """Set a faketime using libfaketime that will affect the following + command calls. + + If faketime is None, faketime settings will be disabled. + """ + cmd = which("faketime") + if cmd is None: + raise unittest.SkipTest("libfaketime/faketime is not installed") + + if self._command[0] == cmd: + self._command = self._command[3:] + + if faketime is not None: + # Use advanced time format + self._command = [cmd, "-f", faketime] + self._command + # vim: ai sts=4 et sw=4 diff --git a/test/basetest/taskd.py b/test/basetest/taskd.py index 7616d7d39..b2e414999 100644 --- a/test/basetest/taskd.py +++ b/test/basetest/taskd.py @@ -50,17 +50,14 @@ class Taskd(object): # Will hold the taskd subprocess if it's running self.proc = None - self.datadir = tempfile.mkdtemp() + self.datadir = tempfile.mkdtemp(prefix="taskd_") self.tasklog = os.path.join(self.datadir, "taskd.log") self.taskpid = os.path.join(self.datadir, "taskd.pid") # Ensure any instance is properly destroyed at session end atexit.register(lambda: self.destroy()) - # Copy all env variables to avoid clashing subprocess environments - self.env = os.environ.copy() - # Make sure TASKDDATA points to the temporary folder - self.env["TASKDATA"] = self.datadir + self.reset_env() if certpath is None: certpath = DEFAULT_CERT_PATH @@ -101,6 +98,15 @@ class Taskd(object): txt = super(Taskd, self).__repr__() return "{0} running from {1}>".format(txt[:-1], self.datadir) + def reset_env(self): + """Set a new environment derived from the one used to launch the test + """ + # Copy all env variables to avoid clashing subprocess environments + self.env = os.environ.copy() + + # Make sure TASKDDATA points to the temporary folder + self.env["TASKDATA"] = self.datadir + def create_user(self, user=None, group=None, org=None): """Create a user/group in the server and return the user credentials to use in a taskw client. diff --git a/test/basetest/testing.py b/test/basetest/testing.py index e90f81b46..5b2d14124 100644 --- a/test/basetest/testing.py +++ b/test/basetest/testing.py @@ -2,14 +2,31 @@ import unittest import sys +from .utils import TASKW_SKIP, TASKD_SKIP +from .taskd import Taskd -class TestCase(unittest.TestCase): +class BaseTestCase(unittest.TestCase): def diag(self, out): - sys.stdout.write("# --- diag start ---\n") - for line in out.split("\n"): - sys.stdout.write("# " + line + "\n") - sys.stdout.write("# --- diag end ---\n") + sys.stderr.write("--- diag start ---\n") + for line in out.splitlines(): + sys.stderr.write(line + '\n') + sys.stderr.write("--- diag end ---\n") + + +@unittest.skipIf(TASKW_SKIP, "TASKW_SKIP set, skipping task tests.") +class TestCase(BaseTestCase): + """Automatically skips tests if TASKW_SKIP is present in the environment + """ + pass + + +@unittest.skipIf(TASKD_SKIP, "TASKD_SKIP set, skipping taskd tests.") +@unittest.skipIf(Taskd.not_available(), "Taskd binary not available") +class ServerTestCase(BaseTestCase): + """Automatically skips tests if TASKD_SKIP is present in the environment + """ + pass # vim: ai sts=4 et sw=4 diff --git a/test/basetest/utils.py b/test/basetest/utils.py index ecd1c2257..6483feee7 100644 --- a/test/basetest/utils.py +++ b/test/basetest/utils.py @@ -4,6 +4,7 @@ import os import sys import socket import signal +import functools from subprocess import Popen, PIPE, STDOUT from threading import Thread from Queue import Queue, Empty @@ -13,6 +14,10 @@ from .exceptions import CommandError USED_PORTS = set() ON_POSIX = 'posix' in sys.builtin_module_names +# Environment flags to control skipping of task and taskd tests +TASKW_SKIP = os.environ.get("TASKW_SKIP", False) +TASKD_SKIP = os.environ.get("TASKD_SKIP", False) + def wait_process(proc, timeout=1): """Wait for process to finish @@ -160,10 +165,26 @@ def release_port(port): pass +def memoize(obj): + """Keep an in-memory cache of function results given it's inputs + """ + cache = obj.cache = {} + + @functools.wraps(obj) + def memoizer(*args, **kwargs): + key = str(args) + str(kwargs) + if key not in cache: + cache[key] = obj(*args, **kwargs) + return cache[key] + return memoizer + + try: from shutil import which + which = memoize(which) except ImportError: # NOTE: This is shutil.which backported from python-3.3.3 + @memoize def which(cmd, mode=os.F_OK | os.X_OK, path=None): """Given a command, mode, and a PATH string, return the path which conforms to the given mode on the PATH, or None if there is no such diff --git a/test/simpletap/__init__.py b/test/simpletap/__init__.py index bc540f0b6..afaacc87e 100644 --- a/test/simpletap/__init__.py +++ b/test/simpletap/__init__.py @@ -1,34 +1,33 @@ -################################################################################ -## taskwarrior - a command line task list manager. -## -## Copyright 2006-2014, Paul Beckingham, Federico Hernandez. -## -## Permission is hereby granted, free of charge, to any person obtaining a copy -## of this software and associated documentation files (the "Software"), to deal -## in the Software without restriction, including without limitation the rights -## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -## copies of the Software, and to permit persons to whom the Software is -## furnished to do so, subject to the following conditions: -## -## The above copyright notice and this permission notice shall be included -## in all copies or substantial portions of the Software. -## -## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -## OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -## THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -## SOFTWARE. -## -## http://www.opensource.org/licenses/mit-license.php -## -################################################################################ +############################################################################### +# taskwarrior - a command line task list manager. +# +# Copyright 2006-2014, Paul Beckingham, Federico Hernandez. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# http://www.opensource.org/licenses/mit-license.php +# +############################################################################### # Original version by Renato Alves import sys -import time import unittest import warnings @@ -39,6 +38,8 @@ class TAPTestResult(unittest.result.TestResult): self.stream = stream self.descriptions = descriptions self.verbosity = verbosity + # Buffer stdout and stderr + self.buffer = True def getDescription(self, test): doc_first_line = test.shortDescription() @@ -50,7 +51,54 @@ class TAPTestResult(unittest.result.TestResult): def startTestRun(self, total="unk"): self.stream.writeln("1..{0}".format(total)) + def stopTest(self, test): + """Prevent flushing of stdout/stderr buffers until later""" + pass + + def _restoreStdout(self): + """Restore sys.stdout and sys.stderr, don't merge buffered output yet + """ + if self.buffer: + sys.stdout = self._original_stdout + sys.stderr = self._original_stderr + + @staticmethod + def _do_stream(data, stream): + """Helper function for _mergeStdout""" + for line in data.splitlines(True): + # Add a comment sign before each line + if line.startswith("#"): + stream.write(line) + else: + stream.write("# " + line) + + if not line.endswith('\n'): + stream.write('\n') + + def _mergeStdout(self): + """Merge buffered output with main streams + """ + + if self.buffer: + output = self._stdout_buffer.getvalue() + error = self._stderr_buffer.getvalue() + if output: + self._do_stream(output, sys.stdout) + if error: + self._do_stream(error, sys.stderr) + + self._stdout_buffer.seek(0) + self._stdout_buffer.truncate() + self._stderr_buffer.seek(0) + self._stderr_buffer.truncate() + + # Needed to fix the stopTest override + self._mirrorOutput = False + def report(self, test, status=None, err=None): + # Restore stdout/stderr but don't flush just yet + self._restoreStdout() + desc = self.getDescription(test) try: exception, msg, _ = err @@ -77,7 +125,8 @@ class TAPTestResult(unittest.result.TestResult): else: self.stream.writeln("ok {0} - {1}".format(self.testsRun, desc)) - self.stream.flush() + # Flush all buffers to stdout + self._mergeStdout() def addSuccess(self, test): super(TAPTestResult, self).addSuccess(test) @@ -108,7 +157,6 @@ class TAPTestRunner(unittest.runner.TextTestRunner): result = self._makeResult() unittest.signals.registerResult(result) result.failfast = self.failfast - result.buffer = self.buffer with warnings.catch_warnings(): if getattr(self, "warnings", None): @@ -120,9 +168,10 @@ class TAPTestRunner(unittest.runner.TextTestRunner): # noisy. The -Wd and -Wa flags can be used to bypass this # only when self.warnings is None. if self.warnings in ['default', 'always']: - warnings.filterwarnings('module', - category=DeprecationWarning, - message='Please use assert\w+ instead.') + warnings.filterwarnings( + 'module', + category=DeprecationWarning, + message='Please use assert\w+ instead.') startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: startTestRun(test.countTestCases()) diff --git a/test/template.t b/test/template.t index cb081eaf9..4122cb2b4 100755 --- a/test/template.t +++ b/test/template.t @@ -8,7 +8,7 @@ from datetime import datetime # Ensure python finds the local simpletap module sys.path.append(os.path.dirname(os.path.abspath(__file__))) -from basetest import Task, Taskd, TestCase +from basetest import Task, TestCase, Taskd, ServerTestCase class TestBugNumber(TestCase): @@ -37,6 +37,23 @@ class TestBugNumber(TestCase): # TAP diagnostics on the bas self.diag("Yay TAP diagnostics") + def test_faketime(self): + """Running tests using libfaketime""" + self.t.faketime("-2y") + + command = ("add", "Testing") + self.t(command) + + # Remove FAKETIME settings + self.t.faketime() + + command = ("list",) + code, out, err = self.t(command) + + # Task should be 2 years old + expected = "2.0y" + self.assertIn(expected, out) + def test_fail_other(self): """Nothing to do with Copyright""" self.assertEqual("I like to code", "I like\nto code\n") @@ -53,8 +70,7 @@ class TestBugNumber(TestCase): """Executed once after all tests in the class""" -@unittest.skipIf(Taskd.not_available(), "Taskd binary not available") -class ServerTestCase(TestCase): +class ServerTestBugNumber(ServerTestCase): @classmethod def setUpClass(cls): cls.taskd = Taskd()