From 559607ea173a0003efda7f884bec73b242f923fb Mon Sep 17 00:00:00 2001 From: Daniel P. Berrange Date: Fri, 27 Feb 2015 16:19:33 +0000 Subject: io: add QIOChannelSocket class Implement a QIOChannel subclass that supports sockets I/O. The implementation is able to manage a single socket file descriptor, whether a TCP/UNIX listener, TCP/UNIX connection, or a UDP datagram. It provides APIs which can listen and connect either asynchronously or synchronously. Since there is no asynchronous DNS lookup API available, it uses the QIOTask helper for spawning a background thread to ensure non-blocking operation. Signed-off-by: Daniel P. Berrange --- tests/.gitignore | 1 + tests/Makefile | 3 + tests/io-channel-helpers.c | 246 +++++++++++++++++++++++++ tests/io-channel-helpers.h | 42 +++++ tests/test-io-channel-socket.c | 399 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 691 insertions(+) create mode 100644 tests/io-channel-helpers.c create mode 100644 tests/io-channel-helpers.h create mode 100644 tests/test-io-channel-socket.c (limited to 'tests') diff --git a/tests/.gitignore b/tests/.gitignore index eec12cc2db..6164cfa2c5 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -24,6 +24,7 @@ test-cutils test-hbitmap test-int128 test-iov +test-io-channel-socket test-io-task test-mul64 test-opts-visitor diff --git a/tests/Makefile b/tests/Makefile index 515a7c7c30..b2e987cf56 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -85,6 +85,7 @@ check-unit-$(CONFIG_GNUTLS) += tests/test-crypto-tlssession$(EXESUF) check-unit-$(CONFIG_LINUX) += tests/test-qga$(EXESUF) check-unit-y += tests/test-timed-average$(EXESUF) check-unit-y += tests/test-io-task$(EXESUF) +check-unit-y += tests/test-io-channel-socket$(EXESUF) check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh @@ -472,6 +473,8 @@ tests/test-crypto-tlssession.o-cflags := $(TASN1_CFLAGS) tests/test-crypto-tlssession$(EXESUF): tests/test-crypto-tlssession.o \ tests/crypto-tls-x509-helpers.o tests/pkix_asn1_tab.o $(test-crypto-obj-y) tests/test-io-task$(EXESUF): tests/test-io-task.o $(test-io-obj-y) +tests/test-io-channel-socket$(EXESUF): tests/test-io-channel-socket.o \ + tests/io-channel-helpers.o $(test-io-obj-y) libqos-obj-y = tests/libqos/pci.o tests/libqos/fw_cfg.o tests/libqos/malloc.o libqos-obj-y += tests/libqos/i2c.o tests/libqos/libqos.o diff --git a/tests/io-channel-helpers.c b/tests/io-channel-helpers.c new file mode 100644 index 0000000000..78d36dd703 --- /dev/null +++ b/tests/io-channel-helpers.c @@ -0,0 +1,246 @@ +/* + * QEMU I/O channel test helpers + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + * + */ + +#include "io-channel-helpers.h" + +struct QIOChannelTest { + QIOChannel *src; + QIOChannel *dst; + bool blocking; + size_t len; + size_t niov; + char *input; + struct iovec *inputv; + char *output; + struct iovec *outputv; + Error *writeerr; + Error *readerr; +}; + + +static void test_skip_iovec(struct iovec **iov, + size_t *niov, + size_t skip, + struct iovec *old) +{ + size_t offset = 0; + size_t i; + + for (i = 0; i < *niov; i++) { + if (skip < (*iov)[i].iov_len) { + old->iov_len = (*iov)[i].iov_len; + old->iov_base = (*iov)[i].iov_base; + + (*iov)[i].iov_len -= skip; + (*iov)[i].iov_base += skip; + break; + } else { + skip -= (*iov)[i].iov_len; + + if (i == 0 && old->iov_base) { + (*iov)[i].iov_len = old->iov_len; + (*iov)[i].iov_base = old->iov_base; + old->iov_len = 0; + old->iov_base = NULL; + } + + offset++; + } + } + + *iov = *iov + offset; + *niov -= offset; +} + + +/* This thread sends all data using iovecs */ +static gpointer test_io_thread_writer(gpointer opaque) +{ + QIOChannelTest *data = opaque; + struct iovec *iov = data->inputv; + size_t niov = data->niov; + struct iovec old = { 0 }; + + qio_channel_set_blocking(data->src, data->blocking, NULL); + + while (niov) { + ssize_t ret; + ret = qio_channel_writev(data->src, + iov, + niov, + &data->writeerr); + if (ret == QIO_CHANNEL_ERR_BLOCK) { + if (data->blocking) { + error_setg(&data->writeerr, + "Unexpected I/O blocking"); + break; + } else { + qio_channel_wait(data->src, + G_IO_OUT); + continue; + } + } else if (ret < 0) { + break; + } else if (ret == 0) { + error_setg(&data->writeerr, + "Unexpected zero length write"); + break; + } + + test_skip_iovec(&iov, &niov, ret, &old); + } + + return NULL; +} + + +/* This thread receives all data using iovecs */ +static gpointer test_io_thread_reader(gpointer opaque) +{ + QIOChannelTest *data = opaque; + struct iovec *iov = data->outputv; + size_t niov = data->niov; + struct iovec old = { 0 }; + + qio_channel_set_blocking(data->dst, data->blocking, NULL); + + while (niov) { + ssize_t ret; + + ret = qio_channel_readv(data->dst, + iov, + niov, + &data->readerr); + + if (ret == QIO_CHANNEL_ERR_BLOCK) { + if (data->blocking) { + error_setg(&data->writeerr, + "Unexpected I/O blocking"); + break; + } else { + qio_channel_wait(data->dst, + G_IO_IN); + continue; + } + } else if (ret < 0) { + break; + } else if (ret == 0) { + break; + } + + test_skip_iovec(&iov, &niov, ret, &old); + } + + return NULL; +} + + +QIOChannelTest *qio_channel_test_new(void) +{ + QIOChannelTest *data = g_new0(QIOChannelTest, 1); + size_t i; + size_t offset; + + + /* We'll send 1 MB of data */ +#define CHUNK_COUNT 250 +#define CHUNK_LEN 4194 + + data->len = CHUNK_COUNT * CHUNK_LEN; + data->input = g_new0(char, data->len); + data->output = g_new0(gchar, data->len); + + /* Fill input with a pattern */ + for (i = 0; i < data->len; i += CHUNK_LEN) { + memset(data->input + i, (i / CHUNK_LEN), CHUNK_LEN); + } + + /* We'll split the data across a bunch of IO vecs */ + data->niov = CHUNK_COUNT; + data->inputv = g_new0(struct iovec, data->niov); + data->outputv = g_new0(struct iovec, data->niov); + + for (i = 0, offset = 0; i < data->niov; i++, offset += CHUNK_LEN) { + data->inputv[i].iov_base = data->input + offset; + data->outputv[i].iov_base = data->output + offset; + data->inputv[i].iov_len = CHUNK_LEN; + data->outputv[i].iov_len = CHUNK_LEN; + } + + return data; +} + +void qio_channel_test_run_threads(QIOChannelTest *test, + bool blocking, + QIOChannel *src, + QIOChannel *dst) +{ + GThread *reader, *writer; + + test->src = src; + test->dst = dst; + test->blocking = blocking; + + reader = g_thread_new("reader", + test_io_thread_reader, + test); + writer = g_thread_new("writer", + test_io_thread_writer, + test); + + g_thread_join(reader); + g_thread_join(writer); + + test->dst = test->src = NULL; +} + + +void qio_channel_test_run_writer(QIOChannelTest *test, + QIOChannel *src) +{ + test->src = src; + test_io_thread_writer(test); + test->src = NULL; +} + + +void qio_channel_test_run_reader(QIOChannelTest *test, + QIOChannel *dst) +{ + test->dst = dst; + test_io_thread_reader(test); + test->dst = NULL; +} + + +void qio_channel_test_validate(QIOChannelTest *test) +{ + g_assert_cmpint(memcmp(test->input, + test->output, + test->len), ==, 0); + g_assert(test->readerr == NULL); + g_assert(test->writeerr == NULL); + + g_free(test->inputv); + g_free(test->outputv); + g_free(test->input); + g_free(test->output); + g_free(test); +} diff --git a/tests/io-channel-helpers.h b/tests/io-channel-helpers.h new file mode 100644 index 0000000000..fedc64fd5a --- /dev/null +++ b/tests/io-channel-helpers.h @@ -0,0 +1,42 @@ +/* + * QEMU I/O channel test helpers + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + * + */ + +#include "io/channel.h" + +#ifndef TEST_IO_CHANNEL_HELPERS +#define TEST_IO_CHANNEL_HELPERS + +typedef struct QIOChannelTest QIOChannelTest; + +QIOChannelTest *qio_channel_test_new(void); + +void qio_channel_test_run_threads(QIOChannelTest *test, + bool blocking, + QIOChannel *src, + QIOChannel *dst); + +void qio_channel_test_run_writer(QIOChannelTest *test, + QIOChannel *src); +void qio_channel_test_run_reader(QIOChannelTest *test, + QIOChannel *dst); + +void qio_channel_test_validate(QIOChannelTest *test); + +#endif /* TEST_IO_CHANNEL_HELPERS */ diff --git a/tests/test-io-channel-socket.c b/tests/test-io-channel-socket.c new file mode 100644 index 0000000000..194d043878 --- /dev/null +++ b/tests/test-io-channel-socket.c @@ -0,0 +1,399 @@ +/* + * QEMU I/O channel sockets test + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + * + */ + +#include "io/channel-socket.h" +#include "io-channel-helpers.h" +#ifdef HAVE_IFADDRS_H +#include +#endif + +static int check_protocol_support(bool *has_ipv4, bool *has_ipv6) +{ +#ifdef HAVE_IFADDRS_H + struct ifaddrs *ifaddr = NULL, *ifa; + struct addrinfo hints = { 0 }; + struct addrinfo *ai = NULL; + int gaierr; + + *has_ipv4 = *has_ipv6 = false; + + if (getifaddrs(&ifaddr) < 0) { + g_printerr("Failed to lookup interface addresses: %s\n", + strerror(errno)); + return -1; + } + + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { + if (!ifa->ifa_addr) { + continue; + } + + if (ifa->ifa_addr->sa_family == AF_INET) { + *has_ipv4 = true; + } + if (ifa->ifa_addr->sa_family == AF_INET6) { + *has_ipv6 = true; + } + } + + freeifaddrs(ifaddr); + + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + hints.ai_family = AF_INET6; + hints.ai_socktype = SOCK_STREAM; + + gaierr = getaddrinfo("::1", NULL, &hints, &ai); + if (gaierr != 0) { + if (gaierr == EAI_ADDRFAMILY || + gaierr == EAI_FAMILY || + gaierr == EAI_NONAME) { + *has_ipv6 = false; + } else { + g_printerr("Failed to resolve ::1 address: %s\n", + gai_strerror(gaierr)); + return -1; + } + } + + freeaddrinfo(ai); + + return 0; +#else + *has_ipv4 = *has_ipv6 = false; + + return -1; +#endif +} + + +static void test_io_channel_set_socket_bufs(QIOChannel *src, + QIOChannel *dst) +{ + int buflen = 64 * 1024; + + /* + * Make the socket buffers small so that we see + * the effects of partial reads/writes + */ + setsockopt(((QIOChannelSocket *)src)->fd, + SOL_SOCKET, SO_SNDBUF, + (char *)&buflen, + sizeof(buflen)); + + setsockopt(((QIOChannelSocket *)dst)->fd, + SOL_SOCKET, SO_SNDBUF, + (char *)&buflen, + sizeof(buflen)); +} + + +static void test_io_channel_setup_sync(SocketAddress *listen_addr, + SocketAddress *connect_addr, + QIOChannel **src, + QIOChannel **dst) +{ + QIOChannelSocket *lioc; + + lioc = qio_channel_socket_new(); + qio_channel_socket_listen_sync(lioc, listen_addr, &error_abort); + + if (listen_addr->type == SOCKET_ADDRESS_KIND_INET) { + SocketAddress *laddr = qio_channel_socket_get_local_address( + lioc, &error_abort); + + g_free(connect_addr->u.inet->port); + connect_addr->u.inet->port = g_strdup(laddr->u.inet->port); + + qapi_free_SocketAddress(laddr); + } + + *src = QIO_CHANNEL(qio_channel_socket_new()); + qio_channel_socket_connect_sync( + QIO_CHANNEL_SOCKET(*src), connect_addr, &error_abort); + qio_channel_set_delay(*src, false); + + *dst = QIO_CHANNEL(qio_channel_socket_accept(lioc, &error_abort)); + g_assert(*dst); + + test_io_channel_set_socket_bufs(*src, *dst); + + object_unref(OBJECT(lioc)); +} + + +struct TestIOChannelData { + bool err; + GMainLoop *loop; +}; + + +static void test_io_channel_complete(Object *src, + Error *err, + gpointer opaque) +{ + struct TestIOChannelData *data = opaque; + data->err = err != NULL; + g_main_loop_quit(data->loop); +} + + +static void test_io_channel_setup_async(SocketAddress *listen_addr, + SocketAddress *connect_addr, + QIOChannel **src, + QIOChannel **dst) +{ + QIOChannelSocket *lioc; + struct TestIOChannelData data; + + data.loop = g_main_loop_new(g_main_context_default(), + TRUE); + + lioc = qio_channel_socket_new(); + qio_channel_socket_listen_async( + lioc, listen_addr, + test_io_channel_complete, &data, NULL); + + g_main_loop_run(data.loop); + g_main_context_iteration(g_main_context_default(), FALSE); + + g_assert(!data.err); + + if (listen_addr->type == SOCKET_ADDRESS_KIND_INET) { + SocketAddress *laddr = qio_channel_socket_get_local_address( + lioc, &error_abort); + + g_free(connect_addr->u.inet->port); + connect_addr->u.inet->port = g_strdup(laddr->u.inet->port); + + qapi_free_SocketAddress(laddr); + } + + *src = QIO_CHANNEL(qio_channel_socket_new()); + + qio_channel_socket_connect_async( + QIO_CHANNEL_SOCKET(*src), connect_addr, + test_io_channel_complete, &data, NULL); + + g_main_loop_run(data.loop); + g_main_context_iteration(g_main_context_default(), FALSE); + + g_assert(!data.err); + + *dst = QIO_CHANNEL(qio_channel_socket_accept(lioc, &error_abort)); + g_assert(*dst); + + qio_channel_set_delay(*src, false); + test_io_channel_set_socket_bufs(*src, *dst); + + object_unref(OBJECT(lioc)); + + g_main_loop_unref(data.loop); +} + + +static void test_io_channel(bool async, + SocketAddress *listen_addr, + SocketAddress *connect_addr) +{ + QIOChannel *src, *dst; + QIOChannelTest *test; + if (async) { + test_io_channel_setup_async(listen_addr, connect_addr, &src, &dst); + + test = qio_channel_test_new(); + qio_channel_test_run_threads(test, true, src, dst); + qio_channel_test_validate(test); + + object_unref(OBJECT(src)); + object_unref(OBJECT(dst)); + + test_io_channel_setup_async(listen_addr, connect_addr, &src, &dst); + + test = qio_channel_test_new(); + qio_channel_test_run_threads(test, false, src, dst); + qio_channel_test_validate(test); + + object_unref(OBJECT(src)); + object_unref(OBJECT(dst)); + } else { + test_io_channel_setup_sync(listen_addr, connect_addr, &src, &dst); + + test = qio_channel_test_new(); + qio_channel_test_run_threads(test, true, src, dst); + qio_channel_test_validate(test); + + object_unref(OBJECT(src)); + object_unref(OBJECT(dst)); + + test_io_channel_setup_sync(listen_addr, connect_addr, &src, &dst); + + test = qio_channel_test_new(); + qio_channel_test_run_threads(test, false, src, dst); + qio_channel_test_validate(test); + + object_unref(OBJECT(src)); + object_unref(OBJECT(dst)); + } +} + + +static void test_io_channel_ipv4(bool async) +{ + SocketAddress *listen_addr = g_new0(SocketAddress, 1); + SocketAddress *connect_addr = g_new0(SocketAddress, 1); + + listen_addr->type = SOCKET_ADDRESS_KIND_INET; + listen_addr->u.inet = g_new0(InetSocketAddress, 1); + listen_addr->u.inet->host = g_strdup("0.0.0.0"); + listen_addr->u.inet->port = NULL; /* Auto-select */ + + connect_addr->type = SOCKET_ADDRESS_KIND_INET; + connect_addr->u.inet = g_new0(InetSocketAddress, 1); + connect_addr->u.inet->host = g_strdup("127.0.0.1"); + connect_addr->u.inet->port = NULL; /* Filled in later */ + + test_io_channel(async, listen_addr, connect_addr); + + qapi_free_SocketAddress(listen_addr); + qapi_free_SocketAddress(connect_addr); +} + + +static void test_io_channel_ipv4_sync(void) +{ + return test_io_channel_ipv4(false); +} + + +static void test_io_channel_ipv4_async(void) +{ + return test_io_channel_ipv4(true); +} + + +static void test_io_channel_ipv6(bool async) +{ + SocketAddress *listen_addr = g_new0(SocketAddress, 1); + SocketAddress *connect_addr = g_new0(SocketAddress, 1); + + listen_addr->type = SOCKET_ADDRESS_KIND_INET; + listen_addr->u.inet = g_new0(InetSocketAddress, 1); + listen_addr->u.inet->host = g_strdup("::"); + listen_addr->u.inet->port = NULL; /* Auto-select */ + + connect_addr->type = SOCKET_ADDRESS_KIND_INET; + connect_addr->u.inet = g_new0(InetSocketAddress, 1); + connect_addr->u.inet->host = g_strdup("::1"); + connect_addr->u.inet->port = NULL; /* Filled in later */ + + test_io_channel(async, listen_addr, connect_addr); + + qapi_free_SocketAddress(listen_addr); + qapi_free_SocketAddress(connect_addr); +} + + +static void test_io_channel_ipv6_sync(void) +{ + return test_io_channel_ipv6(false); +} + + +static void test_io_channel_ipv6_async(void) +{ + return test_io_channel_ipv6(true); +} + + +#ifndef _WIN32 +static void test_io_channel_unix(bool async) +{ + SocketAddress *listen_addr = g_new0(SocketAddress, 1); + SocketAddress *connect_addr = g_new0(SocketAddress, 1); + +#define TEST_SOCKET "test-io-channel-socket.sock" + listen_addr->type = SOCKET_ADDRESS_KIND_UNIX; + listen_addr->u.q_unix = g_new0(UnixSocketAddress, 1); + listen_addr->u.q_unix->path = g_strdup(TEST_SOCKET); + + connect_addr->type = SOCKET_ADDRESS_KIND_UNIX; + connect_addr->u.q_unix = g_new0(UnixSocketAddress, 1); + connect_addr->u.q_unix->path = g_strdup(TEST_SOCKET); + + test_io_channel(async, listen_addr, connect_addr); + + qapi_free_SocketAddress(listen_addr); + qapi_free_SocketAddress(connect_addr); + unlink(TEST_SOCKET); +} + + +static void test_io_channel_unix_sync(void) +{ + return test_io_channel_unix(false); +} + + +static void test_io_channel_unix_async(void) +{ + return test_io_channel_unix(true); +} +#endif /* _WIN32 */ + + +int main(int argc, char **argv) +{ + bool has_ipv4, has_ipv6; + + module_call_init(MODULE_INIT_QOM); + + g_test_init(&argc, &argv, NULL); + + /* We're creating actual IPv4/6 sockets, so we should + * check if the host running tests actually supports + * each protocol to avoid breaking tests on machines + * with either IPv4 or IPv6 disabled. + */ + if (check_protocol_support(&has_ipv4, &has_ipv6) < 0) { + return 1; + } + + if (has_ipv4) { + g_test_add_func("/io/channel/socket/ipv4-sync", + test_io_channel_ipv4_sync); + g_test_add_func("/io/channel/socket/ipv4-async", + test_io_channel_ipv4_async); + } + if (has_ipv6) { + g_test_add_func("/io/channel/socket/ipv6-sync", + test_io_channel_ipv6_sync); + g_test_add_func("/io/channel/socket/ipv6-async", + test_io_channel_ipv6_async); + } + +#ifndef _WIN32 + g_test_add_func("/io/channel/socket/unix-sync", + test_io_channel_unix_sync); + g_test_add_func("/io/channel/socket/unix-async", + test_io_channel_unix_async); +#endif /* _WIN32 */ + + return g_test_run(); +} -- cgit v1.2.3-55-g7522