summaryrefslogtreecommitdiffstats
path: root/python/qemu/aqmp/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/aqmp/legacy.py')
-rw-r--r--python/qemu/aqmp/legacy.py177
1 files changed, 0 insertions, 177 deletions
diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py
deleted file mode 100644
index 46026e9fdc..0000000000
--- a/python/qemu/aqmp/legacy.py
+++ /dev/null
@@ -1,177 +0,0 @@
-"""
-Sync QMP Wrapper
-
-This class pretends to be qemu.qmp.QEMUMonitorProtocol.
-"""
-
-import asyncio
-from typing import (
- Any,
- Awaitable,
- Dict,
- List,
- Optional,
- TypeVar,
- Union,
-)
-
-import qemu.qmp
-
-from .error import QMPError
-from .protocol import Runstate, SocketAddrT
-from .qmp_client import QMPClient
-
-
-# (Temporarily) Re-export QMPBadPortError
-QMPBadPortError = qemu.qmp.QMPBadPortError
-
-#: QMPMessage is an entire QMP message of any kind.
-QMPMessage = Dict[str, Any]
-
-#: QMPReturnValue is the 'return' value of a command.
-QMPReturnValue = object
-
-#: QMPObject is any object in a QMP message.
-QMPObject = Dict[str, object]
-
-# QMPMessage can be outgoing commands or incoming events/returns.
-# QMPReturnValue is usually a dict/json object, but due to QAPI's
-# 'returns-whitelist', it can actually be anything.
-#
-# {'return': {}} is a QMPMessage,
-# {} is the QMPReturnValue.
-
-
-# pylint: disable=missing-docstring
-
-
-class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
- def __init__(self, address: SocketAddrT,
- server: bool = False,
- nickname: Optional[str] = None):
-
- # pylint: disable=super-init-not-called
- self._aqmp = QMPClient(nickname)
- self._aloop = asyncio.get_event_loop()
- self._address = address
- self._timeout: Optional[float] = None
-
- if server:
- self._sync(self._aqmp.start_server(self._address))
-
- _T = TypeVar('_T')
-
- def _sync(
- self, future: Awaitable[_T], timeout: Optional[float] = None
- ) -> _T:
- return self._aloop.run_until_complete(
- asyncio.wait_for(future, timeout=timeout)
- )
-
- def _get_greeting(self) -> Optional[QMPMessage]:
- if self._aqmp.greeting is not None:
- # pylint: disable=protected-access
- return self._aqmp.greeting._asdict()
- return None
-
- # __enter__ and __exit__ need no changes
- # parse_address needs no changes
-
- def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
- self._aqmp.await_greeting = negotiate
- self._aqmp.negotiate = negotiate
-
- self._sync(
- self._aqmp.connect(self._address)
- )
- return self._get_greeting()
-
- def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
- self._aqmp.await_greeting = True
- self._aqmp.negotiate = True
-
- self._sync(self._aqmp.accept(), timeout)
-
- ret = self._get_greeting()
- assert ret is not None
- return ret
-
- def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
- return dict(
- self._sync(
- # pylint: disable=protected-access
-
- # _raw() isn't a public API, because turning off
- # automatic ID assignment is discouraged. For
- # compatibility with iotests *only*, do it anyway.
- self._aqmp._raw(qmp_cmd, assign_id=False),
- self._timeout
- )
- )
-
- # Default impl of cmd() delegates to cmd_obj
-
- def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
- return self._sync(
- self._aqmp.execute(cmd, kwds),
- self._timeout
- )
-
- def pull_event(self,
- wait: Union[bool, float] = False) -> Optional[QMPMessage]:
- if not wait:
- # wait is False/0: "do not wait, do not except."
- if self._aqmp.events.empty():
- return None
-
- # If wait is 'True', wait forever. If wait is False/0, the events
- # queue must not be empty; but it still needs some real amount
- # of time to complete.
- timeout = None
- if wait and isinstance(wait, float):
- timeout = wait
-
- return dict(
- self._sync(
- self._aqmp.events.get(),
- timeout
- )
- )
-
- def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
- events = [dict(x) for x in self._aqmp.events.clear()]
- if events:
- return events
-
- event = self.pull_event(wait)
- return [event] if event is not None else []
-
- def clear_events(self) -> None:
- self._aqmp.events.clear()
-
- def close(self) -> None:
- self._sync(
- self._aqmp.disconnect()
- )
-
- def settimeout(self, timeout: Optional[float]) -> None:
- self._timeout = timeout
-
- def send_fd_scm(self, fd: int) -> None:
- self._aqmp.send_fd_scm(fd)
-
- def __del__(self) -> None:
- if self._aqmp.runstate == Runstate.IDLE:
- return
-
- if not self._aloop.is_running():
- self.close()
- else:
- # Garbage collection ran while the event loop was running.
- # Nothing we can do about it now, but if we don't raise our
- # own error, the user will be treated to a lot of traceback
- # they might not understand.
- raise QMPError(
- "QEMUMonitorProtocol.close()"
- " was not called before object was garbage collected"
- )