diff options
Diffstat (limited to 'python/qemu/aqmp/legacy.py')
-rw-r--r-- | python/qemu/aqmp/legacy.py | 317 |
1 files changed, 0 insertions, 317 deletions
diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py deleted file mode 100644 index dfcd20bbd2..0000000000 --- a/python/qemu/aqmp/legacy.py +++ /dev/null @@ -1,317 +0,0 @@ -""" -(Legacy) Sync QMP Wrapper - -This module provides the `QEMUMonitorProtocol` class, which is a -synchronous wrapper around `QMPClient`. - -Its design closely resembles that of the original QEMUMonitorProtocol -class, originally written by Luiz Capitulino. It is provided here for -compatibility with scripts inside the QEMU source tree that expect the -old interface. -""" - -# -# Copyright (C) 2009-2022 Red Hat Inc. -# -# Authors: -# Luiz Capitulino <lcapitulino@redhat.com> -# John Snow <jsnow@redhat.com> -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. -# - -import asyncio -from types import TracebackType -from typing import ( - Any, - Awaitable, - Dict, - List, - Optional, - Type, - TypeVar, - Union, -) - -from .error import QMPError -from .protocol import Runstate, SocketAddrT -from .qmp_client import QMPClient - - -#: QMPMessage is an entire QMP message of any kind. -QMPMessage = Dict[str, Any] - -#: QMPReturnValue is the 'return' value of a command. -QMPReturnValue = object - -#: QMPObject is any object in a QMP message. -QMPObject = Dict[str, object] - -# QMPMessage can be outgoing commands or incoming events/returns. -# QMPReturnValue is usually a dict/json object, but due to QAPI's -# 'returns-whitelist', it can actually be anything. -# -# {'return': {}} is a QMPMessage, -# {} is the QMPReturnValue. - - -class QMPBadPortError(QMPError): - """ - Unable to parse socket address: Port was non-numerical. - """ - - -class QEMUMonitorProtocol: - """ - Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) - and then allow to handle commands and events. - - :param address: QEMU address, can be either a unix socket path (string) - or a tuple in the form ( address, port ) for a TCP - connection - :param server: Act as the socket server. (See 'accept') - :param nickname: Optional nickname used for logging. - """ - - def __init__(self, address: SocketAddrT, - server: bool = False, - nickname: Optional[str] = None): - - self._aqmp = QMPClient(nickname) - self._aloop = asyncio.get_event_loop() - self._address = address - self._timeout: Optional[float] = None - - if server: - self._sync(self._aqmp.start_server(self._address)) - - _T = TypeVar('_T') - - def _sync( - self, future: Awaitable[_T], timeout: Optional[float] = None - ) -> _T: - return self._aloop.run_until_complete( - asyncio.wait_for(future, timeout=timeout) - ) - - def _get_greeting(self) -> Optional[QMPMessage]: - if self._aqmp.greeting is not None: - # pylint: disable=protected-access - return self._aqmp.greeting._asdict() - return None - - def __enter__(self: _T) -> _T: - # Implement context manager enter function. - return self - - def __exit__(self, - # pylint: disable=duplicate-code - # see https://github.com/PyCQA/pylint/issues/3619 - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType]) -> None: - # Implement context manager exit function. - self.close() - - @classmethod - def parse_address(cls, address: str) -> SocketAddrT: - """ - Parse a string into a QMP address. - - Figure out if the argument is in the port:host form. - If it's not, it's probably a file path. - """ - components = address.split(':') - if len(components) == 2: - try: - port = int(components[1]) - except ValueError: - msg = f"Bad port: '{components[1]}' in '{address}'." - raise QMPBadPortError(msg) from None - return (components[0], port) - - # Treat as filepath. - return address - - def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: - """ - Connect to the QMP Monitor and perform capabilities negotiation. - - :return: QMP greeting dict, or None if negotiate is false - :raise ConnectError: on connection errors - """ - self._aqmp.await_greeting = negotiate - self._aqmp.negotiate = negotiate - - self._sync( - self._aqmp.connect(self._address) - ) - return self._get_greeting() - - def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: - """ - Await connection from QMP Monitor and perform capabilities negotiation. - - :param timeout: - timeout in seconds (nonnegative float number, or None). - If None, there is no timeout, and this may block forever. - - :return: QMP greeting dict - :raise ConnectError: on connection errors - """ - self._aqmp.await_greeting = True - self._aqmp.negotiate = True - - self._sync(self._aqmp.accept(), timeout) - - ret = self._get_greeting() - assert ret is not None - return ret - - def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: - """ - Send a QMP command to the QMP Monitor. - - :param qmp_cmd: QMP command to be sent as a Python dict - :return: QMP response as a Python dict - """ - return dict( - self._sync( - # pylint: disable=protected-access - - # _raw() isn't a public API, because turning off - # automatic ID assignment is discouraged. For - # compatibility with iotests *only*, do it anyway. - self._aqmp._raw(qmp_cmd, assign_id=False), - self._timeout - ) - ) - - def cmd(self, name: str, - args: Optional[Dict[str, object]] = None, - cmd_id: Optional[object] = None) -> QMPMessage: - """ - Build a QMP command and send it to the QMP Monitor. - - :param name: command name (string) - :param args: command arguments (dict) - :param cmd_id: command id (dict, list, string or int) - """ - qmp_cmd: QMPMessage = {'execute': name} - if args: - qmp_cmd['arguments'] = args - if cmd_id: - qmp_cmd['id'] = cmd_id - return self.cmd_obj(qmp_cmd) - - def command(self, cmd: str, **kwds: object) -> QMPReturnValue: - """ - Build and send a QMP command to the monitor, report errors if any - """ - return self._sync( - self._aqmp.execute(cmd, kwds), - self._timeout - ) - - def pull_event(self, - wait: Union[bool, float] = False) -> Optional[QMPMessage]: - """ - Pulls a single event. - - :param wait: - If False or 0, do not wait. Return None if no events ready. - If True, wait forever until the next event. - Otherwise, wait for the specified number of seconds. - - :raise asyncio.TimeoutError: - When a timeout is requested and the timeout period elapses. - - :return: The first available QMP event, or None. - """ - if not wait: - # wait is False/0: "do not wait, do not except." - if self._aqmp.events.empty(): - return None - - # If wait is 'True', wait forever. If wait is False/0, the events - # queue must not be empty; but it still needs some real amount - # of time to complete. - timeout = None - if wait and isinstance(wait, float): - timeout = wait - - return dict( - self._sync( - self._aqmp.events.get(), - timeout - ) - ) - - def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: - """ - Get a list of QMP events and clear all pending events. - - :param wait: - If False or 0, do not wait. Return None if no events ready. - If True, wait until we have at least one event. - Otherwise, wait for up to the specified number of seconds for at - least one event. - - :raise asyncio.TimeoutError: - When a timeout is requested and the timeout period elapses. - - :return: A list of QMP events. - """ - events = [dict(x) for x in self._aqmp.events.clear()] - if events: - return events - - event = self.pull_event(wait) - return [event] if event is not None else [] - - def clear_events(self) -> None: - """Clear current list of pending events.""" - self._aqmp.events.clear() - - def close(self) -> None: - """Close the connection.""" - self._sync( - self._aqmp.disconnect() - ) - - def settimeout(self, timeout: Optional[float]) -> None: - """ - Set the timeout for QMP RPC execution. - - This timeout affects the `cmd`, `cmd_obj`, and `command` methods. - The `accept`, `pull_event` and `get_event` methods have their - own configurable timeouts. - - :param timeout: - timeout in seconds, or None. - None will wait indefinitely. - """ - self._timeout = timeout - - def send_fd_scm(self, fd: int) -> None: - """ - Send a file descriptor to the remote via SCM_RIGHTS. - """ - self._aqmp.send_fd_scm(fd) - - def __del__(self) -> None: - if self._aqmp.runstate == Runstate.IDLE: - return - - if not self._aloop.is_running(): - self.close() - else: - # Garbage collection ran while the event loop was running. - # Nothing we can do about it now, but if we don't raise our - # own error, the user will be treated to a lot of traceback - # they might not understand. - raise QMPError( - "QEMUMonitorProtocol.close()" - " was not called before object was garbage collected" - ) |