summaryrefslogtreecommitdiffstats
path: root/tests/qemu-iotests/iotests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r--tests/qemu-iotests/iotests.py128
1 files changed, 106 insertions, 22 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py
index b25d48a91b..fdbdd8b300 100644
--- a/tests/qemu-iotests/iotests.py
+++ b/tests/qemu-iotests/iotests.py
@@ -109,6 +109,20 @@ def qemu_img_pipe(*args):
sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
return subp.communicate()[0]
+def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
+ args = [ 'info' ]
+ if imgopts:
+ args.append('--image-opts')
+ else:
+ args += [ '-f', imgfmt ]
+ args += extra_args
+ args.append(filename)
+
+ output = qemu_img_pipe(*args)
+ if not filter_path:
+ filter_path = filename
+ log(filter_img_info(output, filter_path))
+
def qemu_io(*args):
'''Run qemu-io and return the stdout data'''
args = qemu_io_args + list(args)
@@ -206,6 +220,22 @@ def filter_qmp_event(event):
event['timestamp']['microseconds'] = 'USECS'
return event
+def filter_testfiles(msg):
+ prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
+ return msg.replace(prefix, 'TEST_DIR/PID-')
+
+def filter_img_info(output, filename):
+ lines = []
+ for line in output.split('\n'):
+ if 'disk size' in line or 'actual-size' in line:
+ continue
+ line = line.replace(filename, 'TEST_IMG') \
+ .replace(imgfmt, 'IMGFMT')
+ line = re.sub('iters: [0-9]+', 'iters: XXX', line)
+ line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
+ lines.append(line)
+ return '\n'.join(lines)
+
def log(msg, filters=[]):
for flt in filters:
msg = flt(msg)
@@ -281,6 +311,13 @@ def file_path(*names):
return paths[0] if len(paths) == 1 else paths
+def remote_filename(path):
+ if imgproto == 'file':
+ return path
+ elif imgproto == 'ssh':
+ return "ssh://127.0.0.1%s" % (path)
+ else:
+ raise Exception("Protocol %s not supported" % (imgproto))
class VM(qtest.QEMUQtestMachine):
'''A QEMU VM'''
@@ -363,6 +400,58 @@ class VM(qtest.QEMUQtestMachine):
return self.qmp('human-monitor-command',
command_line='qemu-io %s "%s"' % (drive, cmd))
+ def flatten_qmp_object(self, obj, output=None, basestr=''):
+ if output is None:
+ output = dict()
+ if isinstance(obj, list):
+ for i in range(len(obj)):
+ self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
+ elif isinstance(obj, dict):
+ for key in obj:
+ self.flatten_qmp_object(obj[key], output, basestr + key + '.')
+ else:
+ output[basestr[:-1]] = obj # Strip trailing '.'
+ return output
+
+ def qmp_to_opts(self, obj):
+ obj = self.flatten_qmp_object(obj)
+ output_list = list()
+ for key in obj:
+ output_list += [key + '=' + obj[key]]
+ return ','.join(output_list)
+
+ def get_qmp_events_filtered(self, wait=True):
+ result = []
+ for ev in self.get_qmp_events(wait=wait):
+ result.append(filter_qmp_event(ev))
+ return result
+
+ def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs):
+ logmsg = "{'execute': '%s', 'arguments': %s}" % (cmd, kwargs)
+ log(logmsg, filters)
+ result = self.qmp(cmd, **kwargs)
+ log(str(result), filters)
+ return result
+
+ def run_job(self, job, auto_finalize=True, auto_dismiss=False):
+ while True:
+ for ev in self.get_qmp_events_filtered(wait=True):
+ if ev['event'] == 'JOB_STATUS_CHANGE':
+ status = ev['data']['status']
+ if status == 'aborting':
+ result = self.qmp('query-jobs')
+ for j in result['return']:
+ if j['id'] == job:
+ log('Job failed: %s' % (j['error']))
+ elif status == 'pending' and not auto_finalize:
+ self.qmp_log('job-finalize', id=job)
+ elif status == 'concluded' and not auto_dismiss:
+ self.qmp_log('job-dismiss', id=job)
+ elif status == 'null':
+ return
+ else:
+ iotests.log(ev)
+
index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
@@ -390,26 +479,6 @@ class QMPTestCase(unittest.TestCase):
self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
return d
- def flatten_qmp_object(self, obj, output=None, basestr=''):
- if output is None:
- output = dict()
- if isinstance(obj, list):
- for i in range(len(obj)):
- self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
- elif isinstance(obj, dict):
- for key in obj:
- self.flatten_qmp_object(obj[key], output, basestr + key + '.')
- else:
- output[basestr[:-1]] = obj # Strip trailing '.'
- return output
-
- def qmp_to_opts(self, obj):
- obj = self.flatten_qmp_object(obj)
- output_list = list()
- for key in obj:
- output_list += [key + '=' + obj[key]]
- return ','.join(output_list)
-
def assert_qmp_absent(self, d, path):
try:
result = self.dictpath(d, path)
@@ -444,8 +513,8 @@ class QMPTestCase(unittest.TestCase):
'''Asserts that the given filename is a json: filename and that its
content is equal to the given reference object'''
self.assertEqual(json_filename[:5], 'json:')
- self.assertEqual(self.flatten_qmp_object(json.loads(json_filename[5:])),
- self.flatten_qmp_object(reference))
+ self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
+ self.vm.flatten_qmp_object(reference))
def cancel_and_wait(self, drive='drive0', force=False, resume=False):
'''Cancel a block job and wait for it to finish, returning the event'''
@@ -464,6 +533,9 @@ class QMPTestCase(unittest.TestCase):
self.assert_qmp(event, 'data/device', drive)
result = event
cancelled = True
+ elif event['event'] == 'JOB_STATUS_CHANGE':
+ self.assert_qmp(event, 'data/id', drive)
+
self.assert_no_active_block_jobs()
return result
@@ -479,6 +551,8 @@ class QMPTestCase(unittest.TestCase):
self.assert_qmp(event, 'data/offset', event['data']['len'])
self.assert_no_active_block_jobs()
return event
+ elif event['event'] == 'JOB_STATUS_CHANGE':
+ self.assert_qmp(event, 'data/id', drive)
def wait_ready(self, drive='drive0'):
'''Wait until a block job BLOCK_JOB_READY event'''
@@ -542,6 +616,16 @@ def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
if not_sup or (imgfmt in unsupported_fmts):
notrun('not suitable for this image format: %s' % imgfmt)
+def verify_protocol(supported=[], unsupported=[]):
+ assert not (supported and unsupported)
+
+ if 'generic' in supported:
+ return
+
+ not_sup = supported and (imgproto not in supported)
+ if not_sup or (imgproto in unsupported):
+ notrun('not suitable for this protocol: %s' % imgproto)
+
def verify_platform(supported_oses=['linux']):
if True not in [sys.platform.startswith(x) for x in supported_oses]:
notrun('not suitable for this OS: %s' % sys.platform)