summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJohann Latocha2012-02-15 13:11:04 +0100
committerJohann Latocha2012-02-15 13:11:04 +0100
commitcfa3289843abdf211e9f46ab5d14fd66b6856ba0 (patch)
treeb4c67600f62324c8b2509f9c31af6f3a719dc294 /src
parent[SERVER] Spinlocks to freeze threads while reloading config (diff)
downloaddnbd3-cfa3289843abdf211e9f46ab5d14fd66b6856ba0.tar.gz
dnbd3-cfa3289843abdf211e9f46ab5d14fd66b6856ba0.tar.xz
dnbd3-cfa3289843abdf211e9f46ab5d14fd66b6856ba0.zip
[SERVER] Memleak fixed
[KERNEL] Socket swap fixed
Diffstat (limited to 'src')
-rw-r--r--src/kernel/blk.c2
-rw-r--r--src/kernel/dnbd3.h2
-rw-r--r--src/kernel/net.c204
-rw-r--r--src/kernel/net.h2
-rw-r--r--src/server/server.c1
5 files changed, 178 insertions, 33 deletions
diff --git a/src/kernel/blk.c b/src/kernel/blk.c
index 4d5b4b5..3729d5f 100644
--- a/src/kernel/blk.c
+++ b/src/kernel/blk.c
@@ -28,6 +28,7 @@ int dnbd3_blk_add_device(dnbd3_device_t *dev, int minor)
init_waitqueue_head(&dev->process_queue_send);
init_waitqueue_head(&dev->process_queue_receive);
+ init_waitqueue_head(&dev->process_queue_discover);
INIT_LIST_HEAD(&dev->request_queue_send);
INIT_LIST_HEAD(&dev->request_queue_receive);
@@ -37,6 +38,7 @@ int dnbd3_blk_add_device(dnbd3_device_t *dev, int minor)
dev->num_servers = 0;
dev->thread_send = NULL;
dev->thread_receive = NULL;
+ dev->thread_discover = NULL;
if (!(disk = alloc_disk(1)))
{
diff --git a/src/kernel/dnbd3.h b/src/kernel/dnbd3.h
index b21dba3..45b1cac 100644
--- a/src/kernel/dnbd3.h
+++ b/src/kernel/dnbd3.h
@@ -50,8 +50,10 @@ typedef struct
// process
struct task_struct *thread_send;
struct task_struct *thread_receive;
+ struct task_struct *thread_discover;
wait_queue_head_t process_queue_send;
wait_queue_head_t process_queue_receive;
+ wait_queue_head_t process_queue_discover;
struct list_head request_queue_send;
struct list_head request_queue_receive;
diff --git a/src/kernel/net.c b/src/kernel/net.c
index 5bd8871..f045aa2 100644
--- a/src/kernel/net.c
+++ b/src/kernel/net.c
@@ -22,6 +22,8 @@
#include "blk.h"
#include "utils.h"
+#include <linux/time.h>
+
void dnbd3_net_connect(dnbd3_device_t *dev)
{
struct sockaddr_in sin;
@@ -64,6 +66,11 @@ void dnbd3_net_connect(dnbd3_device_t *dev)
return;
}
+ // enqueue request to request_queue_send (ask file size)
+ req->cmd_type = REQ_TYPE_SPECIAL;
+ req->cmd_flags = REQ_GET_FILESIZE;
+ list_add(&req->queuelist, &dev->request_queue_send);
+
// start sending thread
dev->thread_send = kthread_create(dnbd3_net_send, dev, dev->disk->disk_name);
wake_up_process(dev->thread_send);
@@ -72,6 +79,12 @@ void dnbd3_net_connect(dnbd3_device_t *dev)
dev->thread_receive = kthread_create(dnbd3_net_receive, dev, dev->disk->disk_name);
wake_up_process(dev->thread_receive);
+ // start discover thread
+ dev->thread_discover = kthread_create(dnbd3_net_discover, dev, dev->disk->disk_name);
+ wake_up_process(dev->thread_discover);
+
+ wake_up(&dev->process_queue_send);
+
// add heartbeat timer
init_timer(&dev->hb_timer);
dev->hb_timer.data = (unsigned long) dev;
@@ -79,23 +92,22 @@ void dnbd3_net_connect(dnbd3_device_t *dev)
dev->hb_timer.expires = jiffies + HB_INTERVAL;
add_timer(&dev->hb_timer);
- // enqueue request to request_queue_send (ask file size)
- req->cmd_type = REQ_TYPE_SPECIAL;
- req->cmd_flags = REQ_GET_FILESIZE;
- list_add(&req->queuelist, &dev->request_queue_send);
- wake_up(&dev->process_queue_send);
}
void dnbd3_net_disconnect(dnbd3_device_t *dev)
{
- struct request *blk_request, *tmp_request;
printk("INFO: Disconnecting device %s\n", dev->disk->disk_name);
+ // clear heartbeat timer
+ if (&dev->hb_timer)
+ del_timer(&dev->hb_timer);
+
// kill sending and receiving threads
- if (dev->thread_send && dev->thread_receive)
+ if (dev->thread_send && dev->thread_receive && dev->thread_discover)
{
kthread_stop(dev->thread_send);
kthread_stop(dev->thread_receive);
+ kthread_stop(dev->thread_discover);
}
// clear socket
@@ -104,22 +116,157 @@ void dnbd3_net_disconnect(dnbd3_device_t *dev)
sock_release(dev->sock);
dev->sock = NULL;
}
- // clear heartbeat timer
- if (&dev->hb_timer)
- del_timer(&dev->hb_timer);
+}
+
+void dnbd3_net_heartbeat(unsigned long arg)
+{
+ dnbd3_device_t *dev = (dnbd3_device_t *) arg;
+ struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC);
- // move already send requests to request_queue_send again
- if (!list_empty(&dev->request_queue_receive))
+ if (req)
{
- printk("WARN: Request queue was not empty on %s\n", dev->disk->disk_name);
- spin_lock_irq(&dev->blk_lock);
- list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_receive, queuelist)
+ req->cmd_type = REQ_TYPE_SPECIAL;
+ req->cmd_flags = REQ_GET_SERVERS;
+ list_add_tail(&req->queuelist, &dev->request_queue_send);
+ wake_up(&dev->process_queue_send);
+ }
+
+ dev->hb_timer.expires = jiffies + HB_INTERVAL;
+ add_timer(&dev->hb_timer);
+}
+
+int dnbd3_net_discover(void *data)
+{
+ dnbd3_device_t *dev = data;
+ struct sockaddr_in sin;
+ struct socket *sock;
+
+ dnbd3_request_t dnbd3_request;
+ dnbd3_reply_t dnbd3_reply;
+ struct msghdr msg;
+ struct kvec iov;
+
+ uint64_t filesize;
+ char *buf;
+ char ip[16];
+
+ struct timeval start, end;
+ uint64_t t1, t2 = 0;
+ int a, i, num = 0;
+
+ struct timeval timeout;
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+
+ init_msghdr(msg);
+
+ buf = kmalloc(4096, GFP_KERNEL);
+ if (!buf)
+ {
+ printk("ERROR: kmalloc failed");
+ return -1;
+ }
+
+ set_user_nice(current, -20);
+
+ while (!kthread_should_stop())
+ {
+ wait_event_interruptible(dev->process_queue_discover, kthread_should_stop() || dev->num_servers);
+
+ num = dev->num_servers;
+ dev->num_servers = 0;
+
+ if (!num)
+ continue;
+
+ for (i=0; i < num && i < MAX_NUMBER_SERVERS; i++)
{
- list_del_init(&blk_request->queuelist);
- list_add_tail(&blk_request->queuelist, &dev->request_queue_send);
+ // initialize socket
+ if (sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock) < 0)
+ {
+ printk("ERROR: Couldn't create socket.\n");
+ sock = NULL;
+ continue;
+ }
+
+ kernel_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &timeout, sizeof(timeout));
+
+ inet_ntoa(dev->servers[i], ip);
+ sock->sk->sk_allocation = GFP_NOIO;
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = inet_addr(ip);
+ sin.sin_port = htons(simple_strtol(dev->port, NULL, 10));
+ if (kernel_connect(sock, (struct sockaddr *) &sin, sizeof(sin), 0) < 0)
+ {
+ printk("ERROR: Couldn't connect to host %s:%s\n", ip, dev->port);
+ sock = NULL;
+ continue;
+ }
+
+ // Request filesize
+ dnbd3_request.cmd = CMD_GET_SIZE;
+ dnbd3_request.vid = dev->vid;
+ dnbd3_request.rid = dev->rid;
+ // send net request
+ iov.iov_base = &dnbd3_request;
+ iov.iov_len = sizeof(dnbd3_request);
+ if (kernel_sendmsg(sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0)
+ {
+ printk("ERROR: kernel_sendmsg (discover)\n");
+ sock_release(sock);
+ sock = NULL;
+ continue;
+ }
+ // receive net replay
+ iov.iov_base = &dnbd3_reply;
+ iov.iov_len = sizeof(dnbd3_reply);
+ kernel_recvmsg(sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags);
+ // receive data
+ iov.iov_base = &filesize;
+ iov.iov_len = sizeof(uint64_t);
+ kernel_recvmsg(sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags);
+
+ do_gettimeofday(&start);
+
+ // Request block
+ dnbd3_request.cmd = CMD_GET_BLOCK;
+ dnbd3_request.offset = 0;
+ dnbd3_request.size = 4096;
+ // send net request
+ iov.iov_base = &dnbd3_request;
+ iov.iov_len = sizeof(dnbd3_request);
+ if (kernel_sendmsg(sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0)
+ {
+ printk("ERROR: kernel_sendmsg (discover)\n");
+ sock_release(sock);
+ sock = NULL;
+ continue;
+ }
+ // receive net replay
+ iov.iov_base = &dnbd3_reply;
+ iov.iov_len = sizeof(dnbd3_reply);
+ kernel_recvmsg(sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags);
+ // receive data
+ iov.iov_base = buf;
+ iov.iov_len = 4096;
+ a = kernel_recvmsg(sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags);
+
+ do_gettimeofday(&end);
+
+ // clear socket
+ if (sock)
+ {
+ sock_release(sock);
+ sock = NULL;
+ }
+
+ t1 = (start.tv_sec*1000000ull) + start.tv_usec;
+ t2 = (end.tv_sec*1000000ull) + end.tv_usec;
+ printk("DEBUG: Server: %s RTT: %llums received bytes: %i\n", ip,t2 - t1, a);
}
- spin_unlock_irq(&dev->blk_lock);
}
+ kfree(buf);
+ return 0;
}
int dnbd3_net_send(void *data)
@@ -134,6 +281,8 @@ int dnbd3_net_send(void *data)
init_msghdr(msg);
set_user_nice(current, -20);
+ // TODO: check if socket error
+
while (!kthread_should_stop() || !list_empty(&dev->request_queue_send))
{
wait_event_interruptible(dev->process_queue_send,
@@ -212,6 +361,8 @@ int dnbd3_net_receive(void *data)
init_msghdr(msg);
set_user_nice(current, -20);
+ // TODO: check if socket error
+
while (!kthread_should_stop() || !list_empty(&dev->request_queue_receive))
{
wait_event_interruptible(dev->process_queue_receive,
@@ -302,6 +453,8 @@ int dnbd3_net_receive(void *data)
kernel_recvmsg(dev->sock, &msg, &iov, 1, size, msg.msg_flags);
}
kfree(blk_request);
+// if (dev->num_servers > 1)
+// wake_up(&dev->process_queue_discover);
continue;
default:
@@ -312,18 +465,3 @@ int dnbd3_net_receive(void *data)
}
return 0;
}
-
-void dnbd3_net_heartbeat(unsigned long arg)
-{
- dnbd3_device_t *dev = (dnbd3_device_t *) arg;
- struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC);
- if (req)
- {
- req->cmd_type = REQ_TYPE_SPECIAL;
- req->cmd_flags = REQ_GET_SERVERS;
- list_add(&req->queuelist, &dev->request_queue_send);
- wake_up(&dev->process_queue_send);
- }
- dev->hb_timer.expires = jiffies + HB_INTERVAL;
- add_timer(&dev->hb_timer);
-}
diff --git a/src/kernel/net.h b/src/kernel/net.h
index ac5d8a4..5516b8f 100644
--- a/src/kernel/net.h
+++ b/src/kernel/net.h
@@ -40,4 +40,6 @@ int dnbd3_net_receive(void *data);
void dnbd3_net_heartbeat(unsigned long arg);
+int dnbd3_net_discover(void *data);
+
#endif /* NET_H_ */
diff --git a/src/server/server.c b/src/server/server.c
index 6fdedf0..bc37019 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -185,6 +185,7 @@ int main(int argc, char* argv[])
_dnbd3_clients = g_slist_append(_dnbd3_clients, dnbd3_client);
pthread_create(&(thread), NULL, dnbd3_handle_query, (void *) (uintptr_t) dnbd3_client);
+ pthread_detach(thread);
}
dnbd3_cleanup();