diff options
Diffstat (limited to 'python/qemu/aqmp')
-rw-r--r-- | python/qemu/aqmp/__init__.py | 59 | ||||
-rw-r--r-- | python/qemu/aqmp/aqmp_tui.py | 652 | ||||
-rw-r--r-- | python/qemu/aqmp/error.py | 50 | ||||
-rw-r--r-- | python/qemu/aqmp/events.py | 717 | ||||
-rw-r--r-- | python/qemu/aqmp/legacy.py | 317 | ||||
-rw-r--r-- | python/qemu/aqmp/message.py | 209 | ||||
-rw-r--r-- | python/qemu/aqmp/models.py | 146 | ||||
-rw-r--r-- | python/qemu/aqmp/protocol.py | 1048 | ||||
-rw-r--r-- | python/qemu/aqmp/py.typed | 0 | ||||
-rw-r--r-- | python/qemu/aqmp/qmp_client.py | 655 | ||||
-rw-r--r-- | python/qemu/aqmp/qmp_shell.py | 610 | ||||
-rw-r--r-- | python/qemu/aqmp/util.py | 217 |
12 files changed, 0 insertions, 4680 deletions
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py deleted file mode 100644 index 2b69b264f4..0000000000 --- a/python/qemu/aqmp/__init__.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -QEMU Monitor Protocol (QMP) development library & tooling. - -This package provides a fairly low-level class for communicating -asynchronously with QMP protocol servers, as implemented by QEMU, the -QEMU Guest Agent, and the QEMU Storage Daemon. - -`QMPClient` provides the main functionality of this package. All errors -raised by this library derive from `QMPError`, see `aqmp.error` for -additional detail. See `aqmp.events` for an in-depth tutorial on -managing QMP events. -""" - -# Copyright (C) 2020-2022 John Snow for Red Hat, Inc. -# -# Authors: -# John Snow <jsnow@redhat.com> -# -# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>. -# -# This work is licensed under the terms of the GNU LGPL, version 2 or -# later. See the COPYING file in the top-level directory. - -import logging - -from .error import QMPError -from .events import EventListener -from .message import Message -from .protocol import ( - ConnectError, - Runstate, - SocketAddrT, - StateError, -) -from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient - - -# Suppress logging unless an application engages it. -logging.getLogger('qemu.aqmp').addHandler(logging.NullHandler()) - - -# The order of these fields impact the Sphinx documentation order. -__all__ = ( - # Classes, most to least important - 'QMPClient', - 'Message', - 'EventListener', - 'Runstate', - - # Exceptions, most generic to most explicit - 'QMPError', - 'StateError', - 'ConnectError', - 'ExecuteError', - 'ExecInterruptedError', - - # Type aliases - 'SocketAddrT', -) diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py deleted file mode 100644 index 59d3036be3..0000000000 --- a/python/qemu/aqmp/aqmp_tui.py +++ /dev/null @@ -1,652 +0,0 @@ -# Copyright (c) 2021 -# -# Authors: -# Niteesh Babu G S <niteesh.gs@gmail.com> -# -# This work is licensed under the terms of the GNU LGPL, version 2 or -# later. See the COPYING file in the top-level directory. -""" -AQMP TUI - -AQMP TUI is an asynchronous interface built on top the of the AQMP library. -It is the successor of QMP-shell and is bought-in as a replacement for it. - -Example Usage: aqmp-tui <SOCKET | TCP IP:PORT> -Full Usage: aqmp-tui --help -""" - -import argparse -import asyncio -import json -import logging -from logging import Handler, LogRecord -import signal -from typing import ( - List, - Optional, - Tuple, - Type, - Union, - cast, -) - -from pygments import lexers -from pygments import token as Token -import urwid -import urwid_readline - -from .error import ProtocolError -from .legacy import QEMUMonitorProtocol, QMPBadPortError -from .message import DeserializationError, Message, UnexpectedTypeError -from .protocol import ConnectError, Runstate -from .qmp_client import ExecInterruptedError, QMPClient -from .util import create_task, pretty_traceback - - -# The name of the signal that is used to update the history list -UPDATE_MSG: str = 'UPDATE_MSG' - - -palette = [ - (Token.Punctuation, '', '', '', 'h15,bold', 'g7'), - (Token.Text, '', '', '', '', 'g7'), - (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'), - (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'), - (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'), - (Token.Keyword.Constant, '', '', '', '#6af', 'g7'), - ('DEBUG', '', '', '', '#ddf', 'g7'), - ('INFO', '', '', '', 'g100', 'g7'), - ('WARNING', '', '', '', '#ff6', 'g7'), - ('ERROR', '', '', '', '#a00', 'g7'), - ('CRITICAL', '', '', '', '#a00', 'g7'), - ('background', '', 'black', '', '', 'g7'), -] - - -def format_json(msg: str) -> str: - """ - Formats valid/invalid multi-line JSON message into a single-line message. - - Formatting is first tried using the standard json module. If that fails - due to an decoding error then a simple string manipulation is done to - achieve a single line JSON string. - - Converting into single line is more asthetically pleasing when looking - along with error messages. - - Eg: - Input: - [ 1, - true, - 3 ] - The above input is not a valid QMP message and produces the following error - "QMP message is not a JSON object." - When displaying this in TUI in multiline mode we get - - [ 1, - true, - 3 ]: QMP message is not a JSON object. - - whereas in singleline mode we get the following - - [1, true, 3]: QMP message is not a JSON object. - - The single line mode is more asthetically pleasing. - - :param msg: - The message to formatted into single line. - - :return: Formatted singleline message. - """ - try: - msg = json.loads(msg) - return str(json.dumps(msg)) - except json.decoder.JSONDecodeError: - msg = msg.replace('\n', '') - words = msg.split(' ') - words = list(filter(None, words)) - return ' '.join(words) - - -def has_handler_type(logger: logging.Logger, - handler_type: Type[Handler]) -> bool: - """ - The Logger class has no interface to check if a certain type of handler is - installed or not. So we provide an interface to do so. - - :param logger: - Logger object - :param handler_type: - The type of the handler to be checked. - - :return: returns True if handler of type `handler_type`. - """ - for handler in logger.handlers: - if isinstance(handler, handler_type): - return True - return False - - -class App(QMPClient): - """ - Implements the AQMP TUI. - - Initializes the widgets and starts the urwid event loop. - - :param address: - Address of the server to connect to. - :param num_retries: - The number of times to retry before stopping to reconnect. - :param retry_delay: - The delay(sec) before each retry - """ - def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int, - retry_delay: Optional[int]) -> None: - urwid.register_signal(type(self), UPDATE_MSG) - self.window = Window(self) - self.address = address - self.aloop: Optional[asyncio.AbstractEventLoop] = None - self.num_retries = num_retries - self.retry_delay = retry_delay if retry_delay else 2 - self.retry: bool = False - self.exiting: bool = False - super().__init__() - - def add_to_history(self, msg: str, level: Optional[str] = None) -> None: - """ - Appends the msg to the history list. - - :param msg: - The raw message to be appended in string type. - """ - urwid.emit_signal(self, UPDATE_MSG, msg, level) - - def _cb_outbound(self, msg: Message) -> Message: - """ - Callback: outbound message hook. - - Appends the outgoing messages to the history box. - - :param msg: raw outbound message. - :return: final outbound message. - """ - str_msg = str(msg) - - if not has_handler_type(logging.getLogger(), TUILogHandler): - logging.debug('Request: %s', str_msg) - self.add_to_history('<-- ' + str_msg) - return msg - - def _cb_inbound(self, msg: Message) -> Message: - """ - Callback: outbound message hook. - - Appends the incoming messages to the history box. - - :param msg: raw inbound message. - :return: final inbound message. - """ - str_msg = str(msg) - - if not has_handler_type(logging.getLogger(), TUILogHandler): - logging.debug('Request: %s', str_msg) - self.add_to_history('--> ' + str_msg) - return msg - - async def _send_to_server(self, msg: Message) -> None: - """ - This coroutine sends the message to the server. - The message has to be pre-validated. - - :param msg: - Pre-validated message to be to sent to the server. - - :raise Exception: When an unhandled exception is caught. - """ - try: - await self._raw(msg, assign_id='id' not in msg) - except ExecInterruptedError as err: - logging.info('Error server disconnected before reply %s', str(err)) - self.add_to_history('Server disconnected before reply', 'ERROR') - except Exception as err: - logging.error('Exception from _send_to_server: %s', str(err)) - raise err - - def cb_send_to_server(self, raw_msg: str) -> None: - """ - Validates and sends the message to the server. - The raw string message is first converted into a Message object - and is then sent to the server. - - :param raw_msg: - The raw string message to be sent to the server. - - :raise Exception: When an unhandled exception is caught. - """ - try: - msg = Message(bytes(raw_msg, encoding='utf-8')) - create_task(self._send_to_server(msg)) - except (DeserializationError, UnexpectedTypeError) as err: - raw_msg = format_json(raw_msg) - logging.info('Invalid message: %s', err.error_message) - self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR') - - def unhandled_input(self, key: str) -> None: - """ - Handle's keys which haven't been handled by the child widgets. - - :param key: - Unhandled key - """ - if key == 'esc': - self.kill_app() - - def kill_app(self) -> None: - """ - Initiates killing of app. A bridge between asynchronous and synchronous - code. - """ - create_task(self._kill_app()) - - async def _kill_app(self) -> None: - """ - This coroutine initiates the actual disconnect process and calls - urwid.ExitMainLoop() to kill the TUI. - - :raise Exception: When an unhandled exception is caught. - """ - self.exiting = True - await self.disconnect() - logging.debug('Disconnect finished. Exiting app') - raise urwid.ExitMainLoop() - - async def disconnect(self) -> None: - """ - Overrides the disconnect method to handle the errors locally. - """ - try: - await super().disconnect() - except (OSError, EOFError) as err: - logging.info('disconnect: %s', str(err)) - self.retry = True - except ProtocolError as err: - logging.info('disconnect: %s', str(err)) - except Exception as err: - logging.error('disconnect: Unhandled exception %s', str(err)) - raise err - - def _set_status(self, msg: str) -> None: - """ - Sets the message as the status. - - :param msg: - The message to be displayed in the status bar. - """ - self.window.footer.set_text(msg) - - def _get_formatted_address(self) -> str: - """ - Returns a formatted version of the server's address. - - :return: formatted address - """ - if isinstance(self.address, tuple): - host, port = self.address - addr = f'{host}:{port}' - else: - addr = f'{self.address}' - return addr - - async def _initiate_connection(self) -> Optional[ConnectError]: - """ - Tries connecting to a server a number of times with a delay between - each try. If all retries failed then return the error faced during - the last retry. - - :return: Error faced during last retry. - """ - current_retries = 0 - err = None - - # initial try - await self.connect_server() - while self.retry and current_retries < self.num_retries: - logging.info('Connection Failed, retrying in %d', self.retry_delay) - status = f'[Retry #{current_retries} ({self.retry_delay}s)]' - self._set_status(status) - - await asyncio.sleep(self.retry_delay) - - err = await self.connect_server() - current_retries += 1 - # If all retries failed report the last error - if err: - logging.info('All retries failed: %s', err) - return err - return None - - async def manage_connection(self) -> None: - """ - Manage the connection based on the current run state. - - A reconnect is issued when the current state is IDLE and the number - of retries is not exhausted. - A disconnect is issued when the current state is DISCONNECTING. - """ - while not self.exiting: - if self.runstate == Runstate.IDLE: - err = await self._initiate_connection() - # If retry is still true then, we have exhausted all our tries. - if err: - self._set_status(f'[Error: {err.error_message}]') - else: - addr = self._get_formatted_address() - self._set_status(f'[Connected {addr}]') - elif self.runstate == Runstate.DISCONNECTING: - self._set_status('[Disconnected]') - await self.disconnect() - # check if a retry is needed - if self.runstate == Runstate.IDLE: - continue - await self.runstate_changed() - - async def connect_server(self) -> Optional[ConnectError]: - """ - Initiates a connection to the server at address `self.address` - and in case of a failure, sets the status to the respective error. - """ - try: - await self.connect(self.address) - self.retry = False - except ConnectError as err: - logging.info('connect_server: ConnectError %s', str(err)) - self.retry = True - return err - return None - - def run(self, debug: bool = False) -> None: - """ - Starts the long running co-routines and the urwid event loop. - - :param debug: - Enables/Disables asyncio event loop debugging - """ - screen = urwid.raw_display.Screen() - screen.set_terminal_properties(256) - - self.aloop = asyncio.get_event_loop() - self.aloop.set_debug(debug) - - # Gracefully handle SIGTERM and SIGINT signals - cancel_signals = [signal.SIGTERM, signal.SIGINT] - for sig in cancel_signals: - self.aloop.add_signal_handler(sig, self.kill_app) - - event_loop = urwid.AsyncioEventLoop(loop=self.aloop) - main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'), - unhandled_input=self.unhandled_input, - screen=screen, - palette=palette, - handle_mouse=True, - event_loop=event_loop) - - create_task(self.manage_connection(), self.aloop) - try: - main_loop.run() - except Exception as err: - logging.error('%s\n%s\n', str(err), pretty_traceback()) - raise err - - -class StatusBar(urwid.Text): - """ - A simple statusbar modelled using the Text widget. The status can be - set using the set_text function. All text set is aligned to right. - - :param text: Initial text to be displayed. Default is empty str. - """ - def __init__(self, text: str = ''): - super().__init__(text, align='right') - - -class Editor(urwid_readline.ReadlineEdit): - """ - A simple editor modelled using the urwid_readline.ReadlineEdit widget. - Mimcs GNU readline shortcuts and provides history support. - - The readline shortcuts can be found below: - https://github.com/rr-/urwid_readline#features - - Along with the readline features, this editor also has support for - history. Pressing the 'up'/'down' switches between the prev/next messages - available in the history. - - Currently there is no support to save the history to a file. The history of - previous commands is lost on exit. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - super().__init__(caption='> ', multiline=True) - self.parent = parent - self.history: List[str] = [] - self.last_index: int = -1 - self.show_history: bool = False - - def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]: - """ - Handles the keypress on this widget. - - :param size: - The current size of the widget. - :param key: - The key to be handled. - - :return: Unhandled key if any. - """ - msg = self.get_edit_text() - if key == 'up' and not msg: - # Show the history when 'up arrow' is pressed with no input text. - # NOTE: The show_history logic is necessary because in 'multiline' - # mode (which we use) 'up arrow' is used to move between lines. - if not self.history: - return None - self.show_history = True - last_msg = self.history[self.last_index] - self.set_edit_text(last_msg) - self.edit_pos = len(last_msg) - elif key == 'up' and self.show_history: - self.last_index = max(self.last_index - 1, -len(self.history)) - self.set_edit_text(self.history[self.last_index]) - self.edit_pos = len(self.history[self.last_index]) - elif key == 'down' and self.show_history: - if self.last_index == -1: - self.set_edit_text('') - self.show_history = False - else: - self.last_index += 1 - self.set_edit_text(self.history[self.last_index]) - self.edit_pos = len(self.history[self.last_index]) - elif key == 'meta enter': - # When using multiline, enter inserts a new line into the editor - # send the input to the server on alt + enter - self.parent.cb_send_to_server(msg) - self.history.append(msg) - self.set_edit_text('') - self.last_index = -1 - self.show_history = False - else: - self.show_history = False - self.last_index = -1 - return cast(Optional[str], super().keypress(size, key)) - return None - - -class EditorWidget(urwid.Filler): - """ - Wrapper around the editor widget. - - The Editor is a flow widget and has to wrapped inside a box widget. - This class wraps the Editor inside filler widget. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - super().__init__(Editor(parent), valign='top') - - -class HistoryBox(urwid.ListBox): - """ - This widget is modelled using the ListBox widget, contains the list of - all messages both QMP messages and log messsages to be shown in the TUI. - - The messages are urwid.Text widgets. On every append of a message, the - focus is shifted to the last appended message. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - self.history = urwid.SimpleFocusListWalker([]) - super().__init__(self.history) - - def add_to_history(self, - history: Union[str, List[Tuple[str, str]]]) -> None: - """ - Appends a message to the list and set the focus to the last appended - message. - - :param history: - The history item(message/event) to be appended to the list. - """ - self.history.append(urwid.Text(history)) - self.history.set_focus(len(self.history) - 1) - - def mouse_event(self, size: Tuple[int, int], _event: str, button: float, - _x: int, _y: int, focus: bool) -> None: - # Unfortunately there are no urwid constants that represent the mouse - # events. - if button == 4: # Scroll up event - super().keypress(size, 'up') - elif button == 5: # Scroll down event - super().keypress(size, 'down') - - -class HistoryWindow(urwid.Frame): - """ - This window composes the HistoryBox and EditorWidget in a horizontal split. - By default the first focus is given to the history box. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - self.editor_widget = EditorWidget(parent) - self.editor = urwid.LineBox(self.editor_widget) - self.history = HistoryBox(parent) - self.body = urwid.Pile([('weight', 80, self.history), - ('weight', 20, self.editor)]) - super().__init__(self.body) - urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history) - - def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None: - """ - Appends a message to the history box - - :param msg: - The message to be appended to the history box. - :param level: - The log level of the message, if it is a log message. - """ - formatted = [] - if level: - msg = f'[{level}]: {msg}' - formatted.append((level, msg)) - else: - lexer = lexers.JsonLexer() # pylint: disable=no-member - for token in lexer.get_tokens(msg): - formatted.append(token) - self.history.add_to_history(formatted) - - -class Window(urwid.Frame): - """ - This window is the top most widget of the TUI and will contain other - windows. Each child of this widget is responsible for displaying a specific - functionality. - - :param parent: Reference to the TUI object. - """ - def __init__(self, parent: App) -> None: - self.parent = parent - footer = StatusBar() - body = HistoryWindow(parent) - super().__init__(body, footer=footer) - - -class TUILogHandler(Handler): - """ - This handler routes all the log messages to the TUI screen. - It is installed to the root logger to so that the log message from all - libraries begin used is routed to the screen. - - :param tui: Reference to the TUI object. - """ - def __init__(self, tui: App) -> None: - super().__init__() - self.tui = tui - - def emit(self, record: LogRecord) -> None: - """ - Emits a record to the TUI screen. - - Appends the log message to the TUI screen - """ - level = record.levelname - msg = record.getMessage() - self.tui.add_to_history(msg, level) - - -def main() -> None: - """ - Driver of the whole script, parses arguments, initialize the TUI and - the logger. - """ - parser = argparse.ArgumentParser(description='AQMP TUI') - parser.add_argument('qmp_server', help='Address of the QMP server. ' - 'Format <UNIX socket path | TCP addr:port>') - parser.add_argument('--num-retries', type=int, default=10, - help='Number of times to reconnect before giving up.') - parser.add_argument('--retry-delay', type=int, - help='Time(s) to wait before next retry. ' - 'Default action is to wait 2s between each retry.') - parser.add_argument('--log-file', help='The Log file name') - parser.add_argument('--log-level', default='WARNING', - help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>') - parser.add_argument('--asyncio-debug', action='store_true', - help='Enable debug mode for asyncio loop. ' - 'Generates lot of output, makes TUI unusable when ' - 'logs are logged in the TUI. ' - 'Use only when logging to a file.') - args = parser.parse_args() - - try: - address = QEMUMonitorProtocol.parse_address(args.qmp_server) - except QMPBadPortError as err: - parser.error(str(err)) - - app = App(address, args.num_retries, args.retry_delay) - - root_logger = logging.getLogger() - root_logger.setLevel(logging.getLevelName(args.log_level)) - - if args.log_file: - root_logger.addHandler(logging.FileHandler(args.log_file)) - else: - root_logger.addHandler(TUILogHandler(app)) - - app.run(args.asyncio_debug) - - -if __name__ == '__main__': - main() diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py deleted file mode 100644 index 24ba4d5054..0000000000 --- a/python/qemu/aqmp/error.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -QMP Error Classes - -This package seeks to provide semantic error classes that are intended -to be used directly by clients when they would like to handle particular -semantic failures (e.g. "failed to connect") without needing to know the -enumeration of possible reasons for that failure. - -QMPError serves as the ancestor for all exceptions raised by this -package, and is suitable for use in handling semantic errors from this -library. In most cases, individual public methods will attempt to catch -and re-encapsulate various exceptions to provide a semantic -error-handling interface. - -.. admonition:: QMP Exception Hierarchy Reference - - | `Exception` - | +-- `QMPError` - | +-- `ConnectError` - | +-- `StateError` - | +-- `ExecInterruptedError` - | +-- `ExecuteError` - | +-- `ListenerError` - | +-- `ProtocolError` - | +-- `DeserializationError` - | +-- `UnexpectedTypeError` - | +-- `ServerParseError` - | +-- `BadReplyError` - | +-- `GreetingError` - | +-- `NegotiationError` -""" - - -class QMPError(Exception): - """Abstract error class for all errors originating from this package.""" - - -class ProtocolError(QMPError): - """ - Abstract error class for protocol failures. - - Semantically, these errors are generally the fault of either the - protocol server or as a result of a bug in this library. - - :param error_message: Human-readable string describing the error. - """ - def __init__(self, error_message: str): - super().__init__(error_message) - #: Human-readable error message, without any prefix. - self.error_message: str = error_message diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py deleted file mode 100644 index f3d4e2b5e8..0000000000 --- a/python/qemu/aqmp/events.py +++ /dev/null @@ -1,717 +0,0 @@ -""" -AQMP Events and EventListeners - -Asynchronous QMP uses `EventListener` objects to listen for events. An -`EventListener` is a FIFO event queue that can be pre-filtered to listen -for only specific events. Each `EventListener` instance receives its own -copy of events that it hears, so events may be consumed without fear or -worry for depriving other listeners of events they need to hear. - - -EventListener Tutorial ----------------------- - -In all of the following examples, we assume that we have a `QMPClient` -instantiated named ``qmp`` that is already connected. - - -`listener()` context blocks with one name -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The most basic usage is by using the `listener()` context manager to -construct them: - -.. code:: python - - with qmp.listener('STOP') as listener: - await qmp.execute('stop') - await listener.get() - -The listener is active only for the duration of the ‘with’ block. This -instance listens only for ‘STOP’ events. - - -`listener()` context blocks with two or more names -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Multiple events can be selected for by providing any ``Iterable[str]``: - -.. code:: python - - with qmp.listener(('STOP', 'RESUME')) as listener: - await qmp.execute('stop') - event = await listener.get() - assert event['event'] == 'STOP' - - await qmp.execute('cont') - event = await listener.get() - assert event['event'] == 'RESUME' - - -`listener()` context blocks with no names -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -By omitting names entirely, you can listen to ALL events. - -.. code:: python - - with qmp.listener() as listener: - await qmp.execute('stop') - event = await listener.get() - assert event['event'] == 'STOP' - -This isn’t a very good use case for this feature: In a non-trivial -running system, we may not know what event will arrive next. Grabbing -the top of a FIFO queue returning multiple kinds of events may be prone -to error. - - -Using async iterators to retrieve events -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If you’d like to simply watch what events happen to arrive, you can use -the listener as an async iterator: - -.. code:: python - - with qmp.listener() as listener: - async for event in listener: - print(f"Event arrived: {event['event']}") - -This is analogous to the following code: - -.. code:: python - - with qmp.listener() as listener: - while True: - event = listener.get() - print(f"Event arrived: {event['event']}") - -This event stream will never end, so these blocks will never terminate. - - -Using asyncio.Task to concurrently retrieve events -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Since a listener’s event stream will never terminate, it is not likely -useful to use that form in a script. For longer-running clients, we can -create event handlers by using `asyncio.Task` to create concurrent -coroutines: - -.. code:: python - - async def print_events(listener): - try: - async for event in listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - with qmp.listener() as listener: - task = asyncio.Task(print_events(listener)) - await qmp.execute('stop') - await qmp.execute('cont') - task.cancel() - await task - -However, there is no guarantee that these events will be received by the -time we leave this context block. Once the context block is exited, the -listener will cease to hear any new events, and becomes inert. - -Be mindful of the timing: the above example will *probably*– but does -not *guarantee*– that both STOP/RESUMED events will be printed. The -example below outlines how to use listeners outside of a context block. - - -Using `register_listener()` and `remove_listener()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -To create a listener with a longer lifetime, beyond the scope of a -single block, create a listener and then call `register_listener()`: - -.. code:: python - - class MyClient: - def __init__(self, qmp): - self.qmp = qmp - self.listener = EventListener() - - async def print_events(self): - try: - async for event in self.listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - async def run(self): - self.task = asyncio.Task(self.print_events) - self.qmp.register_listener(self.listener) - await qmp.execute('stop') - await qmp.execute('cont') - - async def stop(self): - self.task.cancel() - await self.task - self.qmp.remove_listener(self.listener) - -The listener can be deactivated by using `remove_listener()`. When it is -removed, any possible pending events are cleared and it can be -re-registered at a later time. - - -Using the built-in all events listener -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The `QMPClient` object creates its own default listener named -:py:obj:`~Events.events` that can be used for the same purpose without -having to create your own: - -.. code:: python - - async def print_events(listener): - try: - async for event in listener: - print(f"Event arrived: {event['event']}") - except asyncio.CancelledError: - return - - task = asyncio.Task(print_events(qmp.events)) - - await qmp.execute('stop') - await qmp.execute('cont') - - task.cancel() - await task - - -Using both .get() and async iterators -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The async iterator and `get()` methods pull events from the same FIFO -queue. If you mix the usage of both, be aware: Events are emitted -precisely once per listener. - -If multiple contexts try to pull events from the same listener instance, -events are still emitted only precisely once. - -This restriction can be lifted by creating additional listeners. - - -Creating multiple listeners -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Additional `EventListener` objects can be created at-will. Each one -receives its own copy of events, with separate FIFO event queues. - -.. code:: python - - my_listener = EventListener() - qmp.register_listener(my_listener) - - await qmp.execute('stop') - copy1 = await my_listener.get() - copy2 = await qmp.events.get() - - assert copy1 == copy2 - -In this example, we await an event from both a user-created -`EventListener` and the built-in events listener. Both receive the same -event. - - -Clearing listeners -~~~~~~~~~~~~~~~~~~ - -`EventListener` objects can be cleared, clearing all events seen thus far: - -.. code:: python - - await qmp.execute('stop') - qmp.events.clear() - await qmp.execute('cont') - event = await qmp.events.get() - assert event['event'] == 'RESUME' - -`EventListener` objects are FIFO queues. If events are not consumed, -they will remain in the queue until they are witnessed or discarded via -`clear()`. FIFO queues will be drained automatically upon leaving a -context block, or when calling `remove_listener()`. - - -Accessing listener history -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -`EventListener` objects record their history. Even after being cleared, -you can obtain a record of all events seen so far: - -.. code:: python - - await qmp.execute('stop') - await qmp.execute('cont') - qmp.events.clear() - - assert len(qmp.events.history) == 2 - assert qmp.events.history[0]['event'] == 'STOP' - assert qmp.events.history[1]['event'] == 'RESUME' - -The history is updated immediately and does not require the event to be -witnessed first. - - -Using event filters -~~~~~~~~~~~~~~~~~~~ - -`EventListener` objects can be given complex filtering criteria if names -are not sufficient: - -.. code:: python - - def job1_filter(event) -> bool: - event_data = event.get('data', {}) - event_job_id = event_data.get('id') - return event_job_id == "job1" - - with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: - await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) - async for event in listener: - if event['data']['status'] == 'concluded': - break - -These filters might be most useful when parameterized. `EventListener` -objects expect a function that takes only a single argument (the raw -event, as a `Message`) and returns a bool; True if the event should be -accepted into the stream. You can create a function that adapts this -signature to accept configuration parameters: - -.. code:: python - - def job_filter(job_id: str) -> EventFilter: - def filter(event: Message) -> bool: - return event['data']['id'] == job_id - return filter - - with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: - await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) - async for event in listener: - if event['data']['status'] == 'concluded': - break - - -Activating an existing listener with `listen()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Listeners with complex, long configurations can also be created manually -and activated temporarily by using `listen()` instead of `listener()`: - -.. code:: python - - listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', - 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', - 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) - - with qmp.listen(listener): - await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) - async for event in listener: - print(event) - if event['event'] == 'BLOCK_JOB_COMPLETED': - break - -Any events that are not witnessed by the time the block is left will be -cleared from the queue; entering the block is an implicit -`register_listener()` and leaving the block is an implicit -`remove_listener()`. - - -Activating multiple existing listeners with `listen()` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -While `listener()` is only capable of creating a single listener, -`listen()` is capable of activating multiple listeners simultaneously: - -.. code:: python - - def job_filter(job_id: str) -> EventFilter: - def filter(event: Message) -> bool: - return event['data']['id'] == job_id - return filter - - jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) - jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) - - with qmp.listen(jobA, jobB): - qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) - qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) - - async for event in jobA.get(): - if event['data']['status'] == 'concluded': - break - async for event in jobB.get(): - if event['data']['status'] == 'concluded': - break - - -Extending the `EventListener` class -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -In the case that a more specialized `EventListener` is desired to -provide either more functionality or more compact syntax for specialized -cases, it can be extended. - -One of the key methods to extend or override is -:py:meth:`~EventListener.accept()`. The default implementation checks an -incoming message for: - -1. A qualifying name, if any :py:obj:`~EventListener.names` were - specified at initialization time -2. That :py:obj:`~EventListener.event_filter()` returns True. - -This can be modified however you see fit to change the criteria for -inclusion in the stream. - -For convenience, a ``JobListener`` class could be created that simply -bakes in configuration so it does not need to be repeated: - -.. code:: python - - class JobListener(EventListener): - def __init__(self, job_id: str): - super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', - 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', - 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) - self.job_id = job_id - - def accept(self, event) -> bool: - if not super().accept(event): - return False - if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): - return event['data']['id'] == job_id - return event['data']['device'] == job_id - -From here on out, you can conjure up a custom-purpose listener that -listens only for job-related events for a specific job-id easily: - -.. code:: python - - listener = JobListener('job4') - with qmp.listener(listener): - await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) - async for event in listener: - print(event) - if event['event'] == 'BLOCK_JOB_COMPLETED': - break - - -Experimental Interfaces & Design Issues ---------------------------------------- - -These interfaces are not ones I am sure I will keep or otherwise modify -heavily. - -qmp.listener()’s type signature -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -`listener()` does not return anything, because it was assumed the caller -already had a handle to the listener. However, for -``qmp.listener(EventListener())`` forms, the caller will not have saved -a handle to the listener. - -Because this function can accept *many* listeners, I found it hard to -accurately type in a way where it could be used in both “one” or “many” -forms conveniently and in a statically type-safe manner. - -Ultimately, I removed the return altogether, but perhaps with more time -I can work out a way to re-add it. - - -API Reference -------------- - -""" - -import asyncio -from contextlib import contextmanager -import logging -from typing import ( - AsyncIterator, - Callable, - Iterable, - Iterator, - List, - Optional, - Set, - Tuple, - Union, -) - -from .error import QMPError -from .message import Message - - -EventNames = Union[str, Iterable[str], None] -EventFilter = Callable[[Message], bool] - - -class ListenerError(QMPError): - """ - Generic error class for `EventListener`-related problems. - """ - - -class EventListener: - """ - Selectively listens for events with runtime configurable filtering. - - This class is designed to be directly usable for the most common cases, - but it can be extended to provide more rigorous control. - - :param names: - One or more names of events to listen for. - When not provided, listen for ALL events. - :param event_filter: - An optional event filtering function. - When names are also provided, this acts as a secondary filter. - - When ``names`` and ``event_filter`` are both provided, the names - will be filtered first, and then the filter function will be called - second. The event filter function can assume that the format of the - event is a known format. - """ - def __init__( - self, - names: EventNames = None, - event_filter: Optional[EventFilter] = None, - ): - # Queue of 'heard' events yet to be witnessed by a caller. - self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() - - # Intended as a historical record, NOT a processing queue or backlog. - self._history: List[Message] = [] - - #: Primary event filter, based on one or more event names. - self.names: Set[str] = set() - if isinstance(names, str): - self.names.add(names) - elif names is not None: - self.names.update(names) - - #: Optional, secondary event filter. - self.event_filter: Optional[EventFilter] = event_filter - - @property - def history(self) -> Tuple[Message, ...]: - """ - A read-only history of all events seen so far. - - This represents *every* event, including those not yet witnessed - via `get()` or ``async for``. It persists between `clear()` - calls and is immutable. - """ - return tuple(self._history) - - def accept(self, event: Message) -> bool: - """ - Determine if this listener accepts this event. - - This method determines which events will appear in the stream. - The default implementation simply checks the event against the - list of names and the event_filter to decide if this - `EventListener` accepts a given event. It can be - overridden/extended to provide custom listener behavior. - - User code is not expected to need to invoke this method. - - :param event: The event under consideration. - :return: `True`, if this listener accepts this event. - """ - name_ok = (not self.names) or (event['event'] in self.names) - return name_ok and ( - (not self.event_filter) or self.event_filter(event) - ) - - async def put(self, event: Message) -> None: - """ - Conditionally put a new event into the FIFO queue. - - This method is not designed to be invoked from user code, and it - should not need to be overridden. It is a public interface so - that `QMPClient` has an interface by which it can inform - registered listeners of new events. - - The event will be put into the queue if - :py:meth:`~EventListener.accept()` returns `True`. - - :param event: The new event to put into the FIFO queue. - """ - if not self.accept(event): - return - - self._history.append(event) - await self._queue.put(event) - - async def get(self) -> Message: - """ - Wait for the very next event in this stream. - - If one is already available, return that one. - """ - return await self._queue.get() - - def empty(self) -> bool: - """ - Return `True` if there are no pending events. - """ - return self._queue.empty() - - def clear(self) -> List[Message]: - """ - Clear this listener of all pending events. - - Called when an `EventListener` is being unregistered, this clears the - pending FIFO queue synchronously. It can be also be used to - manually clear any pending events, if desired. - - :return: The cleared events, if any. - - .. warning:: - Take care when discarding events. Cleared events will be - silently tossed on the floor. All events that were ever - accepted by this listener are visible in `history()`. - """ - events = [] - while True: - try: - events.append(self._queue.get_nowait()) - except asyncio.QueueEmpty: - break - - return events - - def __aiter__(self) -> AsyncIterator[Message]: - return self - - async def __anext__(self) -> Message: - """ - Enables the `EventListener` to function as an async iterator. - - It may be used like this: - - .. code:: python - - async for event in listener: - print(event) - - These iterators will never terminate of their own accord; you - must provide break conditions or otherwise prepare to run them - in an `asyncio.Task` that can be cancelled. - """ - return await self.get() - - -class Events: - """ - Events is a mix-in class that adds event functionality to the QMP class. - - It's designed specifically as a mix-in for `QMPClient`, and it - relies upon the class it is being mixed into having a 'logger' - property. - """ - def __init__(self) -> None: - self._listeners: List[EventListener] = [] - - #: Default, all-events `EventListener`. - self.events: EventListener = EventListener() - self.register_listener(self.events) - - # Parent class needs to have a logger - self.logger: logging.Logger - - async def _event_dispatch(self, msg: Message) -> None: - """ - Given a new event, propagate it to all of the active listeners. - - :param msg: The event to propagate. - """ - for listener in self._listeners: - await listener.put(msg) - - def register_listener(self, listener: EventListener) -> None: - """ - Register and activate an `EventListener`. - - :param listener: The listener to activate. - :raise ListenerError: If the given listener is already registered. - """ - if listener in self._listeners: - raise ListenerError("Attempted to re-register existing listener") - self.logger.debug("Registering %s.", str(listener)) - self._listeners.append(listener) - - def remove_listener(self, listener: EventListener) -> None: - """ - Unregister and deactivate an `EventListener`. - - The removed listener will have its pending events cleared via - `clear()`. The listener can be re-registered later when - desired. - - :param listener: The listener to deactivate. - :raise ListenerError: If the given listener is not registered. - """ - if listener == self.events: - raise ListenerError("Cannot remove the default listener.") - self.logger.debug("Removing %s.", str(listener)) - listener.clear() - self._listeners.remove(listener) - - @contextmanager - def listen(self, *listeners: EventListener) -> Iterator[None]: - r""" - Context manager: Temporarily listen with an `EventListener`. - - Accepts one or more `EventListener` objects and registers them, - activating them for the duration of the context block. - - `EventListener` objects will have any pending events in their - FIFO queue cleared upon exiting the context block, when they are - deactivated. - - :param \*listeners: One or more EventListeners to activate. - :raise ListenerError: If the given listener(s) are already active. - """ - _added = [] - - try: - for listener in listeners: - self.register_listener(listener) - _added.append(listener) - - yield - - finally: - for listener in _added: - self.remove_listener(listener) - - @contextmanager - def listener( - self, - names: EventNames = (), - event_filter: Optional[EventFilter] = None - ) -> Iterator[EventListener]: - """ - Context manager: Temporarily listen with a new `EventListener`. - - Creates an `EventListener` object and registers it, activating - it for the duration of the context block. - - :param names: - One or more names of events to listen for. - When not provided, listen for ALL events. - :param event_filter: - An optional event filtering function. - When names are also provided, this acts as a secondary filter. - - :return: The newly created and active `EventListener`. - """ - listener = EventListener(names, event_filter) - with self.listen(listener): - yield listener diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py deleted file mode 100644 index dfcd20bbd2..0000000000 --- a/python/qemu/aqmp/legacy.py +++ /dev/null @@ -1,317 +0,0 @@ -""" -(Legacy) Sync QMP Wrapper - -This module provides the `QEMUMonitorProtocol` class, which is a -synchronous wrapper around `QMPClient`. - -Its design closely resembles that of the original QEMUMonitorProtocol -class, originally written by Luiz Capitulino. It is provided here for -compatibility with scripts inside the QEMU source tree that expect the -old interface. -""" - -# -# Copyright (C) 2009-2022 Red Hat Inc. -# -# Authors: -# Luiz Capitulino <lcapitulino@redhat.com> -# John Snow <jsnow@redhat.com> -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. -# - -import asyncio -from types import TracebackType -from typing import ( - Any, - Awaitable, - Dict, - List, - Optional, - Type, - TypeVar, - Union, -) - -from .error import QMPError -from .protocol import Runstate, SocketAddrT -from .qmp_client import QMPClient - - -#: QMPMessage is an entire QMP message of any kind. -QMPMessage = Dict[str, Any] - -#: QMPReturnValue is the 'return' value of a command. -QMPReturnValue = object - -#: QMPObject is any object in a QMP message. -QMPObject = Dict[str, object] - -# QMPMessage can be outgoing commands or incoming events/returns. -# QMPReturnValue is usually a dict/json object, but due to QAPI's -# 'returns-whitelist', it can actually be anything. -# -# {'return': {}} is a QMPMessage, -# {} is the QMPReturnValue. - - -class QMPBadPortError(QMPError): - """ - Unable to parse socket address: Port was non-numerical. - """ - - -class QEMUMonitorProtocol: - """ - Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) - and then allow to handle commands and events. - - :param address: QEMU address, can be either a unix socket path (string) - or a tuple in the form ( address, port ) for a TCP - connection - :param server: Act as the socket server. (See 'accept') - :param nickname: Optional nickname used for logging. - """ - - def __init__(self, address: SocketAddrT, - server: bool = False, - nickname: Optional[str] = None): - - self._aqmp = QMPClient(nickname) - self._aloop = asyncio.get_event_loop() - self._address = address - self._timeout: Optional[float] = None - - if server: - self._sync(self._aqmp.start_server(self._address)) - - _T = TypeVar('_T') - - def _sync( - self, future: Awaitable[_T], timeout: Optional[float] = None - ) -> _T: - return self._aloop.run_until_complete( - asyncio.wait_for(future, timeout=timeout) - ) - - def _get_greeting(self) -> Optional[QMPMessage]: - if self._aqmp.greeting is not None: - # pylint: disable=protected-access - return self._aqmp.greeting._asdict() - return None - - def __enter__(self: _T) -> _T: - # Implement context manager enter function. - return self - - def __exit__(self, - # pylint: disable=duplicate-code - # see https://github.com/PyCQA/pylint/issues/3619 - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType]) -> None: - # Implement context manager exit function. - self.close() - - @classmethod - def parse_address(cls, address: str) -> SocketAddrT: - """ - Parse a string into a QMP address. - - Figure out if the argument is in the port:host form. - If it's not, it's probably a file path. - """ - components = address.split(':') - if len(components) == 2: - try: - port = int(components[1]) - except ValueError: - msg = f"Bad port: '{components[1]}' in '{address}'." - raise QMPBadPortError(msg) from None - return (components[0], port) - - # Treat as filepath. - return address - - def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: - """ - Connect to the QMP Monitor and perform capabilities negotiation. - - :return: QMP greeting dict, or None if negotiate is false - :raise ConnectError: on connection errors - """ - self._aqmp.await_greeting = negotiate - self._aqmp.negotiate = negotiate - - self._sync( - self._aqmp.connect(self._address) - ) - return self._get_greeting() - - def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: - """ - Await connection from QMP Monitor and perform capabilities negotiation. - - :param timeout: - timeout in seconds (nonnegative float number, or None). - If None, there is no timeout, and this may block forever. - - :return: QMP greeting dict - :raise ConnectError: on connection errors - """ - self._aqmp.await_greeting = True - self._aqmp.negotiate = True - - self._sync(self._aqmp.accept(), timeout) - - ret = self._get_greeting() - assert ret is not None - return ret - - def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: - """ - Send a QMP command to the QMP Monitor. - - :param qmp_cmd: QMP command to be sent as a Python dict - :return: QMP response as a Python dict - """ - return dict( - self._sync( - # pylint: disable=protected-access - - # _raw() isn't a public API, because turning off - # automatic ID assignment is discouraged. For - # compatibility with iotests *only*, do it anyway. - self._aqmp._raw(qmp_cmd, assign_id=False), - self._timeout - ) - ) - - def cmd(self, name: str, - args: Optional[Dict[str, object]] = None, - cmd_id: Optional[object] = None) -> QMPMessage: - """ - Build a QMP command and send it to the QMP Monitor. - - :param name: command name (string) - :param args: command arguments (dict) - :param cmd_id: command id (dict, list, string or int) - """ - qmp_cmd: QMPMessage = {'execute': name} - if args: - qmp_cmd['arguments'] = args - if cmd_id: - qmp_cmd['id'] = cmd_id - return self.cmd_obj(qmp_cmd) - - def command(self, cmd: str, **kwds: object) -> QMPReturnValue: - """ - Build and send a QMP command to the monitor, report errors if any - """ - return self._sync( - self._aqmp.execute(cmd, kwds), - self._timeout - ) - - def pull_event(self, - wait: Union[bool, float] = False) -> Optional[QMPMessage]: - """ - Pulls a single event. - - :param wait: - If False or 0, do not wait. Return None if no events ready. - If True, wait forever until the next event. - Otherwise, wait for the specified number of seconds. - - :raise asyncio.TimeoutError: - When a timeout is requested and the timeout period elapses. - - :return: The first available QMP event, or None. - """ - if not wait: - # wait is False/0: "do not wait, do not except." - if self._aqmp.events.empty(): - return None - - # If wait is 'True', wait forever. If wait is False/0, the events - # queue must not be empty; but it still needs some real amount - # of time to complete. - timeout = None - if wait and isinstance(wait, float): - timeout = wait - - return dict( - self._sync( - self._aqmp.events.get(), - timeout - ) - ) - - def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: - """ - Get a list of QMP events and clear all pending events. - - :param wait: - If False or 0, do not wait. Return None if no events ready. - If True, wait until we have at least one event. - Otherwise, wait for up to the specified number of seconds for at - least one event. - - :raise asyncio.TimeoutError: - When a timeout is requested and the timeout period elapses. - - :return: A list of QMP events. - """ - events = [dict(x) for x in self._aqmp.events.clear()] - if events: - return events - - event = self.pull_event(wait) - return [event] if event is not None else [] - - def clear_events(self) -> None: - """Clear current list of pending events.""" - self._aqmp.events.clear() - - def close(self) -> None: - """Close the connection.""" - self._sync( - self._aqmp.disconnect() - ) - - def settimeout(self, timeout: Optional[float]) -> None: - """ - Set the timeout for QMP RPC execution. - - This timeout affects the `cmd`, `cmd_obj`, and `command` methods. - The `accept`, `pull_event` and `get_event` methods have their - own configurable timeouts. - - :param timeout: - timeout in seconds, or None. - None will wait indefinitely. - """ - self._timeout = timeout - - def send_fd_scm(self, fd: int) -> None: - """ - Send a file descriptor to the remote via SCM_RIGHTS. - """ - self._aqmp.send_fd_scm(fd) - - def __del__(self) -> None: - if self._aqmp.runstate == Runstate.IDLE: - return - - if not self._aloop.is_running(): - self.close() - else: - # Garbage collection ran while the event loop was running. - # Nothing we can do about it now, but if we don't raise our - # own error, the user will be treated to a lot of traceback - # they might not understand. - raise QMPError( - "QEMUMonitorProtocol.close()" - " was not called before object was garbage collected" - ) diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py deleted file mode 100644 index f76ccc9074..0000000000 --- a/python/qemu/aqmp/message.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -QMP Message Format - -This module provides the `Message` class, which represents a single QMP -message sent to or from the server. -""" - -import json -from json import JSONDecodeError -from typing import ( - Dict, - Iterator, - Mapping, - MutableMapping, - Optional, - Union, -) - -from .error import ProtocolError - - -class Message(MutableMapping[str, object]): - """ - Represents a single QMP protocol message. - - QMP uses JSON objects as its basic communicative unit; so this - Python object is a :py:obj:`~collections.abc.MutableMapping`. It may - be instantiated from either another mapping (like a `dict`), or from - raw `bytes` that still need to be deserialized. - - Once instantiated, it may be treated like any other MutableMapping:: - - >>> msg = Message(b'{"hello": "world"}') - >>> assert msg['hello'] == 'world' - >>> msg['id'] = 'foobar' - >>> print(msg) - { - "hello": "world", - "id": "foobar" - } - - It can be converted to `bytes`:: - - >>> msg = Message({"hello": "world"}) - >>> print(bytes(msg)) - b'{"hello":"world","id":"foobar"}' - - Or back into a garden-variety `dict`:: - - >>> dict(msg) - {'hello': 'world'} - - - :param value: Initial value, if any. - :param eager: - When `True`, attempt to serialize or deserialize the initial value - immediately, so that conversion exceptions are raised during - the call to ``__init__()``. - """ - # pylint: disable=too-many-ancestors - - def __init__(self, - value: Union[bytes, Mapping[str, object]] = b'{}', *, - eager: bool = True): - self._data: Optional[bytes] = None - self._obj: Optional[Dict[str, object]] = None - - if isinstance(value, bytes): - self._data = value - if eager: - self._obj = self._deserialize(self._data) - else: - self._obj = dict(value) - if eager: - self._data = self._serialize(self._obj) - - # Methods necessary to implement the MutableMapping interface, see: - # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping - - # We get pop, popitem, clear, update, setdefault, __contains__, - # keys, items, values, get, __eq__ and __ne__ for free. - - def __getitem__(self, key: str) -> object: - return self._object[key] - - def __setitem__(self, key: str, value: object) -> None: - self._object[key] = value - self._data = None - - def __delitem__(self, key: str) -> None: - del self._object[key] - self._data = None - - def __iter__(self) -> Iterator[str]: - return iter(self._object) - - def __len__(self) -> int: - return len(self._object) - - # Dunder methods not related to MutableMapping: - - def __repr__(self) -> str: - if self._obj is not None: - return f"Message({self._object!r})" - return f"Message({bytes(self)!r})" - - def __str__(self) -> str: - """Pretty-printed representation of this QMP message.""" - return json.dumps(self._object, indent=2) - - def __bytes__(self) -> bytes: - """bytes representing this QMP message.""" - if self._data is None: - self._data = self._serialize(self._obj or {}) - return self._data - - # Conversion Methods - - @property - def _object(self) -> Dict[str, object]: - """ - A `dict` representing this QMP message. - - Generated on-demand, if required. This property is private - because it returns an object that could be used to invalidate - the internal state of the `Message` object. - """ - if self._obj is None: - self._obj = self._deserialize(self._data or b'{}') - return self._obj - - @classmethod - def _serialize(cls, value: object) -> bytes: - """ - Serialize a JSON object as `bytes`. - - :raise ValueError: When the object cannot be serialized. - :raise TypeError: When the object cannot be serialized. - - :return: `bytes` ready to be sent over the wire. - """ - return json.dumps(value, separators=(',', ':')).encode('utf-8') - - @classmethod - def _deserialize(cls, data: bytes) -> Dict[str, object]: - """ - Deserialize JSON `bytes` into a native Python `dict`. - - :raise DeserializationError: - If JSON deserialization fails for any reason. - :raise UnexpectedTypeError: - If the data does not represent a JSON object. - - :return: A `dict` representing this QMP message. - """ - try: - obj = json.loads(data) - except JSONDecodeError as err: - emsg = "Failed to deserialize QMP message." - raise DeserializationError(emsg, data) from err - if not isinstance(obj, dict): - raise UnexpectedTypeError( - "QMP message is not a JSON object.", - obj - ) - return obj - - -class DeserializationError(ProtocolError): - """ - A QMP message was not understood as JSON. - - When this Exception is raised, ``__cause__`` will be set to the - `json.JSONDecodeError` Exception, which can be interrogated for - further details. - - :param error_message: Human-readable string describing the error. - :param raw: The raw `bytes` that prompted the failure. - """ - def __init__(self, error_message: str, raw: bytes): - super().__init__(error_message) - #: The raw `bytes` that were not understood as JSON. - self.raw: bytes = raw - - def __str__(self) -> str: - return "\n".join([ - super().__str__(), - f" raw bytes were: {str(self.raw)}", - ]) - - -class UnexpectedTypeError(ProtocolError): - """ - A QMP message was JSON, but not a JSON object. - - :param error_message: Human-readable string describing the error. - :param value: The deserialized JSON value that wasn't an object. - """ - def __init__(self, error_message: str, value: object): - super().__init__(error_message) - #: The JSON value that was expected to be an object. - self.value: object = value - - def __str__(self) -> str: - strval = json.dumps(self.value, indent=2) - return "\n".join([ - super().__str__(), - f" json value was: {strval}", - ]) diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py deleted file mode 100644 index de87f87804..0000000000 --- a/python/qemu/aqmp/models.py +++ /dev/null @@ -1,146 +0,0 @@ -""" -QMP Data Models - -This module provides simplistic data classes that represent the few -structures that the QMP spec mandates; they are used to verify incoming -data to make sure it conforms to spec. -""" -# pylint: disable=too-few-public-methods - -from collections import abc -import copy -from typing import ( - Any, - Dict, - Mapping, - Optional, - Sequence, -) - - -class Model: - """ - Abstract data model, representing some QMP object of some kind. - - :param raw: The raw object to be validated. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - self._raw = raw - - def _check_key(self, key: str) -> None: - if key not in self._raw: - raise KeyError(f"'{self._name}' object requires '{key}' member") - - def _check_value(self, key: str, type_: type, typestr: str) -> None: - assert key in self._raw - if not isinstance(self._raw[key], type_): - raise TypeError( - f"'{self._name}' member '{key}' must be a {typestr}" - ) - - def _check_member(self, key: str, type_: type, typestr: str) -> None: - self._check_key(key) - self._check_value(key, type_, typestr) - - @property - def _name(self) -> str: - return type(self).__name__ - - def __repr__(self) -> str: - return f"{self._name}({self._raw!r})" - - -class Greeting(Model): - """ - Defined in qmp-spec.txt, section 2.2, "Server Greeting". - - :param raw: The raw Greeting object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'QMP' member - self.QMP: QMPGreeting # pylint: disable=invalid-name - - self._check_member('QMP', abc.Mapping, "JSON object") - self.QMP = QMPGreeting(self._raw['QMP']) - - def _asdict(self) -> Dict[str, object]: - """ - For compatibility with the iotests sync QMP wrapper. - - The legacy QMP interface needs Greetings as a garden-variety Dict. - - This interface is private in the hopes that it will be able to - be dropped again in the near-future. Caller beware! - """ - return dict(copy.deepcopy(self._raw)) - - -class QMPGreeting(Model): - """ - Defined in qmp-spec.txt, section 2.2, "Server Greeting". - - :param raw: The raw QMPGreeting object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'version' member - self.version: Mapping[str, object] - #: 'capabilities' member - self.capabilities: Sequence[object] - - self._check_member('version', abc.Mapping, "JSON object") - self.version = self._raw['version'] - - self._check_member('capabilities', abc.Sequence, "JSON array") - self.capabilities = self._raw['capabilities'] - - -class ErrorResponse(Model): - """ - Defined in qmp-spec.txt, section 2.4.2, "error". - - :param raw: The raw ErrorResponse object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'error' member - self.error: ErrorInfo - #: 'id' member - self.id: Optional[object] = None # pylint: disable=invalid-name - - self._check_member('error', abc.Mapping, "JSON object") - self.error = ErrorInfo(self._raw['error']) - - if 'id' in raw: - self.id = raw['id'] - - -class ErrorInfo(Model): - """ - Defined in qmp-spec.txt, section 2.4.2, "error". - - :param raw: The raw ErrorInfo object. - :raise KeyError: If any required fields are absent. - :raise TypeError: If any required fields have the wrong type. - """ - def __init__(self, raw: Mapping[str, Any]): - super().__init__(raw) - #: 'class' member, with an underscore to avoid conflicts in Python. - self.class_: str - #: 'desc' member - self.desc: str - - self._check_member('class', str, "string") - self.class_ = self._raw['class'] - - self._check_member('desc', str, "string") - self.desc = self._raw['desc'] diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py deleted file mode 100644 index 36fae57f27..0000000000 --- a/python/qemu/aqmp/protocol.py +++ /dev/null @@ -1,1048 +0,0 @@ -""" -Generic Asynchronous Message-based Protocol Support - -This module provides a generic framework for sending and receiving -messages over an asyncio stream. `AsyncProtocol` is an abstract class -that implements the core mechanisms of a simple send/receive protocol, -and is designed to be extended. - -In this package, it is used as the implementation for the `QMPClient` -class. -""" - -# It's all the docstrings ... ! It's long for a good reason ^_^; -# pylint: disable=too-many-lines - -import asyncio -from asyncio import StreamReader, StreamWriter -from enum import Enum -from functools import wraps -import logging -from ssl import SSLContext -from typing import ( - Any, - Awaitable, - Callable, - Generic, - List, - Optional, - Tuple, - TypeVar, - Union, - cast, -) - -from .error import QMPError -from .util import ( - bottom_half, - create_task, - exception_summary, - flush, - is_closing, - pretty_traceback, - upper_half, - wait_closed, -) - - -T = TypeVar('T') -_U = TypeVar('_U') -_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None`` - -InternetAddrT = Tuple[str, int] -UnixAddrT = str -SocketAddrT = Union[UnixAddrT, InternetAddrT] - - -class Runstate(Enum): - """Protocol session runstate.""" - - #: Fully quiesced and disconnected. - IDLE = 0 - #: In the process of connecting or establishing a session. - CONNECTING = 1 - #: Fully connected and active session. - RUNNING = 2 - #: In the process of disconnecting. - #: Runstate may be returned to `IDLE` by calling `disconnect()`. - DISCONNECTING = 3 - - -class ConnectError(QMPError): - """ - Raised when the initial connection process has failed. - - This Exception always wraps a "root cause" exception that can be - interrogated for additional information. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - def __init__(self, error_message: str, exc: Exception): - super().__init__(error_message) - #: Human-readable error string - self.error_message: str = error_message - #: Wrapped root cause exception - self.exc: Exception = exc - - def __str__(self) -> str: - cause = str(self.exc) - if not cause: - # If there's no error string, use the exception name. - cause = exception_summary(self.exc) - return f"{self.error_message}: {cause}" - - -class StateError(QMPError): - """ - An API command (connect, execute, etc) was issued at an inappropriate time. - - This error is raised when a command like - :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate - time. - - :param error_message: Human-readable string describing the state violation. - :param state: The actual `Runstate` seen at the time of the violation. - :param required: The `Runstate` required to process this command. - """ - def __init__(self, error_message: str, - state: Runstate, required: Runstate): - super().__init__(error_message) - self.error_message = error_message - self.state = state - self.required = required - - -F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name - - -# Don't Panic. -def require(required_state: Runstate) -> Callable[[F], F]: - """ - Decorator: protect a method so it can only be run in a certain `Runstate`. - - :param required_state: The `Runstate` required to invoke this method. - :raise StateError: When the required `Runstate` is not met. - """ - def _decorator(func: F) -> F: - # _decorator is the decorator that is built by calling the - # require() decorator factory; e.g.: - # - # @require(Runstate.IDLE) def foo(): ... - # will replace 'foo' with the result of '_decorator(foo)'. - - @wraps(func) - def _wrapper(proto: 'AsyncProtocol[Any]', - *args: Any, **kwargs: Any) -> Any: - # _wrapper is the function that gets executed prior to the - # decorated method. - - name = type(proto).__name__ - - if proto.runstate != required_state: - if proto.runstate == Runstate.CONNECTING: - emsg = f"{name} is currently connecting." - elif proto.runstate == Runstate.DISCONNECTING: - emsg = (f"{name} is disconnecting." - " Call disconnect() to return to IDLE state.") - elif proto.runstate == Runstate.RUNNING: - emsg = f"{name} is already connected and running." - elif proto.runstate == Runstate.IDLE: - emsg = f"{name} is disconnected and idle." - else: - assert False - raise StateError(emsg, proto.runstate, required_state) - # No StateError, so call the wrapped method. - return func(proto, *args, **kwargs) - - # Return the decorated method; - # Transforming Func to Decorated[Func]. - return cast(F, _wrapper) - - # Return the decorator instance from the decorator factory. Phew! - return _decorator - - -class AsyncProtocol(Generic[T]): - """ - AsyncProtocol implements a generic async message-based protocol. - - This protocol assumes the basic unit of information transfer between - client and server is a "message", the details of which are left up - to the implementation. It assumes the sending and receiving of these - messages is full-duplex and not necessarily correlated; i.e. it - supports asynchronous inbound messages. - - It is designed to be extended by a specific protocol which provides - the implementations for how to read and send messages. These must be - defined in `_do_recv()` and `_do_send()`, respectively. - - Other callbacks have a default implementation, but are intended to be - either extended or overridden: - - - `_establish_session`: - The base implementation starts the reader/writer tasks. - A protocol implementation can override this call, inserting - actions to be taken prior to starting the reader/writer tasks - before the super() call; actions needing to occur afterwards - can be written after the super() call. - - `_on_message`: - Actions to be performed when a message is received. - - `_cb_outbound`: - Logging/Filtering hook for all outbound messages. - - `_cb_inbound`: - Logging/Filtering hook for all inbound messages. - This hook runs *before* `_on_message()`. - - :param name: - Name used for logging messages, if any. By default, messages - will log to 'qemu.aqmp.protocol', but each individual connection - can be given its own logger by giving it a name; messages will - then log to 'qemu.aqmp.protocol.${name}'. - """ - # pylint: disable=too-many-instance-attributes - - #: Logger object for debugging messages from this connection. - logger = logging.getLogger(__name__) - - # Maximum allowable size of read buffer - _limit = (64 * 1024) - - # ------------------------- - # Section: Public interface - # ------------------------- - - def __init__(self, name: Optional[str] = None) -> None: - #: The nickname for this connection, if any. - self.name: Optional[str] = name - if self.name is not None: - self.logger = self.logger.getChild(self.name) - - # stream I/O - self._reader: Optional[StreamReader] = None - self._writer: Optional[StreamWriter] = None - - # Outbound Message queue - self._outgoing: asyncio.Queue[T] - - # Special, long-running tasks: - self._reader_task: Optional[asyncio.Future[None]] = None - self._writer_task: Optional[asyncio.Future[None]] = None - - # Aggregate of the above two tasks, used for Exception management. - self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None - - #: Disconnect task. The disconnect implementation runs in a task - #: so that asynchronous disconnects (initiated by the - #: reader/writer) are allowed to wait for the reader/writers to - #: exit. - self._dc_task: Optional[asyncio.Future[None]] = None - - self._runstate = Runstate.IDLE - self._runstate_changed: Optional[asyncio.Event] = None - - # Server state for start_server() and _incoming() - self._server: Optional[asyncio.AbstractServer] = None - self._accepted: Optional[asyncio.Event] = None - - def __repr__(self) -> str: - cls_name = type(self).__name__ - tokens = [] - if self.name is not None: - tokens.append(f"name={self.name!r}") - tokens.append(f"runstate={self.runstate.name}") - return f"<{cls_name} {' '.join(tokens)}>" - - @property # @upper_half - def runstate(self) -> Runstate: - """The current `Runstate` of the connection.""" - return self._runstate - - @upper_half - async def runstate_changed(self) -> Runstate: - """ - Wait for the `runstate` to change, then return that runstate. - """ - await self._runstate_event.wait() - return self.runstate - - @upper_half - @require(Runstate.IDLE) - async def start_server_and_accept( - self, address: SocketAddrT, - ssl: Optional[SSLContext] = None - ) -> None: - """ - Accept a connection and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - This method is precisely equivalent to calling `start_server()` - followed by `accept()`. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - await self.start_server(address, ssl) - await self.accept() - assert self.runstate == Runstate.RUNNING - - @upper_half - @require(Runstate.IDLE) - async def start_server(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Start listening for an incoming connection, but do not wait for a peer. - - This method starts listening for an incoming connection, but - does not block waiting for a peer. This call will return - immediately after binding and listening on a socket. A later - call to `accept()` must be made in order to finalize the - incoming connection. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When the server could not start listening on this address. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError`. - """ - await self._session_guard( - self._do_start_server(address, ssl), - 'Failed to establish connection') - assert self.runstate == Runstate.CONNECTING - - @upper_half - @require(Runstate.CONNECTING) - async def accept(self) -> None: - """ - Accept an incoming connection and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :raise StateError: When the `Runstate` is not `CONNECTING`. - :raise QMPError: When `start_server()` was not called yet. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - if self._accepted is None: - raise QMPError("Cannot call accept() before start_server().") - await self._session_guard( - self._do_accept(), - 'Failed to establish connection') - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - assert self.runstate == Runstate.RUNNING - - @upper_half - @require(Runstate.IDLE) - async def connect(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Connect to the server and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - await self._session_guard( - self._do_connect(address, ssl), - 'Failed to establish connection') - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - assert self.runstate == Runstate.RUNNING - - @upper_half - async def disconnect(self) -> None: - """ - Disconnect and wait for all tasks to fully stop. - - If there was an exception that caused the reader/writers to - terminate prematurely, it will be raised here. - - :raise Exception: When the reader or writer terminate unexpectedly. - """ - self.logger.debug("disconnect() called.") - self._schedule_disconnect() - await self._wait_disconnect() - - # -------------------------- - # Section: Session machinery - # -------------------------- - - async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None: - """ - Async guard function used to roll back to `IDLE` on any error. - - On any Exception, the state machine will be reset back to - `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but - `BaseException` events will be left alone (This includes - asyncio.CancelledError, even prior to Python 3.8). - - :param error_message: - Human-readable string describing what connection phase failed. - - :raise BaseException: - When `BaseException` occurs in the guarded block. - :raise ConnectError: - When any other error is encountered in the guarded block. - """ - # Note: After Python 3.6 support is removed, this should be an - # @asynccontextmanager instead of accepting a callback. - try: - await coro - except BaseException as err: - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - try: - # Reset the runstate back to IDLE. - await self.disconnect() - except: - # We don't expect any Exceptions from the disconnect function - # here, because we failed to connect in the first place. - # The disconnect() function is intended to perform - # only cannot-fail cleanup here, but you never know. - emsg = ( - "Unexpected bottom half exception. " - "This is a bug in the QMP library. " - "Please report it to <qemu-devel@nongnu.org> and " - "CC: John Snow <jsnow@redhat.com>." - ) - self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) - raise - - # CancelledError is an Exception with special semantic meaning; - # We do NOT want to wrap it up under ConnectError. - # NB: CancelledError is not a BaseException before Python 3.8 - if isinstance(err, asyncio.CancelledError): - raise - - # Any other kind of error can be treated as some kind of connection - # failure broadly. Inspect the 'exc' field to explore the root - # cause in greater detail. - if isinstance(err, Exception): - raise ConnectError(emsg, err) from err - - # Raise BaseExceptions un-wrapped, they're more important. - raise - - @property - def _runstate_event(self) -> asyncio.Event: - # asyncio.Event() objects should not be created prior to entrance into - # an event loop, so we can ensure we create it in the correct context. - # Create it on-demand *only* at the behest of an 'async def' method. - if not self._runstate_changed: - self._runstate_changed = asyncio.Event() - return self._runstate_changed - - @upper_half - @bottom_half - def _set_state(self, state: Runstate) -> None: - """ - Change the `Runstate` of the protocol connection. - - Signals the `runstate_changed` event. - """ - if state == self._runstate: - return - - self.logger.debug("Transitioning from '%s' to '%s'.", - str(self._runstate), str(state)) - self._runstate = state - self._runstate_event.set() - self._runstate_event.clear() - - @bottom_half - async def _stop_server(self) -> None: - """ - Stop listening for / accepting new incoming connections. - """ - if self._server is None: - return - - try: - self.logger.debug("Stopping server.") - self._server.close() - await self._server.wait_closed() - self.logger.debug("Server stopped.") - finally: - self._server = None - - @bottom_half # However, it does not run from the R/W tasks. - async def _incoming(self, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: - """ - Accept an incoming connection and signal the upper_half. - - This method does the minimum necessary to accept a single - incoming connection. It signals back to the upper_half ASAP so - that any errors during session initialization can occur - naturally in the caller's stack. - - :param reader: Incoming `asyncio.StreamReader` - :param writer: Incoming `asyncio.StreamWriter` - """ - peer = writer.get_extra_info('peername', 'Unknown peer') - self.logger.debug("Incoming connection from %s", peer) - - if self._reader or self._writer: - # Sadly, we can have more than one pending connection - # because of https://bugs.python.org/issue46715 - # Close any extra connections we don't actually want. - self.logger.warning("Extraneous connection inadvertently accepted") - writer.close() - return - - # A connection has been accepted; stop listening for new ones. - assert self._accepted is not None - await self._stop_server() - self._reader, self._writer = (reader, writer) - self._accepted.set() - - @upper_half - async def _do_start_server(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Start listening for an incoming connection, but do not wait for a peer. - - This method starts listening for an incoming connection, but does not - block waiting for a peer. This call will return immediately after - binding and listening to a socket. A later call to accept() must be - made in order to finalize the incoming connection. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - self.logger.debug("Awaiting connection on %s ...", address) - self._accepted = asyncio.Event() - - if isinstance(address, tuple): - coro = asyncio.start_server( - self._incoming, - host=address[0], - port=address[1], - ssl=ssl, - backlog=1, - limit=self._limit, - ) - else: - coro = asyncio.start_unix_server( - self._incoming, - path=address, - ssl=ssl, - backlog=1, - limit=self._limit, - ) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - # This will start the server (bind(2), listen(2)). It will also - # call accept(2) if we yield, but we don't block on that here. - self._server = await coro - self.logger.debug("Server listening on %s", address) - - @upper_half - async def _do_accept(self) -> None: - """ - Wait for and accept an incoming connection. - - Requires that we have not yet accepted an incoming connection - from the upper_half, but it's OK if the server is no longer - running because the bottom_half has already accepted the - connection. - """ - assert self._accepted is not None - await self._accepted.wait() - assert self._server is None - self._accepted = None - - self.logger.debug("Connection accepted.") - - @upper_half - async def _do_connect(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Acting as the transport client, initiate a connection to a server. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - self.logger.debug("Connecting to %s ...", address) - - if isinstance(address, tuple): - connect = asyncio.open_connection( - address[0], - address[1], - ssl=ssl, - limit=self._limit, - ) - else: - connect = asyncio.open_unix_connection( - path=address, - ssl=ssl, - limit=self._limit, - ) - self._reader, self._writer = await connect - - self.logger.debug("Connected.") - - @upper_half - async def _establish_session(self) -> None: - """ - Establish a new session. - - Starts the readers/writer tasks; subclasses may perform their - own negotiations here. The Runstate will be RUNNING upon - successful conclusion. - """ - assert self.runstate == Runstate.CONNECTING - - self._outgoing = asyncio.Queue() - - reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') - writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') - - self._reader_task = create_task(reader_coro) - self._writer_task = create_task(writer_coro) - - self._bh_tasks = asyncio.gather( - self._reader_task, - self._writer_task, - ) - - self._set_state(Runstate.RUNNING) - await asyncio.sleep(0) # Allow runstate_event to process - - @upper_half - @bottom_half - def _schedule_disconnect(self) -> None: - """ - Initiate a disconnect; idempotent. - - This method is used both in the upper-half as a direct - consequence of `disconnect()`, and in the bottom-half in the - case of unhandled exceptions in the reader/writer tasks. - - It can be invoked no matter what the `runstate` is. - """ - if not self._dc_task: - self._set_state(Runstate.DISCONNECTING) - self.logger.debug("Scheduling disconnect.") - self._dc_task = create_task(self._bh_disconnect()) - - @upper_half - async def _wait_disconnect(self) -> None: - """ - Waits for a previously scheduled disconnect to finish. - - This method will gather any bottom half exceptions and re-raise - the one that occurred first; presuming it to be the root cause - of any subsequent Exceptions. It is intended to be used in the - upper half of the call chain. - - :raise Exception: - Arbitrary exception re-raised on behalf of the reader/writer. - """ - assert self.runstate == Runstate.DISCONNECTING - assert self._dc_task - - aws: List[Awaitable[object]] = [self._dc_task] - if self._bh_tasks: - aws.insert(0, self._bh_tasks) - all_defined_tasks = asyncio.gather(*aws) - - # Ensure disconnect is done; Exception (if any) is not raised here: - await asyncio.wait((self._dc_task,)) - - try: - await all_defined_tasks # Raise Exceptions from the bottom half. - finally: - self._cleanup() - self._set_state(Runstate.IDLE) - - @upper_half - def _cleanup(self) -> None: - """ - Fully reset this object to a clean state and return to `IDLE`. - """ - def _paranoid_task_erase(task: Optional['asyncio.Future[_U]'] - ) -> Optional['asyncio.Future[_U]']: - # Help to erase a task, ENSURING it is fully quiesced first. - assert (task is None) or task.done() - return None if (task and task.done()) else task - - assert self.runstate == Runstate.DISCONNECTING - self._dc_task = _paranoid_task_erase(self._dc_task) - self._reader_task = _paranoid_task_erase(self._reader_task) - self._writer_task = _paranoid_task_erase(self._writer_task) - self._bh_tasks = _paranoid_task_erase(self._bh_tasks) - - self._reader = None - self._writer = None - self._accepted = None - - # NB: _runstate_changed cannot be cleared because we still need it to - # send the final runstate changed event ...! - - # ---------------------------- - # Section: Bottom Half methods - # ---------------------------- - - @bottom_half - async def _bh_disconnect(self) -> None: - """ - Disconnect and cancel all outstanding tasks. - - It is designed to be called from its task context, - :py:obj:`~AsyncProtocol._dc_task`. By running in its own task, - it is free to wait on any pending actions that may still need to - occur in either the reader or writer tasks. - """ - assert self.runstate == Runstate.DISCONNECTING - - def _done(task: Optional['asyncio.Future[Any]']) -> bool: - return task is not None and task.done() - - # If the server is running, stop it. - await self._stop_server() - - # Are we already in an error pathway? If either of the tasks are - # already done, or if we have no tasks but a reader/writer; we - # must be. - # - # NB: We can't use _bh_tasks to check for premature task - # completion, because it may not yet have had a chance to run - # and gather itself. - tasks = tuple(filter(None, (self._writer_task, self._reader_task))) - error_pathway = _done(self._reader_task) or _done(self._writer_task) - if not tasks: - error_pathway |= bool(self._reader) or bool(self._writer) - - try: - # Try to flush the writer, if possible. - # This *may* cause an error and force us over into the error path. - if not error_pathway: - await self._bh_flush_writer() - except BaseException as err: - error_pathway = True - emsg = "Failed to flush the writer" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - finally: - # Cancel any still-running tasks (Won't raise): - if self._writer_task is not None and not self._writer_task.done(): - self.logger.debug("Cancelling writer task.") - self._writer_task.cancel() - if self._reader_task is not None and not self._reader_task.done(): - self.logger.debug("Cancelling reader task.") - self._reader_task.cancel() - - # Close out the tasks entirely (Won't raise): - if tasks: - self.logger.debug("Waiting for tasks to complete ...") - await asyncio.wait(tasks) - - # Lastly, close the stream itself. (*May raise*!): - await self._bh_close_stream(error_pathway) - self.logger.debug("Disconnected.") - - @bottom_half - async def _bh_flush_writer(self) -> None: - if not self._writer_task: - return - - self.logger.debug("Draining the outbound queue ...") - await self._outgoing.join() - if self._writer is not None: - self.logger.debug("Flushing the StreamWriter ...") - await flush(self._writer) - - @bottom_half - async def _bh_close_stream(self, error_pathway: bool = False) -> None: - # NB: Closing the writer also implcitly closes the reader. - if not self._writer: - return - - if not is_closing(self._writer): - self.logger.debug("Closing StreamWriter.") - self._writer.close() - - self.logger.debug("Waiting for StreamWriter to close ...") - try: - await wait_closed(self._writer) - except Exception: # pylint: disable=broad-except - # It's hard to tell if the Stream is already closed or - # not. Even if one of the tasks has failed, it may have - # failed for a higher-layered protocol reason. The - # stream could still be open and perfectly fine. - # I don't know how to discern its health here. - - if error_pathway: - # We already know that *something* went wrong. Let's - # just trust that the Exception we already have is the - # better one to present to the user, even if we don't - # genuinely *know* the relationship between the two. - self.logger.debug( - "Discarding Exception from wait_closed:\n%s\n", - pretty_traceback(), - ) - else: - # Oops, this is a brand-new error! - raise - finally: - self.logger.debug("StreamWriter closed.") - - @bottom_half - async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: - """ - Run one of the bottom-half methods in a loop forever. - - If the bottom half ever raises any exception, schedule a - disconnect that will terminate the entire loop. - - :param async_fn: The bottom-half method to run in a loop. - :param name: The name of this task, used for logging. - """ - try: - while True: - await async_fn() - except asyncio.CancelledError: - # We have been cancelled by _bh_disconnect, exit gracefully. - self.logger.debug("Task.%s: cancelled.", name) - return - except BaseException as err: - self.logger.log( - logging.INFO if isinstance(err, EOFError) else logging.ERROR, - "Task.%s: %s", - name, exception_summary(err) - ) - self.logger.debug("Task.%s: failure:\n%s\n", - name, pretty_traceback()) - self._schedule_disconnect() - raise - finally: - self.logger.debug("Task.%s: exiting.", name) - - @bottom_half - async def _bh_send_message(self) -> None: - """ - Wait for an outgoing message, then send it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._outgoing.get() - try: - await self._send(msg) - finally: - self._outgoing.task_done() - - @bottom_half - async def _bh_recv_message(self) -> None: - """ - Wait for an incoming message and call `_on_message` to route it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._recv() - await self._on_message(msg) - - # -------------------- - # Section: Message I/O - # -------------------- - - @upper_half - @bottom_half - def _cb_outbound(self, msg: T) -> T: - """ - Callback: outbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate outgoing messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - :param msg: raw outbound message - :return: final outbound message - """ - self.logger.debug("--> %s", str(msg)) - return msg - - @upper_half - @bottom_half - def _cb_inbound(self, msg: T) -> T: - """ - Callback: inbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate incoming messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - This method does not "handle" incoming messages; it is a filter. - The actual "endpoint" for incoming messages is `_on_message()`. - - :param msg: raw inbound message - :return: processed inbound message - """ - self.logger.debug("<-- %s", str(msg)) - return msg - - @upper_half - @bottom_half - async def _readline(self) -> bytes: - """ - Wait for a newline from the incoming reader. - - This method is provided as a convenience for upper-layer - protocols, as many are line-based. - - This method *may* return a sequence of bytes without a trailing - newline if EOF occurs, but *some* bytes were received. In this - case, the next call will raise `EOFError`. It is assumed that - the layer 5 protocol will decide if there is anything meaningful - to be done with a partial message. - - :raise OSError: For stream-related errors. - :raise EOFError: - If the reader stream is at EOF and there are no bytes to return. - :return: bytes, including the newline. - """ - assert self._reader is not None - msg_bytes = await self._reader.readline() - - if not msg_bytes: - if self._reader.at_eof(): - raise EOFError - - return msg_bytes - - @upper_half - @bottom_half - async def _do_recv(self) -> T: - """ - Abstract: Read from the stream and return a message. - - Very low-level; intended to only be called by `_recv()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _recv(self) -> T: - """ - Read an arbitrary protocol message. - - .. warning:: - This method is intended primarily for `_bh_recv_message()` - to use in an asynchronous task loop. Using it outside of - this loop will "steal" messages from the normal routing - mechanism. It is safe to use prior to `_establish_session()`, - but should not be used otherwise. - - This method uses `_do_recv()` to retrieve the raw message, and - then transforms it using `_cb_inbound()`. - - :return: A single (filtered, processed) protocol message. - """ - message = await self._do_recv() - return self._cb_inbound(message) - - @upper_half - @bottom_half - def _do_send(self, msg: T) -> None: - """ - Abstract: Write a message to the stream. - - Very low-level; intended to only be called by `_send()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _send(self, msg: T) -> None: - """ - Send an arbitrary protocol message. - - This method will transform any outgoing messages according to - `_cb_outbound()`. - - .. warning:: - Like `_recv()`, this method is intended to be called by - the writer task loop that processes outgoing - messages. Calling it directly may circumvent logic - implemented by the caller meant to correlate outgoing and - incoming messages. - - :raise OSError: For problems with the underlying stream. - """ - msg = self._cb_outbound(msg) - self._do_send(msg) - - @bottom_half - async def _on_message(self, msg: T) -> None: - """ - Called to handle the receipt of a new message. - - .. caution:: - This is executed from within the reader loop, so be advised - that waiting on either the reader or writer task will lead - to deadlock. Additionally, any unhandled exceptions will - directly cause the loop to halt, so logic may be best-kept - to a minimum if at all possible. - - :param msg: The incoming message, already logged/filtered. - """ - # Nothing to do in the abstract case. diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed deleted file mode 100644 index e69de29bb2..0000000000 --- a/python/qemu/aqmp/py.typed +++ /dev/null diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py deleted file mode 100644 index 90a8737f03..0000000000 --- a/python/qemu/aqmp/qmp_client.py +++ /dev/null @@ -1,655 +0,0 @@ -""" -QMP Protocol Implementation - -This module provides the `QMPClient` class, which can be used to connect -and send commands to a QMP server such as QEMU. The QMP class can be -used to either connect to a listening server, or used to listen and -accept an incoming connection from that server. -""" - -import asyncio -import logging -import socket -import struct -from typing import ( - Dict, - List, - Mapping, - Optional, - Union, - cast, -) - -from .error import ProtocolError, QMPError -from .events import Events -from .message import Message -from .models import ErrorResponse, Greeting -from .protocol import AsyncProtocol, Runstate, require -from .util import ( - bottom_half, - exception_summary, - pretty_traceback, - upper_half, -) - - -class _WrappedProtocolError(ProtocolError): - """ - Abstract exception class for Protocol errors that wrap an Exception. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - def __init__(self, error_message: str, exc: Exception): - super().__init__(error_message) - self.exc = exc - - def __str__(self) -> str: - return f"{self.error_message}: {self.exc!s}" - - -class GreetingError(_WrappedProtocolError): - """ - An exception occurred during the Greeting phase. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - - -class NegotiationError(_WrappedProtocolError): - """ - An exception occurred during the Negotiation phase. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - - -class ExecuteError(QMPError): - """ - Exception raised by `QMPClient.execute()` on RPC failure. - - :param error_response: The RPC error response object. - :param sent: The sent RPC message that caused the failure. - :param received: The raw RPC error reply received. - """ - def __init__(self, error_response: ErrorResponse, - sent: Message, received: Message): - super().__init__(error_response.error.desc) - #: The sent `Message` that caused the failure - self.sent: Message = sent - #: The received `Message` that indicated failure - self.received: Message = received - #: The parsed error response - self.error: ErrorResponse = error_response - #: The QMP error class - self.error_class: str = error_response.error.class_ - - -class ExecInterruptedError(QMPError): - """ - Exception raised by `execute()` (et al) when an RPC is interrupted. - - This error is raised when an `execute()` statement could not be - completed. This can occur because the connection itself was - terminated before a reply was received. - - The true cause of the interruption will be available via `disconnect()`. - """ - - -class _MsgProtocolError(ProtocolError): - """ - Abstract error class for protocol errors that have a `Message` object. - - This Exception class is used for protocol errors where the `Message` - was mechanically understood, but was found to be inappropriate or - malformed. - - :param error_message: Human-readable string describing the error. - :param msg: The QMP `Message` that caused the error. - """ - def __init__(self, error_message: str, msg: Message): - super().__init__(error_message) - #: The received `Message` that caused the error. - self.msg: Message = msg - - def __str__(self) -> str: - return "\n".join([ - super().__str__(), - f" Message was: {str(self.msg)}\n", - ]) - - -class ServerParseError(_MsgProtocolError): - """ - The Server sent a `Message` indicating parsing failure. - - i.e. A reply has arrived from the server, but it is missing the "ID" - field, indicating a parsing error. - - :param error_message: Human-readable string describing the error. - :param msg: The QMP `Message` that caused the error. - """ - - -class BadReplyError(_MsgProtocolError): - """ - An execution reply was successfully routed, but not understood. - - If a QMP message is received with an 'id' field to allow it to be - routed, but is otherwise malformed, this exception will be raised. - - A reply message is malformed if it is missing either the 'return' or - 'error' keys, or if the 'error' value has missing keys or members of - the wrong type. - - :param error_message: Human-readable string describing the error. - :param msg: The malformed reply that was received. - :param sent: The message that was sent that prompted the error. - """ - def __init__(self, error_message: str, msg: Message, sent: Message): - super().__init__(error_message, msg) - #: The sent `Message` that caused the failure - self.sent = sent - - -class QMPClient(AsyncProtocol[Message], Events): - """ - Implements a QMP client connection. - - QMP can be used to establish a connection as either the transport - client or server, though this class always acts as the QMP client. - - :param name: Optional nickname for the connection, used for logging. - - Basic script-style usage looks like this:: - - qmp = QMPClient('my_virtual_machine_name') - await qmp.connect(('127.0.0.1', 1234)) - ... - res = await qmp.execute('block-query') - ... - await qmp.disconnect() - - Basic async client-style usage looks like this:: - - class Client: - def __init__(self, name: str): - self.qmp = QMPClient(name) - - async def watch_events(self): - try: - async for event in self.qmp.events: - print(f"Event: {event['event']}") - except asyncio.CancelledError: - return - - async def run(self, address='/tmp/qemu.socket'): - await self.qmp.connect(address) - asyncio.create_task(self.watch_events()) - await self.qmp.runstate_changed.wait() - await self.disconnect() - - See `aqmp.events` for more detail on event handling patterns. - """ - #: Logger object used for debugging messages. - logger = logging.getLogger(__name__) - - # Read buffer limit; large enough to accept query-qmp-schema - _limit = (256 * 1024) - - # Type alias for pending execute() result items - _PendingT = Union[Message, ExecInterruptedError] - - def __init__(self, name: Optional[str] = None) -> None: - super().__init__(name) - Events.__init__(self) - - #: Whether or not to await a greeting after establishing a connection. - self.await_greeting: bool = True - - #: Whether or not to perform capabilities negotiation upon connection. - #: Implies `await_greeting`. - self.negotiate: bool = True - - # Cached Greeting, if one was awaited. - self._greeting: Optional[Greeting] = None - - # Command ID counter - self._execute_id = 0 - - # Incoming RPC reply messages. - self._pending: Dict[ - Union[str, None], - 'asyncio.Queue[QMPClient._PendingT]' - ] = {} - - @property - def greeting(self) -> Optional[Greeting]: - """The `Greeting` from the QMP server, if any.""" - return self._greeting - - @upper_half - async def _establish_session(self) -> None: - """ - Initiate the QMP session. - - Wait for the QMP greeting and perform capabilities negotiation. - - :raise GreetingError: When the greeting is not understood. - :raise NegotiationError: If the negotiation fails. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - """ - self._greeting = None - self._pending = {} - - if self.await_greeting or self.negotiate: - self._greeting = await self._get_greeting() - - if self.negotiate: - await self._negotiate() - - # This will start the reader/writers: - await super()._establish_session() - - @upper_half - async def _get_greeting(self) -> Greeting: - """ - :raise GreetingError: When the greeting is not understood. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - - :return: the Greeting object given by the server. - """ - self.logger.debug("Awaiting greeting ...") - - try: - msg = await self._recv() - return Greeting(msg) - except (ProtocolError, KeyError, TypeError) as err: - emsg = "Did not understand Greeting" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise GreetingError(emsg, err) from err - except BaseException as err: - # EOFError, OSError, or something unexpected. - emsg = "Failed to receive Greeting" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - - @upper_half - async def _negotiate(self) -> None: - """ - Perform QMP capabilities negotiation. - - :raise NegotiationError: When negotiation fails. - :raise EOFError: When the server unexpectedly hangs up. - :raise OSError: For underlying stream errors. - """ - self.logger.debug("Negotiating capabilities ...") - - arguments: Dict[str, List[str]] = {} - if self._greeting and 'oob' in self._greeting.QMP.capabilities: - arguments.setdefault('enable', []).append('oob') - msg = self.make_execute_msg('qmp_capabilities', arguments=arguments) - - # It's not safe to use execute() here, because the reader/writers - # aren't running. AsyncProtocol *requires* that a new session - # does not fail after the reader/writers are running! - try: - await self._send(msg) - reply = await self._recv() - assert 'return' in reply - assert 'error' not in reply - except (ProtocolError, AssertionError) as err: - emsg = "Negotiation failed" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise NegotiationError(emsg, err) from err - except BaseException as err: - # EOFError, OSError, or something unexpected. - emsg = "Negotiation failed" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - - @bottom_half - async def _bh_disconnect(self) -> None: - try: - await super()._bh_disconnect() - finally: - if self._pending: - self.logger.debug("Cancelling pending executions") - keys = self._pending.keys() - for key in keys: - self.logger.debug("Cancelling execution '%s'", key) - self._pending[key].put_nowait( - ExecInterruptedError("Disconnected") - ) - - self.logger.debug("QMP Disconnected.") - - @upper_half - def _cleanup(self) -> None: - super()._cleanup() - assert not self._pending - - @bottom_half - async def _on_message(self, msg: Message) -> None: - """ - Add an incoming message to the appropriate queue/handler. - - :raise ServerParseError: When Message indicates server parse failure. - """ - # Incoming messages are not fully parsed/validated here; - # do only light peeking to know how to route the messages. - - if 'event' in msg: - await self._event_dispatch(msg) - return - - # Below, we assume everything left is an execute/exec-oob response. - - exec_id = cast(Optional[str], msg.get('id')) - - if exec_id in self._pending: - await self._pending[exec_id].put(msg) - return - - # We have a message we can't route back to a caller. - - is_error = 'error' in msg - has_id = 'id' in msg - - if is_error and not has_id: - # This is very likely a server parsing error. - # It doesn't inherently belong to any pending execution. - # Instead of performing clever recovery, just terminate. - # See "NOTE" in qmp-spec.txt, section 2.4.2 - raise ServerParseError( - ("Server sent an error response without an ID, " - "but there are no ID-less executions pending. " - "Assuming this is a server parser failure."), - msg - ) - - # qmp-spec.txt, section 2.4: - # 'Clients should drop all the responses - # that have an unknown "id" field.' - self.logger.log( - logging.ERROR if is_error else logging.WARNING, - "Unknown ID '%s', message dropped.", - exec_id, - ) - self.logger.debug("Unroutable message: %s", str(msg)) - - @upper_half - @bottom_half - async def _do_recv(self) -> Message: - """ - :raise OSError: When a stream error is encountered. - :raise EOFError: When the stream is at EOF. - :raise ProtocolError: - When the Message is not understood. - See also `Message._deserialize`. - - :return: A single QMP `Message`. - """ - msg_bytes = await self._readline() - msg = Message(msg_bytes, eager=True) - return msg - - @upper_half - @bottom_half - def _do_send(self, msg: Message) -> None: - """ - :raise ValueError: JSON serialization failure - :raise TypeError: JSON serialization failure - :raise OSError: When a stream error is encountered. - """ - assert self._writer is not None - self._writer.write(bytes(msg)) - - @upper_half - def _get_exec_id(self) -> str: - exec_id = f"__aqmp#{self._execute_id:05d}" - self._execute_id += 1 - return exec_id - - @upper_half - async def _issue(self, msg: Message) -> Union[None, str]: - """ - Issue a QMP `Message` and do not wait for a reply. - - :param msg: The QMP `Message` to send to the server. - - :return: The ID of the `Message` sent. - """ - msg_id: Optional[str] = None - if 'id' in msg: - assert isinstance(msg['id'], str) - msg_id = msg['id'] - - self._pending[msg_id] = asyncio.Queue(maxsize=1) - try: - await self._outgoing.put(msg) - except: - del self._pending[msg_id] - raise - - return msg_id - - @upper_half - async def _reply(self, msg_id: Union[str, None]) -> Message: - """ - Await a reply to a previously issued QMP message. - - :param msg_id: The ID of the previously issued message. - - :return: The reply from the server. - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - """ - queue = self._pending[msg_id] - - try: - result = await queue.get() - if isinstance(result, ExecInterruptedError): - raise result - return result - finally: - del self._pending[msg_id] - - @upper_half - async def _execute(self, msg: Message, assign_id: bool = True) -> Message: - """ - Send a QMP `Message` to the server and await a reply. - - This method *assumes* you are sending some kind of an execute - statement that *will* receive a reply. - - An execution ID will be assigned if assign_id is `True`. It can be - disabled, but this requires that an ID is manually assigned - instead. For manually assigned IDs, you must not use the string - '__aqmp#' anywhere in the ID. - - :param msg: The QMP `Message` to execute. - :param assign_id: If True, assign a new execution ID. - - :return: Execution reply from the server. - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - """ - if assign_id: - msg['id'] = self._get_exec_id() - elif 'id' in msg: - assert isinstance(msg['id'], str) - assert '__aqmp#' not in msg['id'] - - exec_id = await self._issue(msg) - return await self._reply(exec_id) - - @upper_half - @require(Runstate.RUNNING) - async def _raw( - self, - msg: Union[Message, Mapping[str, object], bytes], - assign_id: bool = True, - ) -> Message: - """ - Issue a raw `Message` to the QMP server and await a reply. - - :param msg: - A Message to send to the server. It may be a `Message`, any - Mapping (including Dict), or raw bytes. - :param assign_id: - Assign an arbitrary execution ID to this message. If - `False`, the existing id must either be absent (and no other - such pending execution may omit an ID) or a string. If it is - a string, it must not start with '__aqmp#' and no other such - pending execution may currently be using that ID. - - :return: Execution reply from the server. - - :raise ExecInterruptedError: - When the reply could not be retrieved because the connection - was lost, or some other problem. - :raise TypeError: - When assign_id is `False`, an ID is given, and it is not a string. - :raise ValueError: - When assign_id is `False`, but the ID is not usable; - Either because it starts with '__aqmp#' or it is already in-use. - """ - # 1. convert generic Mapping or bytes to a QMP Message - # 2. copy Message objects so that we assign an ID only to the copy. - msg = Message(msg) - - exec_id = msg.get('id') - if not assign_id and 'id' in msg: - if not isinstance(exec_id, str): - raise TypeError(f"ID ('{exec_id}') must be a string.") - if exec_id.startswith('__aqmp#'): - raise ValueError( - f"ID ('{exec_id}') must not start with '__aqmp#'." - ) - - if not assign_id and exec_id in self._pending: - raise ValueError( - f"ID '{exec_id}' is in-use and cannot be used." - ) - - return await self._execute(msg, assign_id=assign_id) - - @upper_half - @require(Runstate.RUNNING) - async def execute_msg(self, msg: Message) -> object: - """ - Execute a QMP command and return its value. - - :param msg: The QMP `Message` to execute. - - :return: - The command execution return value from the server. The type of - object returned depends on the command that was issued, - though most in QEMU return a `dict`. - :raise ValueError: - If the QMP `Message` does not have either the 'execute' or - 'exec-oob' fields set. - :raise ExecuteError: When the server returns an error response. - :raise ExecInterruptedError: if the connection was terminated early. - """ - if not ('execute' in msg or 'exec-oob' in msg): - raise ValueError("Requires 'execute' or 'exec-oob' message") - - # Copy the Message so that the ID assigned by _execute() is - # local to this method; allowing the ID to be seen in raised - # Exceptions but without modifying the caller's held copy. - msg = Message(msg) - reply = await self._execute(msg) - - if 'error' in reply: - try: - error_response = ErrorResponse(reply) - except (KeyError, TypeError) as err: - # Error response was malformed. - raise BadReplyError( - "QMP error reply is malformed", reply, msg, - ) from err - - raise ExecuteError(error_response, msg, reply) - - if 'return' not in reply: - raise BadReplyError( - "QMP reply is missing a 'error' or 'return' member", - reply, msg, - ) - - return reply['return'] - - @classmethod - def make_execute_msg(cls, cmd: str, - arguments: Optional[Mapping[str, object]] = None, - oob: bool = False) -> Message: - """ - Create an executable message to be sent by `execute_msg` later. - - :param cmd: QMP command name. - :param arguments: Arguments (if any). Must be JSON-serializable. - :param oob: If `True`, execute "out of band". - - :return: An executable QMP `Message`. - """ - msg = Message({'exec-oob' if oob else 'execute': cmd}) - if arguments is not None: - msg['arguments'] = arguments - return msg - - @upper_half - async def execute(self, cmd: str, - arguments: Optional[Mapping[str, object]] = None, - oob: bool = False) -> object: - """ - Execute a QMP command and return its value. - - :param cmd: QMP command name. - :param arguments: Arguments (if any). Must be JSON-serializable. - :param oob: If `True`, execute "out of band". - - :return: - The command execution return value from the server. The type of - object returned depends on the command that was issued, - though most in QEMU return a `dict`. - :raise ExecuteError: When the server returns an error response. - :raise ExecInterruptedError: if the connection was terminated early. - """ - msg = self.make_execute_msg(cmd, arguments, oob=oob) - return await self.execute_msg(msg) - - @upper_half - @require(Runstate.RUNNING) - def send_fd_scm(self, fd: int) -> None: - """ - Send a file descriptor to the remote via SCM_RIGHTS. - """ - assert self._writer is not None - sock = self._writer.transport.get_extra_info('socket') - - if sock.family != socket.AF_UNIX: - raise QMPError("Sending file descriptors requires a UNIX socket.") - - if not hasattr(sock, 'sendmsg'): - # We need to void the warranty sticker. - # Access to sendmsg is scheduled for removal in Python 3.11. - # Find the real backing socket to use it anyway. - sock = sock._sock # pylint: disable=protected-access - - sock.sendmsg( - [b' '], - [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))] - ) diff --git a/python/qemu/aqmp/qmp_shell.py b/python/qemu/aqmp/qmp_shell.py deleted file mode 100644 index c23f1b1928..0000000000 --- a/python/qemu/aqmp/qmp_shell.py +++ /dev/null @@ -1,610 +0,0 @@ -# -# Copyright (C) 2009-2022 Red Hat Inc. -# -# Authors: -# Luiz Capitulino <lcapitulino@redhat.com> -# John Snow <jsnow@redhat.com> -# -# This work is licensed under the terms of the GNU LGPL, version 2 or -# later. See the COPYING file in the top-level directory. -# - -""" -Low-level QEMU shell on top of QMP. - -usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server - -positional arguments: - qmp_server < UNIX socket path | TCP address:port > - -optional arguments: - -h, --help show this help message and exit - -H, --hmp Use HMP interface - -N, --skip-negotiation - Skip negotiate (for qemu-ga) - -v, --verbose Verbose (echo commands sent and received) - -p, --pretty Pretty-print JSON - - -Start QEMU with: - -# qemu [...] -qmp unix:./qmp-sock,server - -Run the shell: - -$ qmp-shell ./qmp-sock - -Commands have the following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - -For example: - -(QEMU) device_add driver=e1000 id=net1 -{'return': {}} -(QEMU) - -key=value pairs also support Python or JSON object literal subset notations, -without spaces. Dictionaries/objects {} are supported as are arrays []. - - example-command arg-name1={'key':'value','obj'={'prop':"value"}} - -Both JSON and Python formatting should work, including both styles of -string literal quotes. Both paradigms of literal values should work, -including null/true/false for JSON and None/True/False for Python. - - -Transactions have the following multi-line format: - - transaction( - action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ... - action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ) - -One line transactions are also supported: - - transaction( action-name1 ... ) - -For example: - - (QEMU) transaction( - TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 - TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 - TRANS> ) - {"return": {}} - (QEMU) - -Use the -v and -p options to activate the verbose and pretty-print options, -which will echo back the properly formatted JSON-compliant QMP that is being -sent to QEMU, which is useful for debugging and documentation generation. -""" - -import argparse -import ast -import json -import logging -import os -import re -import readline -from subprocess import Popen -import sys -from typing import ( - IO, - Iterator, - List, - NoReturn, - Optional, - Sequence, -) - -from qemu.aqmp import ConnectError, QMPError, SocketAddrT -from qemu.aqmp.legacy import ( - QEMUMonitorProtocol, - QMPBadPortError, - QMPMessage, - QMPObject, -) - - -LOG = logging.getLogger(__name__) - - -class QMPCompleter: - """ - QMPCompleter provides a readline library tab-complete behavior. - """ - # NB: Python 3.9+ will probably allow us to subclass list[str] directly, - # but pylint as of today does not know that List[str] is simply 'list'. - def __init__(self) -> None: - self._matches: List[str] = [] - - def append(self, value: str) -> None: - """Append a new valid completion to the list of possibilities.""" - return self._matches.append(value) - - def complete(self, text: str, state: int) -> Optional[str]: - """readline.set_completer() callback implementation.""" - for cmd in self._matches: - if cmd.startswith(text): - if state == 0: - return cmd - state -= 1 - return None - - -class QMPShellError(QMPError): - """ - QMP Shell Base error class. - """ - - -class FuzzyJSON(ast.NodeTransformer): - """ - This extension of ast.NodeTransformer filters literal "true/false/null" - values in a Python AST and replaces them by proper "True/False/None" values - that Python can properly evaluate. - """ - - @classmethod - def visit_Name(cls, # pylint: disable=invalid-name - node: ast.Name) -> ast.AST: - """ - Transform Name nodes with certain values into Constant (keyword) nodes. - """ - if node.id == 'true': - return ast.Constant(value=True) - if node.id == 'false': - return ast.Constant(value=False) - if node.id == 'null': - return ast.Constant(value=None) - return node - - -class QMPShell(QEMUMonitorProtocol): - """ - QMPShell provides a basic readline-based QMP shell. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: SocketAddrT, - pretty: bool = False, - verbose: bool = False, - server: bool = False, - logfile: Optional[str] = None): - super().__init__(address, server=server) - self._greeting: Optional[QMPMessage] = None - self._completer = QMPCompleter() - self._transmode = False - self._actions: List[QMPMessage] = [] - self._histfile = os.path.join(os.path.expanduser('~'), - '.qmp-shell_history') - self.pretty = pretty - self.verbose = verbose - self.logfile = None - - if logfile is not None: - self.logfile = open(logfile, "w", encoding='utf-8') - - def close(self) -> None: - # Hook into context manager of parent to save shell history. - self._save_history() - super().close() - - def _fill_completion(self) -> None: - cmds = self.cmd('query-commands') - if 'error' in cmds: - return - for cmd in cmds['return']: - self._completer.append(cmd['name']) - - def _completer_setup(self) -> None: - self._completer = QMPCompleter() - self._fill_completion() - readline.set_history_length(1024) - readline.set_completer(self._completer.complete) - readline.parse_and_bind("tab: complete") - # NB: default delimiters conflict with some command names - # (eg. query-), clearing everything as it doesn't seem to matter - readline.set_completer_delims('') - try: - readline.read_history_file(self._histfile) - except FileNotFoundError: - pass - except IOError as err: - msg = f"Failed to read history '{self._histfile}': {err!s}" - LOG.warning(msg) - - def _save_history(self) -> None: - try: - readline.write_history_file(self._histfile) - except IOError as err: - msg = f"Failed to save history file '{self._histfile}': {err!s}" - LOG.warning(msg) - - @classmethod - def _parse_value(cls, val: str) -> object: - try: - return int(val) - except ValueError: - pass - - if val.lower() == 'true': - return True - if val.lower() == 'false': - return False - if val.startswith(('{', '[')): - # Try first as pure JSON: - try: - return json.loads(val) - except ValueError: - pass - # Try once again as FuzzyJSON: - try: - tree = ast.parse(val, mode='eval') - transformed = FuzzyJSON().visit(tree) - return ast.literal_eval(transformed) - except (SyntaxError, ValueError): - pass - return val - - def _cli_expr(self, - tokens: Sequence[str], - parent: QMPObject) -> None: - for arg in tokens: - (key, sep, val) = arg.partition('=') - if sep != '=': - raise QMPShellError( - f"Expected a key=value pair, got '{arg!s}'" - ) - - value = self._parse_value(val) - optpath = key.split('.') - curpath = [] - for path in optpath[:-1]: - curpath.append(path) - obj = parent.get(path, {}) - if not isinstance(obj, dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - parent[path] = obj - parent = obj - if optpath[-1] in parent: - if isinstance(parent[optpath[-1]], dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - raise QMPShellError(f'Cannot set "{key}" multiple times') - parent[optpath[-1]] = value - - def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: - """ - Build a QMP input object from a user provided command-line in the - following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - """ - argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' - cmdargs = re.findall(argument_regex, cmdline) - qmpcmd: QMPMessage - - # Transactional CLI entry: - if cmdargs and cmdargs[0] == 'transaction(': - self._transmode = True - self._actions = [] - cmdargs.pop(0) - - # Transactional CLI exit: - if cmdargs and cmdargs[0] == ')' and self._transmode: - self._transmode = False - if len(cmdargs) > 1: - msg = 'Unexpected input after close of Transaction sub-shell' - raise QMPShellError(msg) - qmpcmd = { - 'execute': 'transaction', - 'arguments': {'actions': self._actions} - } - return qmpcmd - - # No args, or no args remaining - if not cmdargs: - return None - - if self._transmode: - # Parse and cache this Transactional Action - finalize = False - action = {'type': cmdargs[0], 'data': {}} - if cmdargs[-1] == ')': - cmdargs.pop(-1) - finalize = True - self._cli_expr(cmdargs[1:], action['data']) - self._actions.append(action) - return self._build_cmd(')') if finalize else None - - # Standard command: parse and return it to be executed. - qmpcmd = {'execute': cmdargs[0], 'arguments': {}} - self._cli_expr(cmdargs[1:], qmpcmd['arguments']) - return qmpcmd - - def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None: - jsobj = json.dumps(qmp_message, - indent=4 if self.pretty else None, - sort_keys=self.pretty) - print(str(jsobj), file=fh) - - def _execute_cmd(self, cmdline: str) -> bool: - try: - qmpcmd = self._build_cmd(cmdline) - except QMPShellError as err: - print( - f"Error while parsing command line: {err!s}\n" - "command format: <command-name> " - "[arg-name1=arg1] ... [arg-nameN=argN", - file=sys.stderr - ) - return True - # For transaction mode, we may have just cached the action: - if qmpcmd is None: - return True - if self.verbose: - self._print(qmpcmd) - resp = self.cmd_obj(qmpcmd) - if resp is None: - print('Disconnected') - return False - self._print(resp) - if self.logfile is not None: - cmd = {**qmpcmd, **resp} - self._print(cmd, fh=self.logfile) - return True - - def connect(self, negotiate: bool = True) -> None: - self._greeting = super().connect(negotiate) - self._completer_setup() - - def show_banner(self, - msg: str = 'Welcome to the QMP low-level shell!') -> None: - """ - Print to stdio a greeting, and the QEMU version if available. - """ - print(msg) - if not self._greeting: - print('Connected') - return - version = self._greeting['QMP']['version']['qemu'] - print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) - - @property - def prompt(self) -> str: - """ - Return the current shell prompt, including a trailing space. - """ - if self._transmode: - return 'TRANS> ' - return '(QEMU) ' - - def read_exec_command(self) -> bool: - """ - Read and execute a command. - - @return True if execution was ok, return False if disconnected. - """ - try: - cmdline = input(self.prompt) - except EOFError: - print() - return False - - if cmdline == '': - for event in self.get_events(): - print(event) - return True - - return self._execute_cmd(cmdline) - - def repl(self) -> Iterator[None]: - """ - Return an iterator that implements the REPL. - """ - self.show_banner() - while self.read_exec_command(): - yield - self.close() - - -class HMPShell(QMPShell): - """ - HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: SocketAddrT, - pretty: bool = False, - verbose: bool = False, - server: bool = False, - logfile: Optional[str] = None): - super().__init__(address, pretty, verbose, server, logfile) - self._cpu_index = 0 - - def _cmd_completion(self) -> None: - for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): - if cmd and cmd[0] != '[' and cmd[0] != '\t': - name = cmd.split()[0] # drop help text - if name == 'info': - continue - if name.find('|') != -1: - # Command in the form 'foobar|f' or 'f|foobar', take the - # full name - opt = name.split('|') - if len(opt[0]) == 1: - name = opt[1] - else: - name = opt[0] - self._completer.append(name) - self._completer.append('help ' + name) # help completion - - def _info_completion(self) -> None: - for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): - if cmd: - self._completer.append('info ' + cmd.split()[1]) - - def _other_completion(self) -> None: - # special cases - self._completer.append('help info') - - def _fill_completion(self) -> None: - self._cmd_completion() - self._info_completion() - self._other_completion() - - def _cmd_passthrough(self, cmdline: str, - cpu_index: int = 0) -> QMPMessage: - return self.cmd_obj({ - 'execute': 'human-monitor-command', - 'arguments': { - 'command-line': cmdline, - 'cpu-index': cpu_index - } - }) - - def _execute_cmd(self, cmdline: str) -> bool: - if cmdline.split()[0] == "cpu": - # trap the cpu command, it requires special setting - try: - idx = int(cmdline.split()[1]) - if 'return' not in self._cmd_passthrough('info version', idx): - print('bad CPU index') - return True - self._cpu_index = idx - except ValueError: - print('cpu command takes an integer argument') - return True - resp = self._cmd_passthrough(cmdline, self._cpu_index) - if resp is None: - print('Disconnected') - return False - assert 'return' in resp or 'error' in resp - if 'return' in resp: - # Success - if len(resp['return']) > 0: - print(resp['return'], end=' ') - else: - # Error - print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) - return True - - def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: - QMPShell.show_banner(self, msg) - - -def die(msg: str) -> NoReturn: - """Write an error to stderr, then exit with a return code of 1.""" - sys.stderr.write('ERROR: %s\n' % msg) - sys.exit(1) - - -def main() -> None: - """ - qmp-shell entry point: parse command line arguments and start the REPL. - """ - parser = argparse.ArgumentParser() - parser.add_argument('-H', '--hmp', action='store_true', - help='Use HMP interface') - parser.add_argument('-N', '--skip-negotiation', action='store_true', - help='Skip negotiate (for qemu-ga)') - parser.add_argument('-v', '--verbose', action='store_true', - help='Verbose (echo commands sent and received)') - parser.add_argument('-p', '--pretty', action='store_true', - help='Pretty-print JSON') - parser.add_argument('-l', '--logfile', - help='Save log of all QMP messages to PATH') - - default_server = os.environ.get('QMP_SOCKET') - parser.add_argument('qmp_server', action='store', - default=default_server, - help='< UNIX socket path | TCP address:port >') - - args = parser.parse_args() - if args.qmp_server is None: - parser.error("QMP socket or TCP address must be specified") - - shell_class = HMPShell if args.hmp else QMPShell - - try: - address = shell_class.parse_address(args.qmp_server) - except QMPBadPortError: - parser.error(f"Bad port number: {args.qmp_server}") - return # pycharm doesn't know error() is noreturn - - with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu: - try: - qemu.connect(negotiate=not args.skip_negotiation) - except ConnectError as err: - if isinstance(err.exc, OSError): - die(f"Couldn't connect to {args.qmp_server}: {err!s}") - die(str(err)) - - for _ in qemu.repl(): - pass - - -def main_wrap() -> None: - """ - qmp-shell-wrap entry point: parse command line arguments and - start the REPL. - """ - parser = argparse.ArgumentParser() - parser.add_argument('-H', '--hmp', action='store_true', - help='Use HMP interface') - parser.add_argument('-v', '--verbose', action='store_true', - help='Verbose (echo commands sent and received)') - parser.add_argument('-p', '--pretty', action='store_true', - help='Pretty-print JSON') - parser.add_argument('-l', '--logfile', - help='Save log of all QMP messages to PATH') - - parser.add_argument('command', nargs=argparse.REMAINDER, - help='QEMU command line to invoke') - - args = parser.parse_args() - - cmd = args.command - if len(cmd) != 0 and cmd[0] == '--': - cmd = cmd[1:] - if len(cmd) == 0: - cmd = ["qemu-system-x86_64"] - - sockpath = "qmp-shell-wrap-%d" % os.getpid() - cmd += ["-qmp", "unix:%s" % sockpath] - - shell_class = HMPShell if args.hmp else QMPShell - - try: - address = shell_class.parse_address(sockpath) - except QMPBadPortError: - parser.error(f"Bad port number: {sockpath}") - return # pycharm doesn't know error() is noreturn - - try: - with shell_class(address, args.pretty, args.verbose, - True, args.logfile) as qemu: - with Popen(cmd): - - try: - qemu.accept() - except ConnectError as err: - if isinstance(err.exc, OSError): - die(f"Couldn't connect to {args.qmp_server}: {err!s}") - die(str(err)) - - for _ in qemu.repl(): - pass - finally: - os.unlink(sockpath) - - -if __name__ == '__main__': - main() diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py deleted file mode 100644 index eaa5fc7d5f..0000000000 --- a/python/qemu/aqmp/util.py +++ /dev/null @@ -1,217 +0,0 @@ -""" -Miscellaneous Utilities - -This module provides asyncio utilities and compatibility wrappers for -Python 3.6 to provide some features that otherwise become available in -Python 3.7+. - -Various logging and debugging utilities are also provided, such as -`exception_summary()` and `pretty_traceback()`, used primarily for -adding information into the logging stream. -""" - -import asyncio -import sys -import traceback -from typing import ( - Any, - Coroutine, - Optional, - TypeVar, - cast, -) - - -T = TypeVar('T') - - -# -------------------------- -# Section: Utility Functions -# -------------------------- - - -async def flush(writer: asyncio.StreamWriter) -> None: - """ - Utility function to ensure a StreamWriter is *fully* drained. - - `asyncio.StreamWriter.drain` only promises we will return to below - the "high-water mark". This function ensures we flush the entire - buffer -- by setting the high water mark to 0 and then calling - drain. The flow control limits are restored after the call is - completed. - """ - transport = cast(asyncio.WriteTransport, writer.transport) - - # https://github.com/python/typeshed/issues/5779 - low, high = transport.get_write_buffer_limits() # type: ignore - transport.set_write_buffer_limits(0, 0) - try: - await writer.drain() - finally: - transport.set_write_buffer_limits(high, low) - - -def upper_half(func: T) -> T: - """ - Do-nothing decorator that annotates a method as an "upper-half" method. - - These methods must not call bottom-half functions directly, but can - schedule them to run. - """ - return func - - -def bottom_half(func: T) -> T: - """ - Do-nothing decorator that annotates a method as a "bottom-half" method. - - These methods must take great care to handle their own exceptions whenever - possible. If they go unhandled, they will cause termination of the loop. - - These methods do not, in general, have the ability to directly - report information to a caller’s context and will usually be - collected as a Task result instead. - - They must not call upper-half functions directly. - """ - return func - - -# ------------------------------- -# Section: Compatibility Wrappers -# ------------------------------- - - -def create_task(coro: Coroutine[Any, Any, T], - loop: Optional[asyncio.AbstractEventLoop] = None - ) -> 'asyncio.Future[T]': - """ - Python 3.6-compatible `asyncio.create_task` wrapper. - - :param coro: The coroutine to execute in a task. - :param loop: Optionally, the loop to create the task in. - - :return: An `asyncio.Future` object. - """ - if sys.version_info >= (3, 7): - if loop is not None: - return loop.create_task(coro) - return asyncio.create_task(coro) # pylint: disable=no-member - - # Python 3.6: - return asyncio.ensure_future(coro, loop=loop) - - -def is_closing(writer: asyncio.StreamWriter) -> bool: - """ - Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. - - :param writer: The `asyncio.StreamWriter` object. - :return: `True` if the writer is closing, or closed. - """ - if sys.version_info >= (3, 7): - return writer.is_closing() - - # Python 3.6: - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - return transport.is_closing() - - -async def wait_closed(writer: asyncio.StreamWriter) -> None: - """ - Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. - - :param writer: The `asyncio.StreamWriter` to wait on. - """ - if sys.version_info >= (3, 7): - await writer.wait_closed() - return - - # Python 3.6 - transport = writer.transport - assert isinstance(transport, asyncio.WriteTransport) - - while not transport.is_closing(): - await asyncio.sleep(0) - - # This is an ugly workaround, but it's the best I can come up with. - sock = transport.get_extra_info('socket') - - if sock is None: - # Our transport doesn't have a socket? ... - # Nothing we can reasonably do. - return - - while sock.fileno() != -1: - await asyncio.sleep(0) - - -def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: - """ - Python 3.6-compatible `asyncio.run` wrapper. - - :param coro: A coroutine to execute now. - :return: The return value from the coroutine. - """ - if sys.version_info >= (3, 7): - return asyncio.run(coro, debug=debug) - - # Python 3.6 - loop = asyncio.get_event_loop() - loop.set_debug(debug) - ret = loop.run_until_complete(coro) - loop.close() - - return ret - - -# ---------------------------- -# Section: Logging & Debugging -# ---------------------------- - - -def exception_summary(exc: BaseException) -> str: - """ - Return a summary string of an arbitrary exception. - - It will be of the form "ExceptionType: Error Message", if the error - string is non-empty, and just "ExceptionType" otherwise. - """ - name = type(exc).__qualname__ - smod = type(exc).__module__ - if smod not in ("__main__", "builtins"): - name = smod + '.' + name - - error = str(exc) - if error: - return f"{name}: {error}" - return name - - -def pretty_traceback(prefix: str = " | ") -> str: - """ - Formats the current traceback, indented to provide visual distinction. - - This is useful for printing a traceback within a traceback for - debugging purposes when encapsulating errors to deliver them up the - stack; when those errors are printed, this helps provide a nice - visual grouping to quickly identify the parts of the error that - belong to the inner exception. - - :param prefix: The prefix to append to each line of the traceback. - :return: A string, formatted something like the following:: - - | Traceback (most recent call last): - | File "foobar.py", line 42, in arbitrary_example - | foo.baz() - | ArbitraryError: [Errno 42] Something bad happened! - """ - output = "".join(traceback.format_exception(*sys.exc_info())) - - exc_lines = [] - for line in output.split('\n'): - exc_lines.append(prefix + line) - - # The last line is always empty, omit it - return "\n".join(exc_lines[:-1]) |