Merge pull request #11 in TM/task from ~UNODE/task:2.4.0 to 2.4.0

* commit 'c5d4efd4d2cdc48489193eee45c2a9566dc6b2fd':
  Unittest - No longer necessary to prefix strings with hash
  Unittest - Buffer stdout/stderr until after test status is printed
  Unittest - Use stderr for diagnostics too
  Unittest - Memoize utils.which for faketime speedups
  Unittest - Add a memoize function for caching of function results
  Unittest - Skip tests that use faketime if not available
  Unittest - Add test example using libfaketime
  Unittest - Implement faketime for use in taskw
  Unittest - Make is easier to reset env vars of task and taskd
  Unittest - Add mechanism to skip task and taskd tests
  Unittest - Add prefixes to task and taskd temporary folders
This commit is contained in:
Paul Beckingham
2014-07-18 19:19:00 +00:00
7 changed files with 194 additions and 57 deletions

View File

@@ -2,6 +2,6 @@
from .task import Task from .task import Task
from .taskd import Taskd from .taskd import Taskd
from .testing import TestCase from .testing import TestCase, ServerTestCase
# vim: ai sts=4 et sw=4 # vim: ai sts=4 et sw=4

View File

@@ -4,7 +4,8 @@ import os
import tempfile import tempfile
import shutil import shutil
import atexit import atexit
from .utils import run_cmd_wait, run_cmd_wait_nofail import unittest
from .utils import run_cmd_wait, run_cmd_wait_nofail, which
from .exceptions import CommandError from .exceptions import CommandError
@@ -29,21 +30,18 @@ class Task(object):
self.taskw = taskw self.taskw = taskw
self.taskd = taskd self.taskd = taskd
# Used to specify what command to launch (and to inject faketime)
self._command = [self.taskw]
# Configuration of the isolated environment # Configuration of the isolated environment
self._original_pwd = os.getcwd() self._original_pwd = os.getcwd()
self.datadir = tempfile.mkdtemp() self.datadir = tempfile.mkdtemp(prefix="task_")
self.taskrc = os.path.join(self.datadir, "test.rc") self.taskrc = os.path.join(self.datadir, "test.rc")
# Ensure any instance is properly destroyed at session end # Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy()) atexit.register(lambda: self.destroy())
# Copy all env variables to avoid clashing subprocess environments self.reset_env()
self.env = os.environ.copy()
# Make sure no TASKDDATA is isolated
self.env["TASKDATA"] = self.datadir
# As well as TASKRC
self.env["TASKRC"] = self.taskrc
# Cannot call self.config until confirmation is disabled # Cannot call self.config until confirmation is disabled
with open(self.taskrc, 'w') as rc: with open(self.taskrc, 'w') as rc:
@@ -58,6 +56,17 @@ class Task(object):
txt = super(Task, self).__repr__() txt = super(Task, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir) return "{0} running from {1}>".format(txt[:-1], self.datadir)
def reset_env(self):
"""Set a new environment derived from the one used to launch the test
"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
# Make sure no TASKDDATA is isolated
self.env["TASKDATA"] = self.datadir
# As well as TASKRC
self.env["TASKRC"] = self.taskrc
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
"aka t = Task() ; t() which is now an alias to t.runSuccess()" "aka t = Task() ; t() which is now an alias to t.runSuccess()"
return self.runSuccess(*args, **kwargs) return self.runSuccess(*args, **kwargs)
@@ -120,7 +129,8 @@ class Task(object):
Returns (exit_code, stdout, stderr) Returns (exit_code, stdout, stderr)
""" """
command = [self.taskw] # Create a copy of the command
command = self._command[:]
command.extend(args) command.extend(args)
output = run_cmd_wait_nofail(command, input, output = run_cmd_wait_nofail(command, input,
@@ -145,7 +155,8 @@ class Task(object):
Returns (exit_code, stdout, stderr) Returns (exit_code, stdout, stderr)
""" """
command = [self.taskw] # Create a copy of the command
command = self._command[:]
command.extend(args) command.extend(args)
output = run_cmd_wait_nofail(command, input, output = run_cmd_wait_nofail(command, input,
@@ -226,4 +237,21 @@ class Task(object):
return code, newout, newerr return code, newout, newerr
def faketime(self, faketime=None):
"""Set a faketime using libfaketime that will affect the following
command calls.
If faketime is None, faketime settings will be disabled.
"""
cmd = which("faketime")
if cmd is None:
raise unittest.SkipTest("libfaketime/faketime is not installed")
if self._command[0] == cmd:
self._command = self._command[3:]
if faketime is not None:
# Use advanced time format
self._command = [cmd, "-f", faketime] + self._command
# vim: ai sts=4 et sw=4 # vim: ai sts=4 et sw=4

View File

@@ -50,17 +50,14 @@ class Taskd(object):
# Will hold the taskd subprocess if it's running # Will hold the taskd subprocess if it's running
self.proc = None self.proc = None
self.datadir = tempfile.mkdtemp() self.datadir = tempfile.mkdtemp(prefix="taskd_")
self.tasklog = os.path.join(self.datadir, "taskd.log") self.tasklog = os.path.join(self.datadir, "taskd.log")
self.taskpid = os.path.join(self.datadir, "taskd.pid") self.taskpid = os.path.join(self.datadir, "taskd.pid")
# Ensure any instance is properly destroyed at session end # Ensure any instance is properly destroyed at session end
atexit.register(lambda: self.destroy()) atexit.register(lambda: self.destroy())
# Copy all env variables to avoid clashing subprocess environments self.reset_env()
self.env = os.environ.copy()
# Make sure TASKDDATA points to the temporary folder
self.env["TASKDATA"] = self.datadir
if certpath is None: if certpath is None:
certpath = DEFAULT_CERT_PATH certpath = DEFAULT_CERT_PATH
@@ -101,6 +98,15 @@ class Taskd(object):
txt = super(Taskd, self).__repr__() txt = super(Taskd, self).__repr__()
return "{0} running from {1}>".format(txt[:-1], self.datadir) return "{0} running from {1}>".format(txt[:-1], self.datadir)
def reset_env(self):
"""Set a new environment derived from the one used to launch the test
"""
# Copy all env variables to avoid clashing subprocess environments
self.env = os.environ.copy()
# Make sure TASKDDATA points to the temporary folder
self.env["TASKDATA"] = self.datadir
def create_user(self, user=None, group=None, org=None): def create_user(self, user=None, group=None, org=None):
"""Create a user/group in the server and return the user """Create a user/group in the server and return the user
credentials to use in a taskw client. credentials to use in a taskw client.

View File

@@ -2,14 +2,31 @@
import unittest import unittest
import sys import sys
from .utils import TASKW_SKIP, TASKD_SKIP
from .taskd import Taskd
class TestCase(unittest.TestCase): class BaseTestCase(unittest.TestCase):
def diag(self, out): def diag(self, out):
sys.stdout.write("# --- diag start ---\n") sys.stderr.write("--- diag start ---\n")
for line in out.split("\n"): for line in out.splitlines():
sys.stdout.write("# " + line + "\n") sys.stderr.write(line + '\n')
sys.stdout.write("# --- diag end ---\n") sys.stderr.write("--- diag end ---\n")
@unittest.skipIf(TASKW_SKIP, "TASKW_SKIP set, skipping task tests.")
class TestCase(BaseTestCase):
"""Automatically skips tests if TASKW_SKIP is present in the environment
"""
pass
@unittest.skipIf(TASKD_SKIP, "TASKD_SKIP set, skipping taskd tests.")
@unittest.skipIf(Taskd.not_available(), "Taskd binary not available")
class ServerTestCase(BaseTestCase):
"""Automatically skips tests if TASKD_SKIP is present in the environment
"""
pass
# vim: ai sts=4 et sw=4 # vim: ai sts=4 et sw=4

View File

@@ -4,6 +4,7 @@ import os
import sys import sys
import socket import socket
import signal import signal
import functools
from subprocess import Popen, PIPE, STDOUT from subprocess import Popen, PIPE, STDOUT
from threading import Thread from threading import Thread
from Queue import Queue, Empty from Queue import Queue, Empty
@@ -13,6 +14,10 @@ from .exceptions import CommandError
USED_PORTS = set() USED_PORTS = set()
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
# Environment flags to control skipping of task and taskd tests
TASKW_SKIP = os.environ.get("TASKW_SKIP", False)
TASKD_SKIP = os.environ.get("TASKD_SKIP", False)
def wait_process(proc, timeout=1): def wait_process(proc, timeout=1):
"""Wait for process to finish """Wait for process to finish
@@ -160,10 +165,26 @@ def release_port(port):
pass pass
def memoize(obj):
"""Keep an in-memory cache of function results given it's inputs
"""
cache = obj.cache = {}
@functools.wraps(obj)
def memoizer(*args, **kwargs):
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = obj(*args, **kwargs)
return cache[key]
return memoizer
try: try:
from shutil import which from shutil import which
which = memoize(which)
except ImportError: except ImportError:
# NOTE: This is shutil.which backported from python-3.3.3 # NOTE: This is shutil.which backported from python-3.3.3
@memoize
def which(cmd, mode=os.F_OK | os.X_OK, path=None): def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which """Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such conforms to the given mode on the PATH, or None if there is no such

