diff options
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r-- | tests/qemu-iotests/iotests.py | 145 |
1 files changed, 81 insertions, 64 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py index 5af0182895..777fa2ec0e 100644 --- a/tests/qemu-iotests/iotests.py +++ b/tests/qemu-iotests/iotests.py @@ -20,7 +20,6 @@ import atexit import bz2 from collections import OrderedDict import faulthandler -import io import json import logging import os @@ -32,7 +31,7 @@ import subprocess import sys import time from typing import (Any, Callable, Dict, Iterable, - List, Optional, Sequence, Tuple, TypeVar) + List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) import unittest from contextlib import contextmanager @@ -113,15 +112,14 @@ def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], Run a tool and return both its output and its exit code """ stderr = subprocess.STDOUT if connect_stderr else None - subp = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=stderr, - universal_newlines=True) - output = subp.communicate()[0] - if subp.returncode < 0: - cmd = ' '.join(args) - sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n') - return (output, subp.returncode) + with subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=stderr, universal_newlines=True) as subp: + output = subp.communicate()[0] + if subp.returncode < 0: + cmd = ' '.join(args) + sys.stderr.write(f'{tool} received signal \ + {-subp.returncode}: {cmd}\n') + return (output, subp.returncode) def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: """ @@ -237,6 +235,9 @@ def qemu_io_silent_check(*args): class QemuIoInteractive: def __init__(self, *args): self.args = qemu_io_args_no_fmt + list(args) + # We need to keep the Popen objext around, and not + # close it immediately. Therefore, disable the pylint check: + # pylint: disable=consider-using-with self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -310,22 +311,22 @@ def qemu_nbd_popen(*args): cmd.extend(args) log('Start NBD server') - p = subprocess.Popen(cmd) - try: - while not os.path.exists(pid_file): - if p.poll() is not None: - raise RuntimeError( - "qemu-nbd terminated with exit code {}: {}" - .format(p.returncode, ' '.join(cmd))) - - time.sleep(0.01) - yield - finally: - if os.path.exists(pid_file): - os.remove(pid_file) - log('Kill NBD server') - p.kill() - p.wait() + with subprocess.Popen(cmd) as p: + try: + while not os.path.exists(pid_file): + if p.poll() is not None: + raise RuntimeError( + "qemu-nbd terminated with exit code {}: {}" + .format(p.returncode, ' '.join(cmd))) + + time.sleep(0.01) + yield + finally: + if os.path.exists(pid_file): + os.remove(pid_file) + log('Kill NBD server') + p.kill() + p.wait() def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): '''Return True if two image files are identical''' @@ -334,13 +335,12 @@ def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): def create_image(name, size): '''Create a fully-allocated raw image with sector markers''' - file = open(name, 'wb') - i = 0 - while i < size: - sector = struct.pack('>l504xl', i // 512, i // 512) - file.write(sector) - i = i + 512 - file.close() + with open(name, 'wb') as file: + i = 0 + while i < size: + sector = struct.pack('>l504xl', i // 512, i // 512) + file.write(sector) + i = i + 512 def image_size(img): '''Return image's virtual size''' @@ -1271,37 +1271,54 @@ def skip_if_user_is_root(func): return func(*args, **kwargs) return func_wrapper -def execute_unittest(debug=False): +# We need to filter out the time taken from the output so that +# qemu-iotest can reliably diff the results against master output, +# and hide skipped tests from the reference output. + +class ReproducibleTestResult(unittest.TextTestResult): + def addSkip(self, test, reason): + # Same as TextTestResult, but print dot instead of "s" + unittest.TestResult.addSkip(self, test, reason) + if self.showAll: + self.stream.writeln("skipped {0!r}".format(reason)) + elif self.dots: + self.stream.write(".") + self.stream.flush() + +class ReproducibleStreamWrapper: + def __init__(self, stream: TextIO): + self.stream = stream + + def __getattr__(self, attr): + if attr in ('stream', '__getstate__'): + raise AttributeError(attr) + return getattr(self.stream, attr) + + def write(self, arg=None): + arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) + arg = re.sub(r' \(skipped=\d+\)', r'', arg) + self.stream.write(arg) + +class ReproducibleTestRunner(unittest.TextTestRunner): + def __init__(self, stream: Optional[TextIO] = None, + resultclass: Type[unittest.TestResult] = ReproducibleTestResult, + **kwargs: Any) -> None: + rstream = ReproducibleStreamWrapper(stream or sys.stdout) + super().__init__(stream=rstream, # type: ignore + descriptions=True, + resultclass=resultclass, + **kwargs) + +def execute_unittest(argv: List[str], debug: bool = False) -> None: """Executes unittests within the calling module.""" - verbosity = 2 if debug else 1 - - if debug: - output = sys.stdout - else: - # We need to filter out the time taken from the output so that - # qemu-iotest can reliably diff the results against master output. - output = io.StringIO() - - runner = unittest.TextTestRunner(stream=output, descriptions=True, - verbosity=verbosity) - try: - # unittest.main() will use sys.exit(); so expect a SystemExit - # exception - unittest.main(testRunner=runner) - finally: - # We need to filter out the time taken from the output so that - # qemu-iotest can reliably diff the results against master output. - if not debug: - out = output.getvalue() - out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) - - # Hide skipped tests from the reference output - out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) - out_first_line, out_rest = out.split('\n', 1) - out = out_first_line.replace('s', '.') + '\n' + out_rest - - sys.stderr.write(out) + # Some tests have warnings, especially ResourceWarnings for unclosed + # files and sockets. Ignore them for now to ensure reproducibility of + # the test output. + unittest.main(argv=argv, + testRunner=ReproducibleTestRunner, + verbosity=2 if debug else 1, + warnings=None if sys.warnoptions else 'ignore') def execute_setup_common(supported_fmts: Sequence[str] = (), supported_platforms: Sequence[str] = (), @@ -1338,7 +1355,7 @@ def execute_test(*args, test_function=None, **kwargs): debug = execute_setup_common(*args, **kwargs) if not test_function: - execute_unittest(debug) + execute_unittest(sys.argv, debug) else: test_function() |