summaryrefslogtreecommitdiffstats
path: root/python/qemu/qmp.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/qmp.py')
-rw-r--r--python/qemu/qmp.py44
1 files changed, 25 insertions, 19 deletions
diff --git a/python/qemu/qmp.py b/python/qemu/qmp.py
index ddf8347ac1..9223307ed8 100644
--- a/python/qemu/qmp.py
+++ b/python/qemu/qmp.py
@@ -15,6 +15,7 @@ from types import TracebackType
from typing import (
Any,
Dict,
+ List,
Optional,
TextIO,
Tuple,
@@ -90,7 +91,9 @@ class QEMUMonitorProtocol:
#: Logger object for debugging messages
logger = logging.getLogger('QMP')
- def __init__(self, address, server=False, nickname=None):
+ def __init__(self, address: SocketAddrT,
+ server: bool = False,
+ nickname: Optional[str] = None):
"""
Create a QEMUMonitorProtocol class.
@@ -102,7 +105,7 @@ class QEMUMonitorProtocol:
@note No connection is established, this is done by the connect() or
accept() methods
"""
- self.__events = []
+ self.__events: List[QMPMessage] = []
self.__address = address
self.__sock = self.__get_sock()
self.__sockfile: Optional[TextIO] = None
@@ -114,14 +117,14 @@ class QEMUMonitorProtocol:
self.__sock.bind(self.__address)
self.__sock.listen(1)
- def __get_sock(self):
+ def __get_sock(self) -> socket.socket:
if isinstance(self.__address, tuple):
family = socket.AF_INET
else:
family = socket.AF_UNIX
return socket.socket(family, socket.SOCK_STREAM)
- def __negotiate_capabilities(self):
+ def __negotiate_capabilities(self) -> QMPMessage:
greeting = self.__json_read()
if greeting is None or "QMP" not in greeting:
raise QMPConnectError
@@ -131,7 +134,7 @@ class QEMUMonitorProtocol:
return greeting
raise QMPCapabilitiesError
- def __json_read(self, only_event=False):
+ def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
assert self.__sockfile is not None
while True:
data = self.__sockfile.readline()
@@ -148,7 +151,7 @@ class QEMUMonitorProtocol:
continue
return resp
- def __get_events(self, wait=False):
+ def __get_events(self, wait: Union[bool, float] = False) -> None:
"""
Check for new events in the stream and cache them in __events.
@@ -186,7 +189,7 @@ class QEMUMonitorProtocol:
raise QMPConnectError("Error while reading from socket")
self.__sock.settimeout(None)
- def __enter__(self):
+ def __enter__(self) -> 'QEMUMonitorProtocol':
# Implement context manager enter function.
return self
@@ -199,7 +202,7 @@ class QEMUMonitorProtocol:
# Implement context manager exit function.
self.close()
- def connect(self, negotiate=True):
+ def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
"""
Connect to the QMP Monitor and perform capabilities negotiation.
@@ -214,7 +217,7 @@ class QEMUMonitorProtocol:
return self.__negotiate_capabilities()
return None
- def accept(self, timeout=15.0):
+ def accept(self, timeout: float = 15.0) -> QMPMessage:
"""
Await connection from QMP Monitor and perform capabilities negotiation.
@@ -250,7 +253,9 @@ class QEMUMonitorProtocol:
self.logger.debug("<<< %s", resp)
return resp
- def cmd(self, name, args=None, cmd_id=None):
+ def cmd(self, name: str,
+ args: Optional[Dict[str, Any]] = None,
+ cmd_id: Optional[Any] = None) -> QMPMessage:
"""
Build a QMP command and send it to the QMP Monitor.
@@ -258,14 +263,14 @@ class QEMUMonitorProtocol:
@param args: command arguments (dict)
@param cmd_id: command id (dict, list, string or int)
"""
- qmp_cmd = {'execute': name}
+ qmp_cmd: QMPMessage = {'execute': name}
if args:
qmp_cmd['arguments'] = args
if cmd_id:
qmp_cmd['id'] = cmd_id
return self.cmd_obj(qmp_cmd)
- def command(self, cmd, **kwds):
+ def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
"""
Build and send a QMP command to the monitor, report errors if any
"""
@@ -278,7 +283,8 @@ class QEMUMonitorProtocol:
)
return cast(QMPReturnValue, ret['return'])
- def pull_event(self, wait=False):
+ def pull_event(self,
+ wait: Union[bool, float] = False) -> Optional[QMPMessage]:
"""
Pulls a single event.
@@ -298,7 +304,7 @@ class QEMUMonitorProtocol:
return self.__events.pop(0)
return None
- def get_events(self, wait=False):
+ def get_events(self, wait: bool = False) -> List[QMPMessage]:
"""
Get a list of available QMP events.
@@ -315,13 +321,13 @@ class QEMUMonitorProtocol:
self.__get_events(wait)
return self.__events
- def clear_events(self):
+ def clear_events(self) -> None:
"""
Clear current list of pending events.
"""
self.__events = []
- def close(self):
+ def close(self) -> None:
"""
Close the socket and socket file.
"""
@@ -330,7 +336,7 @@ class QEMUMonitorProtocol:
if self.__sockfile:
self.__sockfile.close()
- def settimeout(self, timeout):
+ def settimeout(self, timeout: float) -> None:
"""
Set the socket timeout.
@@ -339,7 +345,7 @@ class QEMUMonitorProtocol:
"""
self.__sock.settimeout(timeout)
- def get_sock_fd(self):
+ def get_sock_fd(self) -> int:
"""
Get the socket file descriptor.
@@ -347,7 +353,7 @@ class QEMUMonitorProtocol:
"""
return self.__sock.fileno()
- def is_scm_available(self):
+ def is_scm_available(self) -> bool:
"""
Check if the socket allows for SCM_RIGHTS.