From cfa3289843abdf211e9f46ab5d14fd66b6856ba0 Mon Sep 17 00:00:00 2001 From: Johann Latocha Date: Wed, 15 Feb 2012 13:11:04 +0100 Subject: [SERVER] Memleak fixed [KERNEL] Socket swap fixed --- src/kernel/net.c | 204 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 171 insertions(+), 33 deletions(-) (limited to 'src/kernel/net.c') diff --git a/src/kernel/net.c b/src/kernel/net.c index 5bd8871..f045aa2 100644 --- a/src/kernel/net.c +++ b/src/kernel/net.c @@ -22,6 +22,8 @@ #include "blk.h" #include "utils.h" +#include + void dnbd3_net_connect(dnbd3_device_t *dev) { struct sockaddr_in sin; @@ -64,6 +66,11 @@ void dnbd3_net_connect(dnbd3_device_t *dev) return; } + // enqueue request to request_queue_send (ask file size) + req->cmd_type = REQ_TYPE_SPECIAL; + req->cmd_flags = REQ_GET_FILESIZE; + list_add(&req->queuelist, &dev->request_queue_send); + // start sending thread dev->thread_send = kthread_create(dnbd3_net_send, dev, dev->disk->disk_name); wake_up_process(dev->thread_send); @@ -72,6 +79,12 @@ void dnbd3_net_connect(dnbd3_device_t *dev) dev->thread_receive = kthread_create(dnbd3_net_receive, dev, dev->disk->disk_name); wake_up_process(dev->thread_receive); + // start discover thread + dev->thread_discover = kthread_create(dnbd3_net_discover, dev, dev->disk->disk_name); + wake_up_process(dev->thread_discover); + + wake_up(&dev->process_queue_send); + // add heartbeat timer init_timer(&dev->hb_timer); dev->hb_timer.data = (unsigned long) dev; @@ -79,23 +92,22 @@ void dnbd3_net_connect(dnbd3_device_t *dev) dev->hb_timer.expires = jiffies + HB_INTERVAL; add_timer(&dev->hb_timer); - // enqueue request to request_queue_send (ask file size) - req->cmd_type = REQ_TYPE_SPECIAL; - req->cmd_flags = REQ_GET_FILESIZE; - list_add(&req->queuelist, &dev->request_queue_send); - wake_up(&dev->process_queue_send); } void dnbd3_net_disconnect(dnbd3_device_t *dev) { - struct request *blk_request, *tmp_request; printk("INFO: Disconnecting device %s\n", dev->disk->disk_name); + // clear heartbeat timer + if (&dev->hb_timer) + del_timer(&dev->hb_timer); + // kill sending and receiving threads - if (dev->thread_send && dev->thread_receive) + if (dev->thread_send && dev->thread_receive && dev->thread_discover) { kthread_stop(dev->thread_send); kthread_stop(dev->thread_receive); + kthread_stop(dev->thread_discover); } // clear socket @@ -104,22 +116,157 @@ void dnbd3_net_disconnect(dnbd3_device_t *dev) sock_release(dev->sock); dev->sock = NULL; } - // clear heartbeat timer - if (&dev->hb_timer) - del_timer(&dev->hb_timer); +} + +void dnbd3_net_heartbeat(unsigned long arg) +{ + dnbd3_device_t *dev = (dnbd3_device_t *) arg; + struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC); - // move already send requests to request_queue_send again - if (!list_empty(&dev->request_queue_receive)) + if (req) { - printk("WARN: Request queue was not empty on %s\n", dev->disk->disk_name); - spin_lock_irq(&dev->blk_lock); - list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_receive, queuelist) + req->cmd_type = REQ_TYPE_SPECIAL; + req->cmd_flags = REQ_GET_SERVERS; + list_add_tail(&req->queuelist, &dev->request_queue_send); + wake_up(&dev->process_queue_send); + } + + dev->hb_timer.expires = jiffies + HB_INTERVAL; + add_timer(&dev->hb_timer); +} + +int dnbd3_net_discover(void *data) +{ + dnbd3_device_t *dev = data; + struct sockaddr_in sin; + struct socket *sock; + + dnbd3_request_t dnbd3_request; + dnbd3_reply_t dnbd3_reply; + struct msghdr msg; + struct kvec iov; + + uint64_t filesize; + char *buf; + char ip[16]; + + struct timeval start, end; + uint64_t t1, t2 = 0; + int a, i, num = 0; + + struct timeval timeout; + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + init_msghdr(msg); + + buf = kmalloc(4096, GFP_KERNEL); + if (!buf) + { + printk("ERROR: kmalloc failed"); + return -1; + } + + set_user_nice(current, -20); + + while (!kthread_should_stop()) + { + wait_event_interruptible(dev->process_queue_discover, kthread_should_stop() || dev->num_servers); + + num = dev->num_servers; + dev->num_servers = 0; + + if (!num) + continue; + + for (i=0; i < num && i < MAX_NUMBER_SERVERS; i++) { - list_del_init(&blk_request->queuelist); - list_add_tail(&blk_request->queuelist, &dev->request_queue_send); + // initialize socket + if (sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock) < 0) + { + printk("ERROR: Couldn't create socket.\n"); + sock = NULL; + continue; + } + + kernel_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &timeout, sizeof(timeout)); + + inet_ntoa(dev->servers[i], ip); + sock->sk->sk_allocation = GFP_NOIO; + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = inet_addr(ip); + sin.sin_port = htons(simple_strtol(dev->port, NULL, 10)); + if (kernel_connect(sock, (struct sockaddr *) &sin, sizeof(sin), 0) < 0) + { + printk("ERROR: Couldn't connect to host %s:%s\n", ip, dev->port); + sock = NULL; + continue; + } + + // Request filesize + dnbd3_request.cmd = CMD_GET_SIZE; + dnbd3_request.vid = dev->vid; + dnbd3_request.rid = dev->rid; + // send net request + iov.iov_base = &dnbd3_request; + iov.iov_len = sizeof(dnbd3_request); + if (kernel_sendmsg(sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0) + { + printk("ERROR: kernel_sendmsg (discover)\n"); + sock_release(sock); + sock = NULL; + continue; + } + // receive net replay + iov.iov_base = &dnbd3_reply; + iov.iov_len = sizeof(dnbd3_reply); + kernel_recvmsg(sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags); + // receive data + iov.iov_base = &filesize; + iov.iov_len = sizeof(uint64_t); + kernel_recvmsg(sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags); + + do_gettimeofday(&start); + + // Request block + dnbd3_request.cmd = CMD_GET_BLOCK; + dnbd3_request.offset = 0; + dnbd3_request.size = 4096; + // send net request + iov.iov_base = &dnbd3_request; + iov.iov_len = sizeof(dnbd3_request); + if (kernel_sendmsg(sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0) + { + printk("ERROR: kernel_sendmsg (discover)\n"); + sock_release(sock); + sock = NULL; + continue; + } + // receive net replay + iov.iov_base = &dnbd3_reply; + iov.iov_len = sizeof(dnbd3_reply); + kernel_recvmsg(sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags); + // receive data + iov.iov_base = buf; + iov.iov_len = 4096; + a = kernel_recvmsg(sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags); + + do_gettimeofday(&end); + + // clear socket + if (sock) + { + sock_release(sock); + sock = NULL; + } + + t1 = (start.tv_sec*1000000ull) + start.tv_usec; + t2 = (end.tv_sec*1000000ull) + end.tv_usec; + printk("DEBUG: Server: %s RTT: %llums received bytes: %i\n", ip,t2 - t1, a); } - spin_unlock_irq(&dev->blk_lock); } + kfree(buf); + return 0; } int dnbd3_net_send(void *data) @@ -134,6 +281,8 @@ int dnbd3_net_send(void *data) init_msghdr(msg); set_user_nice(current, -20); + // TODO: check if socket error + while (!kthread_should_stop() || !list_empty(&dev->request_queue_send)) { wait_event_interruptible(dev->process_queue_send, @@ -212,6 +361,8 @@ int dnbd3_net_receive(void *data) init_msghdr(msg); set_user_nice(current, -20); + // TODO: check if socket error + while (!kthread_should_stop() || !list_empty(&dev->request_queue_receive)) { wait_event_interruptible(dev->process_queue_receive, @@ -302,6 +453,8 @@ int dnbd3_net_receive(void *data) kernel_recvmsg(dev->sock, &msg, &iov, 1, size, msg.msg_flags); } kfree(blk_request); +// if (dev->num_servers > 1) +// wake_up(&dev->process_queue_discover); continue; default: @@ -312,18 +465,3 @@ int dnbd3_net_receive(void *data) } return 0; } - -void dnbd3_net_heartbeat(unsigned long arg) -{ - dnbd3_device_t *dev = (dnbd3_device_t *) arg; - struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC); - if (req) - { - req->cmd_type = REQ_TYPE_SPECIAL; - req->cmd_flags = REQ_GET_SERVERS; - list_add(&req->queuelist, &dev->request_queue_send); - wake_up(&dev->process_queue_send); - } - dev->hb_timer.expires = jiffies + HB_INTERVAL; - add_timer(&dev->hb_timer); -} -- cgit v1.2.3-55-g7522