Unittest - Buffer stdout/stderr until after test status is printed

This commit is contained in:
Renato Alves
2014-07-18 20:06:29 +01:00
parent 97d490e0fd
commit 3fd06257b7

View File

@@ -1,34 +1,33 @@
################################################################################ ###############################################################################
## taskwarrior - a command line task list manager. # taskwarrior - a command line task list manager.
## #
## Copyright 2006-2014, Paul Beckingham, Federico Hernandez. # Copyright 2006-2014, Paul Beckingham, Federico Hernandez.
## #
## Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
## of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
## in the Software without restriction, including without limitation the rights # in the Software without restriction, including without limitation the rights
## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
## copies of the Software, and to permit persons to whom the Software is # copies of the Software, and to permit persons to whom the Software is
## furnished to do so, subject to the following conditions: # furnished to do so, subject to the following conditions:
## #
## The above copyright notice and this permission notice shall be included # The above copyright notice and this permission notice shall be included
## in all copies or substantial portions of the Software. # in all copies or substantial portions of the Software.
## #
## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
## OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
## THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
## SOFTWARE. # SOFTWARE.
## #
## http://www.opensource.org/licenses/mit-license.php # http://www.opensource.org/licenses/mit-license.php
## #
################################################################################ ###############################################################################
# Original version by Renato Alves # Original version by Renato Alves
import sys import sys
import time
import unittest import unittest
import warnings import warnings
@@ -39,6 +38,8 @@ class TAPTestResult(unittest.result.TestResult):
self.stream = stream self.stream = stream
self.descriptions = descriptions self.descriptions = descriptions
self.verbosity = verbosity self.verbosity = verbosity
# Buffer stdout and stderr
self.buffer = True
def getDescription(self, test): def getDescription(self, test):
doc_first_line = test.shortDescription() doc_first_line = test.shortDescription()
@@ -50,7 +51,54 @@ class TAPTestResult(unittest.result.TestResult):
def startTestRun(self, total="unk"): def startTestRun(self, total="unk"):
self.stream.writeln("1..{0}".format(total)) self.stream.writeln("1..{0}".format(total))
def stopTest(self, test):
"""Prevent flushing of stdout/stderr buffers until later"""
pass
def _restoreStdout(self):
"""Restore sys.stdout and sys.stderr, don't merge buffered output yet
"""
if self.buffer:
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
@staticmethod
def _do_stream(data, stream):
"""Helper function for _mergeStdout"""
for line in data.splitlines(True):
# Add a comment sign before each line
if line.startswith("#"):
stream.write(line)
else:
stream.write("# " + line)
if not line.endswith('\n'):
stream.write('\n')
def _mergeStdout(self):
"""Merge buffered output with main streams
"""
if self.buffer:
output = self._stdout_buffer.getvalue()
error = self._stderr_buffer.getvalue()
if output:
self._do_stream(output, sys.stdout)
if error:
self._do_stream(error, sys.stderr)
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate()
self._stderr_buffer.seek(0)
self._stderr_buffer.truncate()
# Needed to fix the stopTest override
self._mirrorOutput = False
def report(self, test, status=None, err=None): def report(self, test, status=None, err=None):
# Restore stdout/stderr but don't flush just yet
self._restoreStdout()
desc = self.getDescription(test) desc = self.getDescription(test)
try: try:
exception, msg, _ = err exception, msg, _ = err
@@ -77,7 +125,8 @@ class TAPTestResult(unittest.result.TestResult):
else: else:
self.stream.writeln("ok {0} - {1}".format(self.testsRun, desc)) self.stream.writeln("ok {0} - {1}".format(self.testsRun, desc))
self.stream.flush() # Flush all buffers to stdout
self._mergeStdout()
def addSuccess(self, test): def addSuccess(self, test):
super(TAPTestResult, self).addSuccess(test) super(TAPTestResult, self).addSuccess(test)
@@ -108,7 +157,6 @@ class TAPTestRunner(unittest.runner.TextTestRunner):
result = self._makeResult() result = self._makeResult()
unittest.signals.registerResult(result) unittest.signals.registerResult(result)
result.failfast = self.failfast result.failfast = self.failfast
result.buffer = self.buffer
with warnings.catch_warnings(): with warnings.catch_warnings():
if getattr(self, "warnings", None): if getattr(self, "warnings", None):
@@ -120,9 +168,10 @@ class TAPTestRunner(unittest.runner.TextTestRunner):
# noisy. The -Wd and -Wa flags can be used to bypass this # noisy. The -Wd and -Wa flags can be used to bypass this
# only when self.warnings is None. # only when self.warnings is None.
if self.warnings in ['default', 'always']: if self.warnings in ['default', 'always']:
warnings.filterwarnings('module', warnings.filterwarnings(
category=DeprecationWarning, 'module',
message='Please use assert\w+ instead.') category=DeprecationWarning,
message='Please use assert\w+ instead.')
startTestRun = getattr(result, 'startTestRun', None) startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None: if startTestRun is not None:
startTestRun(test.countTestCases()) startTestRun(test.countTestCases())