diff options
-rw-r--r-- | tests/qemu-iotests/iotests.py | 39 |
1 files changed, 24 insertions, 15 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py index 9f5da32dae..cf10c428b5 100644 --- a/tests/qemu-iotests/iotests.py +++ b/tests/qemu-iotests/iotests.py @@ -37,6 +37,10 @@ from qemu import qtest assert sys.version_info >= (3, 6) +# Type Aliases +QMPResponse = Dict[str, Any] + + faulthandler.enable() # This will not work if arguments contain spaces but is necessary if we @@ -541,25 +545,30 @@ class VM(qtest.QEMUQtestMachine): self._args.append(addr) return self - def pause_drive(self, drive, event=None): - '''Pause drive r/w operations''' + def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse: + cmd = 'human-monitor-command' + kwargs = {'command-line': command_line} + if use_log: + return self.qmp_log(cmd, **kwargs) + else: + return self.qmp(cmd, **kwargs) + + def pause_drive(self, drive: str, event: Optional[str] = None) -> None: + """Pause drive r/w operations""" if not event: self.pause_drive(drive, "read_aio") self.pause_drive(drive, "write_aio") return - self.qmp('human-monitor-command', - command_line='qemu-io %s "break %s bp_%s"' - % (drive, event, drive)) - - def resume_drive(self, drive): - self.qmp('human-monitor-command', - command_line='qemu-io %s "remove_break bp_%s"' - % (drive, drive)) - - def hmp_qemu_io(self, drive, cmd): - '''Write to a given drive using an HMP command''' - return self.qmp('human-monitor-command', - command_line='qemu-io %s "%s"' % (drive, cmd)) + self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') + + def resume_drive(self, drive: str) -> None: + """Resume drive r/w operations""" + self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') + + def hmp_qemu_io(self, drive: str, cmd: str, + use_log: bool = False) -> QMPResponse: + """Write to a given drive using an HMP command""" + return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) def flatten_qmp_object(self, obj, output=None, basestr=''): if output is None: |