From cbf9d922fdbe19e65e8710dd72d6832e35144fc5 Mon Sep 17 00:00:00 2001 From: sr Date: Sat, 25 Aug 2012 18:59:12 +0200 Subject: [KERNEL] Make it possible to receive push messages by the server --- src/client/client.c | 8 +++++--- src/kernel/blk.c | 42 ++++++++++++++++++++++++++++----------- src/kernel/net.c | 57 +++++++++++++++++++++++++++++++++++------------------ src/serialize.c | 2 +- src/server/net.c | 11 ++++++----- 5 files changed, 80 insertions(+), 40 deletions(-) diff --git a/src/client/client.c b/src/client/client.c index 86b7cb9..c3318f4 100644 --- a/src/client/client.c +++ b/src/client/client.c @@ -95,6 +95,7 @@ int main(int argc, char *argv[]) msg.read_ahead_kb = DEFAULT_READ_AHEAD_KB; msg.port = htons(PORT); msg.addrtype = 0; + msg.imgname = NULL; int opt = 0; int longIndex = 0; @@ -166,7 +167,7 @@ int main(int argc, char *argv[]) fd = open(dev, O_WRONLY); printf("INFO: Closing device %s\n", dev); - const int ret = ioctl(fd, IOCTL_OPEN, &msg); + const int ret = ioctl(fd, IOCTL_CLOSE, &msg); if (ret < 0) { printf("ERROR: ioctl not successful (close, errcode: %d)\n", ret); @@ -183,9 +184,10 @@ int main(int argc, char *argv[]) fd = open(dev, O_WRONLY); printf("INFO: Switching device %s to %s\n", dev, ""); - if (ioctl(fd, IOCTL_SWITCH, &msg) < 0) + const int ret = ioctl(fd, IOCTL_SWITCH, &msg); + if (ret < 0) { - printf("ERROR: ioctl not successful (switch)\n"); + printf("ERROR: ioctl not successful (switch, errcode: %d)\n", ret); exit(EXIT_FAILURE); } diff --git a/src/kernel/blk.c b/src/kernel/blk.c index b0b0912..f8ef4fd 100644 --- a/src/kernel/blk.c +++ b/src/kernel/blk.c @@ -159,9 +159,10 @@ int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int cmd, u break; case IOCTL_CLOSE: - set_capacity(dev->disk, 0); + dnbd3_blk_fail_all_requests(dev); result = dnbd3_net_disconnect(dev); dnbd3_blk_fail_all_requests(dev); + set_capacity(dev->disk, 0); if (dev->imgname) { kfree(dev->imgname); @@ -240,24 +241,41 @@ void dnbd3_blk_fail_all_requests(dnbd3_device_t *dev) int dup; INIT_LIST_HEAD(&local_copy); spin_lock_irq(&dev->blk_lock); - list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_receive, queuelist) + while (!list_empty(&dev->request_queue_receive)) { - list_del_init(&blk_request->queuelist); - list_add(&blk_request->queuelist, &local_copy); + list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_receive, queuelist) + { + list_del_init(&blk_request->queuelist); + dup = 0; + list_for_each_entry_safe(blk_request2, tmp_request2, &local_copy, queuelist) + { + if (blk_request == blk_request2) + { + printk("WARNING: Request is in both lists!\n"); + dup = 1; + break; + } + } + if (!dup) list_add(&blk_request->queuelist, &local_copy); + } } - list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_send, queuelist) + while (!list_empty(&dev->request_queue_send)) { - list_del_init(&blk_request->queuelist); - dup = 0; - list_for_each_entry_safe(blk_request2, tmp_request2, &local_copy, queuelist) + list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_send, queuelist) { - if (blk_request == blk_request2) + list_del_init(&blk_request->queuelist); + dup = 0; + list_for_each_entry_safe(blk_request2, tmp_request2, &local_copy, queuelist) { - printk("WARNING: Request is in both lists!\n"); - dup = 1; + if (blk_request == blk_request2) + { + printk("WARNING: Request is in both lists!\n"); + dup = 1; + break; + } } + if (!dup) list_add(&blk_request->queuelist, &local_copy); } - if (!dup) list_add(&blk_request->queuelist, &local_copy); } spin_unlock_irq(&dev->blk_lock); list_for_each_entry_safe(blk_request, tmp_request, &local_copy, queuelist) diff --git a/src/kernel/net.c b/src/kernel/net.c index 41f3e5f..9ea9169 100644 --- a/src/kernel/net.c +++ b/src/kernel/net.c @@ -26,7 +26,7 @@ #include #ifndef MIN -#define MIN(a,b) (a < b ? a : b) +#define MIN(a,b) ((a) < (b) ? (a) : (b)) #endif int dnbd3_net_connect(dnbd3_device_t *dev) @@ -695,7 +695,10 @@ int dnbd3_net_send(void *data) iov.iov_base = &dnbd3_request; iov.iov_len = sizeof(dnbd3_request); if (kernel_sendmsg(dev->sock, &msg, &iov, 1, sizeof(dnbd3_request)) != sizeof(dnbd3_request)) + { + printk("Couldn't properly send a request header.\n"); goto error; + } wake_up(&dev->process_queue_receive); } @@ -730,27 +733,34 @@ int dnbd3_net_receive(void *data) unsigned long flags; sigset_t blocked, oldset; - int count, remaining; + int count, remaining, ret; init_msghdr(msg); set_user_nice(current, -20); - for (;;) + while (!kthread_should_stop()) { - wait_event_interruptible(dev->process_queue_receive, - kthread_should_stop() || !list_empty(&dev->request_queue_receive)); - - if (kthread_should_stop()) - break; - - if (list_empty(&dev->request_queue_receive)) - continue; - // receive net reply iov.iov_base = &dnbd3_reply; iov.iov_len = sizeof(dnbd3_reply); - if (kernel_recvmsg(dev->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags) != sizeof(dnbd3_reply)) + ret = kernel_recvmsg(dev->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags); + if (ret == -EAGAIN) + { + msleep_interruptible(2000); // Sleep at most 2 seconds, then check if we can receive something + // If a request for a block was sent, the thread is waken up immediately, so that we don't wait 2 seconds for the reply + // This change was made to allow unrequested information from the server to be received (push) + continue; + } + if (ret <= 0) + { + printk("Connection closed (%d).\n", ret); + goto error; + } + if (ret != sizeof(dnbd3_reply)) + { + printk("ERROR: Recv msg header\n"); goto error; + } fixup_reply(dnbd3_reply); // check error @@ -797,8 +807,9 @@ int dnbd3_net_receive(void *data) kaddr = kmap(bvec->bv_page) + bvec->bv_offset; iov.iov_base = kaddr; iov.iov_len = bvec->bv_len; - if (kernel_recvmsg(dev->sock, &msg, &iov, 1, bvec->bv_len, msg.msg_flags) <= 0) + if (kernel_recvmsg(dev->sock, &msg, &iov, 1, bvec->bv_len, msg.msg_flags) != bvec->bv_len) { + printk("ERROR: Receiving from net to block layer\n"); kunmap(bvec->bv_page); goto error; } @@ -822,23 +833,30 @@ int dnbd3_net_receive(void *data) { iov.iov_base = dev->new_servers; iov.iov_len = count * sizeof(dnbd3_server_entry_t); - if (kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags) != iov.iov_len) + if (kernel_recvmsg(dev->sock, &msg, &iov, 1, (count * sizeof(dnbd3_server_entry_t)), msg.msg_flags) != (count * sizeof(dnbd3_server_entry_t))) + { + printk("ERROR: Recv CMD_GET_SERVERS payload.\n"); goto error; + } spin_lock_irq(&dev->blk_lock); dev->new_servers_num = count; spin_unlock_irq(&dev->blk_lock); // TODO: Re-Add update check } // If there were more servers than accepted, remove the remaining data from the socket buffer - remaining = dnbd3_reply.size - count * sizeof(dnbd3_server_entry_t); + remaining = dnbd3_reply.size - (count * sizeof(dnbd3_server_entry_t)); while (remaining > 0) { count = MIN(sizeof(dnbd3_reply), remaining); iov.iov_base = &dnbd3_reply; iov.iov_len = count; - if (kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags) <= 0) + ret = kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags); + if (ret <= 0) + { + printk("ERROR: Recv additional payload from CMD_GET_SERVERS.\n"); goto error; - remaining -= count; + } + remaining -= ret; } continue; @@ -849,12 +867,13 @@ int dnbd3_net_receive(void *data) } } + printk("dnbd3_net_receive terminated normally.\n"); return 0; error: printk("ERROR: Connection to server %pI4 : %d lost (receive)\n", dev->cur_server.hostaddr, (int)ntohs(dev->cur_server.port)); // move already send requests to request_queue_send again - if (!list_empty(&dev->request_queue_receive)) + while (!list_empty(&dev->request_queue_receive)) { printk("WARN: Request queue was not empty on %s\n", dev->disk->disk_name); spin_lock_irq(&dev->blk_lock); diff --git a/src/serialize.c b/src/serialize.c index fa1e878..ce27c8d 100644 --- a/src/serialize.c +++ b/src/serialize.c @@ -2,7 +2,7 @@ #include "types.h" #ifndef MIN -#define MIN(a,b) (a < b ? a : b) +#define MIN(a,b) ((a) < (b) ? (a) : (b)) #endif void serializer_reset_read(serialized_buffer_t *buffer, size_t data_len) diff --git a/src/server/net.c b/src/server/net.c index bf680a3..a60b739 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -41,10 +41,11 @@ static char recv_request_header(int sock, dnbd3_request_t *request) { - // Read request heade from socket - if (recv(sock, request, sizeof(*request), MSG_WAITALL) != sizeof(*request)) + int ret; + // Read request header from socket + if ((ret = recv(sock, request, sizeof(*request), MSG_WAITALL)) != sizeof(*request)) { - printf("[DEBUG] Error receiving request: Could not read message header\n"); + printf("[DEBUG] Error receiving request: Could not read message header (%d)\n", ret); return 0; } // Make sure all bytes are in the right order (endianness) @@ -55,7 +56,7 @@ static char recv_request_header(int sock, dnbd3_request_t *request) return 0; } // Payload sanity check - if (request->size > MAX_PAYLOAD) + if (request->cmd != CMD_GET_BLOCK && request->size > MAX_PAYLOAD) { memlogf("[WARNING] Client tries to send a packet of type %d with %d bytes payload. Dropping client.", (int)request->cmd, (int)request->size); return 0; @@ -152,7 +153,6 @@ void *dnbd3_handle_query(void *dnbd3_client) } else { - printf("Payload len: %d\n", (int)request.size); if (recv_request_payload(client->sock, request.size, &payload)) { client_version = serializer_get_uint16(&payload); @@ -179,6 +179,7 @@ void *dnbd3_handle_query(void *dnbd3_client) } else { + serializer_reset_write(&payload); serializer_put_uint16(&payload, PROTOCOL_VERSION); serializer_put_string(&payload, image->low_name); serializer_put_uint16(&payload, image->rid); -- cgit v1.2.3-55-g7522