From 6151710bbd5226f8e4f081f912c532fce70624c6 Mon Sep 17 00:00:00 2001 From: Frederic Robra Date: Thu, 11 Jul 2019 17:45:49 +0200 Subject: added first draft of rtt measurement --- src/kernel/core.c | 1 + src/kernel/dnbd3.h | 1 + src/kernel/net.c | 232 ++++++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 205 insertions(+), 29 deletions(-) diff --git a/src/kernel/core.c b/src/kernel/core.c index c2d14d2..1347bcd 100644 --- a/src/kernel/core.c +++ b/src/kernel/core.c @@ -341,6 +341,7 @@ int dnbd3_add_device(struct dnbd3_device *dev, int minor) for (i = 0; i < NUMBER_CONNECTIONS; i++) { dev->socks[i].device = dev; + dev->socks[i].sock_nr = i; } disk = alloc_disk(1); diff --git a/src/kernel/dnbd3.h b/src/kernel/dnbd3.h index cde94a2..f3f848a 100644 --- a/src/kernel/dnbd3.h +++ b/src/kernel/dnbd3.h @@ -45,6 +45,7 @@ struct dnbd3_server { struct dnbd3_sock { + uint16_t sock_nr; struct socket *sock; struct mutex lock; struct request *pending; diff --git a/src/kernel/net.c b/src/kernel/net.c index 7df8369..0f875c5 100644 --- a/src/kernel/net.c +++ b/src/kernel/net.c @@ -49,10 +49,12 @@ } while (0) static DECLARE_WAIT_QUEUE_HEAD(send_wq); -static uint64_t send_wq_handle; +static volatile uint64_t send_wq_signal; static int dnbd3_socket_connect(struct dnbd3_device *dev, struct dnbd3_server *server); -static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock); +static int __dnbd3_socket_connect(struct dnbd3_server * server, struct dnbd3_sock *sock); +static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock, bool wait_for_receiver); + static void dnbd3_print_host(struct dnbd3_host_t *host, char *msg) { @@ -74,6 +76,12 @@ static void dnbd3_print_server_list(struct dnbd3_device *dev) } } + +static inline uint64_t dnbd3_to_wq_signal(int minor, uint16_t dnbd3_cmd, uint16_t sock_nr) { + return ((uint64_t) minor << 32) | ((uint32_t) dnbd3_cmd << 16) | sock_nr; +} + + static uint64_t dnbd3_to_handle(uint32_t arg0, uint32_t arg1) { return ((uint64_t) arg0 << 32) | arg1; } @@ -159,7 +167,7 @@ int dnbd3_send_request_blocking(struct dnbd3_sock *sock, int dnbd3_cmd) { int result = 0; uint64_t handle; - struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC ); + struct request *req = kmalloc(sizeof(struct request), GFP_KERNEL); printk(KERN_DEBUG "dnbd3: starting blocking request\n"); if (!req) { printk(KERN_ERR "dnbd3: kmalloc failed\n"); @@ -186,13 +194,13 @@ int dnbd3_send_request_blocking(struct dnbd3_sock *sock, int dnbd3_cmd) mutex_unlock(&sock->lock); goto error; } - send_wq_handle = 0; - handle = dnbd3_to_handle(sock->device->minor, dnbd3_cmd); + send_wq_signal = 0; + handle = dnbd3_to_wq_signal(sock->device->minor, dnbd3_cmd, sock->sock_nr); mutex_unlock(&sock->lock); printk(KERN_DEBUG "dnbd3: blocking request going to sleep wait for handle %llu\n", handle); - if (wait_event_interruptible_timeout(send_wq, handle == send_wq_handle, REQUEST_TIMEOUT) <= 0) { // timeout or interrupt + if (wait_event_interruptible_timeout(send_wq, handle == send_wq_signal, 10 /* REQUEST_TIMEOUT*/) <= 0) { // timeout or interrupt printk(KERN_WARNING "dndbd3: request timed out\n"); result = -EIO; goto error; @@ -240,7 +248,6 @@ static void dnbd3_receive_worker(struct work_struct *work) goto error; } - result = 0; fixup_reply(dnbd3_reply); // check error @@ -289,7 +296,8 @@ static void dnbd3_receive_worker(struct work_struct *work) kaddr = kmap(bvec->bv_page) + bvec->bv_offset; iov.iov_base = kaddr; iov.iov_len = bvec->bv_len; - if (kernel_recvmsg(sock->sock, &msg, &iov, 1, bvec->bv_len, msg.msg_flags) != bvec->bv_len) { + result = kernel_recvmsg(sock->sock, &msg, &iov, 1, bvec->bv_len, msg.msg_flags); + if (result != bvec->bv_len) { kunmap(bvec->bv_page); sigprocmask(SIG_SETMASK, &oldset, NULL ); printk(KERN_ERR "dnbd3: could not receive form net to block layer\n"); @@ -316,7 +324,8 @@ static void dnbd3_receive_worker(struct work_struct *work) if (count != 0) { iov.iov_base = dev->new_servers; iov.iov_len = count * sizeof(dnbd3_server_entry_t); - if (kernel_recvmsg(sock->sock, &msg, &iov, 1, (count * sizeof(dnbd3_server_entry_t)), msg.msg_flags) != (count * sizeof(dnbd3_server_entry_t))) { + result = kernel_recvmsg(sock->sock, &msg, &iov, 1, (count * sizeof(dnbd3_server_entry_t)), msg.msg_flags); + if (result != (count * sizeof(dnbd3_server_entry_t))) { printk(KERN_ERR "dnbd3: failed to get servers\n"); mutex_unlock(&dev->device_lock); goto error; @@ -336,7 +345,6 @@ consume_payload: mutex_unlock(&dev->device_lock); goto error; } - result = 0; } mutex_unlock(&dev->device_lock); break; @@ -348,7 +356,8 @@ consume_payload: printk(KERN_DEBUG "dnbd3: latest rid received\n"); iov.iov_base = &rid; iov.iov_len = sizeof(rid); - if (kernel_recvmsg(sock->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags) <= 0) { + result = kernel_recvmsg(sock->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags); + if (result <= 0) { printk(KERN_ERR "dnbd3: failed to get latest rid\n"); goto error; } @@ -368,12 +377,12 @@ consume_payload: // receive reply payload iov.iov_base = &payload_buffer; iov.iov_len = dnbd3_reply.size; - if ((result = kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags)) != dnbd3_reply.size) { + result = kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags); + if (result != dnbd3_reply.size) { printk(KERN_ERR "dnbd3: could not read CMD_SELECT_IMAGE payload on handshake, size is %d and should be%d\n", result, dnbd3_reply.size); goto error; } - result = 0; // handle/check reply payload serializer_reset_read(&payload_buffer, dnbd3_reply.size); @@ -408,11 +417,15 @@ consume_payload: break; } error: - handle = dnbd3_to_handle(dev->minor, dnbd3_reply.cmd); + handle = dnbd3_to_wq_signal(dev->minor, dnbd3_reply.cmd, sock->sock_nr); printk(KERN_DEBUG "dnbd3: try to wake up queue with handle %llu\n", handle); - send_wq_handle = handle; + send_wq_signal = handle; wake_up_interruptible(&send_wq); - if (result) { + if (result == 0) { + printk(KERN_INFO "dnbd3: result is 0, socket seems to be down\n"); +// dnbd3_socket_disconnect(dev, NULL, sock, false);//TODO use panic or something or start worker to reconnect? + break; //the socket seems to be down + } else if (result < 0) { sock->server->failures++; // discovery takes care of to many failures printk(KERN_WARNING "dnbd3: receive error happened %d, total failures %d\n", result, sock->server->failures); } @@ -449,13 +462,24 @@ static void dnbd3_discovery_timer(struct timer_list *arg) static void dnbd3_discovery_worker(struct work_struct *work) { struct dnbd3_device *dev = container_of(work, struct dnbd3_device, discovery); - struct dnbd3_sock *sock = &dev->socks[0]; // we use the first sock for discovery + struct dnbd3_sock *sock = NULL; int i, j; struct dnbd3_server *existing_server, *free_server, *failed_server; dnbd3_server_entry_t *new_server; + struct kvec iov; + struct timeval start, end; + dnbd3_request_t dnbd3_request; + dnbd3_reply_t dnbd3_reply; + struct msghdr msg; + char *buf, *name; + struct request *req = NULL; + uint64_t rtt; + serialized_buffer_t *payload; + uint64_t filesize; + uint16_t rid; printk(KERN_DEBUG "dnbd3: starting discovery worker\n"); - dnbd3_send_request_blocking(sock, CMD_GET_SERVERS); + dnbd3_send_request_blocking(&dev->socks[0], CMD_GET_SERVERS); printk(KERN_DEBUG "dnbd3: new server num is %d\n", dev->new_servers_num); if (dev->new_servers_num) { @@ -487,7 +511,7 @@ static void dnbd3_discovery_worker(struct work_struct *work) if (existing_server) { if (new_server->failures == 1) { // remove is requested dnbd3_print_host(&existing_server->host, "remove server"); - dnbd3_socket_disconnect(dev, existing_server, NULL); // TODO what to do when only one connection? + dnbd3_socket_disconnect(dev, existing_server, NULL, true); // TODO what to do when only one connection? existing_server->host.type = 0; } // ADD, so just reset fail counter @@ -511,28 +535,175 @@ static void dnbd3_discovery_worker(struct work_struct *work) dev->new_servers_num = 0; mutex_unlock(&dev->device_lock); } - + buf = kmalloc(RTT_BLOCK_SIZE, GFP_KERNEL); + if (!buf) { + printk(KERN_ERR "dnbd3: kmalloc failed\n"); + goto error; + } + payload = (serialized_buffer_t *)buf; + req = kmalloc(sizeof(struct request), GFP_KERNEL); + if (!req) { + printk(KERN_ERR "dnbd3: kmalloc failed\n"); + goto error; + } + sock = kmalloc(sizeof(struct dnbd3_sock), GFP_KERNEL); + if (!sock) { + printk(KERN_ERR "dnbd3: kmalloc failed\n"); + goto error; + } // measure rtt for all alt servers for (i = 0; i < NUMBER_SERVERS; i++) { + existing_server = &dev->alt_servers[i]; + if (existing_server->host.type) { + sock->sock = NULL; + sock->device = dev; + init_msghdr(msg); + if (__dnbd3_socket_connect(existing_server, sock)) { + printk(KERN_ERR "dnbd3: socket connect failed in rtt measurement\n"); + goto rtt_error; + } + dnbd3_connect(req); + if (dnbd3_send_request(sock, req, NULL)) { + printk(KERN_ERR "dnbd3: request select image failed in rtt measurement\n"); + goto rtt_error; + } + iov.iov_base = &dnbd3_reply; + iov.iov_len = sizeof(dnbd3_reply); + if (kernel_recvmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags) != sizeof(dnbd3_reply)) { + printk(KERN_ERR "dnbd3: receive select image failed in rtt measurement %d\n", j); + goto rtt_error; + } + fixup_reply(dnbd3_reply); + if (dnbd3_reply.magic != dnbd3_packet_magic || dnbd3_reply.cmd != CMD_SELECT_IMAGE || dnbd3_reply.size < 4) { + printk(KERN_ERR "dnbd3: receive select image wrong header in rtt measurement\n"); + goto rtt_error; + } + // receive data + iov.iov_base = payload; + iov.iov_len = dnbd3_reply.size; + if (kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags) != dnbd3_reply.size) { + printk(KERN_ERR "dnbd3: receive data select image failed in rtt measurement\n"); + goto rtt_error; + } + serializer_reset_read(payload, dnbd3_reply.size); + + existing_server->protocol_version = serializer_get_uint16(payload); + if (existing_server->protocol_version < MIN_SUPPORTED_SERVER) { + printk(KERN_ERR "dnbd3: server version to old in rtt measurement\n"); + goto rtt_error; + } + + name = serializer_get_string(payload); + if (name == NULL ) { + printk(KERN_ERR "dnbd3: server did not supply an image name in rtt measurement\n"); + goto rtt_error; + } + + if (strcmp(name, dev->imgname) != 0) { + printk(KERN_ERR "dnbd3: image name %s does not match requested %s in rtt measurement\n", name, dev->imgname); + goto rtt_error; + } +// TODO rid is 1 but dev->rid is 0 + rid = serializer_get_uint16(payload); +// if (rid != dev->rid) { +// printk(KERN_ERR "dnbd3: rid %d does not match requested %d in rtt measurement\n", rid, dev->rid); +// goto rtt_error; +// } + + filesize = serializer_get_uint64(payload); + if (filesize != dev->reported_size) { + printk(KERN_ERR "dnbd3: image size %llu does not match requested %llu in rtt measurement\n", filesize, dev->reported_size); + goto rtt_error; + } + // Request block + dnbd3_request.cmd = CMD_GET_BLOCK; + // Do *NOT* pick a random block as it has proven to cause severe + // cache thrashing on the server + dnbd3_request.offset = 0; + dnbd3_request.size = RTT_BLOCK_SIZE; + fixup_request(dnbd3_request); + iov.iov_base = &dnbd3_request; + iov.iov_len = sizeof(dnbd3_request); + + init_msghdr(msg); + // start rtt measurement + do_gettimeofday(&start); + + if (kernel_sendmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0) { + printk(KERN_ERR "dnbd3: request test block failed in rtt measurement\n"); + goto rtt_error; + } + // receive net reply + iov.iov_base = &dnbd3_reply; + iov.iov_len = sizeof(dnbd3_reply); + if ((j = kernel_recvmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags)) != sizeof(dnbd3_reply)) { + printk(KERN_ERR "dnbd3: receive header test block failed in rtt measurement %d %ld\n", j, sizeof(dnbd3_reply)); + goto rtt_error; + } + fixup_reply(dnbd3_reply); + if (dnbd3_reply.magic != dnbd3_packet_magic|| dnbd3_reply.cmd != CMD_GET_BLOCK || dnbd3_reply.size != RTT_BLOCK_SIZE) { + printk(KERN_ERR "dnbd3: receive header cmd test block failed in rtt measurement\n"); + goto rtt_error; + } + + // receive data + iov.iov_base = buf; + iov.iov_len = RTT_BLOCK_SIZE; + if (kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags) != RTT_BLOCK_SIZE) { + printk(KERN_ERR "dnbd3: receive test block failed in rtt measurement\n"); + goto rtt_error; + } + + do_gettimeofday(&end); // end rtt measurement + + rtt = (uint64_t)((end.tv_sec - start.tv_sec) * 1000000ull + (end.tv_usec - start.tv_usec)); + + printk(KERN_DEBUG "dnbd3: new rrt for %pI4 is %llu\n", existing_server->host.addr, rtt); + +rtt_error: + if (sock->sock) { + kernel_sock_shutdown(sock->sock, SHUT_RDWR); + sock->server = NULL; + } + + if (sock->sock) { + sock_release(sock->sock); + sock->sock = NULL; + } + } + } + +error: + if (buf) { + kfree(buf); + buf = NULL; + } + if (req) { + kfree(req); + req = NULL; + } + if (sock) { + kfree(sock); + sock = NULL; } } -static int __dnbd3_socket_connect(struct dnbd3_server * server, struct dnbd3_sock *sock) +static int __dnbd3_socket_connect(struct dnbd3_server *server, struct dnbd3_sock *sock) { int result = 0; struct timeval timeout; if (server->host.port == 0 || server->host.type == 0) { printk(KERN_ERR "dnbd3: host or port not set\n"); - goto error; + return -EIO; } if (sock->sock) { printk(KERN_WARNING "dnbd3: socket already connected\n"); - goto error; + return -EIO; } - timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA + 2; + timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA; timeout.tv_usec = 0; if ((result = dnbd3_sock_create(server->host.type, SOCK_STREAM, IPPROTO_TCP, &sock->sock)) < 0) { @@ -636,7 +807,7 @@ error: } -static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock) +static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock, bool wait_for_receiver) { int i; int sock_alive = 0; @@ -652,12 +823,13 @@ static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server printk(KERN_WARNING "dnbd3: could not find socket to disconnect\n"); return -EIO; } - if (sock_alive == 1) { + if (sock_alive <= 1) { printk(KERN_INFO "dnbd3: shutting down last socket and stopping discovery\n"); del_timer_sync(&dev->discovery_timer); cancel_work_sync(&dev->discovery); } + printk(KERN_DEBUG "dnbd3: stopping keepalive\n"); del_timer_sync(&sock->keepalive_timer); cancel_work_sync(&sock->keepalive); @@ -680,8 +852,10 @@ static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server mutex_unlock(&sock->lock); mutex_destroy(&sock->lock); - printk(KERN_DEBUG "dnbd3: cancel receiver work device %i\n", dev->minor); - cancel_work_sync(&sock->receive); + if (wait_for_receiver) { + printk(KERN_DEBUG "dnbd3: cancel receiver work device %i\n", dev->minor); + cancel_work_sync(&sock->receive); + } if (sock->sock) { sock_release(sock->sock); @@ -697,7 +871,7 @@ int dnbd3_net_disconnect(struct dnbd3_device *dev) for (i = 0; i < NUMBER_CONNECTIONS; i++) { if (dev->socks[i].sock) { - if (dnbd3_socket_disconnect(dev, NULL, &dev->socks[i])) { + if (dnbd3_socket_disconnect(dev, NULL, &dev->socks[i], true)) { result = -EIO; } } -- cgit v1.2.3-55-g7522