diff options
Diffstat (limited to 'python/qemu/aqmp/protocol.py')
-rw-r--r-- | python/qemu/aqmp/protocol.py | 1048 |
1 files changed, 0 insertions, 1048 deletions
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py deleted file mode 100644 index 36fae57f27..0000000000 --- a/python/qemu/aqmp/protocol.py +++ /dev/null @@ -1,1048 +0,0 @@ -""" -Generic Asynchronous Message-based Protocol Support - -This module provides a generic framework for sending and receiving -messages over an asyncio stream. `AsyncProtocol` is an abstract class -that implements the core mechanisms of a simple send/receive protocol, -and is designed to be extended. - -In this package, it is used as the implementation for the `QMPClient` -class. -""" - -# It's all the docstrings ... ! It's long for a good reason ^_^; -# pylint: disable=too-many-lines - -import asyncio -from asyncio import StreamReader, StreamWriter -from enum import Enum -from functools import wraps -import logging -from ssl import SSLContext -from typing import ( - Any, - Awaitable, - Callable, - Generic, - List, - Optional, - Tuple, - TypeVar, - Union, - cast, -) - -from .error import QMPError -from .util import ( - bottom_half, - create_task, - exception_summary, - flush, - is_closing, - pretty_traceback, - upper_half, - wait_closed, -) - - -T = TypeVar('T') -_U = TypeVar('_U') -_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None`` - -InternetAddrT = Tuple[str, int] -UnixAddrT = str -SocketAddrT = Union[UnixAddrT, InternetAddrT] - - -class Runstate(Enum): - """Protocol session runstate.""" - - #: Fully quiesced and disconnected. - IDLE = 0 - #: In the process of connecting or establishing a session. - CONNECTING = 1 - #: Fully connected and active session. - RUNNING = 2 - #: In the process of disconnecting. - #: Runstate may be returned to `IDLE` by calling `disconnect()`. - DISCONNECTING = 3 - - -class ConnectError(QMPError): - """ - Raised when the initial connection process has failed. - - This Exception always wraps a "root cause" exception that can be - interrogated for additional information. - - :param error_message: Human-readable string describing the error. - :param exc: The root-cause exception. - """ - def __init__(self, error_message: str, exc: Exception): - super().__init__(error_message) - #: Human-readable error string - self.error_message: str = error_message - #: Wrapped root cause exception - self.exc: Exception = exc - - def __str__(self) -> str: - cause = str(self.exc) - if not cause: - # If there's no error string, use the exception name. - cause = exception_summary(self.exc) - return f"{self.error_message}: {cause}" - - -class StateError(QMPError): - """ - An API command (connect, execute, etc) was issued at an inappropriate time. - - This error is raised when a command like - :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate - time. - - :param error_message: Human-readable string describing the state violation. - :param state: The actual `Runstate` seen at the time of the violation. - :param required: The `Runstate` required to process this command. - """ - def __init__(self, error_message: str, - state: Runstate, required: Runstate): - super().__init__(error_message) - self.error_message = error_message - self.state = state - self.required = required - - -F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name - - -# Don't Panic. -def require(required_state: Runstate) -> Callable[[F], F]: - """ - Decorator: protect a method so it can only be run in a certain `Runstate`. - - :param required_state: The `Runstate` required to invoke this method. - :raise StateError: When the required `Runstate` is not met. - """ - def _decorator(func: F) -> F: - # _decorator is the decorator that is built by calling the - # require() decorator factory; e.g.: - # - # @require(Runstate.IDLE) def foo(): ... - # will replace 'foo' with the result of '_decorator(foo)'. - - @wraps(func) - def _wrapper(proto: 'AsyncProtocol[Any]', - *args: Any, **kwargs: Any) -> Any: - # _wrapper is the function that gets executed prior to the - # decorated method. - - name = type(proto).__name__ - - if proto.runstate != required_state: - if proto.runstate == Runstate.CONNECTING: - emsg = f"{name} is currently connecting." - elif proto.runstate == Runstate.DISCONNECTING: - emsg = (f"{name} is disconnecting." - " Call disconnect() to return to IDLE state.") - elif proto.runstate == Runstate.RUNNING: - emsg = f"{name} is already connected and running." - elif proto.runstate == Runstate.IDLE: - emsg = f"{name} is disconnected and idle." - else: - assert False - raise StateError(emsg, proto.runstate, required_state) - # No StateError, so call the wrapped method. - return func(proto, *args, **kwargs) - - # Return the decorated method; - # Transforming Func to Decorated[Func]. - return cast(F, _wrapper) - - # Return the decorator instance from the decorator factory. Phew! - return _decorator - - -class AsyncProtocol(Generic[T]): - """ - AsyncProtocol implements a generic async message-based protocol. - - This protocol assumes the basic unit of information transfer between - client and server is a "message", the details of which are left up - to the implementation. It assumes the sending and receiving of these - messages is full-duplex and not necessarily correlated; i.e. it - supports asynchronous inbound messages. - - It is designed to be extended by a specific protocol which provides - the implementations for how to read and send messages. These must be - defined in `_do_recv()` and `_do_send()`, respectively. - - Other callbacks have a default implementation, but are intended to be - either extended or overridden: - - - `_establish_session`: - The base implementation starts the reader/writer tasks. - A protocol implementation can override this call, inserting - actions to be taken prior to starting the reader/writer tasks - before the super() call; actions needing to occur afterwards - can be written after the super() call. - - `_on_message`: - Actions to be performed when a message is received. - - `_cb_outbound`: - Logging/Filtering hook for all outbound messages. - - `_cb_inbound`: - Logging/Filtering hook for all inbound messages. - This hook runs *before* `_on_message()`. - - :param name: - Name used for logging messages, if any. By default, messages - will log to 'qemu.aqmp.protocol', but each individual connection - can be given its own logger by giving it a name; messages will - then log to 'qemu.aqmp.protocol.${name}'. - """ - # pylint: disable=too-many-instance-attributes - - #: Logger object for debugging messages from this connection. - logger = logging.getLogger(__name__) - - # Maximum allowable size of read buffer - _limit = (64 * 1024) - - # ------------------------- - # Section: Public interface - # ------------------------- - - def __init__(self, name: Optional[str] = None) -> None: - #: The nickname for this connection, if any. - self.name: Optional[str] = name - if self.name is not None: - self.logger = self.logger.getChild(self.name) - - # stream I/O - self._reader: Optional[StreamReader] = None - self._writer: Optional[StreamWriter] = None - - # Outbound Message queue - self._outgoing: asyncio.Queue[T] - - # Special, long-running tasks: - self._reader_task: Optional[asyncio.Future[None]] = None - self._writer_task: Optional[asyncio.Future[None]] = None - - # Aggregate of the above two tasks, used for Exception management. - self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None - - #: Disconnect task. The disconnect implementation runs in a task - #: so that asynchronous disconnects (initiated by the - #: reader/writer) are allowed to wait for the reader/writers to - #: exit. - self._dc_task: Optional[asyncio.Future[None]] = None - - self._runstate = Runstate.IDLE - self._runstate_changed: Optional[asyncio.Event] = None - - # Server state for start_server() and _incoming() - self._server: Optional[asyncio.AbstractServer] = None - self._accepted: Optional[asyncio.Event] = None - - def __repr__(self) -> str: - cls_name = type(self).__name__ - tokens = [] - if self.name is not None: - tokens.append(f"name={self.name!r}") - tokens.append(f"runstate={self.runstate.name}") - return f"<{cls_name} {' '.join(tokens)}>" - - @property # @upper_half - def runstate(self) -> Runstate: - """The current `Runstate` of the connection.""" - return self._runstate - - @upper_half - async def runstate_changed(self) -> Runstate: - """ - Wait for the `runstate` to change, then return that runstate. - """ - await self._runstate_event.wait() - return self.runstate - - @upper_half - @require(Runstate.IDLE) - async def start_server_and_accept( - self, address: SocketAddrT, - ssl: Optional[SSLContext] = None - ) -> None: - """ - Accept a connection and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - This method is precisely equivalent to calling `start_server()` - followed by `accept()`. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - await self.start_server(address, ssl) - await self.accept() - assert self.runstate == Runstate.RUNNING - - @upper_half - @require(Runstate.IDLE) - async def start_server(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Start listening for an incoming connection, but do not wait for a peer. - - This method starts listening for an incoming connection, but - does not block waiting for a peer. This call will return - immediately after binding and listening on a socket. A later - call to `accept()` must be made in order to finalize the - incoming connection. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When the server could not start listening on this address. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError`. - """ - await self._session_guard( - self._do_start_server(address, ssl), - 'Failed to establish connection') - assert self.runstate == Runstate.CONNECTING - - @upper_half - @require(Runstate.CONNECTING) - async def accept(self) -> None: - """ - Accept an incoming connection and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :raise StateError: When the `Runstate` is not `CONNECTING`. - :raise QMPError: When `start_server()` was not called yet. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - if self._accepted is None: - raise QMPError("Cannot call accept() before start_server().") - await self._session_guard( - self._do_accept(), - 'Failed to establish connection') - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - assert self.runstate == Runstate.RUNNING - - @upper_half - @require(Runstate.IDLE) - async def connect(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Connect to the server and begin processing message queues. - - If this call fails, `runstate` is guaranteed to be set back to `IDLE`. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - await self._session_guard( - self._do_connect(address, ssl), - 'Failed to establish connection') - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - assert self.runstate == Runstate.RUNNING - - @upper_half - async def disconnect(self) -> None: - """ - Disconnect and wait for all tasks to fully stop. - - If there was an exception that caused the reader/writers to - terminate prematurely, it will be raised here. - - :raise Exception: When the reader or writer terminate unexpectedly. - """ - self.logger.debug("disconnect() called.") - self._schedule_disconnect() - await self._wait_disconnect() - - # -------------------------- - # Section: Session machinery - # -------------------------- - - async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None: - """ - Async guard function used to roll back to `IDLE` on any error. - - On any Exception, the state machine will be reset back to - `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but - `BaseException` events will be left alone (This includes - asyncio.CancelledError, even prior to Python 3.8). - - :param error_message: - Human-readable string describing what connection phase failed. - - :raise BaseException: - When `BaseException` occurs in the guarded block. - :raise ConnectError: - When any other error is encountered in the guarded block. - """ - # Note: After Python 3.6 support is removed, this should be an - # @asynccontextmanager instead of accepting a callback. - try: - await coro - except BaseException as err: - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - try: - # Reset the runstate back to IDLE. - await self.disconnect() - except: - # We don't expect any Exceptions from the disconnect function - # here, because we failed to connect in the first place. - # The disconnect() function is intended to perform - # only cannot-fail cleanup here, but you never know. - emsg = ( - "Unexpected bottom half exception. " - "This is a bug in the QMP library. " - "Please report it to <qemu-devel@nongnu.org> and " - "CC: John Snow <jsnow@redhat.com>." - ) - self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) - raise - - # CancelledError is an Exception with special semantic meaning; - # We do NOT want to wrap it up under ConnectError. - # NB: CancelledError is not a BaseException before Python 3.8 - if isinstance(err, asyncio.CancelledError): - raise - - # Any other kind of error can be treated as some kind of connection - # failure broadly. Inspect the 'exc' field to explore the root - # cause in greater detail. - if isinstance(err, Exception): - raise ConnectError(emsg, err) from err - - # Raise BaseExceptions un-wrapped, they're more important. - raise - - @property - def _runstate_event(self) -> asyncio.Event: - # asyncio.Event() objects should not be created prior to entrance into - # an event loop, so we can ensure we create it in the correct context. - # Create it on-demand *only* at the behest of an 'async def' method. - if not self._runstate_changed: - self._runstate_changed = asyncio.Event() - return self._runstate_changed - - @upper_half - @bottom_half - def _set_state(self, state: Runstate) -> None: - """ - Change the `Runstate` of the protocol connection. - - Signals the `runstate_changed` event. - """ - if state == self._runstate: - return - - self.logger.debug("Transitioning from '%s' to '%s'.", - str(self._runstate), str(state)) - self._runstate = state - self._runstate_event.set() - self._runstate_event.clear() - - @bottom_half - async def _stop_server(self) -> None: - """ - Stop listening for / accepting new incoming connections. - """ - if self._server is None: - return - - try: - self.logger.debug("Stopping server.") - self._server.close() - await self._server.wait_closed() - self.logger.debug("Server stopped.") - finally: - self._server = None - - @bottom_half # However, it does not run from the R/W tasks. - async def _incoming(self, - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: - """ - Accept an incoming connection and signal the upper_half. - - This method does the minimum necessary to accept a single - incoming connection. It signals back to the upper_half ASAP so - that any errors during session initialization can occur - naturally in the caller's stack. - - :param reader: Incoming `asyncio.StreamReader` - :param writer: Incoming `asyncio.StreamWriter` - """ - peer = writer.get_extra_info('peername', 'Unknown peer') - self.logger.debug("Incoming connection from %s", peer) - - if self._reader or self._writer: - # Sadly, we can have more than one pending connection - # because of https://bugs.python.org/issue46715 - # Close any extra connections we don't actually want. - self.logger.warning("Extraneous connection inadvertently accepted") - writer.close() - return - - # A connection has been accepted; stop listening for new ones. - assert self._accepted is not None - await self._stop_server() - self._reader, self._writer = (reader, writer) - self._accepted.set() - - @upper_half - async def _do_start_server(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Start listening for an incoming connection, but do not wait for a peer. - - This method starts listening for an incoming connection, but does not - block waiting for a peer. This call will return immediately after - binding and listening to a socket. A later call to accept() must be - made in order to finalize the incoming connection. - - :param address: - Address to listen on; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - self.logger.debug("Awaiting connection on %s ...", address) - self._accepted = asyncio.Event() - - if isinstance(address, tuple): - coro = asyncio.start_server( - self._incoming, - host=address[0], - port=address[1], - ssl=ssl, - backlog=1, - limit=self._limit, - ) - else: - coro = asyncio.start_unix_server( - self._incoming, - path=address, - ssl=ssl, - backlog=1, - limit=self._limit, - ) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - # This will start the server (bind(2), listen(2)). It will also - # call accept(2) if we yield, but we don't block on that here. - self._server = await coro - self.logger.debug("Server listening on %s", address) - - @upper_half - async def _do_accept(self) -> None: - """ - Wait for and accept an incoming connection. - - Requires that we have not yet accepted an incoming connection - from the upper_half, but it's OK if the server is no longer - running because the bottom_half has already accepted the - connection. - """ - assert self._accepted is not None - await self._accepted.wait() - assert self._server is None - self._accepted = None - - self.logger.debug("Connection accepted.") - - @upper_half - async def _do_connect(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: - """ - Acting as the transport client, initiate a connection to a server. - - :param address: - Address to connect to; UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - - :raise OSError: For stream-related errors. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - self.logger.debug("Connecting to %s ...", address) - - if isinstance(address, tuple): - connect = asyncio.open_connection( - address[0], - address[1], - ssl=ssl, - limit=self._limit, - ) - else: - connect = asyncio.open_unix_connection( - path=address, - ssl=ssl, - limit=self._limit, - ) - self._reader, self._writer = await connect - - self.logger.debug("Connected.") - - @upper_half - async def _establish_session(self) -> None: - """ - Establish a new session. - - Starts the readers/writer tasks; subclasses may perform their - own negotiations here. The Runstate will be RUNNING upon - successful conclusion. - """ - assert self.runstate == Runstate.CONNECTING - - self._outgoing = asyncio.Queue() - - reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader') - writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer') - - self._reader_task = create_task(reader_coro) - self._writer_task = create_task(writer_coro) - - self._bh_tasks = asyncio.gather( - self._reader_task, - self._writer_task, - ) - - self._set_state(Runstate.RUNNING) - await asyncio.sleep(0) # Allow runstate_event to process - - @upper_half - @bottom_half - def _schedule_disconnect(self) -> None: - """ - Initiate a disconnect; idempotent. - - This method is used both in the upper-half as a direct - consequence of `disconnect()`, and in the bottom-half in the - case of unhandled exceptions in the reader/writer tasks. - - It can be invoked no matter what the `runstate` is. - """ - if not self._dc_task: - self._set_state(Runstate.DISCONNECTING) - self.logger.debug("Scheduling disconnect.") - self._dc_task = create_task(self._bh_disconnect()) - - @upper_half - async def _wait_disconnect(self) -> None: - """ - Waits for a previously scheduled disconnect to finish. - - This method will gather any bottom half exceptions and re-raise - the one that occurred first; presuming it to be the root cause - of any subsequent Exceptions. It is intended to be used in the - upper half of the call chain. - - :raise Exception: - Arbitrary exception re-raised on behalf of the reader/writer. - """ - assert self.runstate == Runstate.DISCONNECTING - assert self._dc_task - - aws: List[Awaitable[object]] = [self._dc_task] - if self._bh_tasks: - aws.insert(0, self._bh_tasks) - all_defined_tasks = asyncio.gather(*aws) - - # Ensure disconnect is done; Exception (if any) is not raised here: - await asyncio.wait((self._dc_task,)) - - try: - await all_defined_tasks # Raise Exceptions from the bottom half. - finally: - self._cleanup() - self._set_state(Runstate.IDLE) - - @upper_half - def _cleanup(self) -> None: - """ - Fully reset this object to a clean state and return to `IDLE`. - """ - def _paranoid_task_erase(task: Optional['asyncio.Future[_U]'] - ) -> Optional['asyncio.Future[_U]']: - # Help to erase a task, ENSURING it is fully quiesced first. - assert (task is None) or task.done() - return None if (task and task.done()) else task - - assert self.runstate == Runstate.DISCONNECTING - self._dc_task = _paranoid_task_erase(self._dc_task) - self._reader_task = _paranoid_task_erase(self._reader_task) - self._writer_task = _paranoid_task_erase(self._writer_task) - self._bh_tasks = _paranoid_task_erase(self._bh_tasks) - - self._reader = None - self._writer = None - self._accepted = None - - # NB: _runstate_changed cannot be cleared because we still need it to - # send the final runstate changed event ...! - - # ---------------------------- - # Section: Bottom Half methods - # ---------------------------- - - @bottom_half - async def _bh_disconnect(self) -> None: - """ - Disconnect and cancel all outstanding tasks. - - It is designed to be called from its task context, - :py:obj:`~AsyncProtocol._dc_task`. By running in its own task, - it is free to wait on any pending actions that may still need to - occur in either the reader or writer tasks. - """ - assert self.runstate == Runstate.DISCONNECTING - - def _done(task: Optional['asyncio.Future[Any]']) -> bool: - return task is not None and task.done() - - # If the server is running, stop it. - await self._stop_server() - - # Are we already in an error pathway? If either of the tasks are - # already done, or if we have no tasks but a reader/writer; we - # must be. - # - # NB: We can't use _bh_tasks to check for premature task - # completion, because it may not yet have had a chance to run - # and gather itself. - tasks = tuple(filter(None, (self._writer_task, self._reader_task))) - error_pathway = _done(self._reader_task) or _done(self._writer_task) - if not tasks: - error_pathway |= bool(self._reader) or bool(self._writer) - - try: - # Try to flush the writer, if possible. - # This *may* cause an error and force us over into the error path. - if not error_pathway: - await self._bh_flush_writer() - except BaseException as err: - error_pathway = True - emsg = "Failed to flush the writer" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - raise - finally: - # Cancel any still-running tasks (Won't raise): - if self._writer_task is not None and not self._writer_task.done(): - self.logger.debug("Cancelling writer task.") - self._writer_task.cancel() - if self._reader_task is not None and not self._reader_task.done(): - self.logger.debug("Cancelling reader task.") - self._reader_task.cancel() - - # Close out the tasks entirely (Won't raise): - if tasks: - self.logger.debug("Waiting for tasks to complete ...") - await asyncio.wait(tasks) - - # Lastly, close the stream itself. (*May raise*!): - await self._bh_close_stream(error_pathway) - self.logger.debug("Disconnected.") - - @bottom_half - async def _bh_flush_writer(self) -> None: - if not self._writer_task: - return - - self.logger.debug("Draining the outbound queue ...") - await self._outgoing.join() - if self._writer is not None: - self.logger.debug("Flushing the StreamWriter ...") - await flush(self._writer) - - @bottom_half - async def _bh_close_stream(self, error_pathway: bool = False) -> None: - # NB: Closing the writer also implcitly closes the reader. - if not self._writer: - return - - if not is_closing(self._writer): - self.logger.debug("Closing StreamWriter.") - self._writer.close() - - self.logger.debug("Waiting for StreamWriter to close ...") - try: - await wait_closed(self._writer) - except Exception: # pylint: disable=broad-except - # It's hard to tell if the Stream is already closed or - # not. Even if one of the tasks has failed, it may have - # failed for a higher-layered protocol reason. The - # stream could still be open and perfectly fine. - # I don't know how to discern its health here. - - if error_pathway: - # We already know that *something* went wrong. Let's - # just trust that the Exception we already have is the - # better one to present to the user, even if we don't - # genuinely *know* the relationship between the two. - self.logger.debug( - "Discarding Exception from wait_closed:\n%s\n", - pretty_traceback(), - ) - else: - # Oops, this is a brand-new error! - raise - finally: - self.logger.debug("StreamWriter closed.") - - @bottom_half - async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None: - """ - Run one of the bottom-half methods in a loop forever. - - If the bottom half ever raises any exception, schedule a - disconnect that will terminate the entire loop. - - :param async_fn: The bottom-half method to run in a loop. - :param name: The name of this task, used for logging. - """ - try: - while True: - await async_fn() - except asyncio.CancelledError: - # We have been cancelled by _bh_disconnect, exit gracefully. - self.logger.debug("Task.%s: cancelled.", name) - return - except BaseException as err: - self.logger.log( - logging.INFO if isinstance(err, EOFError) else logging.ERROR, - "Task.%s: %s", - name, exception_summary(err) - ) - self.logger.debug("Task.%s: failure:\n%s\n", - name, pretty_traceback()) - self._schedule_disconnect() - raise - finally: - self.logger.debug("Task.%s: exiting.", name) - - @bottom_half - async def _bh_send_message(self) -> None: - """ - Wait for an outgoing message, then send it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._outgoing.get() - try: - await self._send(msg) - finally: - self._outgoing.task_done() - - @bottom_half - async def _bh_recv_message(self) -> None: - """ - Wait for an incoming message and call `_on_message` to route it. - - Designed to be run in `_bh_loop_forever()`. - """ - msg = await self._recv() - await self._on_message(msg) - - # -------------------- - # Section: Message I/O - # -------------------- - - @upper_half - @bottom_half - def _cb_outbound(self, msg: T) -> T: - """ - Callback: outbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate outgoing messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - :param msg: raw outbound message - :return: final outbound message - """ - self.logger.debug("--> %s", str(msg)) - return msg - - @upper_half - @bottom_half - def _cb_inbound(self, msg: T) -> T: - """ - Callback: inbound message hook. - - This is intended for subclasses to be able to add arbitrary - hooks to filter or manipulate incoming messages. The base - implementation does nothing but log the message without any - manipulation of the message. - - This method does not "handle" incoming messages; it is a filter. - The actual "endpoint" for incoming messages is `_on_message()`. - - :param msg: raw inbound message - :return: processed inbound message - """ - self.logger.debug("<-- %s", str(msg)) - return msg - - @upper_half - @bottom_half - async def _readline(self) -> bytes: - """ - Wait for a newline from the incoming reader. - - This method is provided as a convenience for upper-layer - protocols, as many are line-based. - - This method *may* return a sequence of bytes without a trailing - newline if EOF occurs, but *some* bytes were received. In this - case, the next call will raise `EOFError`. It is assumed that - the layer 5 protocol will decide if there is anything meaningful - to be done with a partial message. - - :raise OSError: For stream-related errors. - :raise EOFError: - If the reader stream is at EOF and there are no bytes to return. - :return: bytes, including the newline. - """ - assert self._reader is not None - msg_bytes = await self._reader.readline() - - if not msg_bytes: - if self._reader.at_eof(): - raise EOFError - - return msg_bytes - - @upper_half - @bottom_half - async def _do_recv(self) -> T: - """ - Abstract: Read from the stream and return a message. - - Very low-level; intended to only be called by `_recv()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _recv(self) -> T: - """ - Read an arbitrary protocol message. - - .. warning:: - This method is intended primarily for `_bh_recv_message()` - to use in an asynchronous task loop. Using it outside of - this loop will "steal" messages from the normal routing - mechanism. It is safe to use prior to `_establish_session()`, - but should not be used otherwise. - - This method uses `_do_recv()` to retrieve the raw message, and - then transforms it using `_cb_inbound()`. - - :return: A single (filtered, processed) protocol message. - """ - message = await self._do_recv() - return self._cb_inbound(message) - - @upper_half - @bottom_half - def _do_send(self, msg: T) -> None: - """ - Abstract: Write a message to the stream. - - Very low-level; intended to only be called by `_send()`. - """ - raise NotImplementedError - - @upper_half - @bottom_half - async def _send(self, msg: T) -> None: - """ - Send an arbitrary protocol message. - - This method will transform any outgoing messages according to - `_cb_outbound()`. - - .. warning:: - Like `_recv()`, this method is intended to be called by - the writer task loop that processes outgoing - messages. Calling it directly may circumvent logic - implemented by the caller meant to correlate outgoing and - incoming messages. - - :raise OSError: For problems with the underlying stream. - """ - msg = self._cb_outbound(msg) - self._do_send(msg) - - @bottom_half - async def _on_message(self, msg: T) -> None: - """ - Called to handle the receipt of a new message. - - .. caution:: - This is executed from within the reader loop, so be advised - that waiting on either the reader or writer task will lead - to deadlock. Additionally, any unhandled exceptions will - directly cause the loop to halt, so logic may be best-kept - to a minimum if at all possible. - - :param msg: The incoming message, already logged/filtered. - """ - # Nothing to do in the abstract case. |