From 69ebaaf47baa159c4d82e8c512c03d004e125175 Mon Sep 17 00:00:00 2001 From: Frederic Robra Date: Tue, 16 Jul 2019 16:53:17 +0200 Subject: discovery now connects to new slots --- src/kernel/core.c | 5 ++- src/kernel/dnbd3.h | 5 ++- src/kernel/net.c | 123 ++++++++++++++++++++++++++++++++++++++++++++--------- src/kernel/sysfs.c | 2 +- 4 files changed, 111 insertions(+), 24 deletions(-) diff --git a/src/kernel/core.c b/src/kernel/core.c index 2664090..9d930cb 100644 --- a/src/kernel/core.c +++ b/src/kernel/core.c @@ -82,7 +82,7 @@ static int dnbd3_handle_cmd(struct dnbd3_cmd *cmd, int index) int i; int sock_alive = 0; - debug_dev(dev, "handle request at position %lu and size %d", blk_rq_pos(req), blk_rq_bytes(req)); + debug_dev(dev, "handle request at position %lu, size %d, index %d", blk_rq_pos(req), blk_rq_bytes(req), index); // if (index >= 1) { // TODO use next server with good rtt for this request // printk(KERN_INFO "dnbd3: index is %d", index); @@ -94,7 +94,7 @@ static int dnbd3_handle_cmd(struct dnbd3_cmd *cmd, int index) for (i = 0; i < NUMBER_CONNECTIONS; i++) { if (dnbd3_is_sock_alive(dev->socks[i])) { if (index == sock_alive) { - sock = &dev->socks[index]; + sock = &dev->socks[i]; } sock_alive++; } @@ -260,6 +260,7 @@ static int dnbd3_ioctl(struct block_device *bdev, fmode_t mode, unsigned int cmd } memcpy(&dev->initial_server.host, &msg->host, sizeof(msg->host)); dev->initial_server.failures = 0; + dev->initial_server.rtts[0] = dev->initial_server.rtts[1] = dev->initial_server.rtts[2] = dev->initial_server.rtts[3] = RTT_UNREACHABLE; // memcpy(&dev->initial_server, &dev->cur_server, sizeof(dev->initial_server)); dev->imgname = imgname; dev->rid = msg->rid; diff --git a/src/kernel/dnbd3.h b/src/kernel/dnbd3.h index 0b225c9..43b923f 100644 --- a/src/kernel/dnbd3.h +++ b/src/kernel/dnbd3.h @@ -31,7 +31,7 @@ #include "serialize.h" -#define NUMBER_CONNECTIONS 4 +#define NUMBER_CONNECTIONS 4 // power of 2 #define DEBUG @@ -86,8 +86,9 @@ struct dnbd3_device { struct work_struct panic_worker; struct work_struct discovery_worker; // if in irq and need to send request + uint8_t discovery_count; struct timer_list timer; - uint32_t timer_count; + uint8_t timer_count; }; diff --git a/src/kernel/net.c b/src/kernel/net.c index 04dc5ba..6d8538d 100644 --- a/src/kernel/net.c +++ b/src/kernel/net.c @@ -191,7 +191,7 @@ int dnbd3_send_request_blocking(struct dnbd3_sock *sock, int dnbd3_cmd) mutex_unlock(&sock->lock); if (wait_event_interruptible_timeout(send_wq, atomic64_read(&send_wq_signal) == handle, REQUEST_TIMEOUT) <= 0) { // timeout or interrupt - warn_sock(sock, "request timed out"); + warn_sock(sock, "request timed out, cmd %d", dnbd3_cmd); result = -EIO; goto error; } @@ -434,7 +434,7 @@ static void dnbd3_receive_worker(struct work_struct *work) uint64_t handle; int result; - while(dnbd3_is_sock_alive(*sock)) { + while(1) { // loop until socket returns 0 result = dnbd3_receive_cmd(sock, &dnbd3_reply); if (result == -EAGAIN) { continue; @@ -499,7 +499,7 @@ error: debug_sock(sock, "receive completed, waiting for next receive"); } - debug_dev(dev, "receive work queue is stopped"); + debug_sock(sock, "receive work queue is stopped"); } @@ -539,25 +539,58 @@ static struct dnbd3_server *dnbd3_find_best_alt_server(struct dnbd3_device *dev) int i, j; uint64_t rtt = 0; uint64_t best_rtt = RTT_UNREACHABLE; + uint64_t current_best_rtt = RTT_UNREACHABLE; struct dnbd3_server *best_alt_server = NULL; + struct dnbd3_server *better_alt_server = NULL; + + for (i = 0; i < NUMBER_CONNECTIONS; i++) { + if (dnbd3_is_sock_alive(dev->socks[i])) { + rtt = (dev->socks[i].server->rtts[0] + dev->socks[i].server->rtts[1] + dev->socks[i].server->rtts[2] + dev->socks[i].server->rtts[3]) / 4; + if (rtt <= current_best_rtt) { + current_best_rtt = rtt; + } + } + } + + best_rtt = current_best_rtt * 10; // TODO add DEFINE to control this + debug_dev(dev, "best connected rtt is %llu, searching for rtt better than %llu", current_best_rtt, best_rtt); + for (i = 0; i < NUMBER_SERVERS; i++) { if (dev->alt_servers[i].host.type != 0) { - rtt = (dev->alt_servers[i].rtts[0] + dev->alt_servers[i].rtts[1] - + dev->alt_servers[i].rtts[2] + dev->alt_servers[i].rtts[3]) / 4; + rtt = (dev->alt_servers[i].rtts[0] + dev->alt_servers[i].rtts[1] + dev->alt_servers[i].rtts[2] + dev->alt_servers[i].rtts[3]) / 4; if (rtt <= best_rtt) { - best_alt_server = &dev->alt_servers[i]; - for (j = 0; j < NUMBER_CONNECTIONS; j++) { - if (best_alt_server == dev->socks[j].server) { - best_alt_server = NULL; // found already connected server + better_alt_server = &dev->alt_servers[i]; + for (j = 0; j < NUMBER_CONNECTIONS; j++) { // check if already connected + if (better_alt_server == dev->socks[j].server) { + better_alt_server = NULL; // found already connected server break; } } + if (better_alt_server) { + best_alt_server = better_alt_server; + best_rtt = rtt; + } } } } + if (best_alt_server) { + print_server(KERN_INFO, dev, best_alt_server, "found best alt server with rtt %llu", rtt); + } else { + debug_dev(dev, "did not find any alternative server"); + } return best_alt_server; } +static bool dnbd3_better_rtt(struct dnbd3_server *new_server, struct dnbd3_server *existing_server) { + uint64_t new_rtt = (new_server->rtts[0] + new_server->rtts[1] + new_server->rtts[2] + new_server->rtts[3]) / 4; + uint64_t existing_rtt = (existing_server->rtts[0] + existing_server->rtts[1] + existing_server->rtts[2] + existing_server->rtts[3]) / 4; + + if (((new_rtt * 2)/3) < existing_rtt) { + return true; + } + return false; +} + static void dnbd3_panic_worker(struct work_struct *work) { struct dnbd3_device *dev = container_of(work, struct dnbd3_device, panic_worker); @@ -575,16 +608,17 @@ static void dnbd3_panic_worker(struct work_struct *work) if (panicked_sock) { warn_sock(panicked_sock, "panicked, connections still alive %d", sock_alive); panicked_server = panicked_sock->server; + new_server = dnbd3_find_best_alt_server(dev); + dnbd3_socket_disconnect(dev, panicked_server, panicked_sock); - new_server = dnbd3_find_best_alt_server(dev); if (new_server != NULL && new_server != panicked_server) { print_server(KERN_INFO, dev, new_server, "found replacement"); dnbd3_socket_connect(dev, new_server); } else if (sock_alive > 0) { - info_sock(panicked_sock, "found no replacement server but still connected to %d servers", sock_alive); + info_dev(dev, "found no replacement server but still connected to %d servers", sock_alive); } else { - error_sock(panicked_sock, "could not reconnect to server"); + error_dev(dev, "could not reconnect to server"); } } else if (sock_alive == 0) { new_server = dnbd3_find_best_alt_server(dev); @@ -592,7 +626,7 @@ static void dnbd3_panic_worker(struct work_struct *work) print_server(KERN_INFO, dev, new_server, "reconnect to server"); dnbd3_socket_connect(dev, new_server); } else { - error_sock(panicked_sock, "could not reconnect to server"); + error_dev(dev, "could not reconnect to server"); } } } @@ -601,7 +635,7 @@ static void dnbd3_panic_worker(struct work_struct *work) static void dnbd3_discovery_worker(struct work_struct *work) { struct dnbd3_device *dev = container_of(work, struct dnbd3_device, discovery_worker); - struct dnbd3_sock *sock = &dev->socks[0]; + struct dnbd3_sock *sock = &dev->socks[dev->discovery_count % NUMBER_CONNECTIONS]; // just take the next int i, j; struct dnbd3_server *existing_server, *free_server, *failed_server; dnbd3_server_entry_t *new_server; @@ -614,6 +648,20 @@ static void dnbd3_discovery_worker(struct work_struct *work) struct request *req = NULL; uint64_t rtt; serialized_buffer_t *payload; + + if (!dnbd3_is_sock_alive(*sock)) { + sock = NULL; + for (i = 0; i < NUMBER_CONNECTIONS; i++) { + if (dnbd3_is_sock_alive(dev->socks[i])) { + sock = &dev->socks[i]; + } + } + if (!sock) { + error_dev(dev, "discovery failed, no socket available"); + } + } + + debug_sock(sock, "starting discovery worker"); dnbd3_send_request_blocking(sock, CMD_GET_SERVERS); @@ -687,8 +735,6 @@ static void dnbd3_discovery_worker(struct work_struct *work) error_dev(dev, "kmalloc failed"); goto error; } - mutex_init(&sock->lock); - mutex_lock(&sock->lock); sock->sock_nr = NUMBER_CONNECTIONS; // measure rtt for all alt servers for (i = 0; i < NUMBER_SERVERS; i++) { @@ -723,6 +769,7 @@ static void dnbd3_discovery_worker(struct work_struct *work) } // Request block + dnbd3_request.magic = dnbd3_packet_magic; dnbd3_request.cmd = CMD_GET_BLOCK; // Do *NOT* pick a random block as it has proven to cause severe // cache thrashing on the server @@ -766,6 +813,7 @@ static void dnbd3_discovery_worker(struct work_struct *work) rtt = (uint64_t)((end.tv_sec - start.tv_sec) * 1000000ull + (end.tv_usec - start.tv_usec)); debug_sock(sock, "new rrt is %llu", rtt); + existing_server->rtts[dev->discovery_count % 4] = rtt; rtt_error: if (sock->sock) { @@ -779,8 +827,6 @@ rtt_error: } } } - mutex_unlock(&sock->lock); - mutex_destroy(&sock->lock); error: if (buf) { kfree(buf); @@ -794,6 +840,43 @@ error: kfree(sock); sock = NULL; } + + // connect empty sockets + j = 0; + for (i = 0; i < NUMBER_CONNECTIONS; i++) { + if (!dnbd3_is_sock_alive(dev->socks[i])) { + free_server = dnbd3_find_best_alt_server(dev); + if (free_server) { + if (dnbd3_socket_connect(dev, free_server) == 0) { + j++; + } else { + print_server(KERN_WARNING, dev, free_server, "failed to connect"); + } + } + } else { + j++; + } + } + + // replace socket with better server + if (j == NUMBER_CONNECTIONS) { + for (i = 0; i < NUMBER_CONNECTIONS; i++) { + if (dnbd3_is_sock_alive(dev->socks[i])) { + free_server = dnbd3_find_best_alt_server(dev); + if (free_server && dnbd3_better_rtt(free_server, dev->socks[i].server)) { + dnbd3_socket_disconnect(dev, NULL, &dev->socks[i]); + + if (dnbd3_socket_connect(dev, free_server) != 0) { + print_server(KERN_WARNING, dev, free_server, "failed to connect"); + } + } + } + } + } + + debug_dev(dev, "connected to %d / %d sockets", j, NUMBER_CONNECTIONS); + + dev->discovery_count++; } static int __dnbd3_socket_connect(struct dnbd3_server *server, struct dnbd3_sock *sock) @@ -944,6 +1027,7 @@ static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server info_sock(sock, "shutting down last socket and stopping discovery"); del_timer_sync(&dev->timer); dev->timer_count = 0; + dev->discovery_count = 0; cancel_work_sync(&dev->discovery_worker); // cancel_work_sync(&dev->panic_worker); // do not wait for panic_worker, probably we are called from panic_worker @@ -1000,7 +1084,8 @@ int dnbd3_net_connect(struct dnbd3_device *dev) int result; debug_dev(dev, "connecting to server"); - if (dnbd3_socket_connect(dev, &dev->initial_server) == 0) { + // alt_server[0] is the initial server + if (dnbd3_socket_connect(dev, &dev->alt_servers[0]) == 0) { dnbd3_print_server_list(dev); diff --git a/src/kernel/sysfs.c b/src/kernel/sysfs.c index f5fb99a..1588dfb 100644 --- a/src/kernel/sysfs.c +++ b/src/kernel/sysfs.c @@ -239,7 +239,7 @@ void dnbd3_sysfs_init(struct dnbd3_device *dev) error = kobject_init_and_add(kobj, ktype, parent, "%s", "net"); if (error) - printk("Error initializing dnbd3 device!\n"); + error_dev(dev, "kobject initializing failed"); } void dnbd3_sysfs_exit(struct dnbd3_device *dev) -- cgit v1.2.3-55-g7522