diff options
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r-- | tests/qemu-iotests/iotests.py | 128 |
1 files changed, 106 insertions, 22 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py index b25d48a91b..fdbdd8b300 100644 --- a/tests/qemu-iotests/iotests.py +++ b/tests/qemu-iotests/iotests.py @@ -109,6 +109,20 @@ def qemu_img_pipe(*args): sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) return subp.communicate()[0] +def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]): + args = [ 'info' ] + if imgopts: + args.append('--image-opts') + else: + args += [ '-f', imgfmt ] + args += extra_args + args.append(filename) + + output = qemu_img_pipe(*args) + if not filter_path: + filter_path = filename + log(filter_img_info(output, filter_path)) + def qemu_io(*args): '''Run qemu-io and return the stdout data''' args = qemu_io_args + list(args) @@ -206,6 +220,22 @@ def filter_qmp_event(event): event['timestamp']['microseconds'] = 'USECS' return event +def filter_testfiles(msg): + prefix = os.path.join(test_dir, "%s-" % (os.getpid())) + return msg.replace(prefix, 'TEST_DIR/PID-') + +def filter_img_info(output, filename): + lines = [] + for line in output.split('\n'): + if 'disk size' in line or 'actual-size' in line: + continue + line = line.replace(filename, 'TEST_IMG') \ + .replace(imgfmt, 'IMGFMT') + line = re.sub('iters: [0-9]+', 'iters: XXX', line) + line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line) + lines.append(line) + return '\n'.join(lines) + def log(msg, filters=[]): for flt in filters: msg = flt(msg) @@ -281,6 +311,13 @@ def file_path(*names): return paths[0] if len(paths) == 1 else paths +def remote_filename(path): + if imgproto == 'file': + return path + elif imgproto == 'ssh': + return "ssh://127.0.0.1%s" % (path) + else: + raise Exception("Protocol %s not supported" % (imgproto)) class VM(qtest.QEMUQtestMachine): '''A QEMU VM''' @@ -363,6 +400,58 @@ class VM(qtest.QEMUQtestMachine): return self.qmp('human-monitor-command', command_line='qemu-io %s "%s"' % (drive, cmd)) + def flatten_qmp_object(self, obj, output=None, basestr=''): + if output is None: + output = dict() + if isinstance(obj, list): + for i in range(len(obj)): + self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.') + elif isinstance(obj, dict): + for key in obj: + self.flatten_qmp_object(obj[key], output, basestr + key + '.') + else: + output[basestr[:-1]] = obj # Strip trailing '.' + return output + + def qmp_to_opts(self, obj): + obj = self.flatten_qmp_object(obj) + output_list = list() + for key in obj: + output_list += [key + '=' + obj[key]] + return ','.join(output_list) + + def get_qmp_events_filtered(self, wait=True): + result = [] + for ev in self.get_qmp_events(wait=wait): + result.append(filter_qmp_event(ev)) + return result + + def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs): + logmsg = "{'execute': '%s', 'arguments': %s}" % (cmd, kwargs) + log(logmsg, filters) + result = self.qmp(cmd, **kwargs) + log(str(result), filters) + return result + + def run_job(self, job, auto_finalize=True, auto_dismiss=False): + while True: + for ev in self.get_qmp_events_filtered(wait=True): + if ev['event'] == 'JOB_STATUS_CHANGE': + status = ev['data']['status'] + if status == 'aborting': + result = self.qmp('query-jobs') + for j in result['return']: + if j['id'] == job: + log('Job failed: %s' % (j['error'])) + elif status == 'pending' and not auto_finalize: + self.qmp_log('job-finalize', id=job) + elif status == 'concluded' and not auto_dismiss: + self.qmp_log('job-dismiss', id=job) + elif status == 'null': + return + else: + iotests.log(ev) + index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') @@ -390,26 +479,6 @@ class QMPTestCase(unittest.TestCase): self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d))) return d - def flatten_qmp_object(self, obj, output=None, basestr=''): - if output is None: - output = dict() - if isinstance(obj, list): - for i in range(len(obj)): - self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.') - elif isinstance(obj, dict): - for key in obj: - self.flatten_qmp_object(obj[key], output, basestr + key + '.') - else: - output[basestr[:-1]] = obj # Strip trailing '.' - return output - - def qmp_to_opts(self, obj): - obj = self.flatten_qmp_object(obj) - output_list = list() - for key in obj: - output_list += [key + '=' + obj[key]] - return ','.join(output_list) - def assert_qmp_absent(self, d, path): try: result = self.dictpath(d, path) @@ -444,8 +513,8 @@ class QMPTestCase(unittest.TestCase): '''Asserts that the given filename is a json: filename and that its content is equal to the given reference object''' self.assertEqual(json_filename[:5], 'json:') - self.assertEqual(self.flatten_qmp_object(json.loads(json_filename[5:])), - self.flatten_qmp_object(reference)) + self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])), + self.vm.flatten_qmp_object(reference)) def cancel_and_wait(self, drive='drive0', force=False, resume=False): '''Cancel a block job and wait for it to finish, returning the event''' @@ -464,6 +533,9 @@ class QMPTestCase(unittest.TestCase): self.assert_qmp(event, 'data/device', drive) result = event cancelled = True + elif event['event'] == 'JOB_STATUS_CHANGE': + self.assert_qmp(event, 'data/id', drive) + self.assert_no_active_block_jobs() return result @@ -479,6 +551,8 @@ class QMPTestCase(unittest.TestCase): self.assert_qmp(event, 'data/offset', event['data']['len']) self.assert_no_active_block_jobs() return event + elif event['event'] == 'JOB_STATUS_CHANGE': + self.assert_qmp(event, 'data/id', drive) def wait_ready(self, drive='drive0'): '''Wait until a block job BLOCK_JOB_READY event''' @@ -542,6 +616,16 @@ def verify_image_format(supported_fmts=[], unsupported_fmts=[]): if not_sup or (imgfmt in unsupported_fmts): notrun('not suitable for this image format: %s' % imgfmt) +def verify_protocol(supported=[], unsupported=[]): + assert not (supported and unsupported) + + if 'generic' in supported: + return + + not_sup = supported and (imgproto not in supported) + if not_sup or (imgproto in unsupported): + notrun('not suitable for this protocol: %s' % imgproto) + def verify_platform(supported_oses=['linux']): if True not in [sys.platform.startswith(x) for x in supported_oses]: notrun('not suitable for this OS: %s' % sys.platform) |