View File

@@ -1,34 +1,33 @@
################################################################################ ###############################################################################
## taskwarrior - a command line task list manager. # taskwarrior - a command line task list manager.
## #
## Copyright 2006-2014, Paul Beckingham, Federico Hernandez. # Copyright 2006-2014, Paul Beckingham, Federico Hernandez.
## #
## Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
## of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
## in the Software without restriction, including without limitation the rights # in the Software without restriction, including without limitation the rights
## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
## copies of the Software, and to permit persons to whom the Software is # copies of the Software, and to permit persons to whom the Software is
## furnished to do so, subject to the following conditions: # furnished to do so, subject to the following conditions:
## #
## The above copyright notice and this permission notice shall be included # The above copyright notice and this permission notice shall be included
## in all copies or substantial portions of the Software. # in all copies or substantial portions of the Software.
## #
## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
## OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
## THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
## SOFTWARE. # SOFTWARE.
## #
## http://www.opensource.org/licenses/mit-license.php # http://www.opensource.org/licenses/mit-license.php
## #
################################################################################ ###############################################################################
# Original version by Renato Alves # Original version by Renato Alves
import sys import sys
import time
import unittest import unittest
import warnings import warnings
@@ -39,6 +38,8 @@ class TAPTestResult(unittest.result.TestResult):
self.stream = stream self.stream = stream
self.descriptions = descriptions self.descriptions = descriptions
self.verbosity = verbosity self.verbosity = verbosity
# Buffer stdout and stderr
self.buffer = True
def getDescription(self, test): def getDescription(self, test):
doc_first_line = test.shortDescription() doc_first_line = test.shortDescription()
@@ -50,7 +51,54 @@ class TAPTestResult(unittest.result.TestResult):
def startTestRun(self, total="unk"): def startTestRun(self, total="unk"):
self.stream.writeln("1..{0}".format(total)) self.stream.writeln("1..{0}".format(total))
def stopTest(self, test):
"""Prevent flushing of stdout/stderr buffers until later"""
pass
def _restoreStdout(self):
"""Restore sys.stdout and sys.stderr, don't merge buffered output yet
"""
if self.buffer:
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
@staticmethod
def _do_stream(data, stream):
"""Helper function for _mergeStdout"""
for line in data.splitlines(True):
# Add a comment sign before each line
if line.startswith("#"):
stream.write(line)
else:
stream.write("# " + line)
if not line.endswith('\n'):
stream.write('\n')
def _mergeStdout(self):
"""Merge buffered output with main streams
"""
if self.buffer:
output = self._stdout_buffer.getvalue()
error = self._stderr_buffer.getvalue()
if output:
self._do_stream(output, sys.stdout)
if error:
self._do_stream(error, sys.stderr)
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate()
self._stderr_buffer.seek(0)
self._stderr_buffer.truncate()
# Needed to fix the stopTest override
self._mirrorOutput = False
def report(self, test, status=None, err=None): def report(self, test, status=None, err=None):
# Restore stdout/stderr but don't flush just yet
self._restoreStdout()
desc = self.getDescription(test) desc = self.getDescription(test)
try: try:
exception, msg, _ = err exception, msg, _ = err
@@ -77,7 +125,8 @@ class TAPTestResult(unittest.result.TestResult):
else: else:
self.stream.writeln("ok {0} - {1}".format(self.testsRun, desc)) self.stream.writeln("ok {0} - {1}".format(self.testsRun, desc))
self.stream.flush() # Flush all buffers to stdout
self._mergeStdout()
def addSuccess(self, test): def addSuccess(self, test):
super(TAPTestResult, self).addSuccess(test) super(TAPTestResult, self).addSuccess(test)
@@ -108,7 +157,6 @@ class TAPTestRunner(unittest.runner.TextTestRunner):
result = self._makeResult() result = self._makeResult()
unittest.signals.registerResult(result) unittest.signals.registerResult(result)
result.failfast = self.failfast result.failfast = self.failfast
result.buffer = self.buffer
with warnings.catch_warnings(): with warnings.catch_warnings():
if getattr(self, "warnings", None): if getattr(self, "warnings", None):
@@ -120,9 +168,10 @@ class TAPTestRunner(unittest.runner.TextTestRunner):
# noisy. The -Wd and -Wa flags can be used to bypass this # noisy. The -Wd and -Wa flags can be used to bypass this
# only when self.warnings is None. # only when self.warnings is None.
if self.warnings in ['default', 'always']: if self.warnings in ['default', 'always']:
warnings.filterwarnings('module', warnings.filterwarnings(
category=DeprecationWarning, 'module',
message='Please use assert\w+ instead.') category=DeprecationWarning,
message='Please use assert\w+ instead.')
startTestRun = getattr(result, 'startTestRun', None) startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None: if startTestRun is not None:
startTestRun(test.countTestCases()) startTestRun(test.countTestCases())

