From 3cc08a2ab0bec07a69cf0a745154301178333474 Mon Sep 17 00:00:00 2001 From: Frederic Robra Date: Thu, 11 Jul 2019 12:57:26 +0200 Subject: receiver worker --- src/kernel/core.c | 9 +-- src/kernel/dnbd3.h | 22 ------ src/kernel/net.c | 211 +++++++++++++++++++++++++---------------------------- 3 files changed, 100 insertions(+), 142 deletions(-) diff --git a/src/kernel/core.c b/src/kernel/core.c index f4b7204..7e985f7 100644 --- a/src/kernel/core.c +++ b/src/kernel/core.c @@ -192,17 +192,12 @@ static void dnbd3_blk_fail_all_requests(dnbd3_device *dev) static int dnbd3_ioctl(struct block_device *bdev, fmode_t mode, unsigned int cmd, unsigned long arg) { - int result = -100; + int result = -EIO; dnbd3_device *dev = bdev->bd_disk->private_data; char *imgname = NULL; dnbd3_ioctl_t *msg = NULL; printk(KERN_DEBUG "dnbd3: ioctl device %i, cmd %i, arg %lu\n", dev->minor, cmd, arg); - //unsigned long irqflags; - -// while (dev->disconnecting) { -// // do nothing -// } if (arg != 0) { msg = kmalloc(sizeof(*msg), GFP_KERNEL); @@ -271,7 +266,7 @@ static int dnbd3_ioctl(struct block_device *bdev, fmode_t mode, unsigned int cmd case IOCTL_CLOSE: printk(KERN_DEBUG "dnbd3: ioctl close\n"); dnbd3_blk_fail_all_requests(dev); - dnbd3_net_disconnect(dev); + result = dnbd3_net_disconnect(dev); dnbd3_blk_fail_all_requests(dev); set_capacity(dev->disk, 0); if (dev->imgname) { diff --git a/src/kernel/dnbd3.h b/src/kernel/dnbd3.h index 4e124e0..33f707d 100644 --- a/src/kernel/dnbd3.h +++ b/src/kernel/dnbd3.h @@ -72,41 +72,19 @@ typedef struct dnbd3_device { struct mutex device_lock; // network - uint8_t socks_active; dnbd3_sock socks[NUMBER_CONNECTIONS]; char *imgname; -// struct socket *sock; -// struct mutex socket_lock; -// struct request *pending; dnbd3_server initial_server; -// dnbd3_server_t cur_server, initial_server; -// uint64_t cur_rtt; -// serialized_buffer_t payload_buffer; dnbd3_server alt_servers[NUMBER_SERVERS]; // array of alt servers int new_servers_num; // number of new alt servers that are waiting to be copied to above array dnbd3_server_entry_t new_servers[NUMBER_SERVERS]; // pending new alt servers -// uint8_t discover, panic, disconnecting, update_available, panic_count; uint8_t update_available; uint8_t use_server_provided_alts; uint16_t rid; -// uint32_t heartbeat_count; uint64_t reported_size; - // server switch -// struct socket *better_sock; struct work_struct discovery; // if in irq and need to send request struct timer_list discovery_timer; - // process -// struct task_struct * thread_send; -// struct task_struct * thread_receive; -// struct task_struct *thread_discover; -// struct timer_list hb_timer; -// wait_queue_head_t process_queue_send; -// wait_queue_head_t process_queue_receive; -// wait_queue_head_t process_queue_discover; -// struct list_head request_queue_send; -// struct list_head request_queue_receive; - } dnbd3_device; diff --git a/src/kernel/net.c b/src/kernel/net.c index 0721803..5d3e344 100644 --- a/src/kernel/net.c +++ b/src/kernel/net.c @@ -38,6 +38,7 @@ #define KEEPALIVE_TIMER (jiffies + (HZ * TIMER_INTERVAL_KEEPALIVE_PACKET)) #define DISCOVERY_TIMER (jiffies + (HZ * TIMER_INTERVAL_PROBE_NORMAL)) +#define REQUEST_TIMEOUT (jiffies + (HZ * SOCKET_TIMEOUT_CLIENT_DATA)) #define init_msghdr(h) do { \ h.msg_name = NULL; \ @@ -53,7 +54,7 @@ static uint64_t send_wq_handle; static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server); static int dnbd3_socket_disconnect(dnbd3_device *dev, dnbd3_server *server, dnbd3_sock *sock); -static void print_host(struct dnbd3_host_t *host, char *msg) +static void dnbd3_print_host(struct dnbd3_host_t *host, char *msg) { if (host->type == HOST_IP4) { printk(KERN_INFO "dnbd3: %s %pI4:%d\n", msg, host->addr, host->port); @@ -62,26 +63,26 @@ static void print_host(struct dnbd3_host_t *host, char *msg) } } -static void print_server_list(struct dnbd3_device *dev) +static void dnbd3_print_server_list(struct dnbd3_device *dev) { int i; - print_host(&dev->initial_server.host, "initial server is"); + dnbd3_print_host(&dev->initial_server.host, "initial server is"); for (i = 0; i < NUMBER_SERVERS; i++) { if (dev->alt_servers[i].host.addr[0] != 0) { - print_host(&dev->alt_servers[i].host, "alternative server is"); + dnbd3_print_host(&dev->alt_servers[i].host, "alternative server is"); } } } -static uint64_t to_handle(uint32_t arg0, uint32_t arg1) { +static uint64_t dnbd3_to_handle(uint32_t arg0, uint32_t arg1) { return ((uint64_t) arg0 << 32) | arg1; } -static uint32_t arg0_from_handle(uint64_t handle) { +static uint32_t dnbd3_arg0_from_handle(uint64_t handle) { return (uint32_t)(handle >> 32); } -static uint32_t arg1_from_handle(uint64_t handle) { +static uint32_t dnbd3_arg1_from_handle(uint64_t handle) { return (uint32_t) handle; } @@ -132,14 +133,12 @@ int dnbd3_send_request(struct dnbd3_sock *sock, struct request *req, struct dnbd if (cmd != NULL) { cmd->cookie = sock->cookie; tag = blk_mq_unique_tag(req); - handle = ((uint64_t) tag << 32) | sock->cookie; + handle = dnbd3_to_handle(tag, sock->cookie);// ((uint64_t) tag << 32) | sock->cookie; } else { handle = sock->cookie; } memcpy(&dnbd3_request.handle, &handle, sizeof(handle)); - printk(KERN_DEBUG "dnbd3: request handle is %llu\n", dnbd3_request.handle); -// dnbd3_request.handle = (uint64_t)(uintptr_t)req; // Double cast to prevent warning on 32bit fixup_request(dnbd3_request); iov[0].iov_base = &dnbd3_request; iov[0].iov_len = sizeof(dnbd3_request); @@ -188,12 +187,16 @@ int dnbd3_send_request_blocking(struct dnbd3_sock *sock, int dnbd3_cmd) goto error; } send_wq_handle = 0; - handle = to_handle(sock->device->minor, dnbd3_cmd); + handle = dnbd3_to_handle(sock->device->minor, dnbd3_cmd); mutex_unlock(&sock->lock); printk(KERN_DEBUG "dnbd3: blocking request going to sleep wait for handle %llu\n", handle); - wait_event_interruptible(send_wq, handle == send_wq_handle); + if (wait_event_interruptible_timeout(send_wq, handle == send_wq_handle, REQUEST_TIMEOUT) <= 0) { // timeout or interrupt + printk(KERN_WARNING "dndbd3: request timed out\n"); + result = -EIO; + goto error; + } printk(KERN_DEBUG "dnbd3: blocking request woke up with handle %llu\n", handle); @@ -204,7 +207,7 @@ error: return result; } -static void dnbd3_receive_work(struct work_struct *work) +static void dnbd3_receive_worker(struct work_struct *work) { struct dnbd3_sock *sock = container_of(work, struct dnbd3_sock, receive); struct dnbd3_device *dev = sock->device; @@ -219,21 +222,21 @@ static void dnbd3_receive_work(struct work_struct *work) sigset_t blocked, oldset; void *kaddr; uint32_t tag, cookie; - uint16_t hwq; + uint16_t hwq, rid; int result, count, remaining; - uint16_t rid; uint64_t reported_size, handle; char *name; serialized_buffer_t payload_buffer; init_msghdr(msg); - while(sock->sock) { + while(sock->sock && sock->server) { iov.iov_base = &dnbd3_reply; iov.iov_len = sizeof(dnbd3_reply); result = kernel_recvmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags); - if (!result) { - printk(KERN_ERR "dnbd3: connection to server lost\n"); - result = -EIO; + if (result == -EAGAIN) { + continue; + } else if (result <= 0) { + printk(KERN_ERR "dnbd3: connection to server lost %d\n", result); goto error; } @@ -258,8 +261,8 @@ static void dnbd3_receive_work(struct work_struct *work) case CMD_GET_BLOCK: printk(KERN_DEBUG "dnbd3: handle is %llu\n", dnbd3_reply.handle); memcpy(&handle, &dnbd3_reply.handle, sizeof(handle)); - cookie = (uint32_t) handle; - tag = (uint32_t)(handle >> 32); + cookie = dnbd3_arg1_from_handle(handle); + tag = dnbd3_arg0_from_handle(handle); hwq = blk_mq_unique_tag_to_hwq(tag); if (hwq < dev->tag_set.nr_hw_queues) @@ -405,22 +408,21 @@ consume_payload: break; } error: - handle = to_handle(dev->minor, dnbd3_reply.cmd); + handle = dnbd3_to_handle(dev->minor, dnbd3_reply.cmd); printk(KERN_DEBUG "dnbd3: try to wake up queue with handle %llu\n", handle); send_wq_handle = handle; wake_up_interruptible(&send_wq); if (result) { - printk(KERN_DEBUG "dnbd3: receive error happened %d\n", result); - break; //TODO for now need to handle errors + sock->server->failures++; // discovery takes care of to many failures + printk(KERN_WARNING "dnbd3: receive error happened %d, total failures %d\n", result, sock->server->failures); } printk(KERN_DEBUG "dnbd3: receive completed, waiting for next receive\n"); } printk(KERN_DEBUG "dnbd3: receive work queue is stopped\n"); - dnbd3_socket_disconnect(dev, sock->server, sock); } -void dnbd3_keepalive(struct timer_list *arg) +void dnbd3_keepalive_timer(struct timer_list *arg) { struct dnbd3_sock *sock = container_of(arg, struct dnbd3_sock, keepalive_timer); queue_work(dnbd3_wq, &sock->keepalive); @@ -428,25 +430,15 @@ void dnbd3_keepalive(struct timer_list *arg) add_timer(&sock->keepalive_timer); } -static void keepalive(struct work_struct *work) +static void dnbd3_keepalive_worker(struct work_struct *work) { struct dnbd3_sock *sock = container_of(work, struct dnbd3_sock, keepalive); -// struct request *req; printk(KERN_DEBUG "dnbd3: starting keepalive worker\n"); -// mutex_lock(&sock->lock); -// req = kmalloc(sizeof(struct request), GFP_ATOMIC ); - // send keepalive -// if (req) { - dnbd3_send_request_blocking(sock, CMD_KEEPALIVE); -//// kfree(req); -// } else { -// printk(KERN_WARNING "dnbd3: could not create keepalive request\n"); -// } - ++sock->heartbeat_count; -// mutex_unlock(&sock->lock); + dnbd3_send_request_blocking(sock, CMD_KEEPALIVE); + sock->heartbeat_count++; } -void dnbd3_discovery(struct timer_list *arg) +static void dnbd3_discovery_timer(struct timer_list *arg) { struct dnbd3_device *dev = container_of(arg, struct dnbd3_device, discovery_timer); queue_work(dnbd3_wq, &dev->discovery); @@ -454,28 +446,16 @@ void dnbd3_discovery(struct timer_list *arg) add_timer(&dev->discovery_timer); } -static void discovery(struct work_struct *work) +static void dnbd3_discovery_worker(struct work_struct *work) { struct dnbd3_device *dev = container_of(work, struct dnbd3_device, discovery); dnbd3_sock *sock = &dev->socks[0]; // we use the first sock for discovery -// struct request *req; int i, j; struct dnbd3_server *existing_server, *free_server, *failed_server; dnbd3_server_entry_t *new_server; printk(KERN_DEBUG "dnbd3: starting discovery worker\n"); -// mutex_lock(&sock->lock); -// req = kmalloc(sizeof(struct request), GFP_ATOMIC ); -// // send keepalive -// if (req) { -// dnbd3_cmd_to_priv(req, CMD_GET_SERVERS); - dnbd3_send_request_blocking(sock, CMD_GET_SERVERS); -// kfree(req); -// } else { -// printk(KERN_WARNING "dnbd3: could not create get servers request\n"); -// } -// mutex_unlock(&sock->lock); - - //TODO wait until something is received + + dnbd3_send_request_blocking(sock, CMD_GET_SERVERS); printk(KERN_DEBUG "dnbd3: new server num is %d\n", dev->new_servers_num); if (dev->new_servers_num) { @@ -506,7 +486,7 @@ static void discovery(struct work_struct *work) if (existing_server) { if (new_server->failures == 1) { // remove is requested - print_host(&existing_server->host, "remove server"); + dnbd3_print_host(&existing_server->host, "remove server"); dnbd3_socket_disconnect(dev, existing_server, NULL); // TODO what to do when only one connection? existing_server->host.type = 0; } @@ -522,7 +502,7 @@ static void discovery(struct work_struct *work) //no server found to replace continue; } - print_host(&free_server->host, "got new alt server"); + dnbd3_print_host(&free_server->host, "got new alt server"); free_server->failures = 0; free_server->protocol_version = 0; free_server->rtts[0] = free_server->rtts[1] = free_server->rtts[2] = free_server->rtts[3] = RTT_UNREACHABLE; @@ -538,11 +518,10 @@ static void discovery(struct work_struct *work) } } -static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock) +static int __dnbd3_socket_connect(struct dnbd3_server * server, struct dnbd3_sock *sock) { + int result = 0; struct timeval timeout; - mutex_init(&sock->lock); - mutex_lock(&sock->lock); if (server->host.port == 0 || server->host.type == 0) { printk(KERN_ERR "dnbd3: host or port not set\n"); @@ -553,10 +532,10 @@ static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock) goto error; } - timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA; + timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA + 2; timeout.tv_usec = 0; - if (dnbd3_sock_create(server->host.type, SOCK_STREAM, IPPROTO_TCP, &sock->sock) < 0) { + if ((result = dnbd3_sock_create(server->host.type, SOCK_STREAM, IPPROTO_TCP, &sock->sock)) < 0) { printk(KERN_ERR "dnbd3: could not create socket\n"); goto error; } @@ -570,7 +549,7 @@ static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock) sin.sin_family = AF_INET; memcpy(&(sin.sin_addr), server->host.addr, 4); sin.sin_port = server->host.port; - if (kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0) != 0) { + if ((result = kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0)) != 0) { printk(KERN_ERR "dnbd3: connection to host failed (ipv4)\n"); goto error; } @@ -580,12 +559,11 @@ static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock) sin.sin6_family = AF_INET6; memcpy(&(sin.sin6_addr), server->host.addr, 16); sin.sin6_port = server->host.port; - if (kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0) != 0){ + if ((result = kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0)) != 0){ printk(KERN_ERR "dnbd3: connection to host failed (ipv6)\n"); goto error; } } - mutex_unlock(&sock->lock); return 0; error: @@ -593,9 +571,7 @@ error: sock_release(sock->sock); sock->sock = NULL; } - mutex_unlock(&sock->lock); - mutex_destroy(&sock->lock); - return -EIO; + return result; } static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server) @@ -603,7 +579,6 @@ static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server) int i; int result = -EIO; struct dnbd3_sock *sock = NULL; - struct request *req = NULL; for (i = 0; i < NUMBER_CONNECTIONS; i++) { if (!dev->socks[i].sock) { sock = &dev->socks[i]; @@ -618,40 +593,37 @@ static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server) printk(KERN_DEBUG "dnbd3: socket connect device %i\n", dev->minor); - __dnbd3_socket_connect(server, sock); -// mutex_lock(&sock->lock); -// req = kmalloc(sizeof(*req), GFP_ATOMIC ); -// if (!req) { -// printk(KERN_ERR "dnbd3: kmalloc failed\n"); -// goto error; -// } + mutex_init(&sock->lock); + mutex_lock(&sock->lock); + __dnbd3_socket_connect(server, sock); + mutex_unlock(&sock->lock); + if (!sock->sock) { + printk(KERN_DEBUG "dnbd3: socket is not connected\n"); + result = -EIO; + goto error; + } // start the receiver - INIT_WORK(&sock->receive, dnbd3_receive_work); + INIT_WORK(&sock->receive, dnbd3_receive_worker); queue_work(dnbd3_wq, &sock->receive); -// dnbd3_connect(req); result = dnbd3_send_request_blocking(sock, CMD_SELECT_IMAGE); if (result) { printk(KERN_ERR "dnbd3: connection to image %s failed\n", dev->imgname); goto error; } -// mutex_unlock(&sock->lock); - - //TODO wait until connected printk(KERN_DEBUG "dnbd3: connected to image %s, filesize %llu\n", dev->imgname, dev->reported_size); // add heartbeat timer and scheduler for the command - INIT_WORK(&sock->keepalive, keepalive); + INIT_WORK(&sock->keepalive, dnbd3_keepalive_worker); sock->heartbeat_count = 0; - timer_setup(&sock->keepalive_timer, dnbd3_keepalive, 0); + timer_setup(&sock->keepalive_timer, dnbd3_keepalive_timer, 0); sock->keepalive_timer.expires = KEEPALIVE_TIMER; add_timer(&sock->keepalive_timer); - mutex_unlock(&sock->lock); // kfree(req); return 0; @@ -660,56 +632,69 @@ error: sock_release(sock->sock); sock->sock = NULL; } - if (req) { - kfree(req); - } - mutex_unlock(&sock->lock); return result; } -static int dnbd3_socket_disconnect(dnbd3_device *dev, dnbd3_server *server, dnbd3_sock *sock) +static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock) { int i; - if (sock == NULL) { - for (i = 0; i < NUMBER_CONNECTIONS; i++) { - if (dev->socks[i].server == server) { - sock = &dev->socks[i]; - break; - } + int sock_alive = 0; + for (i = 0; i < NUMBER_CONNECTIONS; i++) { + if (sock == NULL && dev->socks[i].server == server) { + sock = &dev->socks[i]; } - if (!sock) { - printk(KERN_WARNING "dnbd3: could not find socket to disconnect\n"); - return -EIO; + if (dev->socks[i].sock) { + sock_alive++; } } - printk(KERN_DEBUG "dnbd3: socket disconnect device %i\n", dev->minor); - mutex_lock(&sock->lock); + if (!sock || !sock->sock) { + printk(KERN_WARNING "dnbd3: could not find socket to disconnect\n"); + return -EIO; + } + if (sock_alive == 1) { + printk(KERN_INFO "dnbd3: shutting down last socket and stopping discovery\n"); + del_timer_sync(&dev->discovery_timer); + cancel_work_sync(&dev->discovery); - // clear heartbeat timer + } del_timer_sync(&sock->keepalive_timer); + cancel_work_sync(&sock->keepalive); + + printk(KERN_DEBUG "dnbd3: socket disconnect device %i\n", dev->minor); + mutex_lock(&sock->lock); + /* + * Important sequence to shut down socket + * 1. kernel_sock_shutdown + * socket shutdown, receiver which hangs in kernel_recvmsg returns 0 + * 2. cancel_work_sync(receiver) + * wait for the receiver to finish, so the socket is not usesd anymore + * 3. sock_release + * release the socket and set to NULL + */ if (sock->sock) { kernel_sock_shutdown(sock->sock, SHUT_RDWR); } + mutex_unlock(&sock->lock); + mutex_destroy(&sock->lock); + sock->server = NULL; + + printk(KERN_DEBUG "dnbd3: cancel receiver work device %i\n", dev->minor); + cancel_work_sync(&sock->receive); - // clear socket if (sock->sock) { sock_release(sock->sock); sock->sock = NULL; } - - mutex_unlock(&sock->lock); - mutex_destroy(&sock->lock); - sock->server = NULL; return 0; } int dnbd3_net_disconnect(struct dnbd3_device *dev) { int i; - int result; - del_timer_sync(&dev->discovery_timer); + int result = 0; + for (i = 0; i < NUMBER_CONNECTIONS; i++) { if (dev->socks[i].sock) { if (dnbd3_socket_disconnect(dev, NULL, &dev->socks[i])) { @@ -721,15 +706,15 @@ int dnbd3_net_disconnect(struct dnbd3_device *dev) } -int dnbd3_net_connect(struct dnbd3_device *dev) { +int dnbd3_net_connect(struct dnbd3_device *dev) +{ // TODO decide which socket to connect int result; - dev->socks_active = 0; if (dnbd3_socket_connect(dev, &dev->alt_servers[0]) == 0) { - print_server_list(dev); + dnbd3_print_server_list(dev); - INIT_WORK(&dev->discovery, discovery); - timer_setup(&dev->discovery_timer, dnbd3_discovery, 0); + INIT_WORK(&dev->discovery, dnbd3_discovery_worker); + timer_setup(&dev->discovery_timer, dnbd3_discovery_timer, 0); dev->discovery_timer.expires = DISCOVERY_TIMER; add_timer(&dev->discovery_timer); -- cgit v1.2.3-55-g7522