summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFrederic Robra2019-07-11 12:57:26 +0200
committerFrederic Robra2019-07-11 12:57:26 +0200
commit3cc08a2ab0bec07a69cf0a745154301178333474 (patch)
tree2118deb3d955c2a1e8a4a4a44e53bec41f5c3b25
parentadded receive worker (diff)
downloaddnbd3-ng-3cc08a2ab0bec07a69cf0a745154301178333474.tar.gz
dnbd3-ng-3cc08a2ab0bec07a69cf0a745154301178333474.tar.xz
dnbd3-ng-3cc08a2ab0bec07a69cf0a745154301178333474.zip
receiver worker
-rw-r--r--src/kernel/core.c9
-rw-r--r--src/kernel/dnbd3.h22
-rw-r--r--src/kernel/net.c211
3 files changed, 100 insertions, 142 deletions
diff --git a/src/kernel/core.c b/src/kernel/core.c
index f4b7204..7e985f7 100644
--- a/src/kernel/core.c
+++ b/src/kernel/core.c
@@ -192,17 +192,12 @@ static void dnbd3_blk_fail_all_requests(dnbd3_device *dev)
static int dnbd3_ioctl(struct block_device *bdev, fmode_t mode, unsigned int cmd, unsigned long arg)
{
- int result = -100;
+ int result = -EIO;
dnbd3_device *dev = bdev->bd_disk->private_data;
char *imgname = NULL;
dnbd3_ioctl_t *msg = NULL;
printk(KERN_DEBUG "dnbd3: ioctl device %i, cmd %i, arg %lu\n", dev->minor, cmd, arg);
- //unsigned long irqflags;
-
-// while (dev->disconnecting) {
-// // do nothing
-// }
if (arg != 0) {
msg = kmalloc(sizeof(*msg), GFP_KERNEL);
@@ -271,7 +266,7 @@ static int dnbd3_ioctl(struct block_device *bdev, fmode_t mode, unsigned int cmd
case IOCTL_CLOSE:
printk(KERN_DEBUG "dnbd3: ioctl close\n");
dnbd3_blk_fail_all_requests(dev);
- dnbd3_net_disconnect(dev);
+ result = dnbd3_net_disconnect(dev);
dnbd3_blk_fail_all_requests(dev);
set_capacity(dev->disk, 0);
if (dev->imgname) {
diff --git a/src/kernel/dnbd3.h b/src/kernel/dnbd3.h
index 4e124e0..33f707d 100644
--- a/src/kernel/dnbd3.h
+++ b/src/kernel/dnbd3.h
@@ -72,41 +72,19 @@ typedef struct dnbd3_device {
struct mutex device_lock;
// network
- uint8_t socks_active;
dnbd3_sock socks[NUMBER_CONNECTIONS];
char *imgname;
-// struct socket *sock;
-// struct mutex socket_lock;
-// struct request *pending;
dnbd3_server initial_server;
-// dnbd3_server_t cur_server, initial_server;
-// uint64_t cur_rtt;
-// serialized_buffer_t payload_buffer;
dnbd3_server alt_servers[NUMBER_SERVERS]; // array of alt servers
int new_servers_num; // number of new alt servers that are waiting to be copied to above array
dnbd3_server_entry_t new_servers[NUMBER_SERVERS]; // pending new alt servers
-// uint8_t discover, panic, disconnecting, update_available, panic_count;
uint8_t update_available;
uint8_t use_server_provided_alts;
uint16_t rid;
-// uint32_t heartbeat_count;
uint64_t reported_size;
- // server switch
-// struct socket *better_sock;
struct work_struct discovery; // if in irq and need to send request
struct timer_list discovery_timer;
- // process
-// struct task_struct * thread_send;
-// struct task_struct * thread_receive;
-// struct task_struct *thread_discover;
-// struct timer_list hb_timer;
-// wait_queue_head_t process_queue_send;
-// wait_queue_head_t process_queue_receive;
-// wait_queue_head_t process_queue_discover;
-// struct list_head request_queue_send;
-// struct list_head request_queue_receive;
-
} dnbd3_device;
diff --git a/src/kernel/net.c b/src/kernel/net.c
index 0721803..5d3e344 100644
--- a/src/kernel/net.c
+++ b/src/kernel/net.c
@@ -38,6 +38,7 @@
#define KEEPALIVE_TIMER (jiffies + (HZ * TIMER_INTERVAL_KEEPALIVE_PACKET))
#define DISCOVERY_TIMER (jiffies + (HZ * TIMER_INTERVAL_PROBE_NORMAL))
+#define REQUEST_TIMEOUT (jiffies + (HZ * SOCKET_TIMEOUT_CLIENT_DATA))
#define init_msghdr(h) do { \
h.msg_name = NULL; \
@@ -53,7 +54,7 @@ static uint64_t send_wq_handle;
static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server);
static int dnbd3_socket_disconnect(dnbd3_device *dev, dnbd3_server *server, dnbd3_sock *sock);
-static void print_host(struct dnbd3_host_t *host, char *msg)
+static void dnbd3_print_host(struct dnbd3_host_t *host, char *msg)
{
if (host->type == HOST_IP4) {
printk(KERN_INFO "dnbd3: %s %pI4:%d\n", msg, host->addr, host->port);
@@ -62,26 +63,26 @@ static void print_host(struct dnbd3_host_t *host, char *msg)
}
}
-static void print_server_list(struct dnbd3_device *dev)
+static void dnbd3_print_server_list(struct dnbd3_device *dev)
{
int i;
- print_host(&dev->initial_server.host, "initial server is");
+ dnbd3_print_host(&dev->initial_server.host, "initial server is");
for (i = 0; i < NUMBER_SERVERS; i++) {
if (dev->alt_servers[i].host.addr[0] != 0) {
- print_host(&dev->alt_servers[i].host, "alternative server is");
+ dnbd3_print_host(&dev->alt_servers[i].host, "alternative server is");
}
}
}
-static uint64_t to_handle(uint32_t arg0, uint32_t arg1) {
+static uint64_t dnbd3_to_handle(uint32_t arg0, uint32_t arg1) {
return ((uint64_t) arg0 << 32) | arg1;
}
-static uint32_t arg0_from_handle(uint64_t handle) {
+static uint32_t dnbd3_arg0_from_handle(uint64_t handle) {
return (uint32_t)(handle >> 32);
}
-static uint32_t arg1_from_handle(uint64_t handle) {
+static uint32_t dnbd3_arg1_from_handle(uint64_t handle) {
return (uint32_t) handle;
}
@@ -132,14 +133,12 @@ int dnbd3_send_request(struct dnbd3_sock *sock, struct request *req, struct dnbd
if (cmd != NULL) {
cmd->cookie = sock->cookie;
tag = blk_mq_unique_tag(req);
- handle = ((uint64_t) tag << 32) | sock->cookie;
+ handle = dnbd3_to_handle(tag, sock->cookie);// ((uint64_t) tag << 32) | sock->cookie;
} else {
handle = sock->cookie;
}
memcpy(&dnbd3_request.handle, &handle, sizeof(handle));
- printk(KERN_DEBUG "dnbd3: request handle is %llu\n", dnbd3_request.handle);
-// dnbd3_request.handle = (uint64_t)(uintptr_t)req; // Double cast to prevent warning on 32bit
fixup_request(dnbd3_request);
iov[0].iov_base = &dnbd3_request;
iov[0].iov_len = sizeof(dnbd3_request);
@@ -188,12 +187,16 @@ int dnbd3_send_request_blocking(struct dnbd3_sock *sock, int dnbd3_cmd)
goto error;
}
send_wq_handle = 0;
- handle = to_handle(sock->device->minor, dnbd3_cmd);
+ handle = dnbd3_to_handle(sock->device->minor, dnbd3_cmd);
mutex_unlock(&sock->lock);
printk(KERN_DEBUG "dnbd3: blocking request going to sleep wait for handle %llu\n", handle);
- wait_event_interruptible(send_wq, handle == send_wq_handle);
+ if (wait_event_interruptible_timeout(send_wq, handle == send_wq_handle, REQUEST_TIMEOUT) <= 0) { // timeout or interrupt
+ printk(KERN_WARNING "dndbd3: request timed out\n");
+ result = -EIO;
+ goto error;
+ }
printk(KERN_DEBUG "dnbd3: blocking request woke up with handle %llu\n", handle);
@@ -204,7 +207,7 @@ error:
return result;
}
-static void dnbd3_receive_work(struct work_struct *work)
+static void dnbd3_receive_worker(struct work_struct *work)
{
struct dnbd3_sock *sock = container_of(work, struct dnbd3_sock, receive);
struct dnbd3_device *dev = sock->device;
@@ -219,21 +222,21 @@ static void dnbd3_receive_work(struct work_struct *work)
sigset_t blocked, oldset;
void *kaddr;
uint32_t tag, cookie;
- uint16_t hwq;
+ uint16_t hwq, rid;
int result, count, remaining;
- uint16_t rid;
uint64_t reported_size, handle;
char *name;
serialized_buffer_t payload_buffer;
init_msghdr(msg);
- while(sock->sock) {
+ while(sock->sock && sock->server) {
iov.iov_base = &dnbd3_reply;
iov.iov_len = sizeof(dnbd3_reply);
result = kernel_recvmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags);
- if (!result) {
- printk(KERN_ERR "dnbd3: connection to server lost\n");
- result = -EIO;
+ if (result == -EAGAIN) {
+ continue;
+ } else if (result <= 0) {
+ printk(KERN_ERR "dnbd3: connection to server lost %d\n", result);
goto error;
}
@@ -258,8 +261,8 @@ static void dnbd3_receive_work(struct work_struct *work)
case CMD_GET_BLOCK:
printk(KERN_DEBUG "dnbd3: handle is %llu\n", dnbd3_reply.handle);
memcpy(&handle, &dnbd3_reply.handle, sizeof(handle));
- cookie = (uint32_t) handle;
- tag = (uint32_t)(handle >> 32);
+ cookie = dnbd3_arg1_from_handle(handle);
+ tag = dnbd3_arg0_from_handle(handle);
hwq = blk_mq_unique_tag_to_hwq(tag);
if (hwq < dev->tag_set.nr_hw_queues)
@@ -405,22 +408,21 @@ consume_payload:
break;
}
error:
- handle = to_handle(dev->minor, dnbd3_reply.cmd);
+ handle = dnbd3_to_handle(dev->minor, dnbd3_reply.cmd);
printk(KERN_DEBUG "dnbd3: try to wake up queue with handle %llu\n", handle);
send_wq_handle = handle;
wake_up_interruptible(&send_wq);
if (result) {
- printk(KERN_DEBUG "dnbd3: receive error happened %d\n", result);
- break; //TODO for now need to handle errors
+ sock->server->failures++; // discovery takes care of to many failures
+ printk(KERN_WARNING "dnbd3: receive error happened %d, total failures %d\n", result, sock->server->failures);
}
printk(KERN_DEBUG "dnbd3: receive completed, waiting for next receive\n");
}
printk(KERN_DEBUG "dnbd3: receive work queue is stopped\n");
- dnbd3_socket_disconnect(dev, sock->server, sock);
}
-void dnbd3_keepalive(struct timer_list *arg)
+void dnbd3_keepalive_timer(struct timer_list *arg)
{
struct dnbd3_sock *sock = container_of(arg, struct dnbd3_sock, keepalive_timer);
queue_work(dnbd3_wq, &sock->keepalive);
@@ -428,25 +430,15 @@ void dnbd3_keepalive(struct timer_list *arg)
add_timer(&sock->keepalive_timer);
}
-static void keepalive(struct work_struct *work)
+static void dnbd3_keepalive_worker(struct work_struct *work)
{
struct dnbd3_sock *sock = container_of(work, struct dnbd3_sock, keepalive);
-// struct request *req;
printk(KERN_DEBUG "dnbd3: starting keepalive worker\n");
-// mutex_lock(&sock->lock);
-// req = kmalloc(sizeof(struct request), GFP_ATOMIC );
- // send keepalive
-// if (req) {
- dnbd3_send_request_blocking(sock, CMD_KEEPALIVE);
-//// kfree(req);
-// } else {
-// printk(KERN_WARNING "dnbd3: could not create keepalive request\n");
-// }
- ++sock->heartbeat_count;
-// mutex_unlock(&sock->lock);
+ dnbd3_send_request_blocking(sock, CMD_KEEPALIVE);
+ sock->heartbeat_count++;
}
-void dnbd3_discovery(struct timer_list *arg)
+static void dnbd3_discovery_timer(struct timer_list *arg)
{
struct dnbd3_device *dev = container_of(arg, struct dnbd3_device, discovery_timer);
queue_work(dnbd3_wq, &dev->discovery);
@@ -454,28 +446,16 @@ void dnbd3_discovery(struct timer_list *arg)
add_timer(&dev->discovery_timer);
}
-static void discovery(struct work_struct *work)
+static void dnbd3_discovery_worker(struct work_struct *work)
{
struct dnbd3_device *dev = container_of(work, struct dnbd3_device, discovery);
dnbd3_sock *sock = &dev->socks[0]; // we use the first sock for discovery
-// struct request *req;
int i, j;
struct dnbd3_server *existing_server, *free_server, *failed_server;
dnbd3_server_entry_t *new_server;
printk(KERN_DEBUG "dnbd3: starting discovery worker\n");
-// mutex_lock(&sock->lock);
-// req = kmalloc(sizeof(struct request), GFP_ATOMIC );
-// // send keepalive
-// if (req) {
-// dnbd3_cmd_to_priv(req, CMD_GET_SERVERS);
- dnbd3_send_request_blocking(sock, CMD_GET_SERVERS);
-// kfree(req);
-// } else {
-// printk(KERN_WARNING "dnbd3: could not create get servers request\n");
-// }
-// mutex_unlock(&sock->lock);
-
- //TODO wait until something is received
+
+ dnbd3_send_request_blocking(sock, CMD_GET_SERVERS);
printk(KERN_DEBUG "dnbd3: new server num is %d\n", dev->new_servers_num);
if (dev->new_servers_num) {
@@ -506,7 +486,7 @@ static void discovery(struct work_struct *work)
if (existing_server) {
if (new_server->failures == 1) { // remove is requested
- print_host(&existing_server->host, "remove server");
+ dnbd3_print_host(&existing_server->host, "remove server");
dnbd3_socket_disconnect(dev, existing_server, NULL); // TODO what to do when only one connection?
existing_server->host.type = 0;
}
@@ -522,7 +502,7 @@ static void discovery(struct work_struct *work)
//no server found to replace
continue;
}
- print_host(&free_server->host, "got new alt server");
+ dnbd3_print_host(&free_server->host, "got new alt server");
free_server->failures = 0;
free_server->protocol_version = 0;
free_server->rtts[0] = free_server->rtts[1] = free_server->rtts[2] = free_server->rtts[3] = RTT_UNREACHABLE;
@@ -538,11 +518,10 @@ static void discovery(struct work_struct *work)
}
}
-static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock)
+static int __dnbd3_socket_connect(struct dnbd3_server * server, struct dnbd3_sock *sock)
{
+ int result = 0;
struct timeval timeout;
- mutex_init(&sock->lock);
- mutex_lock(&sock->lock);
if (server->host.port == 0 || server->host.type == 0) {
printk(KERN_ERR "dnbd3: host or port not set\n");
@@ -553,10 +532,10 @@ static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock)
goto error;
}
- timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA;
+ timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA + 2;
timeout.tv_usec = 0;
- if (dnbd3_sock_create(server->host.type, SOCK_STREAM, IPPROTO_TCP, &sock->sock) < 0) {
+ if ((result = dnbd3_sock_create(server->host.type, SOCK_STREAM, IPPROTO_TCP, &sock->sock)) < 0) {
printk(KERN_ERR "dnbd3: could not create socket\n");
goto error;
}
@@ -570,7 +549,7 @@ static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock)
sin.sin_family = AF_INET;
memcpy(&(sin.sin_addr), server->host.addr, 4);
sin.sin_port = server->host.port;
- if (kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0) != 0) {
+ if ((result = kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0)) != 0) {
printk(KERN_ERR "dnbd3: connection to host failed (ipv4)\n");
goto error;
}
@@ -580,12 +559,11 @@ static int __dnbd3_socket_connect(dnbd3_server * server, dnbd3_sock *sock)
sin.sin6_family = AF_INET6;
memcpy(&(sin.sin6_addr), server->host.addr, 16);
sin.sin6_port = server->host.port;
- if (kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0) != 0){
+ if ((result = kernel_connect(sock->sock, (struct sockaddr *)&sin, sizeof(sin), 0)) != 0){
printk(KERN_ERR "dnbd3: connection to host failed (ipv6)\n");
goto error;
}
}
- mutex_unlock(&sock->lock);
return 0;
error:
@@ -593,9 +571,7 @@ error:
sock_release(sock->sock);
sock->sock = NULL;
}
- mutex_unlock(&sock->lock);
- mutex_destroy(&sock->lock);
- return -EIO;
+ return result;
}
static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server)
@@ -603,7 +579,6 @@ static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server)
int i;
int result = -EIO;
struct dnbd3_sock *sock = NULL;
- struct request *req = NULL;
for (i = 0; i < NUMBER_CONNECTIONS; i++) {
if (!dev->socks[i].sock) {
sock = &dev->socks[i];
@@ -618,40 +593,37 @@ static int dnbd3_socket_connect(dnbd3_device *dev, dnbd3_server *server)
printk(KERN_DEBUG "dnbd3: socket connect device %i\n", dev->minor);
- __dnbd3_socket_connect(server, sock);
-// mutex_lock(&sock->lock);
-// req = kmalloc(sizeof(*req), GFP_ATOMIC );
-// if (!req) {
-// printk(KERN_ERR "dnbd3: kmalloc failed\n");
-// goto error;
-// }
+ mutex_init(&sock->lock);
+ mutex_lock(&sock->lock);
+ __dnbd3_socket_connect(server, sock);
+ mutex_unlock(&sock->lock);
+ if (!sock->sock) {
+ printk(KERN_DEBUG "dnbd3: socket is not connected\n");
+ result = -EIO;
+ goto error;
+ }
// start the receiver
- INIT_WORK(&sock->receive, dnbd3_receive_work);
+ INIT_WORK(&sock->receive, dnbd3_receive_worker);
queue_work(dnbd3_wq, &sock->receive);
-// dnbd3_connect(req);
result = dnbd3_send_request_blocking(sock, CMD_SELECT_IMAGE);
if (result) {
printk(KERN_ERR "dnbd3: connection to image %s failed\n", dev->imgname);
goto error;
}
-// mutex_unlock(&sock->lock);
-
- //TODO wait until connected
printk(KERN_DEBUG "dnbd3: connected to image %s, filesize %llu\n", dev->imgname, dev->reported_size);
// add heartbeat timer and scheduler for the command
- INIT_WORK(&sock->keepalive, keepalive);
+ INIT_WORK(&sock->keepalive, dnbd3_keepalive_worker);
sock->heartbeat_count = 0;
- timer_setup(&sock->keepalive_timer, dnbd3_keepalive, 0);
+ timer_setup(&sock->keepalive_timer, dnbd3_keepalive_timer, 0);
sock->keepalive_timer.expires = KEEPALIVE_TIMER;
add_timer(&sock->keepalive_timer);
- mutex_unlock(&sock->lock);
// kfree(req);
return 0;
@@ -660,56 +632,69 @@ error:
sock_release(sock->sock);
sock->sock = NULL;
}
- if (req) {
- kfree(req);
- }
- mutex_unlock(&sock->lock);
return result;
}
-static int dnbd3_socket_disconnect(dnbd3_device *dev, dnbd3_server *server, dnbd3_sock *sock)
+static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock)
{
int i;
- if (sock == NULL) {
- for (i = 0; i < NUMBER_CONNECTIONS; i++) {
- if (dev->socks[i].server == server) {
- sock = &dev->socks[i];
- break;
- }
+ int sock_alive = 0;
+ for (i = 0; i < NUMBER_CONNECTIONS; i++) {
+ if (sock == NULL && dev->socks[i].server == server) {
+ sock = &dev->socks[i];
}
- if (!sock) {
- printk(KERN_WARNING "dnbd3: could not find socket to disconnect\n");
- return -EIO;
+ if (dev->socks[i].sock) {
+ sock_alive++;
}
}
- printk(KERN_DEBUG "dnbd3: socket disconnect device %i\n", dev->minor);
- mutex_lock(&sock->lock);
+ if (!sock || !sock->sock) {
+ printk(KERN_WARNING "dnbd3: could not find socket to disconnect\n");
+ return -EIO;
+ }
+ if (sock_alive == 1) {
+ printk(KERN_INFO "dnbd3: shutting down last socket and stopping discovery\n");
+ del_timer_sync(&dev->discovery_timer);
+ cancel_work_sync(&dev->discovery);
- // clear heartbeat timer
+ }
del_timer_sync(&sock->keepalive_timer);
+ cancel_work_sync(&sock->keepalive);
+
+ printk(KERN_DEBUG "dnbd3: socket disconnect device %i\n", dev->minor);
+ mutex_lock(&sock->lock);
+ /*
+ * Important sequence to shut down socket
+ * 1. kernel_sock_shutdown
+ * socket shutdown, receiver which hangs in kernel_recvmsg returns 0
+ * 2. cancel_work_sync(receiver)
+ * wait for the receiver to finish, so the socket is not usesd anymore
+ * 3. sock_release
+ * release the socket and set to NULL
+ */
if (sock->sock) {
kernel_sock_shutdown(sock->sock, SHUT_RDWR);
}
+ mutex_unlock(&sock->lock);
+ mutex_destroy(&sock->lock);
+ sock->server = NULL;
+
+ printk(KERN_DEBUG "dnbd3: cancel receiver work device %i\n", dev->minor);
+ cancel_work_sync(&sock->receive);
- // clear socket
if (sock->sock) {
sock_release(sock->sock);
sock->sock = NULL;
}
-
- mutex_unlock(&sock->lock);
- mutex_destroy(&sock->lock);
- sock->server = NULL;
return 0;
}
int dnbd3_net_disconnect(struct dnbd3_device *dev)
{
int i;
- int result;
- del_timer_sync(&dev->discovery_timer);
+ int result = 0;
+
for (i = 0; i < NUMBER_CONNECTIONS; i++) {
if (dev->socks[i].sock) {
if (dnbd3_socket_disconnect(dev, NULL, &dev->socks[i])) {
@@ -721,15 +706,15 @@ int dnbd3_net_disconnect(struct dnbd3_device *dev)
}
-int dnbd3_net_connect(struct dnbd3_device *dev) {
+int dnbd3_net_connect(struct dnbd3_device *dev)
+{
// TODO decide which socket to connect
int result;
- dev->socks_active = 0;
if (dnbd3_socket_connect(dev, &dev->alt_servers[0]) == 0) {
- print_server_list(dev);
+ dnbd3_print_server_list(dev);
- INIT_WORK(&dev->discovery, discovery);
- timer_setup(&dev->discovery_timer, dnbd3_discovery, 0);
+ INIT_WORK(&dev->discovery, dnbd3_discovery_worker);
+ timer_setup(&dev->discovery_timer, dnbd3_discovery_timer, 0);
dev->discovery_timer.expires = DISCOVERY_TIMER;
add_timer(&dev->discovery_timer);