From 0347c4c4cfed47e54d9dc275ceb28d35b250749f Mon Sep 17 00:00:00 2001 From: John Snow Date: Mon, 10 Jan 2022 18:28:54 -0500 Subject: python: move qmp utilities to python/qemu/utils In order to upload a QMP package to PyPI, I want to remove any scripts that I am not 100% confident I want to support upstream, beyond our castle walls. Move most of our QMP utilities into the utils package so we can split them out from the PyPI upload. Signed-off-by: John Snow Reviewed-by: Vladimir Sementsov-Ogievskiy Reviewed-by: Beraldo Leal --- python/qemu/qmp/qemu_ga_client.py | 323 ------------------------------------ python/qemu/qmp/qom.py | 273 ------------------------------ python/qemu/qmp/qom_common.py | 175 ------------------- python/qemu/qmp/qom_fuse.py | 207 ----------------------- python/qemu/utils/qemu_ga_client.py | 323 ++++++++++++++++++++++++++++++++++++ python/qemu/utils/qom.py | 273 ++++++++++++++++++++++++++++++ python/qemu/utils/qom_common.py | 175 +++++++++++++++++++ python/qemu/utils/qom_fuse.py | 207 +++++++++++++++++++++++ python/setup.cfg | 16 +- 9 files changed, 986 insertions(+), 986 deletions(-) delete mode 100644 python/qemu/qmp/qemu_ga_client.py delete mode 100644 python/qemu/qmp/qom.py delete mode 100644 python/qemu/qmp/qom_common.py delete mode 100644 python/qemu/qmp/qom_fuse.py create mode 100644 python/qemu/utils/qemu_ga_client.py create mode 100644 python/qemu/utils/qom.py create mode 100644 python/qemu/utils/qom_common.py create mode 100644 python/qemu/utils/qom_fuse.py (limited to 'python') diff --git a/python/qemu/qmp/qemu_ga_client.py b/python/qemu/qmp/qemu_ga_client.py deleted file mode 100644 index 15ed430c61..0000000000 --- a/python/qemu/qmp/qemu_ga_client.py +++ /dev/null @@ -1,323 +0,0 @@ -""" -QEMU Guest Agent Client - -Usage: - -Start QEMU with: - -# qemu [...] -chardev socket,path=/tmp/qga.sock,server=on,wait=off,id=qga0 \ - -device virtio-serial \ - -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0 - -Run the script: - -$ qemu-ga-client --address=/tmp/qga.sock [args...] - -or - -$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock -$ qemu-ga-client [args...] - -For example: - -$ qemu-ga-client cat /etc/resolv.conf -# Generated by NetworkManager -nameserver 10.0.2.3 -$ qemu-ga-client fsfreeze status -thawed -$ qemu-ga-client fsfreeze freeze -2 filesystems frozen - -See also: https://wiki.qemu.org/Features/QAPI/GuestAgent -""" - -# Copyright (C) 2012 Ryota Ozaki -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. - -import argparse -import asyncio -import base64 -import os -import random -import sys -from typing import ( - Any, - Callable, - Dict, - Optional, - Sequence, -) - -from qemu.aqmp import ConnectError, SocketAddrT -from qemu.aqmp.legacy import QEMUMonitorProtocol - - -# This script has not seen many patches or careful attention in quite -# some time. If you would like to improve it, please review the design -# carefully and add docstrings at that point in time. Until then: - -# pylint: disable=missing-docstring - - -class QemuGuestAgent(QEMUMonitorProtocol): - def __getattr__(self, name: str) -> Callable[..., Any]: - def wrapper(**kwds: object) -> object: - return self.command('guest-' + name.replace('_', '-'), **kwds) - return wrapper - - -class QemuGuestAgentClient: - def __init__(self, address: SocketAddrT): - self.qga = QemuGuestAgent(address) - self.qga.connect(negotiate=False) - - def sync(self, timeout: Optional[float] = 3) -> None: - # Avoid being blocked forever - if not self.ping(timeout): - raise EnvironmentError('Agent seems not alive') - uid = random.randint(0, (1 << 32) - 1) - while True: - ret = self.qga.sync(id=uid) - if isinstance(ret, int) and int(ret) == uid: - break - - def __file_read_all(self, handle: int) -> bytes: - eof = False - data = b'' - while not eof: - ret = self.qga.file_read(handle=handle, count=1024) - _data = base64.b64decode(ret['buf-b64']) - data += _data - eof = ret['eof'] - return data - - def read(self, path: str) -> bytes: - handle = self.qga.file_open(path=path) - try: - data = self.__file_read_all(handle) - finally: - self.qga.file_close(handle=handle) - return data - - def info(self) -> str: - info = self.qga.info() - - msgs = [] - msgs.append('version: ' + info['version']) - msgs.append('supported_commands:') - enabled = [c['name'] for c in info['supported_commands'] - if c['enabled']] - msgs.append('\tenabled: ' + ', '.join(enabled)) - disabled = [c['name'] for c in info['supported_commands'] - if not c['enabled']] - msgs.append('\tdisabled: ' + ', '.join(disabled)) - - return '\n'.join(msgs) - - @classmethod - def __gen_ipv4_netmask(cls, prefixlen: int) -> str: - mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2) - return '.'.join([str(mask >> 24), - str((mask >> 16) & 0xff), - str((mask >> 8) & 0xff), - str(mask & 0xff)]) - - def ifconfig(self) -> str: - nifs = self.qga.network_get_interfaces() - - msgs = [] - for nif in nifs: - msgs.append(nif['name'] + ':') - if 'ip-addresses' in nif: - for ipaddr in nif['ip-addresses']: - if ipaddr['ip-address-type'] == 'ipv4': - addr = ipaddr['ip-address'] - mask = self.__gen_ipv4_netmask(int(ipaddr['prefix'])) - msgs.append(f"\tinet {addr} netmask {mask}") - elif ipaddr['ip-address-type'] == 'ipv6': - addr = ipaddr['ip-address'] - prefix = ipaddr['prefix'] - msgs.append(f"\tinet6 {addr} prefixlen {prefix}") - if nif['hardware-address'] != '00:00:00:00:00:00': - msgs.append("\tether " + nif['hardware-address']) - - return '\n'.join(msgs) - - def ping(self, timeout: Optional[float]) -> bool: - self.qga.settimeout(timeout) - try: - self.qga.ping() - except asyncio.TimeoutError: - return False - return True - - def fsfreeze(self, cmd: str) -> object: - if cmd not in ['status', 'freeze', 'thaw']: - raise Exception('Invalid command: ' + cmd) - # Can be int (freeze, thaw) or GuestFsfreezeStatus (status) - return getattr(self.qga, 'fsfreeze' + '_' + cmd)() - - def fstrim(self, minimum: int) -> Dict[str, object]: - # returns GuestFilesystemTrimResponse - ret = getattr(self.qga, 'fstrim')(minimum=minimum) - assert isinstance(ret, dict) - return ret - - def suspend(self, mode: str) -> None: - if mode not in ['disk', 'ram', 'hybrid']: - raise Exception('Invalid mode: ' + mode) - - try: - getattr(self.qga, 'suspend' + '_' + mode)() - # On error exception will raise - except asyncio.TimeoutError: - # On success command will timed out - return - - def shutdown(self, mode: str = 'powerdown') -> None: - if mode not in ['powerdown', 'halt', 'reboot']: - raise Exception('Invalid mode: ' + mode) - - try: - self.qga.shutdown(mode=mode) - except asyncio.TimeoutError: - pass - - -def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - if len(args) != 1: - print('Invalid argument') - print('Usage: cat ') - sys.exit(1) - print(client.read(args[0])) - - -def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - usage = 'Usage: fsfreeze status|freeze|thaw' - if len(args) != 1: - print('Invalid argument') - print(usage) - sys.exit(1) - if args[0] not in ['status', 'freeze', 'thaw']: - print('Invalid command: ' + args[0]) - print(usage) - sys.exit(1) - cmd = args[0] - ret = client.fsfreeze(cmd) - if cmd == 'status': - print(ret) - return - - assert isinstance(ret, int) - verb = 'frozen' if cmd == 'freeze' else 'thawed' - print(f"{ret:d} filesystems {verb}") - - -def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - if len(args) == 0: - minimum = 0 - else: - minimum = int(args[0]) - print(client.fstrim(minimum)) - - -def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - assert not args - print(client.ifconfig()) - - -def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - assert not args - print(client.info()) - - -def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - timeout = 3.0 if len(args) == 0 else float(args[0]) - alive = client.ping(timeout) - if not alive: - print("Not responded in %s sec" % args[0]) - sys.exit(1) - - -def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - usage = 'Usage: suspend disk|ram|hybrid' - if len(args) != 1: - print('Less argument') - print(usage) - sys.exit(1) - if args[0] not in ['disk', 'ram', 'hybrid']: - print('Invalid command: ' + args[0]) - print(usage) - sys.exit(1) - client.suspend(args[0]) - - -def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - assert not args - client.shutdown() - - -_cmd_powerdown = _cmd_shutdown - - -def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - assert not args - client.shutdown('halt') - - -def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None: - assert not args - client.shutdown('reboot') - - -commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m] - - -def send_command(address: str, cmd: str, args: Sequence[str]) -> None: - if not os.path.exists(address): - print(f"'{address}' not found. (Is QEMU running?)") - sys.exit(1) - - if cmd not in commands: - print('Invalid command: ' + cmd) - print('Available commands: ' + ', '.join(commands)) - sys.exit(1) - - try: - client = QemuGuestAgentClient(address) - except ConnectError as err: - print(err) - if isinstance(err.exc, ConnectionError): - print('(Is QEMU running?)') - sys.exit(1) - - if cmd == 'fsfreeze' and args[0] == 'freeze': - client.sync(60) - elif cmd != 'ping': - client.sync() - - globals()['_cmd_' + cmd](client, args) - - -def main() -> None: - address = os.environ.get('QGA_CLIENT_ADDRESS') - - parser = argparse.ArgumentParser() - parser.add_argument('--address', action='store', - default=address, - help='Specify a ip:port pair or a unix socket path') - parser.add_argument('command', choices=commands) - parser.add_argument('args', nargs='*') - - args = parser.parse_args() - if args.address is None: - parser.error('address is not specified') - sys.exit(1) - - send_command(args.address, args.command, args.args) - - -if __name__ == '__main__': - main() diff --git a/python/qemu/qmp/qom.py b/python/qemu/qmp/qom.py deleted file mode 100644 index bb5d1a78f5..0000000000 --- a/python/qemu/qmp/qom.py +++ /dev/null @@ -1,273 +0,0 @@ -""" -QEMU Object Model testing tools. - -usage: qom [-h] {set,get,list,tree,fuse} ... - -Query and manipulate QOM data - -optional arguments: - -h, --help show this help message and exit - -QOM commands: - {set,get,list,tree,fuse} - set Set a QOM property value - get Get a QOM property value - list List QOM properties at a given path - tree Show QOM tree from a given path - fuse Mount a QOM tree as a FUSE filesystem -""" -## -# Copyright John Snow 2020, for Red Hat, Inc. -# Copyright IBM, Corp. 2011 -# -# Authors: -# John Snow -# Anthony Liguori -# -# This work is licensed under the terms of the GNU GPL, version 2 or later. -# See the COPYING file in the top-level directory. -# -# Based on ./scripts/qmp/qom-[set|get|tree|list] -## - -import argparse - -from qemu.aqmp import ExecuteError - -from .qom_common import QOMCommand - - -try: - from .qom_fuse import QOMFuse -except ModuleNotFoundError as _err: - if _err.name != 'fuse': - raise -else: - assert issubclass(QOMFuse, QOMCommand) - - -class QOMSet(QOMCommand): - """ - QOM Command - Set a property to a given value. - - usage: qom-set [-h] [--socket SOCKET] . - - Set a QOM property value - - positional arguments: - . QOM path and property, separated by a period '.' - new QOM property value - - optional arguments: - -h, --help show this help message and exit - --socket SOCKET, -s SOCKET - QMP socket path or address (addr:port). May also be - set via QMP_SOCKET environment variable. - """ - name = 'set' - help = 'Set a QOM property value' - - @classmethod - def configure_parser(cls, parser: argparse.ArgumentParser) -> None: - super().configure_parser(parser) - cls.add_path_prop_arg(parser) - parser.add_argument( - 'value', - metavar='', - action='store', - help='new QOM property value' - ) - - def __init__(self, args: argparse.Namespace): - super().__init__(args) - self.path, self.prop = args.path_prop.rsplit('.', 1) - self.value = args.value - - def run(self) -> int: - rsp = self.qmp.command( - 'qom-set', - path=self.path, - property=self.prop, - value=self.value - ) - print(rsp) - return 0 - - -class QOMGet(QOMCommand): - """ - QOM Command - Get a property's current value. - - usage: qom-get [-h] [--socket SOCKET] . - - Get a QOM property value - - positional arguments: - . QOM path and property, separated by a period '.' - - optional arguments: - -h, --help show this help message and exit - --socket SOCKET, -s SOCKET - QMP socket path or address (addr:port). May also be - set via QMP_SOCKET environment variable. - """ - name = 'get' - help = 'Get a QOM property value' - - @classmethod - def configure_parser(cls, parser: argparse.ArgumentParser) -> None: - super().configure_parser(parser) - cls.add_path_prop_arg(parser) - - def __init__(self, args: argparse.Namespace): - super().__init__(args) - try: - tmp = args.path_prop.rsplit('.', 1) - except ValueError as err: - raise ValueError('Invalid format for .') from err - self.path = tmp[0] - self.prop = tmp[1] - - def run(self) -> int: - rsp = self.qmp.command( - 'qom-get', - path=self.path, - property=self.prop - ) - if isinstance(rsp, dict): - for key, value in rsp.items(): - print(f"{key}: {value}") - else: - print(rsp) - return 0 - - -class QOMList(QOMCommand): - """ - QOM Command - List the properties at a given path. - - usage: qom-list [-h] [--socket SOCKET] - - List QOM properties at a given path - - positional arguments: - QOM path - - optional arguments: - -h, --help show this help message and exit - --socket SOCKET, -s SOCKET - QMP socket path or address (addr:port). May also be - set via QMP_SOCKET environment variable. - """ - name = 'list' - help = 'List QOM properties at a given path' - - @classmethod - def configure_parser(cls, parser: argparse.ArgumentParser) -> None: - super().configure_parser(parser) - parser.add_argument( - 'path', - metavar='', - action='store', - help='QOM path', - ) - - def __init__(self, args: argparse.Namespace): - super().__init__(args) - self.path = args.path - - def run(self) -> int: - rsp = self.qom_list(self.path) - for item in rsp: - if item.child: - print(f"{item.name}/") - elif item.link: - print(f"@{item.name}/") - else: - print(item.name) - return 0 - - -class QOMTree(QOMCommand): - """ - QOM Command - Show the full tree below a given path. - - usage: qom-tree [-h] [--socket SOCKET] [] - - Show QOM tree from a given path - - positional arguments: - QOM path - - optional arguments: - -h, --help show this help message and exit - --socket SOCKET, -s SOCKET - QMP socket path or address (addr:port). May also be - set via QMP_SOCKET environment variable. - """ - name = 'tree' - help = 'Show QOM tree from a given path' - - @classmethod - def configure_parser(cls, parser: argparse.ArgumentParser) -> None: - super().configure_parser(parser) - parser.add_argument( - 'path', - metavar='', - action='store', - help='QOM path', - nargs='?', - default='/' - ) - - def __init__(self, args: argparse.Namespace): - super().__init__(args) - self.path = args.path - - def _list_node(self, path: str) -> None: - print(path) - items = self.qom_list(path) - for item in items: - if item.child: - continue - try: - rsp = self.qmp.command('qom-get', path=path, - property=item.name) - print(f" {item.name}: {rsp} ({item.type})") - except ExecuteError as err: - print(f" {item.name}: ({item.type})") - print('') - for item in items: - if not item.child: - continue - if path == '/': - path = '' - self._list_node(f"{path}/{item.name}") - - def run(self) -> int: - self._list_node(self.path) - return 0 - - -def main() -> int: - """QOM script main entry point.""" - parser = argparse.ArgumentParser( - description='Query and manipulate QOM data' - ) - subparsers = parser.add_subparsers( - title='QOM commands', - dest='command' - ) - - for command in QOMCommand.__subclasses__(): - command.register(subparsers) - - args = parser.parse_args() - - if args.command is None: - parser.error('Command not specified.') - return 1 - - cmd_class = args.cmd_class - assert isinstance(cmd_class, type(QOMCommand)) - return cmd_class.command_runner(args) diff --git a/python/qemu/qmp/qom_common.py b/python/qemu/qmp/qom_common.py deleted file mode 100644 index e034a6f247..0000000000 --- a/python/qemu/qmp/qom_common.py +++ /dev/null @@ -1,175 +0,0 @@ -""" -QOM Command abstractions. -""" -## -# Copyright John Snow 2020, for Red Hat, Inc. -# Copyright IBM, Corp. 2011 -# -# Authors: -# John Snow -# Anthony Liguori -# -# This work is licensed under the terms of the GNU GPL, version 2 or later. -# See the COPYING file in the top-level directory. -# -# Based on ./scripts/qmp/qom-[set|get|tree|list] -## - -import argparse -import os -import sys -from typing import ( - Any, - Dict, - List, - Optional, - Type, - TypeVar, -) - -from qemu.aqmp import QMPError -from qemu.aqmp.legacy import QEMUMonitorProtocol - - -class ObjectPropertyInfo: - """ - Represents the return type from e.g. qom-list. - """ - def __init__(self, name: str, type_: str, - description: Optional[str] = None, - default_value: Optional[object] = None): - self.name = name - self.type = type_ - self.description = description - self.default_value = default_value - - @classmethod - def make(cls, value: Dict[str, Any]) -> 'ObjectPropertyInfo': - """ - Build an ObjectPropertyInfo from a Dict with an unknown shape. - """ - assert value.keys() >= {'name', 'type'} - assert value.keys() <= {'name', 'type', 'description', 'default-value'} - return cls(value['name'], value['type'], - value.get('description'), - value.get('default-value')) - - @property - def child(self) -> bool: - """Is this property a child property?""" - return self.type.startswith('child<') - - @property - def link(self) -> bool: - """Is this property a link property?""" - return self.type.startswith('link<') - - -CommandT = TypeVar('CommandT', bound='QOMCommand') - - -class QOMCommand: - """ - Represents a QOM sub-command. - - :param args: Parsed arguments, as returned from parser.parse_args. - """ - name: str - help: str - - def __init__(self, args: argparse.Namespace): - if args.socket is None: - raise QMPError("No QMP socket path or address given") - self.qmp = QEMUMonitorProtocol( - QEMUMonitorProtocol.parse_address(args.socket) - ) - self.qmp.connect() - - @classmethod - def register(cls, subparsers: Any) -> None: - """ - Register this command with the argument parser. - - :param subparsers: argparse subparsers object, from "add_subparsers". - """ - subparser = subparsers.add_parser(cls.name, help=cls.help, - description=cls.help) - cls.configure_parser(subparser) - - @classmethod - def configure_parser(cls, parser: argparse.ArgumentParser) -> None: - """ - Configure a parser with this command's arguments. - - :param parser: argparse parser or subparser object. - """ - default_path = os.environ.get('QMP_SOCKET') - parser.add_argument( - '--socket', '-s', - dest='socket', - action='store', - help='QMP socket path or address (addr:port).' - ' May also be set via QMP_SOCKET environment variable.', - default=default_path - ) - parser.set_defaults(cmd_class=cls) - - @classmethod - def add_path_prop_arg(cls, parser: argparse.ArgumentParser) -> None: - """ - Add the . positional argument to this command. - - :param parser: The parser to add the argument to. - """ - parser.add_argument( - 'path_prop', - metavar='.', - action='store', - help="QOM path and property, separated by a period '.'" - ) - - def run(self) -> int: - """ - Run this command. - - :return: 0 on success, 1 otherwise. - """ - raise NotImplementedError - - def qom_list(self, path: str) -> List[ObjectPropertyInfo]: - """ - :return: a strongly typed list from the 'qom-list' command. - """ - rsp = self.qmp.command('qom-list', path=path) - # qom-list returns List[ObjectPropertyInfo] - assert isinstance(rsp, list) - return [ObjectPropertyInfo.make(x) for x in rsp] - - @classmethod - def command_runner( - cls: Type[CommandT], - args: argparse.Namespace - ) -> int: - """ - Run a fully-parsed subcommand, with error-handling for the CLI. - - :return: The return code from `run()`. - """ - try: - cmd = cls(args) - return cmd.run() - except QMPError as err: - print(f"{type(err).__name__}: {err!s}", file=sys.stderr) - return -1 - - @classmethod - def entry_point(cls) -> int: - """ - Build this command's parser, parse arguments, and run the command. - - :return: `run`'s return code. - """ - parser = argparse.ArgumentParser(description=cls.help) - cls.configure_parser(parser) - args = parser.parse_args() - return cls.command_runner(args) diff --git a/python/qemu/qmp/qom_fuse.py b/python/qemu/qmp/qom_fuse.py deleted file mode 100644 index 653a76b93b..0000000000 --- a/python/qemu/qmp/qom_fuse.py +++ /dev/null @@ -1,207 +0,0 @@ -""" -QEMU Object Model FUSE filesystem tool - -This script offers a simple FUSE filesystem within which the QOM tree -may be browsed, queried and edited using traditional shell tooling. - -This script requires the 'fusepy' python package. - - -usage: qom-fuse [-h] [--socket SOCKET] - -Mount a QOM tree as a FUSE filesystem - -positional arguments: - Mount point - -optional arguments: - -h, --help show this help message and exit - --socket SOCKET, -s SOCKET - QMP socket path or address (addr:port). May also be - set via QMP_SOCKET environment variable. -""" -## -# Copyright IBM, Corp. 2012 -# Copyright (C) 2020 Red Hat, Inc. -# -# Authors: -# Anthony Liguori -# Markus Armbruster -# -# This work is licensed under the terms of the GNU GPL, version 2 or later. -# See the COPYING file in the top-level directory. -## - -import argparse -from errno import ENOENT, EPERM -import stat -import sys -from typing import ( - IO, - Dict, - Iterator, - Mapping, - Optional, - Union, -) - -import fuse -from fuse import FUSE, FuseOSError, Operations - -from qemu.aqmp import ExecuteError - -from .qom_common import QOMCommand - - -fuse.fuse_python_api = (0, 2) - - -class QOMFuse(QOMCommand, Operations): - """ - QOMFuse implements both fuse.Operations and QOMCommand. - - Operations implements the FS, and QOMCommand implements the CLI command. - """ - name = 'fuse' - help = 'Mount a QOM tree as a FUSE filesystem' - fuse: FUSE - - @classmethod - def configure_parser(cls, parser: argparse.ArgumentParser) -> None: - super().configure_parser(parser) - parser.add_argument( - 'mount', - metavar='', - action='store', - help="Mount point", - ) - - def __init__(self, args: argparse.Namespace): - super().__init__(args) - self.mount = args.mount - self.ino_map: Dict[str, int] = {} - self.ino_count = 1 - - def run(self) -> int: - print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr) - self.fuse = FUSE(self, self.mount, foreground=True) - return 0 - - def get_ino(self, path: str) -> int: - """Get an inode number for a given QOM path.""" - if path in self.ino_map: - return self.ino_map[path] - self.ino_map[path] = self.ino_count - self.ino_count += 1 - return self.ino_map[path] - - def is_object(self, path: str) -> bool: - """Is the given QOM path an object?""" - try: - self.qom_list(path) - return True - except ExecuteError: - return False - - def is_property(self, path: str) -> bool: - """Is the given QOM path a property?""" - path, prop = path.rsplit('/', 1) - if path == '': - path = '/' - try: - for item in self.qom_list(path): - if item.name == prop: - return True - return False - except ExecuteError: - return False - - def is_link(self, path: str) -> bool: - """Is the given QOM path a link?""" - path, prop = path.rsplit('/', 1) - if path == '': - path = '/' - try: - for item in self.qom_list(path): - if item.name == prop and item.link: - return True - return False - except ExecuteError: - return False - - def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes: - if not self.is_property(path): - raise FuseOSError(ENOENT) - - path, prop = path.rsplit('/', 1) - if path == '': - path = '/' - try: - data = str(self.qmp.command('qom-get', path=path, property=prop)) - data += '\n' # make values shell friendly - except ExecuteError as err: - raise FuseOSError(EPERM) from err - - if offset > len(data): - return b'' - - return bytes(data[offset:][:size], encoding='utf-8') - - def readlink(self, path: str) -> Union[bool, str]: - if not self.is_link(path): - return False - path, prop = path.rsplit('/', 1) - prefix = '/'.join(['..'] * (len(path.split('/')) - 1)) - return prefix + str(self.qmp.command('qom-get', path=path, - property=prop)) - - def getattr(self, path: str, - fh: Optional[IO[bytes]] = None) -> Mapping[str, object]: - if self.is_link(path): - value = { - 'st_mode': 0o755 | stat.S_IFLNK, - 'st_ino': self.get_ino(path), - 'st_dev': 0, - 'st_nlink': 2, - 'st_uid': 1000, - 'st_gid': 1000, - 'st_size': 4096, - 'st_atime': 0, - 'st_mtime': 0, - 'st_ctime': 0 - } - elif self.is_object(path): - value = { - 'st_mode': 0o755 | stat.S_IFDIR, - 'st_ino': self.get_ino(path), - 'st_dev': 0, - 'st_nlink': 2, - 'st_uid': 1000, - 'st_gid': 1000, - 'st_size': 4096, - 'st_atime': 0, - 'st_mtime': 0, - 'st_ctime': 0 - } - elif self.is_property(path): - value = { - 'st_mode': 0o644 | stat.S_IFREG, - 'st_ino': self.get_ino(path), - 'st_dev': 0, - 'st_nlink': 1, - 'st_uid': 1000, - 'st_gid': 1000, - 'st_size': 4096, - 'st_atime': 0, - 'st_mtime': 0, - 'st_ctime': 0 - } - else: - raise FuseOSError(ENOENT) - return value - - def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]: - yield '.' - yield '..' - for item in self.qom_list(path): - yield item.name diff --git a/python/qemu/utils/qemu_ga_client.py b/python/qemu/utils/qemu_ga_client.py new file mode 100644 index 0000000000..15ed430c61 --- /dev/null +++ b/python/qemu/utils/qemu_ga_client.py @@ -0,0 +1,323 @@ +""" +QEMU Guest Agent Client + +Usage: + +Start QEMU with: + +# qemu [...] -chardev socket,path=/tmp/qga.sock,server=on,wait=off,id=qga0 \ + -device virtio-serial \ + -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0 + +Run the script: + +$ qemu-ga-client --address=/tmp/qga.sock [args...] + +or + +$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock +$ qemu-ga-client [args...] + +For example: + +$ qemu-ga-client cat /etc/resolv.conf +# Generated by NetworkManager +nameserver 10.0.2.3 +$ qemu-ga-client fsfreeze status +thawed +$ qemu-ga-client fsfreeze freeze +2 filesystems frozen + +See also: https://wiki.qemu.org/Features/QAPI/GuestAgent +""" + +# Copyright (C) 2012 Ryota Ozaki +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. + +import argparse +import asyncio +import base64 +import os +import random +import sys +from typing import ( + Any, + Callable, + Dict, + Optional, + Sequence, +) + +from qemu.aqmp import ConnectError, SocketAddrT +from qemu.aqmp.legacy import QEMUMonitorProtocol + + +# This script has not seen many patches or careful attention in quite +# some time. If you would like to improve it, please review the design +# carefully and add docstrings at that point in time. Until then: + +# pylint: disable=missing-docstring + + +class QemuGuestAgent(QEMUMonitorProtocol): + def __getattr__(self, name: str) -> Callable[..., Any]: + def wrapper(**kwds: object) -> object: + return self.command('guest-' + name.replace('_', '-'), **kwds) + return wrapper + + +class QemuGuestAgentClient: + def __init__(self, address: SocketAddrT): + self.qga = QemuGuestAgent(address) + self.qga.connect(negotiate=False) + + def sync(self, timeout: Optional[float] = 3) -> None: + # Avoid being blocked forever + if not self.ping(timeout): + raise EnvironmentError('Agent seems not alive') + uid = random.randint(0, (1 << 32) - 1) + while True: + ret = self.qga.sync(id=uid) + if isinstance(ret, int) and int(ret) == uid: + break + + def __file_read_all(self, handle: int) -> bytes: + eof = False + data = b'' + while not eof: + ret = self.qga.file_read(handle=handle, count=1024) + _data = base64.b64decode(ret['buf-b64']) + data += _data + eof = ret['eof'] + return data + + def read(self, path: str) -> bytes: + handle = self.qga.file_open(path=path) + try: + data = self.__file_read_all(handle) + finally: + self.qga.file_close(handle=handle) + return data + + def info(self) -> str: + info = self.qga.info() + + msgs = [] + msgs.append('version: ' + info['version']) + msgs.append('supported_commands:') + enabled = [c['name'] for c in info['supported_commands'] + if c['enabled']] + msgs.append('\tenabled: ' + ', '.join(enabled)) + disabled = [c['name'] for c in info['supported_commands'] + if not c['enabled']] + msgs.append('\tdisabled: ' + ', '.join(disabled)) + + return '\n'.join(msgs) + + @classmethod + def __gen_ipv4_netmask(cls, prefixlen: int) -> str: + mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2) + return '.'.join([str(mask >> 24), + str((mask >> 16) & 0xff), + str((mask >> 8) & 0xff), + str(mask & 0xff)]) + + def ifconfig(self) -> str: + nifs = self.qga.network_get_interfaces() + + msgs = [] + for nif in nifs: + msgs.append(nif['name'] + ':') + if 'ip-addresses' in nif: + for ipaddr in nif['ip-addresses']: + if ipaddr['ip-address-type'] == 'ipv4': + addr = ipaddr['ip-address'] + mask = self.__gen_ipv4_netmask(int(ipaddr['prefix'])) + msgs.append(f"\tinet {addr} netmask {mask}") + elif ipaddr['ip-address-type'] == 'ipv6': + addr = ipaddr['ip-address'] + prefix = ipaddr['prefix'] + msgs.append(f"\tinet6 {addr} prefixlen {prefix}") + if nif['hardware-address'] != '00:00:00:00:00:00': + msgs.append("\tether " + nif['hardware-address']) + + return '\n'.join(msgs) + + def ping(self, timeout: Optional[float]) -> bool: + self.qga.settimeout(timeout) + try: + self.qga.ping() + except asyncio.TimeoutError: + return False + return True + + def fsfreeze(self, cmd: str) -> object: + if cmd not in ['status', 'freeze', 'thaw']: + raise Exception('Invalid command: ' + cmd) + # Can be int (freeze, thaw) or GuestFsfreezeStatus (status) + return getattr(self.qga, 'fsfreeze' + '_' + cmd)() + + def fstrim(self, minimum: int) -> Dict[str, object]: + # returns GuestFilesystemTrimResponse + ret = getattr(self.qga, 'fstrim')(minimum=minimum) + assert isinstance(ret, dict) + return ret + + def suspend(self, mode: str) -> None: + if mode not in ['disk', 'ram', 'hybrid']: + raise Exception('Invalid mode: ' + mode) + + try: + getattr(self.qga, 'suspend' + '_' + mode)() + # On error exception will raise + except asyncio.TimeoutError: + # On success command will timed out + return + + def shutdown(self, mode: str = 'powerdown') -> None: + if mode not in ['powerdown', 'halt', 'reboot']: + raise Exception('Invalid mode: ' + mode) + + try: + self.qga.shutdown(mode=mode) + except asyncio.TimeoutError: + pass + + +def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + if len(args) != 1: + print('Invalid argument') + print('Usage: cat ') + sys.exit(1) + print(client.read(args[0])) + + +def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + usage = 'Usage: fsfreeze status|freeze|thaw' + if len(args) != 1: + print('Invalid argument') + print(usage) + sys.exit(1) + if args[0] not in ['status', 'freeze', 'thaw']: + print('Invalid command: ' + args[0]) + print(usage) + sys.exit(1) + cmd = args[0] + ret = client.fsfreeze(cmd) + if cmd == 'status': + print(ret) + return + + assert isinstance(ret, int) + verb = 'frozen' if cmd == 'freeze' else 'thawed' + print(f"{ret:d} filesystems {verb}") + + +def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + if len(args) == 0: + minimum = 0 + else: + minimum = int(args[0]) + print(client.fstrim(minimum)) + + +def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + print(client.ifconfig()) + + +def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + print(client.info()) + + +def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + timeout = 3.0 if len(args) == 0 else float(args[0]) + alive = client.ping(timeout) + if not alive: + print("Not responded in %s sec" % args[0]) + sys.exit(1) + + +def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + usage = 'Usage: suspend disk|ram|hybrid' + if len(args) != 1: + print('Less argument') + print(usage) + sys.exit(1) + if args[0] not in ['disk', 'ram', 'hybrid']: + print('Invalid command: ' + args[0]) + print(usage) + sys.exit(1) + client.suspend(args[0]) + + +def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + client.shutdown() + + +_cmd_powerdown = _cmd_shutdown + + +def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + client.shutdown('halt') + + +def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None: + assert not args + client.shutdown('reboot') + + +commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m] + + +def send_command(address: str, cmd: str, args: Sequence[str]) -> None: + if not os.path.exists(address): + print(f"'{address}' not found. (Is QEMU running?)") + sys.exit(1) + + if cmd not in commands: + print('Invalid command: ' + cmd) + print('Available commands: ' + ', '.join(commands)) + sys.exit(1) + + try: + client = QemuGuestAgentClient(address) + except ConnectError as err: + print(err) + if isinstance(err.exc, ConnectionError): + print('(Is QEMU running?)') + sys.exit(1) + + if cmd == 'fsfreeze' and args[0] == 'freeze': + client.sync(60) + elif cmd != 'ping': + client.sync() + + globals()['_cmd_' + cmd](client, args) + + +def main() -> None: + address = os.environ.get('QGA_CLIENT_ADDRESS') + + parser = argparse.ArgumentParser() + parser.add_argument('--address', action='store', + default=address, + help='Specify a ip:port pair or a unix socket path') + parser.add_argument('command', choices=commands) + parser.add_argument('args', nargs='*') + + args = parser.parse_args() + if args.address is None: + parser.error('address is not specified') + sys.exit(1) + + send_command(args.address, args.command, args.args) + + +if __name__ == '__main__': + main() diff --git a/python/qemu/utils/qom.py b/python/qemu/utils/qom.py new file mode 100644 index 0000000000..bb5d1a78f5 --- /dev/null +++ b/python/qemu/utils/qom.py @@ -0,0 +1,273 @@ +""" +QEMU Object Model testing tools. + +usage: qom [-h] {set,get,list,tree,fuse} ... + +Query and manipulate QOM data + +optional arguments: + -h, --help show this help message and exit + +QOM commands: + {set,get,list,tree,fuse} + set Set a QOM property value + get Get a QOM property value + list List QOM properties at a given path + tree Show QOM tree from a given path + fuse Mount a QOM tree as a FUSE filesystem +""" +## +# Copyright John Snow 2020, for Red Hat, Inc. +# Copyright IBM, Corp. 2011 +# +# Authors: +# John Snow +# Anthony Liguori +# +# This work is licensed under the terms of the GNU GPL, version 2 or later. +# See the COPYING file in the top-level directory. +# +# Based on ./scripts/qmp/qom-[set|get|tree|list] +## + +import argparse + +from qemu.aqmp import ExecuteError + +from .qom_common import QOMCommand + + +try: + from .qom_fuse import QOMFuse +except ModuleNotFoundError as _err: + if _err.name != 'fuse': + raise +else: + assert issubclass(QOMFuse, QOMCommand) + + +class QOMSet(QOMCommand): + """ + QOM Command - Set a property to a given value. + + usage: qom-set [-h] [--socket SOCKET] . + + Set a QOM property value + + positional arguments: + . QOM path and property, separated by a period '.' + new QOM property value + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'set' + help = 'Set a QOM property value' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + cls.add_path_prop_arg(parser) + parser.add_argument( + 'value', + metavar='', + action='store', + help='new QOM property value' + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.path, self.prop = args.path_prop.rsplit('.', 1) + self.value = args.value + + def run(self) -> int: + rsp = self.qmp.command( + 'qom-set', + path=self.path, + property=self.prop, + value=self.value + ) + print(rsp) + return 0 + + +class QOMGet(QOMCommand): + """ + QOM Command - Get a property's current value. + + usage: qom-get [-h] [--socket SOCKET] . + + Get a QOM property value + + positional arguments: + . QOM path and property, separated by a period '.' + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'get' + help = 'Get a QOM property value' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + cls.add_path_prop_arg(parser) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + try: + tmp = args.path_prop.rsplit('.', 1) + except ValueError as err: + raise ValueError('Invalid format for .') from err + self.path = tmp[0] + self.prop = tmp[1] + + def run(self) -> int: + rsp = self.qmp.command( + 'qom-get', + path=self.path, + property=self.prop + ) + if isinstance(rsp, dict): + for key, value in rsp.items(): + print(f"{key}: {value}") + else: + print(rsp) + return 0 + + +class QOMList(QOMCommand): + """ + QOM Command - List the properties at a given path. + + usage: qom-list [-h] [--socket SOCKET] + + List QOM properties at a given path + + positional arguments: + QOM path + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'list' + help = 'List QOM properties at a given path' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + parser.add_argument( + 'path', + metavar='', + action='store', + help='QOM path', + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.path = args.path + + def run(self) -> int: + rsp = self.qom_list(self.path) + for item in rsp: + if item.child: + print(f"{item.name}/") + elif item.link: + print(f"@{item.name}/") + else: + print(item.name) + return 0 + + +class QOMTree(QOMCommand): + """ + QOM Command - Show the full tree below a given path. + + usage: qom-tree [-h] [--socket SOCKET] [] + + Show QOM tree from a given path + + positional arguments: + QOM path + + optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. + """ + name = 'tree' + help = 'Show QOM tree from a given path' + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + parser.add_argument( + 'path', + metavar='', + action='store', + help='QOM path', + nargs='?', + default='/' + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.path = args.path + + def _list_node(self, path: str) -> None: + print(path) + items = self.qom_list(path) + for item in items: + if item.child: + continue + try: + rsp = self.qmp.command('qom-get', path=path, + property=item.name) + print(f" {item.name}: {rsp} ({item.type})") + except ExecuteError as err: + print(f" {item.name}: ({item.type})") + print('') + for item in items: + if not item.child: + continue + if path == '/': + path = '' + self._list_node(f"{path}/{item.name}") + + def run(self) -> int: + self._list_node(self.path) + return 0 + + +def main() -> int: + """QOM script main entry point.""" + parser = argparse.ArgumentParser( + description='Query and manipulate QOM data' + ) + subparsers = parser.add_subparsers( + title='QOM commands', + dest='command' + ) + + for command in QOMCommand.__subclasses__(): + command.register(subparsers) + + args = parser.parse_args() + + if args.command is None: + parser.error('Command not specified.') + return 1 + + cmd_class = args.cmd_class + assert isinstance(cmd_class, type(QOMCommand)) + return cmd_class.command_runner(args) diff --git a/python/qemu/utils/qom_common.py b/python/qemu/utils/qom_common.py new file mode 100644 index 0000000000..e034a6f247 --- /dev/null +++ b/python/qemu/utils/qom_common.py @@ -0,0 +1,175 @@ +""" +QOM Command abstractions. +""" +## +# Copyright John Snow 2020, for Red Hat, Inc. +# Copyright IBM, Corp. 2011 +# +# Authors: +# John Snow +# Anthony Liguori +# +# This work is licensed under the terms of the GNU GPL, version 2 or later. +# See the COPYING file in the top-level directory. +# +# Based on ./scripts/qmp/qom-[set|get|tree|list] +## + +import argparse +import os +import sys +from typing import ( + Any, + Dict, + List, + Optional, + Type, + TypeVar, +) + +from qemu.aqmp import QMPError +from qemu.aqmp.legacy import QEMUMonitorProtocol + + +class ObjectPropertyInfo: + """ + Represents the return type from e.g. qom-list. + """ + def __init__(self, name: str, type_: str, + description: Optional[str] = None, + default_value: Optional[object] = None): + self.name = name + self.type = type_ + self.description = description + self.default_value = default_value + + @classmethod + def make(cls, value: Dict[str, Any]) -> 'ObjectPropertyInfo': + """ + Build an ObjectPropertyInfo from a Dict with an unknown shape. + """ + assert value.keys() >= {'name', 'type'} + assert value.keys() <= {'name', 'type', 'description', 'default-value'} + return cls(value['name'], value['type'], + value.get('description'), + value.get('default-value')) + + @property + def child(self) -> bool: + """Is this property a child property?""" + return self.type.startswith('child<') + + @property + def link(self) -> bool: + """Is this property a link property?""" + return self.type.startswith('link<') + + +CommandT = TypeVar('CommandT', bound='QOMCommand') + + +class QOMCommand: + """ + Represents a QOM sub-command. + + :param args: Parsed arguments, as returned from parser.parse_args. + """ + name: str + help: str + + def __init__(self, args: argparse.Namespace): + if args.socket is None: + raise QMPError("No QMP socket path or address given") + self.qmp = QEMUMonitorProtocol( + QEMUMonitorProtocol.parse_address(args.socket) + ) + self.qmp.connect() + + @classmethod + def register(cls, subparsers: Any) -> None: + """ + Register this command with the argument parser. + + :param subparsers: argparse subparsers object, from "add_subparsers". + """ + subparser = subparsers.add_parser(cls.name, help=cls.help, + description=cls.help) + cls.configure_parser(subparser) + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + """ + Configure a parser with this command's arguments. + + :param parser: argparse parser or subparser object. + """ + default_path = os.environ.get('QMP_SOCKET') + parser.add_argument( + '--socket', '-s', + dest='socket', + action='store', + help='QMP socket path or address (addr:port).' + ' May also be set via QMP_SOCKET environment variable.', + default=default_path + ) + parser.set_defaults(cmd_class=cls) + + @classmethod + def add_path_prop_arg(cls, parser: argparse.ArgumentParser) -> None: + """ + Add the . positional argument to this command. + + :param parser: The parser to add the argument to. + """ + parser.add_argument( + 'path_prop', + metavar='.', + action='store', + help="QOM path and property, separated by a period '.'" + ) + + def run(self) -> int: + """ + Run this command. + + :return: 0 on success, 1 otherwise. + """ + raise NotImplementedError + + def qom_list(self, path: str) -> List[ObjectPropertyInfo]: + """ + :return: a strongly typed list from the 'qom-list' command. + """ + rsp = self.qmp.command('qom-list', path=path) + # qom-list returns List[ObjectPropertyInfo] + assert isinstance(rsp, list) + return [ObjectPropertyInfo.make(x) for x in rsp] + + @classmethod + def command_runner( + cls: Type[CommandT], + args: argparse.Namespace + ) -> int: + """ + Run a fully-parsed subcommand, with error-handling for the CLI. + + :return: The return code from `run()`. + """ + try: + cmd = cls(args) + return cmd.run() + except QMPError as err: + print(f"{type(err).__name__}: {err!s}", file=sys.stderr) + return -1 + + @classmethod + def entry_point(cls) -> int: + """ + Build this command's parser, parse arguments, and run the command. + + :return: `run`'s return code. + """ + parser = argparse.ArgumentParser(description=cls.help) + cls.configure_parser(parser) + args = parser.parse_args() + return cls.command_runner(args) diff --git a/python/qemu/utils/qom_fuse.py b/python/qemu/utils/qom_fuse.py new file mode 100644 index 0000000000..653a76b93b --- /dev/null +++ b/python/qemu/utils/qom_fuse.py @@ -0,0 +1,207 @@ +""" +QEMU Object Model FUSE filesystem tool + +This script offers a simple FUSE filesystem within which the QOM tree +may be browsed, queried and edited using traditional shell tooling. + +This script requires the 'fusepy' python package. + + +usage: qom-fuse [-h] [--socket SOCKET] + +Mount a QOM tree as a FUSE filesystem + +positional arguments: + Mount point + +optional arguments: + -h, --help show this help message and exit + --socket SOCKET, -s SOCKET + QMP socket path or address (addr:port). May also be + set via QMP_SOCKET environment variable. +""" +## +# Copyright IBM, Corp. 2012 +# Copyright (C) 2020 Red Hat, Inc. +# +# Authors: +# Anthony Liguori +# Markus Armbruster +# +# This work is licensed under the terms of the GNU GPL, version 2 or later. +# See the COPYING file in the top-level directory. +## + +import argparse +from errno import ENOENT, EPERM +import stat +import sys +from typing import ( + IO, + Dict, + Iterator, + Mapping, + Optional, + Union, +) + +import fuse +from fuse import FUSE, FuseOSError, Operations + +from qemu.aqmp import ExecuteError + +from .qom_common import QOMCommand + + +fuse.fuse_python_api = (0, 2) + + +class QOMFuse(QOMCommand, Operations): + """ + QOMFuse implements both fuse.Operations and QOMCommand. + + Operations implements the FS, and QOMCommand implements the CLI command. + """ + name = 'fuse' + help = 'Mount a QOM tree as a FUSE filesystem' + fuse: FUSE + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + parser.add_argument( + 'mount', + metavar='', + action='store', + help="Mount point", + ) + + def __init__(self, args: argparse.Namespace): + super().__init__(args) + self.mount = args.mount + self.ino_map: Dict[str, int] = {} + self.ino_count = 1 + + def run(self) -> int: + print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr) + self.fuse = FUSE(self, self.mount, foreground=True) + return 0 + + def get_ino(self, path: str) -> int: + """Get an inode number for a given QOM path.""" + if path in self.ino_map: + return self.ino_map[path] + self.ino_map[path] = self.ino_count + self.ino_count += 1 + return self.ino_map[path] + + def is_object(self, path: str) -> bool: + """Is the given QOM path an object?""" + try: + self.qom_list(path) + return True + except ExecuteError: + return False + + def is_property(self, path: str) -> bool: + """Is the given QOM path a property?""" + path, prop = path.rsplit('/', 1) + if path == '': + path = '/' + try: + for item in self.qom_list(path): + if item.name == prop: + return True + return False + except ExecuteError: + return False + + def is_link(self, path: str) -> bool: + """Is the given QOM path a link?""" + path, prop = path.rsplit('/', 1) + if path == '': + path = '/' + try: + for item in self.qom_list(path): + if item.name == prop and item.link: + return True + return False + except ExecuteError: + return False + + def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes: + if not self.is_property(path): + raise FuseOSError(ENOENT) + + path, prop = path.rsplit('/', 1) + if path == '': + path = '/' + try: + data = str(self.qmp.command('qom-get', path=path, property=prop)) + data += '\n' # make values shell friendly + except ExecuteError as err: + raise FuseOSError(EPERM) from err + + if offset > len(data): + return b'' + + return bytes(data[offset:][:size], encoding='utf-8') + + def readlink(self, path: str) -> Union[bool, str]: + if not self.is_link(path): + return False + path, prop = path.rsplit('/', 1) + prefix = '/'.join(['..'] * (len(path.split('/')) - 1)) + return prefix + str(self.qmp.command('qom-get', path=path, + property=prop)) + + def getattr(self, path: str, + fh: Optional[IO[bytes]] = None) -> Mapping[str, object]: + if self.is_link(path): + value = { + 'st_mode': 0o755 | stat.S_IFLNK, + 'st_ino': self.get_ino(path), + 'st_dev': 0, + 'st_nlink': 2, + 'st_uid': 1000, + 'st_gid': 1000, + 'st_size': 4096, + 'st_atime': 0, + 'st_mtime': 0, + 'st_ctime': 0 + } + elif self.is_object(path): + value = { + 'st_mode': 0o755 | stat.S_IFDIR, + 'st_ino': self.get_ino(path), + 'st_dev': 0, + 'st_nlink': 2, + 'st_uid': 1000, + 'st_gid': 1000, + 'st_size': 4096, + 'st_atime': 0, + 'st_mtime': 0, + 'st_ctime': 0 + } + elif self.is_property(path): + value = { + 'st_mode': 0o644 | stat.S_IFREG, + 'st_ino': self.get_ino(path), + 'st_dev': 0, + 'st_nlink': 1, + 'st_uid': 1000, + 'st_gid': 1000, + 'st_size': 4096, + 'st_atime': 0, + 'st_mtime': 0, + 'st_ctime': 0 + } + else: + raise FuseOSError(ENOENT) + return value + + def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]: + yield '.' + yield '..' + for item in self.qom_list(path): + yield item.name diff --git a/python/setup.cfg b/python/setup.cfg index aa238d8bc9..04a41ef1a0 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -60,13 +60,13 @@ tui = [options.entry_points] console_scripts = - qom = qemu.qmp.qom:main - qom-set = qemu.qmp.qom:QOMSet.entry_point - qom-get = qemu.qmp.qom:QOMGet.entry_point - qom-list = qemu.qmp.qom:QOMList.entry_point - qom-tree = qemu.qmp.qom:QOMTree.entry_point - qom-fuse = qemu.qmp.qom_fuse:QOMFuse.entry_point [fuse] - qemu-ga-client = qemu.qmp.qemu_ga_client:main + qom = qemu.utils.qom:main + qom-set = qemu.utils.qom:QOMSet.entry_point + qom-get = qemu.utils.qom:QOMGet.entry_point + qom-list = qemu.utils.qom:QOMList.entry_point + qom-tree = qemu.utils.qom:QOMTree.entry_point + qom-fuse = qemu.utils.qom_fuse:QOMFuse.entry_point [fuse] + qemu-ga-client = qemu.utils.qemu_ga_client:main qmp-shell = qemu.qmp.qmp_shell:main aqmp-tui = qemu.aqmp.aqmp_tui:main [tui] @@ -80,7 +80,7 @@ python_version = 3.6 warn_unused_configs = True namespace_packages = True -[mypy-qemu.qmp.qom_fuse] +[mypy-qemu.utils.qom_fuse] # fusepy has no type stubs: allow_subclassing_any = True -- cgit v1.2.3-55-g7522