summaryrefslogtreecommitdiffstats
path: root/python/qemu/aqmp/qmp_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/aqmp/qmp_client.py')
-rw-r--r--python/qemu/aqmp/qmp_client.py655
1 files changed, 0 insertions, 655 deletions
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
deleted file mode 100644
index 90a8737f03..0000000000
--- a/python/qemu/aqmp/qmp_client.py
+++ /dev/null
@@ -1,655 +0,0 @@
-"""
-QMP Protocol Implementation
-
-This module provides the `QMPClient` class, which can be used to connect
-and send commands to a QMP server such as QEMU. The QMP class can be
-used to either connect to a listening server, or used to listen and
-accept an incoming connection from that server.
-"""
-
-import asyncio
-import logging
-import socket
-import struct
-from typing import (
- Dict,
- List,
- Mapping,
- Optional,
- Union,
- cast,
-)
-
-from .error import ProtocolError, QMPError
-from .events import Events
-from .message import Message
-from .models import ErrorResponse, Greeting
-from .protocol import AsyncProtocol, Runstate, require
-from .util import (
- bottom_half,
- exception_summary,
- pretty_traceback,
- upper_half,
-)
-
-
-class _WrappedProtocolError(ProtocolError):
- """
- Abstract exception class for Protocol errors that wrap an Exception.
-
- :param error_message: Human-readable string describing the error.
- :param exc: The root-cause exception.
- """
- def __init__(self, error_message: str, exc: Exception):
- super().__init__(error_message)
- self.exc = exc
-
- def __str__(self) -> str:
- return f"{self.error_message}: {self.exc!s}"
-
-
-class GreetingError(_WrappedProtocolError):
- """
- An exception occurred during the Greeting phase.
-
- :param error_message: Human-readable string describing the error.
- :param exc: The root-cause exception.
- """
-
-
-class NegotiationError(_WrappedProtocolError):
- """
- An exception occurred during the Negotiation phase.
-
- :param error_message: Human-readable string describing the error.
- :param exc: The root-cause exception.
- """
-
-
-class ExecuteError(QMPError):
- """
- Exception raised by `QMPClient.execute()` on RPC failure.
-
- :param error_response: The RPC error response object.
- :param sent: The sent RPC message that caused the failure.
- :param received: The raw RPC error reply received.
- """
- def __init__(self, error_response: ErrorResponse,
- sent: Message, received: Message):
- super().__init__(error_response.error.desc)
- #: The sent `Message` that caused the failure
- self.sent: Message = sent
- #: The received `Message` that indicated failure
- self.received: Message = received
- #: The parsed error response
- self.error: ErrorResponse = error_response
- #: The QMP error class
- self.error_class: str = error_response.error.class_
-
-
-class ExecInterruptedError(QMPError):
- """
- Exception raised by `execute()` (et al) when an RPC is interrupted.
-
- This error is raised when an `execute()` statement could not be
- completed. This can occur because the connection itself was
- terminated before a reply was received.
-
- The true cause of the interruption will be available via `disconnect()`.
- """
-
-
-class _MsgProtocolError(ProtocolError):
- """
- Abstract error class for protocol errors that have a `Message` object.
-
- This Exception class is used for protocol errors where the `Message`
- was mechanically understood, but was found to be inappropriate or
- malformed.
-
- :param error_message: Human-readable string describing the error.
- :param msg: The QMP `Message` that caused the error.
- """
- def __init__(self, error_message: str, msg: Message):
- super().__init__(error_message)
- #: The received `Message` that caused the error.
- self.msg: Message = msg
-
- def __str__(self) -> str:
- return "\n".join([
- super().__str__(),
- f" Message was: {str(self.msg)}\n",
- ])
-
-
-class ServerParseError(_MsgProtocolError):
- """
- The Server sent a `Message` indicating parsing failure.
-
- i.e. A reply has arrived from the server, but it is missing the "ID"
- field, indicating a parsing error.
-
- :param error_message: Human-readable string describing the error.
- :param msg: The QMP `Message` that caused the error.
- """
-
-
-class BadReplyError(_MsgProtocolError):
- """
- An execution reply was successfully routed, but not understood.
-
- If a QMP message is received with an 'id' field to allow it to be
- routed, but is otherwise malformed, this exception will be raised.
-
- A reply message is malformed if it is missing either the 'return' or
- 'error' keys, or if the 'error' value has missing keys or members of
- the wrong type.
-
- :param error_message: Human-readable string describing the error.
- :param msg: The malformed reply that was received.
- :param sent: The message that was sent that prompted the error.
- """
- def __init__(self, error_message: str, msg: Message, sent: Message):
- super().__init__(error_message, msg)
- #: The sent `Message` that caused the failure
- self.sent = sent
-
-
-class QMPClient(AsyncProtocol[Message], Events):
- """
- Implements a QMP client connection.
-
- QMP can be used to establish a connection as either the transport
- client or server, though this class always acts as the QMP client.
-
- :param name: Optional nickname for the connection, used for logging.
-
- Basic script-style usage looks like this::
-
- qmp = QMPClient('my_virtual_machine_name')
- await qmp.connect(('127.0.0.1', 1234))
- ...
- res = await qmp.execute('block-query')
- ...
- await qmp.disconnect()
-
- Basic async client-style usage looks like this::
-
- class Client:
- def __init__(self, name: str):
- self.qmp = QMPClient(name)
-
- async def watch_events(self):
- try:
- async for event in self.qmp.events:
- print(f"Event: {event['event']}")
- except asyncio.CancelledError:
- return
-
- async def run(self, address='/tmp/qemu.socket'):
- await self.qmp.connect(address)
- asyncio.create_task(self.watch_events())
- await self.qmp.runstate_changed.wait()
- await self.disconnect()
-
- See `aqmp.events` for more detail on event handling patterns.
- """
- #: Logger object used for debugging messages.
- logger = logging.getLogger(__name__)
-
- # Read buffer limit; large enough to accept query-qmp-schema
- _limit = (256 * 1024)
-
- # Type alias for pending execute() result items
- _PendingT = Union[Message, ExecInterruptedError]
-
- def __init__(self, name: Optional[str] = None) -> None:
- super().__init__(name)
- Events.__init__(self)
-
- #: Whether or not to await a greeting after establishing a connection.
- self.await_greeting: bool = True
-
- #: Whether or not to perform capabilities negotiation upon connection.
- #: Implies `await_greeting`.
- self.negotiate: bool = True
-
- # Cached Greeting, if one was awaited.
- self._greeting: Optional[Greeting] = None
-
- # Command ID counter
- self._execute_id = 0
-
- # Incoming RPC reply messages.
- self._pending: Dict[
- Union[str, None],
- 'asyncio.Queue[QMPClient._PendingT]'
- ] = {}
-
- @property
- def greeting(self) -> Optional[Greeting]:
- """The `Greeting` from the QMP server, if any."""
- return self._greeting
-
- @upper_half
- async def _establish_session(self) -> None:
- """
- Initiate the QMP session.
-
- Wait for the QMP greeting and perform capabilities negotiation.
-
- :raise GreetingError: When the greeting is not understood.
- :raise NegotiationError: If the negotiation fails.
- :raise EOFError: When the server unexpectedly hangs up.
- :raise OSError: For underlying stream errors.
- """
- self._greeting = None
- self._pending = {}
-
- if self.await_greeting or self.negotiate:
- self._greeting = await self._get_greeting()
-
- if self.negotiate:
- await self._negotiate()
-
- # This will start the reader/writers:
- await super()._establish_session()
-
- @upper_half
- async def _get_greeting(self) -> Greeting:
- """
- :raise GreetingError: When the greeting is not understood.
- :raise EOFError: When the server unexpectedly hangs up.
- :raise OSError: For underlying stream errors.
-
- :return: the Greeting object given by the server.
- """
- self.logger.debug("Awaiting greeting ...")
-
- try:
- msg = await self._recv()
- return Greeting(msg)
- except (ProtocolError, KeyError, TypeError) as err:
- emsg = "Did not understand Greeting"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise GreetingError(emsg, err) from err
- except BaseException as err:
- # EOFError, OSError, or something unexpected.
- emsg = "Failed to receive Greeting"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise
-
- @upper_half
- async def _negotiate(self) -> None:
- """
- Perform QMP capabilities negotiation.
-
- :raise NegotiationError: When negotiation fails.
- :raise EOFError: When the server unexpectedly hangs up.
- :raise OSError: For underlying stream errors.
- """
- self.logger.debug("Negotiating capabilities ...")
-
- arguments: Dict[str, List[str]] = {}
- if self._greeting and 'oob' in self._greeting.QMP.capabilities:
- arguments.setdefault('enable', []).append('oob')
- msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
-
- # It's not safe to use execute() here, because the reader/writers
- # aren't running. AsyncProtocol *requires* that a new session
- # does not fail after the reader/writers are running!
- try:
- await self._send(msg)
- reply = await self._recv()
- assert 'return' in reply
- assert 'error' not in reply
- except (ProtocolError, AssertionError) as err:
- emsg = "Negotiation failed"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise NegotiationError(emsg, err) from err
- except BaseException as err:
- # EOFError, OSError, or something unexpected.
- emsg = "Negotiation failed"
- self.logger.error("%s: %s", emsg, exception_summary(err))
- self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
- raise
-
- @bottom_half
- async def _bh_disconnect(self) -> None:
- try:
- await super()._bh_disconnect()
- finally:
- if self._pending:
- self.logger.debug("Cancelling pending executions")
- keys = self._pending.keys()
- for key in keys:
- self.logger.debug("Cancelling execution '%s'", key)
- self._pending[key].put_nowait(
- ExecInterruptedError("Disconnected")
- )
-
- self.logger.debug("QMP Disconnected.")
-
- @upper_half
- def _cleanup(self) -> None:
- super()._cleanup()
- assert not self._pending
-
- @bottom_half
- async def _on_message(self, msg: Message) -> None:
- """
- Add an incoming message to the appropriate queue/handler.
-
- :raise ServerParseError: When Message indicates server parse failure.
- """
- # Incoming messages are not fully parsed/validated here;
- # do only light peeking to know how to route the messages.
-
- if 'event' in msg:
- await self._event_dispatch(msg)
- return
-
- # Below, we assume everything left is an execute/exec-oob response.
-
- exec_id = cast(Optional[str], msg.get('id'))
-
- if exec_id in self._pending:
- await self._pending[exec_id].put(msg)
- return
-
- # We have a message we can't route back to a caller.
-
- is_error = 'error' in msg
- has_id = 'id' in msg
-
- if is_error and not has_id:
- # This is very likely a server parsing error.
- # It doesn't inherently belong to any pending execution.
- # Instead of performing clever recovery, just terminate.
- # See "NOTE" in qmp-spec.txt, section 2.4.2
- raise ServerParseError(
- ("Server sent an error response without an ID, "
- "but there are no ID-less executions pending. "
- "Assuming this is a server parser failure."),
- msg
- )
-
- # qmp-spec.txt, section 2.4:
- # 'Clients should drop all the responses
- # that have an unknown "id" field.'
- self.logger.log(
- logging.ERROR if is_error else logging.WARNING,
- "Unknown ID '%s', message dropped.",
- exec_id,
- )
- self.logger.debug("Unroutable message: %s", str(msg))
-
- @upper_half
- @bottom_half
- async def _do_recv(self) -> Message:
- """
- :raise OSError: When a stream error is encountered.
- :raise EOFError: When the stream is at EOF.
- :raise ProtocolError:
- When the Message is not understood.
- See also `Message._deserialize`.
-
- :return: A single QMP `Message`.
- """
- msg_bytes = await self._readline()
- msg = Message(msg_bytes, eager=True)
- return msg
-
- @upper_half
- @bottom_half
- def _do_send(self, msg: Message) -> None:
- """
- :raise ValueError: JSON serialization failure
- :raise TypeError: JSON serialization failure
- :raise OSError: When a stream error is encountered.
- """
- assert self._writer is not None
- self._writer.write(bytes(msg))
-
- @upper_half
- def _get_exec_id(self) -> str:
- exec_id = f"__aqmp#{self._execute_id:05d}"
- self._execute_id += 1
- return exec_id
-
- @upper_half
- async def _issue(self, msg: Message) -> Union[None, str]:
- """
- Issue a QMP `Message` and do not wait for a reply.
-
- :param msg: The QMP `Message` to send to the server.
-
- :return: The ID of the `Message` sent.
- """
- msg_id: Optional[str] = None
- if 'id' in msg:
- assert isinstance(msg['id'], str)
- msg_id = msg['id']
-
- self._pending[msg_id] = asyncio.Queue(maxsize=1)
- try:
- await self._outgoing.put(msg)
- except:
- del self._pending[msg_id]
- raise
-
- return msg_id
-
- @upper_half
- async def _reply(self, msg_id: Union[str, None]) -> Message:
- """
- Await a reply to a previously issued QMP message.
-
- :param msg_id: The ID of the previously issued message.
-
- :return: The reply from the server.
- :raise ExecInterruptedError:
- When the reply could not be retrieved because the connection
- was lost, or some other problem.
- """
- queue = self._pending[msg_id]
-
- try:
- result = await queue.get()
- if isinstance(result, ExecInterruptedError):
- raise result
- return result
- finally:
- del self._pending[msg_id]
-
- @upper_half
- async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
- """
- Send a QMP `Message` to the server and await a reply.
-
- This method *assumes* you are sending some kind of an execute
- statement that *will* receive a reply.
-
- An execution ID will be assigned if assign_id is `True`. It can be
- disabled, but this requires that an ID is manually assigned
- instead. For manually assigned IDs, you must not use the string
- '__aqmp#' anywhere in the ID.
-
- :param msg: The QMP `Message` to execute.
- :param assign_id: If True, assign a new execution ID.
-
- :return: Execution reply from the server.
- :raise ExecInterruptedError:
- When the reply could not be retrieved because the connection
- was lost, or some other problem.
- """
- if assign_id:
- msg['id'] = self._get_exec_id()
- elif 'id' in msg:
- assert isinstance(msg['id'], str)
- assert '__aqmp#' not in msg['id']
-
- exec_id = await self._issue(msg)
- return await self._reply(exec_id)
-
- @upper_half
- @require(Runstate.RUNNING)
- async def _raw(
- self,
- msg: Union[Message, Mapping[str, object], bytes],
- assign_id: bool = True,
- ) -> Message:
- """
- Issue a raw `Message` to the QMP server and await a reply.
-
- :param msg:
- A Message to send to the server. It may be a `Message`, any
- Mapping (including Dict), or raw bytes.
- :param assign_id:
- Assign an arbitrary execution ID to this message. If
- `False`, the existing id must either be absent (and no other
- such pending execution may omit an ID) or a string. If it is
- a string, it must not start with '__aqmp#' and no other such
- pending execution may currently be using that ID.
-
- :return: Execution reply from the server.
-
- :raise ExecInterruptedError:
- When the reply could not be retrieved because the connection
- was lost, or some other problem.
- :raise TypeError:
- When assign_id is `False`, an ID is given, and it is not a string.
- :raise ValueError:
- When assign_id is `False`, but the ID is not usable;
- Either because it starts with '__aqmp#' or it is already in-use.
- """
- # 1. convert generic Mapping or bytes to a QMP Message
- # 2. copy Message objects so that we assign an ID only to the copy.
- msg = Message(msg)
-
- exec_id = msg.get('id')
- if not assign_id and 'id' in msg:
- if not isinstance(exec_id, str):
- raise TypeError(f"ID ('{exec_id}') must be a string.")
- if exec_id.startswith('__aqmp#'):
- raise ValueError(
- f"ID ('{exec_id}') must not start with '__aqmp#'."
- )
-
- if not assign_id and exec_id in self._pending:
- raise ValueError(
- f"ID '{exec_id}' is in-use and cannot be used."
- )
-
- return await self._execute(msg, assign_id=assign_id)
-
- @upper_half
- @require(Runstate.RUNNING)
- async def execute_msg(self, msg: Message) -> object:
- """
- Execute a QMP command and return its value.
-
- :param msg: The QMP `Message` to execute.
-
- :return:
- The command execution return value from the server. The type of
- object returned depends on the command that was issued,
- though most in QEMU return a `dict`.
- :raise ValueError:
- If the QMP `Message` does not have either the 'execute' or
- 'exec-oob' fields set.
- :raise ExecuteError: When the server returns an error response.
- :raise ExecInterruptedError: if the connection was terminated early.
- """
- if not ('execute' in msg or 'exec-oob' in msg):
- raise ValueError("Requires 'execute' or 'exec-oob' message")
-
- # Copy the Message so that the ID assigned by _execute() is
- # local to this method; allowing the ID to be seen in raised
- # Exceptions but without modifying the caller's held copy.
- msg = Message(msg)
- reply = await self._execute(msg)
-
- if 'error' in reply:
- try:
- error_response = ErrorResponse(reply)
- except (KeyError, TypeError) as err:
- # Error response was malformed.
- raise BadReplyError(
- "QMP error reply is malformed", reply, msg,
- ) from err
-
- raise ExecuteError(error_response, msg, reply)
-
- if 'return' not in reply:
- raise BadReplyError(
- "QMP reply is missing a 'error' or 'return' member",
- reply, msg,
- )
-
- return reply['return']
-
- @classmethod
- def make_execute_msg(cls, cmd: str,
- arguments: Optional[Mapping[str, object]] = None,
- oob: bool = False) -> Message:
- """
- Create an executable message to be sent by `execute_msg` later.
-
- :param cmd: QMP command name.
- :param arguments: Arguments (if any). Must be JSON-serializable.
- :param oob: If `True`, execute "out of band".
-
- :return: An executable QMP `Message`.
- """
- msg = Message({'exec-oob' if oob else 'execute': cmd})
- if arguments is not None:
- msg['arguments'] = arguments
- return msg
-
- @upper_half
- async def execute(self, cmd: str,
- arguments: Optional[Mapping[str, object]] = None,
- oob: bool = False) -> object:
- """
- Execute a QMP command and return its value.
-
- :param cmd: QMP command name.
- :param arguments: Arguments (if any). Must be JSON-serializable.
- :param oob: If `True`, execute "out of band".
-
- :return:
- The command execution return value from the server. The type of
- object returned depends on the command that was issued,
- though most in QEMU return a `dict`.
- :raise ExecuteError: When the server returns an error response.
- :raise ExecInterruptedError: if the connection was terminated early.
- """
- msg = self.make_execute_msg(cmd, arguments, oob=oob)
- return await self.execute_msg(msg)
-
- @upper_half
- @require(Runstate.RUNNING)
- def send_fd_scm(self, fd: int) -> None:
- """
- Send a file descriptor to the remote via SCM_RIGHTS.
- """
- assert self._writer is not None
- sock = self._writer.transport.get_extra_info('socket')
-
- if sock.family != socket.AF_UNIX:
- raise QMPError("Sending file descriptors requires a UNIX socket.")
-
- if not hasattr(sock, 'sendmsg'):
- # We need to void the warranty sticker.
- # Access to sendmsg is scheduled for removal in Python 3.11.
- # Find the real backing socket to use it anyway.
- sock = sock._sock # pylint: disable=protected-access
-
- sock.sendmsg(
- [b' '],
- [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
- )