diff options
author | Stefan Hajnoczi | 2022-10-30 23:30:00 +0100 |
---|---|---|
committer | Stefan Hajnoczi | 2022-10-30 23:30:01 +0100 |
commit | 2281c822ee74072659d247a2eb3abf2b786f73b3 (patch) | |
tree | a0f2b5e8de223a27f8c29f174b23e1345b5dc5d0 /net/stream.c | |
parent | Merge tag 'for-upstream' of https://repo.or.cz/qemu/kevin into staging (diff) | |
parent | net: stream: add QAPI events to report connection state (diff) | |
download | qemu-2281c822ee74072659d247a2eb3abf2b786f73b3.tar.gz qemu-2281c822ee74072659d247a2eb3abf2b786f73b3.tar.xz qemu-2281c822ee74072659d247a2eb3abf2b786f73b3.zip |
Merge tag 'net-pull-request' of https://github.com/jasowang/qemu into staging
# -----BEGIN PGP SIGNATURE-----
# Version: GnuPG v1
#
# iQEcBAABAgAGBQJjW2i1AAoJEO8Ells5jWIR5HMIAIvDEmWQ2eZ1R+CfsefXkD5H
# W3RSZbMrOHR6sb9cbYpqK/vWmH8E/jZkKY4n/q7vQ3QerFMeDPgxu0Qn43iElLXS
# iGHhC51fa5IwJNDomjUGI8oJzyk0sxAbgwOjXZ4qbAkk9KeQYWU+JqYghvOrBYbd
# VaIwEiBlECuBy0DKx2gBTfxgeuw3V3WvtjpKeZVRlR+4vLQtI9Goga78qIPfjpH2
# sN/lFhZGaX1FT8DSft0oCCBxCK8ZaNzmMpmr39a8+e4g/EJZC9xbgNkl3fN0yYa5
# P0nluv/Z6e1TZ4FBDaiVFysTS5WnS0KRjUrodzctiGECE3sQiAvkWbKZ+QdGrlw=
# =0/b/
# -----END PGP SIGNATURE-----
# gpg: Signature made Fri 28 Oct 2022 01:29:25 EDT
# gpg: using RSA key EF04965B398D6211
# gpg: Good signature from "Jason Wang (Jason Wang on RedHat) <jasowang@redhat.com>" [full]
# Primary key fingerprint: 215D 46F4 8246 689E C77F 3562 EF04 965B 398D 6211
* tag 'net-pull-request' of https://github.com/jasowang/qemu: (26 commits)
net: stream: add QAPI events to report connection state
net: stream: move to QIO to enable additional parameters
qemu-sockets: update socket_uri() and socket_parse() to be consistent
qemu-sockets: move and rename SocketAddress_to_str()
net: dgram: add unix socket
net: dgram: move mcast specific code from net_socket_fd_init_dgram()
net: dgram: make dgram_dst generic
net: stream: add unix socket
net: stream: Don't ignore EINVAL on netdev socket connection
net: socket: Don't ignore EINVAL on netdev socket connection
qapi: net: add stream and dgram netdevs
net: introduce qemu_set_info_str() function
qapi: net: introduce a way to bypass qemu_opts_parse_noisily()
net: simplify net_client_parse() error management
net: remove the @errp argument of net_client_inits()
net: introduce convert_host_port()
vhost: Accept event idx flag
vhost: use avail event idx on vhost_svq_kick
vhost: toggle device callbacks using used event idx
vhost: allocate event_idx fields on vring
...
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Diffstat (limited to 'net/stream.c')
-rw-r--r-- | net/stream.c | 386 |
1 files changed, 386 insertions, 0 deletions
diff --git a/net/stream.c b/net/stream.c new file mode 100644 index 0000000000..53b7040cc4 --- /dev/null +++ b/net/stream.c @@ -0,0 +1,386 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2022 Red Hat, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" + +#include "net/net.h" +#include "clients.h" +#include "monitor/monitor.h" +#include "qapi/error.h" +#include "qemu/error-report.h" +#include "qemu/option.h" +#include "qemu/sockets.h" +#include "qemu/iov.h" +#include "qemu/main-loop.h" +#include "qemu/cutils.h" +#include "io/channel.h" +#include "io/channel-socket.h" +#include "io/net-listener.h" +#include "qapi/qapi-events-net.h" + +typedef struct NetStreamState { + NetClientState nc; + QIOChannel *listen_ioc; + QIONetListener *listener; + QIOChannel *ioc; + guint ioc_read_tag; + guint ioc_write_tag; + SocketReadState rs; + unsigned int send_index; /* number of bytes sent*/ +} NetStreamState; + +static void net_stream_listen(QIONetListener *listener, + QIOChannelSocket *cioc, + void *opaque); + +static gboolean net_stream_writable(QIOChannel *ioc, + GIOCondition condition, + gpointer data) +{ + NetStreamState *s = data; + + s->ioc_write_tag = 0; + + qemu_flush_queued_packets(&s->nc); + + return G_SOURCE_REMOVE; +} + +static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf, + size_t size) +{ + NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); + uint32_t len = htonl(size); + struct iovec iov[] = { + { + .iov_base = &len, + .iov_len = sizeof(len), + }, { + .iov_base = (void *)buf, + .iov_len = size, + }, + }; + struct iovec local_iov[2]; + unsigned int nlocal_iov; + size_t remaining; + ssize_t ret; + + remaining = iov_size(iov, 2) - s->send_index; + nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining); + ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL); + if (ret == QIO_CHANNEL_ERR_BLOCK) { + ret = 0; /* handled further down */ + } + if (ret == -1) { + s->send_index = 0; + return -errno; + } + if (ret < (ssize_t)remaining) { + s->send_index += ret; + s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT, + net_stream_writable, s, NULL); + return 0; + } + s->send_index = 0; + return size; +} + +static gboolean net_stream_send(QIOChannel *ioc, + GIOCondition condition, + gpointer data); + +static void net_stream_send_completed(NetClientState *nc, ssize_t len) +{ + NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); + + if (!s->ioc_read_tag) { + s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, + net_stream_send, s, NULL); + } +} + +static void net_stream_rs_finalize(SocketReadState *rs) +{ + NetStreamState *s = container_of(rs, NetStreamState, rs); + + if (qemu_send_packet_async(&s->nc, rs->buf, + rs->packet_len, + net_stream_send_completed) == 0) { + if (s->ioc_read_tag) { + g_source_remove(s->ioc_read_tag); + s->ioc_read_tag = 0; + } + } +} + +static gboolean net_stream_send(QIOChannel *ioc, + GIOCondition condition, + gpointer data) +{ + NetStreamState *s = data; + int size; + int ret; + char buf1[NET_BUFSIZE]; + const char *buf; + + size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL); + if (size < 0) { + if (errno != EWOULDBLOCK) { + goto eoc; + } + } else if (size == 0) { + /* end of connection */ + eoc: + s->ioc_read_tag = 0; + if (s->ioc_write_tag) { + g_source_remove(s->ioc_write_tag); + s->ioc_write_tag = 0; + } + if (s->listener) { + qio_net_listener_set_client_func(s->listener, net_stream_listen, + s, NULL); + } + object_unref(OBJECT(s->ioc)); + s->ioc = NULL; + + net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); + s->nc.link_down = true; + qemu_set_info_str(&s->nc, ""); + + qapi_event_send_netdev_stream_disconnected(s->nc.name); + + return G_SOURCE_REMOVE; + } + buf = buf1; + + ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size); + + if (ret == -1) { + goto eoc; + } + + return G_SOURCE_CONTINUE; +} + +static void net_stream_cleanup(NetClientState *nc) +{ + NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc); + if (s->ioc) { + if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) { + if (s->ioc_read_tag) { + g_source_remove(s->ioc_read_tag); + s->ioc_read_tag = 0; + } + if (s->ioc_write_tag) { + g_source_remove(s->ioc_write_tag); + s->ioc_write_tag = 0; + } + } + object_unref(OBJECT(s->ioc)); + s->ioc = NULL; + } + if (s->listen_ioc) { + if (s->listener) { + qio_net_listener_disconnect(s->listener); + object_unref(OBJECT(s->listener)); + s->listener = NULL; + } + object_unref(OBJECT(s->listen_ioc)); + s->listen_ioc = NULL; + } +} + +static NetClientInfo net_stream_info = { + .type = NET_CLIENT_DRIVER_STREAM, + .size = sizeof(NetStreamState), + .receive = net_stream_receive, + .cleanup = net_stream_cleanup, +}; + +static void net_stream_listen(QIONetListener *listener, + QIOChannelSocket *cioc, + void *opaque) +{ + NetStreamState *s = opaque; + SocketAddress *addr; + char *uri; + + object_ref(OBJECT(cioc)); + + qio_net_listener_set_client_func(s->listener, NULL, s, NULL); + + s->ioc = QIO_CHANNEL(cioc); + qio_channel_set_name(s->ioc, "stream-server"); + s->nc.link_down = false; + + s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, + s, NULL); + + if (cioc->localAddr.ss_family == AF_UNIX) { + addr = qio_channel_socket_get_local_address(cioc, NULL); + } else { + addr = qio_channel_socket_get_remote_address(cioc, NULL); + } + g_assert(addr != NULL); + uri = socket_uri(addr); + qemu_set_info_str(&s->nc, uri); + g_free(uri); + qapi_event_send_netdev_stream_connected(s->nc.name, addr); + qapi_free_SocketAddress(addr); +} + +static void net_stream_server_listening(QIOTask *task, gpointer opaque) +{ + NetStreamState *s = opaque; + QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc); + SocketAddress *addr; + int ret; + + if (listen_sioc->fd < 0) { + qemu_set_info_str(&s->nc, "connection error"); + return; + } + + addr = qio_channel_socket_get_local_address(listen_sioc, NULL); + g_assert(addr != NULL); + ret = qemu_socket_try_set_nonblock(listen_sioc->fd); + if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { + qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", + addr->u.fd.str, -ret); + return; + } + g_assert(ret == 0); + qapi_free_SocketAddress(addr); + + s->nc.link_down = true; + s->listener = qio_net_listener_new(); + + net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); + qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL); + qio_net_listener_add(s->listener, listen_sioc); +} + +static int net_stream_server_init(NetClientState *peer, + const char *model, + const char *name, + SocketAddress *addr, + Error **errp) +{ + NetClientState *nc; + NetStreamState *s; + QIOChannelSocket *listen_sioc = qio_channel_socket_new(); + + nc = qemu_new_net_client(&net_stream_info, peer, model, name); + s = DO_UPCAST(NetStreamState, nc, nc); + + s->listen_ioc = QIO_CHANNEL(listen_sioc); + qio_channel_socket_listen_async(listen_sioc, addr, 0, + net_stream_server_listening, s, + NULL, NULL); + + return 0; +} + +static void net_stream_client_connected(QIOTask *task, gpointer opaque) +{ + NetStreamState *s = opaque; + QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc); + SocketAddress *addr; + gchar *uri; + int ret; + + if (sioc->fd < 0) { + qemu_set_info_str(&s->nc, "connection error"); + goto error; + } + + addr = qio_channel_socket_get_remote_address(sioc, NULL); + g_assert(addr != NULL); + uri = socket_uri(addr); + qemu_set_info_str(&s->nc, uri); + g_free(uri); + + ret = qemu_socket_try_set_nonblock(sioc->fd); + if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) { + qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)", + addr->u.fd.str, -ret); + qapi_free_SocketAddress(addr); + goto error; + } + g_assert(ret == 0); + + net_socket_rs_init(&s->rs, net_stream_rs_finalize, false); + + /* Disable Nagle algorithm on TCP sockets to reduce latency */ + qio_channel_set_delay(s->ioc, false); + + s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send, + s, NULL); + s->nc.link_down = false; + qapi_event_send_netdev_stream_connected(s->nc.name, addr); + qapi_free_SocketAddress(addr); + + return; +error: + object_unref(OBJECT(s->ioc)); + s->ioc = NULL; +} + +static int net_stream_client_init(NetClientState *peer, + const char *model, + const char *name, + SocketAddress *addr, + Error **errp) +{ + NetStreamState *s; + NetClientState *nc; + QIOChannelSocket *sioc = qio_channel_socket_new(); + + nc = qemu_new_net_client(&net_stream_info, peer, model, name); + s = DO_UPCAST(NetStreamState, nc, nc); + + s->ioc = QIO_CHANNEL(sioc); + s->nc.link_down = true; + + qio_channel_socket_connect_async(sioc, addr, + net_stream_client_connected, s, + NULL, NULL); + + return 0; +} + +int net_init_stream(const Netdev *netdev, const char *name, + NetClientState *peer, Error **errp) +{ + const NetdevStreamOptions *sock; + + assert(netdev->type == NET_CLIENT_DRIVER_STREAM); + sock = &netdev->u.stream; + + if (!sock->has_server || !sock->server) { + return net_stream_client_init(peer, "stream", name, sock->addr, errp); + } + return net_stream_server_init(peer, "stream", name, sock->addr, errp); +} |