summaryrefslogtreecommitdiffstats
path: root/python/qemu/aqmp/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/aqmp/events.py')
-rw-r--r--python/qemu/aqmp/events.py717
1 files changed, 0 insertions, 717 deletions
diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py
deleted file mode 100644
index f3d4e2b5e8..0000000000
--- a/python/qemu/aqmp/events.py
+++ /dev/null
@@ -1,717 +0,0 @@
-"""
-AQMP Events and EventListeners
-
-Asynchronous QMP uses `EventListener` objects to listen for events. An
-`EventListener` is a FIFO event queue that can be pre-filtered to listen
-for only specific events. Each `EventListener` instance receives its own
-copy of events that it hears, so events may be consumed without fear or
-worry for depriving other listeners of events they need to hear.
-
-
-EventListener Tutorial
-----------------------
-
-In all of the following examples, we assume that we have a `QMPClient`
-instantiated named ``qmp`` that is already connected.
-
-
-`listener()` context blocks with one name
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The most basic usage is by using the `listener()` context manager to
-construct them:
-
-.. code:: python
-
- with qmp.listener('STOP') as listener:
- await qmp.execute('stop')
- await listener.get()
-
-The listener is active only for the duration of the ‘with’ block. This
-instance listens only for ‘STOP’ events.
-
-
-`listener()` context blocks with two or more names
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Multiple events can be selected for by providing any ``Iterable[str]``:
-
-.. code:: python
-
- with qmp.listener(('STOP', 'RESUME')) as listener:
- await qmp.execute('stop')
- event = await listener.get()
- assert event['event'] == 'STOP'
-
- await qmp.execute('cont')
- event = await listener.get()
- assert event['event'] == 'RESUME'
-
-
-`listener()` context blocks with no names
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-By omitting names entirely, you can listen to ALL events.
-
-.. code:: python
-
- with qmp.listener() as listener:
- await qmp.execute('stop')
- event = await listener.get()
- assert event['event'] == 'STOP'
-
-This isn’t a very good use case for this feature: In a non-trivial
-running system, we may not know what event will arrive next. Grabbing
-the top of a FIFO queue returning multiple kinds of events may be prone
-to error.
-
-
-Using async iterators to retrieve events
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-If you’d like to simply watch what events happen to arrive, you can use
-the listener as an async iterator:
-
-.. code:: python
-
- with qmp.listener() as listener:
- async for event in listener:
- print(f"Event arrived: {event['event']}")
-
-This is analogous to the following code:
-
-.. code:: python
-
- with qmp.listener() as listener:
- while True:
- event = listener.get()
- print(f"Event arrived: {event['event']}")
-
-This event stream will never end, so these blocks will never terminate.
-
-
-Using asyncio.Task to concurrently retrieve events
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Since a listener’s event stream will never terminate, it is not likely
-useful to use that form in a script. For longer-running clients, we can
-create event handlers by using `asyncio.Task` to create concurrent
-coroutines:
-
-.. code:: python
-
- async def print_events(listener):
- try:
- async for event in listener:
- print(f"Event arrived: {event['event']}")
- except asyncio.CancelledError:
- return
-
- with qmp.listener() as listener:
- task = asyncio.Task(print_events(listener))
- await qmp.execute('stop')
- await qmp.execute('cont')
- task.cancel()
- await task
-
-However, there is no guarantee that these events will be received by the
-time we leave this context block. Once the context block is exited, the
-listener will cease to hear any new events, and becomes inert.
-
-Be mindful of the timing: the above example will *probably*– but does
-not *guarantee*– that both STOP/RESUMED events will be printed. The
-example below outlines how to use listeners outside of a context block.
-
-
-Using `register_listener()` and `remove_listener()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-To create a listener with a longer lifetime, beyond the scope of a
-single block, create a listener and then call `register_listener()`:
-
-.. code:: python
-
- class MyClient:
- def __init__(self, qmp):
- self.qmp = qmp
- self.listener = EventListener()
-
- async def print_events(self):
- try:
- async for event in self.listener:
- print(f"Event arrived: {event['event']}")
- except asyncio.CancelledError:
- return
-
- async def run(self):
- self.task = asyncio.Task(self.print_events)
- self.qmp.register_listener(self.listener)
- await qmp.execute('stop')
- await qmp.execute('cont')
-
- async def stop(self):
- self.task.cancel()
- await self.task
- self.qmp.remove_listener(self.listener)
-
-The listener can be deactivated by using `remove_listener()`. When it is
-removed, any possible pending events are cleared and it can be
-re-registered at a later time.
-
-
-Using the built-in all events listener
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The `QMPClient` object creates its own default listener named
-:py:obj:`~Events.events` that can be used for the same purpose without
-having to create your own:
-
-.. code:: python
-
- async def print_events(listener):
- try:
- async for event in listener:
- print(f"Event arrived: {event['event']}")
- except asyncio.CancelledError:
- return
-
- task = asyncio.Task(print_events(qmp.events))
-
- await qmp.execute('stop')
- await qmp.execute('cont')
-
- task.cancel()
- await task
-
-
-Using both .get() and async iterators
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The async iterator and `get()` methods pull events from the same FIFO
-queue. If you mix the usage of both, be aware: Events are emitted
-precisely once per listener.
-
-If multiple contexts try to pull events from the same listener instance,
-events are still emitted only precisely once.
-
-This restriction can be lifted by creating additional listeners.
-
-
-Creating multiple listeners
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Additional `EventListener` objects can be created at-will. Each one
-receives its own copy of events, with separate FIFO event queues.
-
-.. code:: python
-
- my_listener = EventListener()
- qmp.register_listener(my_listener)
-
- await qmp.execute('stop')
- copy1 = await my_listener.get()
- copy2 = await qmp.events.get()
-
- assert copy1 == copy2
-
-In this example, we await an event from both a user-created
-`EventListener` and the built-in events listener. Both receive the same
-event.
-
-
-Clearing listeners
-~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects can be cleared, clearing all events seen thus far:
-
-.. code:: python
-
- await qmp.execute('stop')
- qmp.events.clear()
- await qmp.execute('cont')
- event = await qmp.events.get()
- assert event['event'] == 'RESUME'
-
-`EventListener` objects are FIFO queues. If events are not consumed,
-they will remain in the queue until they are witnessed or discarded via
-`clear()`. FIFO queues will be drained automatically upon leaving a
-context block, or when calling `remove_listener()`.
-
-
-Accessing listener history
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects record their history. Even after being cleared,
-you can obtain a record of all events seen so far:
-
-.. code:: python
-
- await qmp.execute('stop')
- await qmp.execute('cont')
- qmp.events.clear()
-
- assert len(qmp.events.history) == 2
- assert qmp.events.history[0]['event'] == 'STOP'
- assert qmp.events.history[1]['event'] == 'RESUME'
-
-The history is updated immediately and does not require the event to be
-witnessed first.
-
-
-Using event filters
-~~~~~~~~~~~~~~~~~~~
-
-`EventListener` objects can be given complex filtering criteria if names
-are not sufficient:
-
-.. code:: python
-
- def job1_filter(event) -> bool:
- event_data = event.get('data', {})
- event_job_id = event_data.get('id')
- return event_job_id == "job1"
-
- with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
- async for event in listener:
- if event['data']['status'] == 'concluded':
- break
-
-These filters might be most useful when parameterized. `EventListener`
-objects expect a function that takes only a single argument (the raw
-event, as a `Message`) and returns a bool; True if the event should be
-accepted into the stream. You can create a function that adapts this
-signature to accept configuration parameters:
-
-.. code:: python
-
- def job_filter(job_id: str) -> EventFilter:
- def filter(event: Message) -> bool:
- return event['data']['id'] == job_id
- return filter
-
- with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
- async for event in listener:
- if event['data']['status'] == 'concluded':
- break
-
-
-Activating an existing listener with `listen()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Listeners with complex, long configurations can also be created manually
-and activated temporarily by using `listen()` instead of `listener()`:
-
-.. code:: python
-
- listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
- 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
- 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
-
- with qmp.listen(listener):
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
- async for event in listener:
- print(event)
- if event['event'] == 'BLOCK_JOB_COMPLETED':
- break
-
-Any events that are not witnessed by the time the block is left will be
-cleared from the queue; entering the block is an implicit
-`register_listener()` and leaving the block is an implicit
-`remove_listener()`.
-
-
-Activating multiple existing listeners with `listen()`
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-While `listener()` is only capable of creating a single listener,
-`listen()` is capable of activating multiple listeners simultaneously:
-
-.. code:: python
-
- def job_filter(job_id: str) -> EventFilter:
- def filter(event: Message) -> bool:
- return event['data']['id'] == job_id
- return filter
-
- jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
- jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
-
- with qmp.listen(jobA, jobB):
- qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
- qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
-
- async for event in jobA.get():
- if event['data']['status'] == 'concluded':
- break
- async for event in jobB.get():
- if event['data']['status'] == 'concluded':
- break
-
-
-Extending the `EventListener` class
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-In the case that a more specialized `EventListener` is desired to
-provide either more functionality or more compact syntax for specialized
-cases, it can be extended.
-
-One of the key methods to extend or override is
-:py:meth:`~EventListener.accept()`. The default implementation checks an
-incoming message for:
-
-1. A qualifying name, if any :py:obj:`~EventListener.names` were
- specified at initialization time
-2. That :py:obj:`~EventListener.event_filter()` returns True.
-
-This can be modified however you see fit to change the criteria for
-inclusion in the stream.
-
-For convenience, a ``JobListener`` class could be created that simply
-bakes in configuration so it does not need to be repeated:
-
-.. code:: python
-
- class JobListener(EventListener):
- def __init__(self, job_id: str):
- super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
- 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
- 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
- self.job_id = job_id
-
- def accept(self, event) -> bool:
- if not super().accept(event):
- return False
- if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
- return event['data']['id'] == job_id
- return event['data']['device'] == job_id
-
-From here on out, you can conjure up a custom-purpose listener that
-listens only for job-related events for a specific job-id easily:
-
-.. code:: python
-
- listener = JobListener('job4')
- with qmp.listener(listener):
- await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
- async for event in listener:
- print(event)
- if event['event'] == 'BLOCK_JOB_COMPLETED':
- break
-
-
-Experimental Interfaces & Design Issues
----------------------------------------
-
-These interfaces are not ones I am sure I will keep or otherwise modify
-heavily.
-
-qmp.listener()’s type signature
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-`listener()` does not return anything, because it was assumed the caller
-already had a handle to the listener. However, for
-``qmp.listener(EventListener())`` forms, the caller will not have saved
-a handle to the listener.
-
-Because this function can accept *many* listeners, I found it hard to
-accurately type in a way where it could be used in both “one” or “many”
-forms conveniently and in a statically type-safe manner.
-
-Ultimately, I removed the return altogether, but perhaps with more time
-I can work out a way to re-add it.
-
-
-API Reference
--------------
-
-"""
-
-import asyncio
-from contextlib import contextmanager
-import logging
-from typing import (
- AsyncIterator,
- Callable,
- Iterable,
- Iterator,
- List,
- Optional,
- Set,
- Tuple,
- Union,
-)
-
-from .error import QMPError
-from .message import Message
-
-
-EventNames = Union[str, Iterable[str], None]
-EventFilter = Callable[[Message], bool]
-
-
-class ListenerError(QMPError):
- """
- Generic error class for `EventListener`-related problems.
- """
-
-
-class EventListener:
- """
- Selectively listens for events with runtime configurable filtering.
-
- This class is designed to be directly usable for the most common cases,
- but it can be extended to provide more rigorous control.
-
- :param names:
- One or more names of events to listen for.
- When not provided, listen for ALL events.
- :param event_filter:
- An optional event filtering function.
- When names are also provided, this acts as a secondary filter.
-
- When ``names`` and ``event_filter`` are both provided, the names
- will be filtered first, and then the filter function will be called
- second. The event filter function can assume that the format of the
- event is a known format.
- """
- def __init__(
- self,
- names: EventNames = None,
- event_filter: Optional[EventFilter] = None,
- ):
- # Queue of 'heard' events yet to be witnessed by a caller.
- self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
-
- # Intended as a historical record, NOT a processing queue or backlog.
- self._history: List[Message] = []
-
- #: Primary event filter, based on one or more event names.
- self.names: Set[str] = set()
- if isinstance(names, str):
- self.names.add(names)
- elif names is not None:
- self.names.update(names)
-
- #: Optional, secondary event filter.
- self.event_filter: Optional[EventFilter] = event_filter
-
- @property
- def history(self) -> Tuple[Message, ...]:
- """
- A read-only history of all events seen so far.
-
- This represents *every* event, including those not yet witnessed
- via `get()` or ``async for``. It persists between `clear()`
- calls and is immutable.
- """
- return tuple(self._history)
-
- def accept(self, event: Message) -> bool:
- """
- Determine if this listener accepts this event.
-
- This method determines which events will appear in the stream.
- The default implementation simply checks the event against the
- list of names and the event_filter to decide if this
- `EventListener` accepts a given event. It can be
- overridden/extended to provide custom listener behavior.
-
- User code is not expected to need to invoke this method.
-
- :param event: The event under consideration.
- :return: `True`, if this listener accepts this event.
- """
- name_ok = (not self.names) or (event['event'] in self.names)
- return name_ok and (
- (not self.event_filter) or self.event_filter(event)
- )
-
- async def put(self, event: Message) -> None:
- """
- Conditionally put a new event into the FIFO queue.
-
- This method is not designed to be invoked from user code, and it
- should not need to be overridden. It is a public interface so
- that `QMPClient` has an interface by which it can inform
- registered listeners of new events.
-
- The event will be put into the queue if
- :py:meth:`~EventListener.accept()` returns `True`.
-
- :param event: The new event to put into the FIFO queue.
- """
- if not self.accept(event):
- return
-
- self._history.append(event)
- await self._queue.put(event)
-
- async def get(self) -> Message:
- """
- Wait for the very next event in this stream.
-
- If one is already available, return that one.
- """
- return await self._queue.get()
-
- def empty(self) -> bool:
- """
- Return `True` if there are no pending events.
- """
- return self._queue.empty()
-
- def clear(self) -> List[Message]:
- """
- Clear this listener of all pending events.
-
- Called when an `EventListener` is being unregistered, this clears the
- pending FIFO queue synchronously. It can be also be used to
- manually clear any pending events, if desired.
-
- :return: The cleared events, if any.
-
- .. warning::
- Take care when discarding events. Cleared events will be
- silently tossed on the floor. All events that were ever
- accepted by this listener are visible in `history()`.
- """
- events = []
- while True:
- try:
- events.append(self._queue.get_nowait())
- except asyncio.QueueEmpty:
- break
-
- return events
-
- def __aiter__(self) -> AsyncIterator[Message]:
- return self
-
- async def __anext__(self) -> Message:
- """
- Enables the `EventListener` to function as an async iterator.
-
- It may be used like this:
-
- .. code:: python
-
- async for event in listener:
- print(event)
-
- These iterators will never terminate of their own accord; you
- must provide break conditions or otherwise prepare to run them
- in an `asyncio.Task` that can be cancelled.
- """
- return await self.get()
-
-
-class Events:
- """
- Events is a mix-in class that adds event functionality to the QMP class.
-
- It's designed specifically as a mix-in for `QMPClient`, and it
- relies upon the class it is being mixed into having a 'logger'
- property.
- """
- def __init__(self) -> None:
- self._listeners: List[EventListener] = []
-
- #: Default, all-events `EventListener`.
- self.events: EventListener = EventListener()
- self.register_listener(self.events)
-
- # Parent class needs to have a logger
- self.logger: logging.Logger
-
- async def _event_dispatch(self, msg: Message) -> None:
- """
- Given a new event, propagate it to all of the active listeners.
-
- :param msg: The event to propagate.
- """
- for listener in self._listeners:
- await listener.put(msg)
-
- def register_listener(self, listener: EventListener) -> None:
- """
- Register and activate an `EventListener`.
-
- :param listener: The listener to activate.
- :raise ListenerError: If the given listener is already registered.
- """
- if listener in self._listeners:
- raise ListenerError("Attempted to re-register existing listener")
- self.logger.debug("Registering %s.", str(listener))
- self._listeners.append(listener)
-
- def remove_listener(self, listener: EventListener) -> None:
- """
- Unregister and deactivate an `EventListener`.
-
- The removed listener will have its pending events cleared via
- `clear()`. The listener can be re-registered later when
- desired.
-
- :param listener: The listener to deactivate.
- :raise ListenerError: If the given listener is not registered.
- """
- if listener == self.events:
- raise ListenerError("Cannot remove the default listener.")
- self.logger.debug("Removing %s.", str(listener))
- listener.clear()
- self._listeners.remove(listener)
-
- @contextmanager
- def listen(self, *listeners: EventListener) -> Iterator[None]:
- r"""
- Context manager: Temporarily listen with an `EventListener`.
-
- Accepts one or more `EventListener` objects and registers them,
- activating them for the duration of the context block.
-
- `EventListener` objects will have any pending events in their
- FIFO queue cleared upon exiting the context block, when they are
- deactivated.
-
- :param \*listeners: One or more EventListeners to activate.
- :raise ListenerError: If the given listener(s) are already active.
- """
- _added = []
-
- try:
- for listener in listeners:
- self.register_listener(listener)
- _added.append(listener)
-
- yield
-
- finally:
- for listener in _added:
- self.remove_listener(listener)
-
- @contextmanager
- def listener(
- self,
- names: EventNames = (),
- event_filter: Optional[EventFilter] = None
- ) -> Iterator[EventListener]:
- """
- Context manager: Temporarily listen with a new `EventListener`.
-
- Creates an `EventListener` object and registers it, activating
- it for the duration of the context block.
-
- :param names:
- One or more names of events to listen for.
- When not provided, listen for ALL events.
- :param event_filter:
- An optional event filtering function.
- When names are also provided, this acts as a secondary filter.
-
- :return: The newly created and active `EventListener`.
- """
- listener = EventListener(names, event_filter)
- with self.listen(listener):
- yield listener