View File

@@ -8,7 +8,7 @@ from datetime import datetime
# Ensure python finds the local simpletap module # Ensure python finds the local simpletap module
sys.path.append(os.path.dirname(os.path.abspath(__file__))) sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from basetest import Task, Taskd, TestCase from basetest import Task, TestCase, Taskd, ServerTestCase
class TestBugNumber(TestCase): class TestBugNumber(TestCase):
@@ -37,6 +37,23 @@ class TestBugNumber(TestCase):
# TAP diagnostics on the bas # TAP diagnostics on the bas
self.diag("Yay TAP diagnostics") self.diag("Yay TAP diagnostics")
def test_faketime(self):
"""Running tests using libfaketime"""
self.t.faketime("-2y")
command = ("add", "Testing")
self.t(command)
# Remove FAKETIME settings
self.t.faketime()
command = ("list",)
code, out, err = self.t(command)
# Task should be 2 years old
expected = "2.0y"
self.assertIn(expected, out)
def test_fail_other(self): def test_fail_other(self):
"""Nothing to do with Copyright""" """Nothing to do with Copyright"""
self.assertEqual("I like to code", "I like\nto code\n") self.assertEqual("I like to code", "I like\nto code\n")
@@ -53,8 +70,7 @@ class TestBugNumber(TestCase):
"""Executed once after all tests in the class""" """Executed once after all tests in the class"""
@unittest.skipIf(Taskd.not_available(), "Taskd binary not available") class ServerTestBugNumber(ServerTestCase):
class ServerTestCase(TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.taskd = Taskd() cls.taskd = Taskd()