diff options
Diffstat (limited to 'python/qemu/qmp/__init__.py')
-rw-r--r-- | python/qemu/qmp/__init__.py | 396 |
1 files changed, 0 insertions, 396 deletions
diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py deleted file mode 100644 index 4e08641154..0000000000 --- a/python/qemu/qmp/__init__.py +++ /dev/null @@ -1,396 +0,0 @@ -""" -QEMU Monitor Protocol (QMP) development library & tooling. - -This package provides a fairly low-level class for communicating to QMP -protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the -QEMU Storage Daemon. This library is not intended for production use. - -`QEMUMonitorProtocol` is the primary class of interest, and all errors -raised derive from `QMPError`. -""" - -# Copyright (C) 2009, 2010 Red Hat Inc. -# -# Authors: -# Luiz Capitulino <lcapitulino@redhat.com> -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. - -import errno -import json -import logging -import socket -import struct -from types import TracebackType -from typing import ( - Any, - Dict, - List, - Optional, - TextIO, - Tuple, - Type, - TypeVar, - Union, - cast, -) - - -#: QMPMessage is an entire QMP message of any kind. -QMPMessage = Dict[str, Any] - -#: QMPReturnValue is the 'return' value of a command. -QMPReturnValue = object - -#: QMPObject is any object in a QMP message. -QMPObject = Dict[str, object] - -# QMPMessage can be outgoing commands or incoming events/returns. -# QMPReturnValue is usually a dict/json object, but due to QAPI's -# 'returns-whitelist', it can actually be anything. -# -# {'return': {}} is a QMPMessage, -# {} is the QMPReturnValue. - - -InternetAddrT = Tuple[str, int] -UnixAddrT = str -SocketAddrT = Union[InternetAddrT, UnixAddrT] - - -class QMPError(Exception): - """ - QMP base exception - """ - - -class QMPConnectError(QMPError): - """ - QMP connection exception - """ - - -class QMPCapabilitiesError(QMPError): - """ - QMP negotiate capabilities exception - """ - - -class QMPTimeoutError(QMPError): - """ - QMP timeout exception - """ - - -class QMPProtocolError(QMPError): - """ - QMP protocol error; unexpected response - """ - - -class QMPResponseError(QMPError): - """ - Represents erroneous QMP monitor reply - """ - def __init__(self, reply: QMPMessage): - try: - desc = reply['error']['desc'] - except KeyError: - desc = reply - super().__init__(desc) - self.reply = reply - - -class QEMUMonitorProtocol: - """ - Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then - allow to handle commands and events. - """ - - #: Logger object for debugging messages - logger = logging.getLogger('QMP') - - def __init__(self, address: SocketAddrT, - server: bool = False, - nickname: Optional[str] = None): - """ - Create a QEMUMonitorProtocol class. - - @param address: QEMU address, can be either a unix socket path (string) - or a tuple in the form ( address, port ) for a TCP - connection - @param server: server mode listens on the socket (bool) - @raise OSError on socket connection errors - @note No connection is established, this is done by the connect() or - accept() methods - """ - self.__events: List[QMPMessage] = [] - self.__address = address - self.__sock = self.__get_sock() - self.__sockfile: Optional[TextIO] = None - self._nickname = nickname - if self._nickname: - self.logger = logging.getLogger('QMP').getChild(self._nickname) - if server: - self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.__sock.bind(self.__address) - self.__sock.listen(1) - - def __get_sock(self) -> socket.socket: - if isinstance(self.__address, tuple): - family = socket.AF_INET - else: - family = socket.AF_UNIX - return socket.socket(family, socket.SOCK_STREAM) - - def __negotiate_capabilities(self) -> QMPMessage: - greeting = self.__json_read() - if greeting is None or "QMP" not in greeting: - raise QMPConnectError - # Greeting seems ok, negotiate capabilities - resp = self.cmd('qmp_capabilities') - if resp and "return" in resp: - return greeting - raise QMPCapabilitiesError - - def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]: - assert self.__sockfile is not None - while True: - data = self.__sockfile.readline() - if not data: - return None - # By definition, any JSON received from QMP is a QMPMessage, - # and we are asserting only at static analysis time that it - # has a particular shape. - resp: QMPMessage = json.loads(data) - if 'event' in resp: - self.logger.debug("<<< %s", resp) - self.__events.append(resp) - if not only_event: - continue - return resp - - def __get_events(self, wait: Union[bool, float] = False) -> None: - """ - Check for new events in the stream and cache them in __events. - - @param wait (bool): block until an event is available. - @param wait (float): If wait is a float, treat it as a timeout value. - - @raise QMPTimeoutError: If a timeout float is provided and the timeout - period elapses. - @raise QMPConnectError: If wait is True but no events could be - retrieved or if some other error occurred. - """ - - # Current timeout and blocking status - current_timeout = self.__sock.gettimeout() - - # Check for new events regardless and pull them into the cache: - self.__sock.settimeout(0) # i.e. setblocking(False) - try: - self.__json_read() - except OSError as err: - # EAGAIN: No data available; not critical - if err.errno != errno.EAGAIN: - raise - finally: - self.__sock.settimeout(current_timeout) - - # Wait for new events, if needed. - # if wait is 0.0, this means "no wait" and is also implicitly false. - if not self.__events and wait: - if isinstance(wait, float): - self.__sock.settimeout(wait) - try: - ret = self.__json_read(only_event=True) - except socket.timeout as err: - raise QMPTimeoutError("Timeout waiting for event") from err - except Exception as err: - msg = "Error while reading from socket" - raise QMPConnectError(msg) from err - finally: - self.__sock.settimeout(current_timeout) - - if ret is None: - raise QMPConnectError("Error while reading from socket") - - T = TypeVar('T') - - def __enter__(self: T) -> T: - # Implement context manager enter function. - return self - - def __exit__(self, - # pylint: disable=duplicate-code - # see https://github.com/PyCQA/pylint/issues/3619 - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType]) -> None: - # Implement context manager exit function. - self.close() - - def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: - """ - Connect to the QMP Monitor and perform capabilities negotiation. - - @return QMP greeting dict, or None if negotiate is false - @raise OSError on socket connection errors - @raise QMPConnectError if the greeting is not received - @raise QMPCapabilitiesError if fails to negotiate capabilities - """ - self.__sock.connect(self.__address) - self.__sockfile = self.__sock.makefile(mode='r') - if negotiate: - return self.__negotiate_capabilities() - return None - - def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: - """ - Await connection from QMP Monitor and perform capabilities negotiation. - - @param timeout: timeout in seconds (nonnegative float number, or - None). The value passed will set the behavior of the - underneath QMP socket as described in [1]. - Default value is set to 15.0. - - @return QMP greeting dict - @raise OSError on socket connection errors - @raise QMPConnectError if the greeting is not received - @raise QMPCapabilitiesError if fails to negotiate capabilities - - [1] - https://docs.python.org/3/library/socket.html#socket.socket.settimeout - """ - self.__sock.settimeout(timeout) - self.__sock, _ = self.__sock.accept() - self.__sockfile = self.__sock.makefile(mode='r') - return self.__negotiate_capabilities() - - def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: - """ - Send a QMP command to the QMP Monitor. - - @param qmp_cmd: QMP command to be sent as a Python dict - @return QMP response as a Python dict - """ - self.logger.debug(">>> %s", qmp_cmd) - self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8')) - resp = self.__json_read() - if resp is None: - raise QMPConnectError("Unexpected empty reply from server") - self.logger.debug("<<< %s", resp) - return resp - - def cmd(self, name: str, - args: Optional[Dict[str, object]] = None, - cmd_id: Optional[object] = None) -> QMPMessage: - """ - Build a QMP command and send it to the QMP Monitor. - - @param name: command name (string) - @param args: command arguments (dict) - @param cmd_id: command id (dict, list, string or int) - """ - qmp_cmd: QMPMessage = {'execute': name} - if args: - qmp_cmd['arguments'] = args - if cmd_id: - qmp_cmd['id'] = cmd_id - return self.cmd_obj(qmp_cmd) - - def command(self, cmd: str, **kwds: object) -> QMPReturnValue: - """ - Build and send a QMP command to the monitor, report errors if any - """ - ret = self.cmd(cmd, kwds) - if 'error' in ret: - raise QMPResponseError(ret) - if 'return' not in ret: - raise QMPProtocolError( - "'return' key not found in QMP response '{}'".format(str(ret)) - ) - return cast(QMPReturnValue, ret['return']) - - def pull_event(self, - wait: Union[bool, float] = False) -> Optional[QMPMessage]: - """ - Pulls a single event. - - @param wait (bool): block until an event is available. - @param wait (float): If wait is a float, treat it as a timeout value. - - @raise QMPTimeoutError: If a timeout float is provided and the timeout - period elapses. - @raise QMPConnectError: If wait is True but no events could be - retrieved or if some other error occurred. - - @return The first available QMP event, or None. - """ - self.__get_events(wait) - - if self.__events: - return self.__events.pop(0) - return None - - def get_events(self, wait: bool = False) -> List[QMPMessage]: - """ - Get a list of available QMP events and clear all pending events. - - @param wait (bool): block until an event is available. - @param wait (float): If wait is a float, treat it as a timeout value. - - @raise QMPTimeoutError: If a timeout float is provided and the timeout - period elapses. - @raise QMPConnectError: If wait is True but no events could be - retrieved or if some other error occurred. - - @return The list of available QMP events. - """ - self.__get_events(wait) - events = self.__events - self.__events = [] - return events - - def clear_events(self) -> None: - """ - Clear current list of pending events. - """ - self.__events = [] - - def close(self) -> None: - """ - Close the socket and socket file. - """ - if self.__sock: - self.__sock.close() - if self.__sockfile: - self.__sockfile.close() - - def settimeout(self, timeout: Optional[float]) -> None: - """ - Set the socket timeout. - - @param timeout (float): timeout in seconds (non-zero), or None. - @note This is a wrap around socket.settimeout - - @raise ValueError: if timeout was set to 0. - """ - if timeout == 0: - msg = "timeout cannot be 0; this engages non-blocking mode." - msg += " Use 'None' instead to disable timeouts." - raise ValueError(msg) - self.__sock.settimeout(timeout) - - def send_fd_scm(self, fd: int) -> None: - """ - Send a file descriptor to the remote via SCM_RIGHTS. - """ - if self.__sock.family != socket.AF_UNIX: - raise RuntimeError("Can't use SCM_RIGHTS on non-AF_UNIX socket.") - - self.__sock.sendmsg( - [b' '], - [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] - ) |