summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--inc/dnbd3/config/client.h36
-rw-r--r--src/kernel/CMakeLists.txt6
-rw-r--r--src/kernel/Kbuild2
-rw-r--r--src/kernel/blk.c332
-rw-r--r--src/kernel/blk.h97
-rw-r--r--src/kernel/dnbd3_main.h125
-rw-r--r--src/kernel/net.c1384
-rw-r--r--src/kernel/net.h6
-rw-r--r--src/kernel/sysfs.c24
-rw-r--r--src/kernel/utils.c48
-rw-r--r--src/kernel/utils.h30
11 files changed, 918 insertions, 1172 deletions
diff --git a/inc/dnbd3/config/client.h b/inc/dnbd3/config/client.h
index 49d4676..55cf8b3 100644
--- a/inc/dnbd3/config/client.h
+++ b/inc/dnbd3/config/client.h
@@ -4,9 +4,21 @@
// Which is the minimum protocol version the client expects from the server
#define MIN_SUPPORTED_SERVER 2
-// in seconds if not stated otherwise (MS = milliseconds)
-#define SOCKET_TIMEOUT_CLIENT_DATA 2
-#define SOCKET_TIMEOUT_CLIENT_DISCOVERY 1
+// Send keepalive every X seconds
+#define KEEPALIVE_INTERVAL 10
+
+// in seconds if not stated otherwise
+#define SOCKET_TIMEOUT_SEND 2
+
+// Socker receive timeout. Must be higher than keepalive interval, otherwise
+// the connection might be aborted when idle
+#define SOCKET_TIMEOUT_RECV 13
+
+// During discovery, we use very short minimum timeouts (unless in panic mode)
+#define SOCKET_TIMEOUT_DISCOVERY 1
+
+// IO timeout for block layer
+#define BLOCK_LAYER_TIMEOUT 10
#define RTT_THRESHOLD_FACTOR(us) (((us) * 3) / 4) // 3/4 = current to best must be 25% worse
#define RTT_ABSOLUTE_THRESHOLD (80000) // Or 80ms worse
@@ -14,15 +26,19 @@
// This must be a power of two:
#define RTT_BLOCK_SIZE 4096
-#define STARTUP_MODE_DURATION 30
// Interval of several repeating tasks (in seconds)
-#define TIMER_INTERVAL_PROBE_STARTUP 4
-#define TIMER_INTERVAL_PROBE_NORMAL 22
+#define TIMER_INTERVAL_PROBE_STARTUP 2
+#define TIMER_INTERVAL_PROBE_SWITCH 10
#define TIMER_INTERVAL_PROBE_PANIC 2
-#define TIMER_INTERVAL_KEEPALIVE_PACKET 6
-
-// Expect a keepalive response every X seconds
-#define SOCKET_KEEPALIVE_TIMEOUT 8
+#define TIMER_INTERVAL_PROBE_MAX 45
+// How many discover runs after setting up a device should be considered the startup phase
+// during that phase, check all servers, before we start doing it selectively
+// and also don't increase the discover interval during this period
+#define DISCOVER_STARTUP_PHASE_COUNT 6
+// How many servers should be tested at maximum after above
+#define DISCOVER_REDUCED_SERVER_COUNT 3
+// Number of RTT probes to keep in history and average the value over
+#define DISCOVER_HISTORY_SIZE 4
// Number of unsuccessful alt_server probes before read errors are reported to the block layer
// (ALL servers will be probed this many times)
diff --git a/src/kernel/CMakeLists.txt b/src/kernel/CMakeLists.txt
index bc02a7b..6bc61ff 100644
--- a/src/kernel/CMakeLists.txt
+++ b/src/kernel/CMakeLists.txt
@@ -30,13 +30,11 @@ set(KERNEL_MODULE_DNBD3_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/blk.c
${CMAKE_CURRENT_SOURCE_DIR}/dnbd3_main.c
${CMAKE_CURRENT_SOURCE_DIR}/net.c
${CMAKE_CURRENT_SOURCE_DIR}/serialize.c
- ${CMAKE_CURRENT_SOURCE_DIR}/sysfs.c
- ${CMAKE_CURRENT_SOURCE_DIR}/utils.c)
+ ${CMAKE_CURRENT_SOURCE_DIR}/sysfs.c)
set(KERNEL_MODULE_DNBD3_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/blk.h
${CMAKE_CURRENT_SOURCE_DIR}/dnbd3_main.h
${CMAKE_CURRENT_SOURCE_DIR}/net.h
- ${CMAKE_CURRENT_SOURCE_DIR}/sysfs.h
- ${CMAKE_CURRENT_SOURCE_DIR}/utils.h)
+ ${CMAKE_CURRENT_SOURCE_DIR}/sysfs.h)
add_kernel_module(dnbd3 "${KERNEL_BUILD_DIR}"
"${KERNEL_INSTALL_DIR}"
diff --git a/src/kernel/Kbuild b/src/kernel/Kbuild
index 385a5ff..26afa98 100644
--- a/src/kernel/Kbuild
+++ b/src/kernel/Kbuild
@@ -2,4 +2,4 @@
# Linux kernel module dnbd3
obj-$(CONFIG_BLK_DEV_DNBD3) := dnbd3.o
-dnbd3-y += dnbd3_main.o blk.o net.o serialize.o sysfs.o utils.o
+dnbd3-y += dnbd3_main.o blk.o net.o serialize.o sysfs.o
diff --git a/src/kernel/blk.c b/src/kernel/blk.c
index ccaa6c1..5795c03 100644
--- a/src/kernel/blk.c
+++ b/src/kernel/blk.c
@@ -34,25 +34,16 @@ static int dnbd3_close_device(dnbd3_device_t *dev)
if (dev->imgname)
dev_info(dnbd3_device_to_dev(dev), "closing down device.\n");
- /* quickly fail all requests */
- dnbd3_blk_fail_all_requests(dev);
- dev->panic = 0;
- dev->discover = 0;
+ dev->panic = false;
result = dnbd3_net_disconnect(dev);
kfree(dev->imgname);
dev->imgname = NULL;
/* new requests might have been queued up, */
/* but now that imgname is NULL no new ones can show up */
- dnbd3_blk_fail_all_requests(dev);
-#ifdef DNBD3_BLK_MQ
blk_mq_freeze_queue(dev->queue);
-#endif
set_capacity(dev->disk, 0);
-#ifdef DNBD3_BLK_MQ
blk_mq_unfreeze_queue(dev->queue);
-#endif
- dev->reported_size = 0;
return result;
}
@@ -65,8 +56,7 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
#endif
char *imgname = NULL;
dnbd3_ioctl_t *msg = NULL;
- unsigned long irqflags;
- int i = 0;
+ int i = 0, j;
u8 locked = 0;
if (arg != 0) {
@@ -97,7 +87,7 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
switch (cmd) {
case IOCTL_OPEN:
- if (atomic_cmpxchg(&dev->connection_lock, 0, 1) != 0) {
+ if (!dnbd3_flag_get(dev->connection_lock)) {
result = -EBUSY;
break;
}
@@ -121,7 +111,7 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
dev_info(dnbd3_device_to_dev(dev), "opening device.\n");
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 14, 0)
- // set optimal request size for the queue
+ // set optimal request size for the queue to half the read-ahead
blk_queue_io_opt(dev->queue, (msg->read_ahead_kb * 512));
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 15, 0)
// set readahead from optimal request size of the queue
@@ -158,8 +148,7 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
if (dev->alt_servers[i].host.ss_family == 0)
continue; // Empty slot
- dev->cur_server.host = dev->alt_servers[i].host;
- result = dnbd3_net_connect(dev);
+ result = dnbd3_new_connection(dev, &dev->alt_servers[i].host, true);
if (result == 0) {
/* connection established, store index of server and exit loop */
result = i;
@@ -168,7 +157,7 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
}
if (result >= 0) {
- /* probing was successful */
+ /* connection was successful */
dev_dbg(dnbd3_device_to_dev(dev), "server %pISpc is initial server\n",
&dev->cur_server.host);
imgname = NULL; // Prevent kfree at the end
@@ -180,7 +169,7 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
break;
case IOCTL_CLOSE:
- if (atomic_cmpxchg(&dev->connection_lock, 0, 1) != 0) {
+ if (!dnbd3_flag_get(dev->connection_lock)) {
result = -EBUSY;
break;
}
@@ -189,7 +178,7 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
break;
case IOCTL_SWITCH:
- if (atomic_cmpxchg(&dev->connection_lock, 0, 1) != 0) {
+ if (!dnbd3_flag_get(dev->connection_lock)) {
result = -EBUSY;
break;
}
@@ -216,40 +205,12 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
/* specified server is current server, so do not switch */
result = 0;
} else {
- struct sockaddr_storage old_server;
-
dev_info(dnbd3_device_to_dev(dev), "manual server switch to %pISpc\n",
&new_addr);
- /* save current working server */
- /* lock device to get consistent copy of current working server */
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- old_server = dev->cur_server.host;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
-
- /* disconnect old server */
- dnbd3_net_disconnect(dev);
-
- /* connect to new specified server (switching) */
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host = new_addr;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- result = dnbd3_net_connect(dev);
+ result = dnbd3_new_connection(dev, &new_addr, false);
if (result != 0) {
- /* reconnect with old server if switching has failed */
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host = old_server;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- if (dnbd3_net_connect(dev) != 0) {
- /* we couldn't reconnect to the old server */
- /* device is dangling now and needs another SWITCH call */
- dev_warn(
- dnbd3_device_to_dev(dev),
- "switching failed and could not switch back to old server - dangling device\n");
- result = -ECONNABORTED;
- } else {
- /* switching didn't work but we are back to the old server */
- result = -EAGAIN;
- }
+ /* switching didn't work */
+ result = -EAGAIN;
} else {
/* switch succeeded */
/* fake RTT so we don't switch away again soon */
@@ -257,12 +218,13 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
for (i = 0; i < NUMBER_SERVERS; ++i) {
alt_server = &dev->alt_servers[i];
if (is_same_server(&alt_server->host, &new_addr)) {
- alt_server->rtts[0] = alt_server->rtts[1]
- = alt_server->rtts[2] = alt_server->rtts[3] = 4;
+ for (j = 0; j < DISCOVER_HISTORY_SIZE; ++j)
+ alt_server->rtts[j] = 1;
alt_server->best_count = 100;
} else {
- alt_server->rtts[0] <<= 2;
- alt_server->rtts[2] <<= 2;
+ for (j = 0; j < DISCOVER_HISTORY_SIZE; ++j)
+ if (alt_server->rtts[j] < 5000)
+ alt_server->rtts[j] = 5000;
alt_server->best_count = 0;
}
}
@@ -318,12 +280,11 @@ static int dnbd3_blk_ioctl(struct block_device *bdev, fmode_t mode, unsigned int
break;
}
- if (locked)
- atomic_set(&dev->connection_lock, 0);
-
cleanup_return:
kfree(msg);
kfree(imgname);
+ if (locked)
+ dnbd3_flag_reset(dev->connection_lock);
return result;
}
@@ -332,7 +293,18 @@ static const struct block_device_operations dnbd3_blk_ops = {
.ioctl = dnbd3_blk_ioctl,
};
-#ifdef DNBD3_BLK_MQ
+static void dnbd3_add_queue(dnbd3_device_t *dev, struct request *rq)
+{
+ unsigned long irqflags;
+
+ spin_lock_irqsave(&dev->send_queue_lock, irqflags);
+ list_add_tail(&rq->queuelist, &dev->send_queue);
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ queue_work(dev->send_wq, &dev->send_work);
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+}
+
/*
* Linux kernel blk-mq driver function (entry point) to handle block IO requests
*/
@@ -340,110 +312,108 @@ static blk_status_t dnbd3_queue_rq(struct blk_mq_hw_ctx *hctx, const struct blk_
{
struct request *rq = bd->rq;
dnbd3_device_t *dev = rq->q->queuedata;
- unsigned long irqflags;
+ struct dnbd3_cmd *cmd;
- if (dev->imgname == NULL)
+ if (dev->imgname == NULL || !device_active(dev))
return BLK_STS_IOERR;
- if (!(dnbd3_req_fs(rq)))
+ if (req_op(rq) != REQ_OP_READ)
return BLK_STS_IOERR;
if (PROBE_COUNT_TIMEOUT > 0 && dev->panic_count >= PROBE_COUNT_TIMEOUT)
return BLK_STS_TIMEOUT;
- if (!(dnbd3_req_read(rq)))
+ if (rq_data_dir(rq) != READ)
return BLK_STS_NOTSUPP;
+ cmd = blk_mq_rq_to_pdu(rq);
+ cmd->handle = (u64)blk_mq_unique_tag(rq) | (((u64)jiffies) << 32);
blk_mq_start_request(rq);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_add_tail(&rq->queuelist, &dev->request_queue_send);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- wake_up(&dev->process_queue_send);
+ dnbd3_add_queue(dev, rq);
return BLK_STS_OK;
}
-static const struct blk_mq_ops dnbd3_mq_ops = {
- .queue_rq = dnbd3_queue_rq,
-};
-
-#else /* DNBD3_BLK_MQ */
-/*
- * Linux kernel blk driver function (entry point) to handle block IO requests
- */
-static void dnbd3_blk_request(struct request_queue *q)
+static enum blk_eh_timer_return dnbd3_rq_timeout(struct request *req, bool reserved)
{
- struct request *rq;
- dnbd3_device_t *dev;
-
- while ((rq = blk_fetch_request(q)) != NULL) {
- dev = rq->rq_disk->private_data;
-
- if (dev->imgname == NULL) {
- __blk_end_request_all(rq, -EIO);
- continue;
- }
-
- if (!(dnbd3_req_fs(rq))) {
- __blk_end_request_all(rq, 0);
- continue;
- }
-
- if (PROBE_COUNT_TIMEOUT > 0 && dev->panic_count >= PROBE_COUNT_TIMEOUT) {
- __blk_end_request_all(rq, -EIO);
- continue;
+ unsigned long irqflags;
+ struct request *rq_iter;
+ bool found = false;
+ dnbd3_device_t *dev = req->q->queuedata;
+
+ spin_lock_irqsave(&dev->send_queue_lock, irqflags);
+ list_for_each_entry(rq_iter, &dev->send_queue, queuelist) {
+ if (rq_iter == req) {
+ found = true;
+ break;
}
-
- if (!(dnbd3_req_read(rq))) {
- __blk_end_request_all(rq, -EACCES);
- continue;
+ }
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
+ // If still in send queue, do nothing
+ if (found)
+ return BLK_EH_RESET_TIMER;
+
+ spin_lock_irqsave(&dev->recv_queue_lock, irqflags);
+ list_for_each_entry(rq_iter, &dev->recv_queue, queuelist) {
+ if (rq_iter == req) {
+ found = true;
+ list_del_init(&req->queuelist);
+ break;
}
-
- list_add_tail(&rq->queuelist, &dev->request_queue_send);
- spin_unlock_irq(q->queue_lock);
- wake_up(&dev->process_queue_send);
- spin_lock_irq(q->queue_lock);
}
+ spin_unlock_irqrestore(&dev->recv_queue_lock, irqflags);
+ if (!found) {
+ dev_err(dnbd3_device_to_dev(dev), "timeout request neither found in send nor recv queue, ignoring\n");
+ // Assume it was fnished concurrently
+ return BLK_EH_DONE;
+ }
+ // Add to send queue again and trigger work, reset timeout
+ dnbd3_add_queue(dev, req);
+ return BLK_EH_RESET_TIMER;
}
-#endif /* DNBD3_BLK_MQ */
+
+static
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
+const
+#endif
+struct blk_mq_ops dnbd3_mq_ops = {
+ .queue_rq = dnbd3_queue_rq,
+ .timeout = dnbd3_rq_timeout,
+};
int dnbd3_blk_add_device(dnbd3_device_t *dev, int minor)
{
int ret;
- init_waitqueue_head(&dev->process_queue_send);
- init_waitqueue_head(&dev->process_queue_discover);
- INIT_LIST_HEAD(&dev->request_queue_send);
- INIT_LIST_HEAD(&dev->request_queue_receive);
-
- memset(&dev->cur_server, 0, sizeof(dev->cur_server));
- dev->better_sock = NULL;
+ memset(dev, 0, sizeof(*dev));
+ dev->index = minor;
+ // lock for imgname, cur_server etc.
+ spin_lock_init(&dev->blk_lock);
+ spin_lock_init(&dev->send_queue_lock);
+ spin_lock_init(&dev->recv_queue_lock);
+ INIT_LIST_HEAD(&dev->send_queue);
+ INIT_LIST_HEAD(&dev->recv_queue);
+ dnbd3_flag_reset(dev->connection_lock);
+ dnbd3_flag_reset(dev->discover_running);
+ mutex_init(&dev->alt_servers_lock);
+ dnbd3_net_work_init(dev);
+ // memset has done this already but I like initial values to be explicit
dev->imgname = NULL;
dev->rid = 0;
- dev->update_available = 0;
- mutex_init(&dev->alt_servers_lock);
- memset(dev->alt_servers, 0, sizeof(dev->alt_servers[0]) * NUMBER_SERVERS);
- dev->thread_send = NULL;
- dev->thread_receive = NULL;
- dev->thread_discover = NULL;
- dev->discover = 0;
- atomic_set(&dev->connection_lock, 0);
- dev->panic = 0;
+ dev->update_available = false;
+ dev->panic = false;
dev->panic_count = 0;
dev->reported_size = 0;
- // set up spin lock for request queues for send and receive
- spin_lock_init(&dev->blk_lock);
-
-#ifdef DNBD3_BLK_MQ
// set up tag_set for blk-mq
dev->tag_set.ops = &dnbd3_mq_ops;
dev->tag_set.nr_hw_queues = 1;
dev->tag_set.queue_depth = 128;
dev->tag_set.numa_node = NUMA_NO_NODE;
- dev->tag_set.cmd_size = 0;
+ dev->tag_set.cmd_size = sizeof(struct dnbd3_cmd);
dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
dev->tag_set.driver_data = dev;
+ dev->tag_set.timeout = BLOCK_LAYER_TIMEOUT * HZ;
ret = blk_mq_alloc_tag_set(&dev->tag_set);
if (ret) {
@@ -470,16 +440,6 @@ int dnbd3_blk_add_device(dnbd3_device_t *dev, int minor)
}
dev->queue->queuedata = dev;
#endif
-#else
- // set up blk
- dev->queue = blk_init_queue(&dnbd3_blk_request, &dev->blk_lock);
- if (!dev->queue) {
- ret = -ENOMEM;
- dev_err(dnbd3_device_to_dev(dev), "blk_init_queue failed\n");
- goto out;
- }
- dev->queue->queuedata = dev;
-#endif /* DNBD3_BLK_MQ */
blk_queue_logical_block_size(dev->queue, DNBD3_BLOCK_SIZE);
blk_queue_physical_block_size(dev->queue, DNBD3_BLOCK_SIZE);
@@ -527,90 +487,88 @@ int dnbd3_blk_add_device(dnbd3_device_t *dev, int minor)
out_cleanup_queue:
blk_cleanup_queue(dev->queue);
#endif
-#ifdef DNBD3_BLK_MQ
out_cleanup_tags:
blk_mq_free_tag_set(&dev->tag_set);
-#endif
out:
+ mutex_destroy(&dev->alt_servers_lock);
return ret;
}
int dnbd3_blk_del_device(dnbd3_device_t *dev)
{
- while (atomic_cmpxchg(&dev->connection_lock, 0, 1) != 0)
+ while (!dnbd3_flag_get(dev->connection_lock))
schedule();
dnbd3_close_device(dev);
dnbd3_sysfs_exit(dev);
del_gendisk(dev->disk);
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 14, 0)
blk_cleanup_queue(dev->queue);
-#endif
-#ifdef DNBD3_BLK_MQ
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 14, 0)
+#else
blk_cleanup_disk(dev->disk);
#endif
blk_mq_free_tag_set(&dev->tag_set);
-#endif
mutex_destroy(&dev->alt_servers_lock);
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 14, 0)
put_disk(dev->disk);
#endif
+ mutex_destroy(&dev->alt_servers_lock);
return 0;
}
+void dnbd3_blk_requeue_all_requests(dnbd3_device_t *dev)
+{
+ struct request *blk_request;
+ unsigned long flags;
+ struct list_head local_copy;
+ int count = 0;
+
+ INIT_LIST_HEAD(&local_copy);
+ spin_lock_irqsave(&dev->recv_queue_lock, flags);
+ while (!list_empty(&dev->recv_queue)) {
+ blk_request = list_entry(dev->recv_queue.next, struct request, queuelist);
+ list_del_init(&blk_request->queuelist);
+ list_add(&blk_request->queuelist, &local_copy);
+ count++;
+ }
+ spin_unlock_irqrestore(&dev->recv_queue_lock, flags);
+ if (count)
+ dev_info(dnbd3_device_to_dev(dev), "re-queueing %d requests\n", count);
+ while (!list_empty(&local_copy)) {
+ blk_request = list_entry(local_copy.next, struct request, queuelist);
+ list_del_init(&blk_request->queuelist);
+ dnbd3_add_queue(dev, blk_request);
+ }
+}
+
void dnbd3_blk_fail_all_requests(dnbd3_device_t *dev)
{
- struct request *blk_request, *tmp_request;
- struct request *blk_request2, *tmp_request2;
+ struct request *blk_request;
unsigned long flags;
struct list_head local_copy;
- int dup;
+ int count = 0;
INIT_LIST_HEAD(&local_copy);
- spin_lock_irqsave(&dev->blk_lock, flags);
- while (!list_empty(&dev->request_queue_receive)) {
- list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_receive, queuelist) {
- list_del_init(&blk_request->queuelist);
- dup = 0;
- list_for_each_entry_safe(blk_request2, tmp_request2, &local_copy, queuelist) {
- if (blk_request == blk_request2) {
- dev_warn(dnbd3_device_to_dev(dev),
- "same request is in request_queue_receive multiple times\n");
- BUG();
- dup = 1;
- break;
- }
- }
- if (!dup)
- list_add(&blk_request->queuelist, &local_copy);
- }
+ spin_lock_irqsave(&dev->recv_queue_lock, flags);
+ while (!list_empty(&dev->recv_queue)) {
+ blk_request = list_entry(dev->recv_queue.next, struct request, queuelist);
+ list_del_init(&blk_request->queuelist);
+ list_add(&blk_request->queuelist, &local_copy);
+ count++;
}
- while (!list_empty(&dev->request_queue_send)) {
- list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_send, queuelist) {
- list_del_init(&blk_request->queuelist);
- dup = 0;
- list_for_each_entry_safe(blk_request2, tmp_request2, &local_copy, queuelist) {
- if (blk_request == blk_request2) {
- dev_warn(dnbd3_device_to_dev(dev), "request is in both lists\n");
- BUG();
- dup = 1;
- break;
- }
- }
- if (!dup)
- list_add(&blk_request->queuelist, &local_copy);
- }
+ spin_unlock_irqrestore(&dev->recv_queue_lock, flags);
+ spin_lock_irqsave(&dev->send_queue_lock, flags);
+ while (!list_empty(&dev->send_queue)) {
+ blk_request = list_entry(dev->send_queue.next, struct request, queuelist);
+ list_del_init(&blk_request->queuelist);
+ list_add(&blk_request->queuelist, &local_copy);
+ count++;
}
- spin_unlock_irqrestore(&dev->blk_lock, flags);
- list_for_each_entry_safe(blk_request, tmp_request, &local_copy, queuelist) {
+ spin_unlock_irqrestore(&dev->send_queue_lock, flags);
+ if (count)
+ dev_info(dnbd3_device_to_dev(dev), "failing %d requests\n", count);
+ while (!list_empty(&local_copy)) {
+ blk_request = list_entry(local_copy.next, struct request, queuelist);
list_del_init(&blk_request->queuelist);
- if (dnbd3_req_fs(blk_request))
-#ifdef DNBD3_BLK_MQ
- blk_mq_end_request(blk_request, BLK_STS_IOERR);
-#else
- blk_end_request_all(blk_request, -EIO);
-#endif
- else if (dnbd3_req_special(blk_request))
- kfree(blk_request);
+ blk_mq_end_request(blk_request, BLK_STS_IOERR);
}
}
diff --git a/src/kernel/blk.h b/src/kernel/blk.h
index cbab6f5..c6dcb8d 100644
--- a/src/kernel/blk.h
+++ b/src/kernel/blk.h
@@ -24,104 +24,15 @@
#include "dnbd3_main.h"
-/* define blkdev file system operation type */
-#define DNBD3_REQ_OP_FS REQ_TYPE_FS
-
-/* define blkdev special operation type */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define DNBD3_REQ_OP_SPECIAL REQ_OP_DRV_IN
-#elif LINUX_VERSION_CODE >= KERNEL_VERSION(4, 2, 0) || \
- RHEL_CHECK_VERSION(RHEL_RELEASE_CODE >= RHEL_RELEASE_VERSION(7, 3))
-#define DNBD3_REQ_OP_SPECIAL REQ_TYPE_DRV_PRIV
-#else
-#define DNBD3_REQ_OP_SPECIAL REQ_TYPE_SPECIAL
-#endif
-
-/* define blkdev read operation type */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define DNBD3_DEV_READ REQ_OP_READ
-#else
-#define DNBD3_DEV_READ DNBD3_REQ_OP_FS
-#endif
-
-/* define blkdev write operation type */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define DNBD3_DEV_WRITE REQ_OP_WRITE
-#else
-#define DNBD3_DEV_WRITE DNBD3_REQ_OP_FS
-#endif
-
-/* define command and blkdev operation access macros */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define DNBD3_REQ_FLAG_BITS REQ_FLAG_BITS
-/* cmd_flags and cmd_type are merged into cmd_flags now */
-/* sanity check to avoid overriding of request bits */
-#if DNBD3_REQ_FLAG_BITS > 24
-#error "Fix CMD bitshift"
-#endif
-/* pack command into cmd_flags field by shifting CMD_* into unused bits of cmd_flags */
-#define dnbd3_cmd_to_priv(req, cmd) \
- ((req)->cmd_flags = DNBD3_REQ_OP_SPECIAL | ((cmd) << DNBD3_REQ_FLAG_BITS))
-#define dnbd3_priv_to_cmd(req) \
- ((req)->cmd_flags >> DNBD3_REQ_FLAG_BITS)
-#define dnbd3_req_op(req) \
- req_op(req)
-#else
-/* pack command into cmd_type and cmd_flags field separated */
-#define dnbd3_cmd_to_priv(req, cmd) \
- do { \
- (req)->cmd_type = DNBD3_REQ_OP_SPECIAL; \
- (req)->cmd_flags = (cmd); \
- } while (0)
-#define dnbd3_priv_to_cmd(req) \
- ((req)->cmd_flags)
-#define dnbd3_req_op(req) \
- ((req)->cmd_type)
-#endif
-
-/* define dnbd3_req_read(req) boolean expression */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define dnbd3_req_read(req) \
- (req_op(req) == DNBD3_DEV_READ)
-#else
-#define dnbd3_req_read(req) \
- (rq_data_dir(req) == READ)
-#endif
-
-/* define dnbd3_req_write(req) boolean expression */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define dnbd3_req_write(req) \
- (req_op(req) == DNBD3_DEV_WRITE)
-#else
-#define dnbd3_req_write(req) \
- (rq_data_dir(req) == WRITE)
-#endif
-
-/* define dnbd3_req_fs(req) boolean expression */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define dnbd3_req_fs(req) \
- (dnbd3_req_read(req) || dnbd3_req_write(req))
-#else
-#define dnbd3_req_fs(req) \
- (dnbd3_req_op(req) == DNBD3_REQ_OP_FS)
-#endif
-
-/* define dnbd3_req_special(req) boolean expression */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 14, 0)
-#define dnbd3_req_special(req) \
- (dnbd3_req_op(req) == DNBD3_REQ_OP_SPECIAL)
-#elif LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
-#define dnbd3_req_special(req) \
- blk_rq_is_private(req)
-#else
-#define dnbd3_req_special(req) \
- (dnbd3_req_op(req) == DNBD3_REQ_OP_SPECIAL)
-#endif
+// The device has been set up via IOCTL_OPEN and hasn't been closed yet
+#define device_active(dev) ((dev)->reported_size != 0)
int dnbd3_blk_add_device(dnbd3_device_t *dev, int minor);
int dnbd3_blk_del_device(dnbd3_device_t *dev);
+void dnbd3_blk_requeue_all_requests(dnbd3_device_t *dev);
+
void dnbd3_blk_fail_all_requests(dnbd3_device_t *dev);
#endif /* BLK_H_ */
diff --git a/src/kernel/dnbd3_main.h b/src/kernel/dnbd3_main.h
index efe4a76..6d91666 100644
--- a/src/kernel/dnbd3_main.h
+++ b/src/kernel/dnbd3_main.h
@@ -22,6 +22,8 @@
#ifndef DNBD_H_
#define DNBD_H_
+#include <dnbd3/config/client.h>
+
#include <linux/version.h>
#include <linux/kthread.h>
#include <linux/module.h>
@@ -33,30 +35,12 @@
#include <dnbd3/types.h>
#include <dnbd3/shared/serialize.h>
-/* define RHEL_CHECK_VERSION macro to check CentOS version */
-#if defined(RHEL_RELEASE_CODE) && defined(RHEL_RELEASE_VERSION)
-#define RHEL_CHECK_VERSION(CONDITION) (CONDITION)
-#else
-#define RHEL_CHECK_VERSION(CONDITION) (0)
-#endif
-
-/* version check to enable/disable blk-mq support */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 18, 0)
-/* enable blk-mq support for Linux kernel 4.18 and later */
-#define DNBD3_BLK_MQ
-#else
-/* disable blk-mq support for Linux kernel prior to 4.18 */
-#undef DNBD3_BLK_MQ
-#endif
-
-#ifdef DNBD3_BLK_MQ
#include <linux/blk-mq.h>
-#endif
extern int major;
typedef struct {
- unsigned long rtts[4]; // Last four round trip time measurements in µs
+ unsigned long rtts[DISCOVER_HISTORY_SIZE]; // Last X round trip time measurements in µs
uint16_t protocol_version; // dnbd3 protocol version of this server
uint8_t failures; // How many times the server was unreachable
uint8_t best_count; // Number of times server measured best
@@ -65,48 +49,60 @@ typedef struct {
typedef struct {
// block
- struct gendisk *disk;
-#ifdef DNBD3_BLK_MQ
- struct blk_mq_tag_set tag_set;
-#endif
- struct request_queue *queue;
- spinlock_t blk_lock;
+ int index;
+ struct gendisk *disk;
+ struct blk_mq_tag_set tag_set;
+ struct request_queue *queue;
+ spinlock_t blk_lock;
// sysfs
- struct kobject kobj;
-
- // network
- struct mutex alt_servers_lock;
- char *imgname;
- struct socket *sock;
- struct {
- unsigned long rtt;
+ struct kobject kobj;
+
+ char *imgname;
+ uint16_t rid;
+ struct socket *sock;
+ struct { // use blk_lock
+ unsigned long rtt;
struct sockaddr_storage host;
- uint16_t protocol_version;
- } cur_server;
- serialized_buffer_t payload_buffer;
- dnbd3_alt_server_t alt_servers[NUMBER_SERVERS]; // array of alt servers, protected by alt_servers_lock
- uint8_t discover, panic, update_available, panic_count;
- atomic_t connection_lock;
- uint8_t use_server_provided_alts;
- uint16_t rid;
- uint32_t heartbeat_count;
- uint64_t reported_size;
- // server switch
- struct socket *better_sock;
-
- // process
- struct task_struct *thread_send;
- struct task_struct *thread_receive;
- struct task_struct *thread_discover;
- struct timer_list hb_timer;
- wait_queue_head_t process_queue_send;
- wait_queue_head_t process_queue_discover;
- struct list_head request_queue_send;
- struct list_head request_queue_receive;
+ uint16_t protocol_version;
+ } cur_server;
+ serialized_buffer_t payload_buffer;
+ struct mutex alt_servers_lock;
+ dnbd3_alt_server_t alt_servers[NUMBER_SERVERS];
+ bool use_server_provided_alts;
+ bool panic;
+ u8 panic_count;
+ bool update_available;
+ atomic_t connection_lock;
+ // Size if image/device - this is 0 if the device is not in use,
+ // otherwise this is also the value we expect from alt servers.
+ uint64_t reported_size;
+ struct delayed_work keepalive_work;
+
+ // sending
+ struct workqueue_struct *send_wq;
+ spinlock_t send_queue_lock;
+ struct list_head send_queue;
+ struct mutex send_mutex;
+ struct work_struct send_work;
+ // receiving
+ struct workqueue_struct *recv_wq;
+ spinlock_t recv_queue_lock;
+ struct list_head recv_queue;
+ struct mutex recv_mutex;
+ struct work_struct recv_work;
+ // discover
+ atomic_t discover_running;
+ struct delayed_work discover_work;
+ u32 discover_interval;
+ u32 discover_count;
} dnbd3_device_t;
+struct dnbd3_cmd {
+ u64 handle;
+};
+
extern inline struct device *dnbd3_device_to_dev(dnbd3_device_t *dev);
extern inline int is_same_server(const struct sockaddr_storage *const x, const struct sockaddr_storage *const y);
@@ -122,4 +118,23 @@ extern int dnbd3_add_server(dnbd3_device_t *dev, dnbd3_host_t *host);
extern int dnbd3_rem_server(dnbd3_device_t *dev, dnbd3_host_t *host);
+#define dnbd3_flag_get(x) (atomic_cmpxchg(&(x), 0, 1) == 0)
+#define dnbd3_flag_reset(x) atomic_set(&(x), 0)
+#define dnbd3_flag_taken(x) (atomic_read(&(x)) != 0)
+
+/* shims for making older kernels look like the current one, if possible, to avoid too
+ * much inline #ifdef which makes code harder to read. */
+
+#if LINUX_VERSION_CODE < KERNEL_VERSION(4, 18, 0)
+#define BLK_EH_DONE BLK_EH_NOT_HANDLED
+#endif
+
+#if LINUX_VERSION_CODE < KERNEL_VERSION(4, 13, 0)
+#define blk_status_t int
+#define BLK_STS_OK 0
+#define BLK_STS_IOERR (-EIO)
+#define BLK_STS_TIMEOUT (-ETIME)
+#define BLK_STS_NOTSUPP (-ENOTSUPP)
+#endif
+
#endif /* DNBD_H_ */
diff --git a/src/kernel/net.c b/src/kernel/net.c
index 5919832..f5806de 100644
--- a/src/kernel/net.c
+++ b/src/kernel/net.c
@@ -22,7 +22,6 @@
#include <dnbd3/config/client.h>
#include "net.h"
#include "blk.h"
-#include "utils.h"
#include "dnbd3_main.h"
#include <dnbd3/shared/serialize.h>
@@ -30,7 +29,6 @@
#include <linux/time.h>
#include <linux/ktime.h>
#include <linux/tcp.h>
-#include <linux/sched/task.h>
#ifndef MIN
#define MIN(a, b) ((a) < (b) ? (a) : (b))
@@ -40,7 +38,7 @@
#define ktime_to_s(kt) ktime_divns(kt, NSEC_PER_SEC)
#endif
-#ifdef CONFIG_DEBUG_DRIVER
+#ifdef DEBUG
#define ASSERT(x) \
do { \
if (!(x)) { \
@@ -54,15 +52,6 @@
} while (0)
#endif
-#define init_msghdr(h) \
- do { \
- h.msg_name = NULL; \
- h.msg_namelen = 0; \
- h.msg_control = NULL; \
- h.msg_controllen = 0; \
- h.msg_flags = MSG_WAITALL | MSG_NOSIGNAL; \
- } while (0)
-
#define dnbd3_dev_dbg_host(dev, host, fmt, ...) \
dev_dbg(dnbd3_device_to_dev(dev), "(%pISpc): " fmt, (host), ##__VA_ARGS__)
#define dnbd3_dev_err_host(dev, host, fmt, ...) \
@@ -73,219 +62,267 @@
#define dnbd3_dev_err_host_cur(dev, fmt, ...) \
dnbd3_dev_err_host(dev, &(dev)->cur_server.host, fmt, ##__VA_ARGS__)
-static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr);
+static bool dnbd3_drain_socket(dnbd3_device_t *dev, struct socket *sock, int bytes);
+static int dnbd3_recv_bytes(struct socket *sock, void *buffer, size_t count);
+static int dnbd3_recv_reply(struct socket *sock, dnbd3_reply_t *reply_hdr);
+static bool dnbd3_send_request(struct socket *sock, u16 cmd, u64 handle, u64 offset, u32 size);
+
+static int dnbd3_set_primary_connection(dnbd3_device_t *dev, struct socket *sock,
+ struct sockaddr_storage *addr, u16 protocol_version);
+
+static int dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr,
+ struct socket **sock_out);
+
+static bool dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
+ struct sockaddr_storage *addr, uint16_t *remote_version, bool copy_image_info);
+
+static bool dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr,
+ struct socket *sock);
+
+static bool dnbd3_send_empty_request(dnbd3_device_t *dev, u16 cmd);
+
+static void dnbd3_start_discover(dnbd3_device_t *dev, bool panic);
+
+static void dnbd3_discover(dnbd3_device_t *dev);
+
+static void dnbd3_internal_discover(dnbd3_device_t *dev);
-static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
- struct sockaddr_storage *addr, uint16_t *remote_version);
+static void set_socket_timeout(struct socket *sock, bool set_send, int timeout_ms);
-static int dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket *sock);
+// Use as write-only dump, don't care about race conditions etc.
+static u8 __garbage_mem[PAGE_SIZE];
-static void dnbd3_net_heartbeat(struct timer_list *arg)
+/**
+ * Delayed work triggering sending of keepalive packet.
+ */
+static void dnbd3_keepalive_workfn(struct work_struct *work)
{
- dnbd3_device_t *dev = (dnbd3_device_t *)container_of(arg, dnbd3_device_t, hb_timer);
-
- // Because different events need different intervals, the timer is called once a second.
- // Other intervals can be derived using dev->heartbeat_count.
-#define timeout_seconds(x) (dev->heartbeat_count % (x) == 0)
-
- if (!dev->panic) {
- if (timeout_seconds(TIMER_INTERVAL_KEEPALIVE_PACKET)) {
- struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC);
- // send keepalive
- if (req) {
- unsigned long irqflags;
-
- dnbd3_cmd_to_priv(req, CMD_KEEPALIVE);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_add_tail(&req->queuelist, &dev->request_queue_send);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- wake_up(&dev->process_queue_send);
- } else {
- dev_err(dnbd3_device_to_dev(dev), "couldn't create keepalive request\n");
- }
- }
- if ((dev->heartbeat_count > STARTUP_MODE_DURATION && timeout_seconds(TIMER_INTERVAL_PROBE_NORMAL)) ||
- (dev->heartbeat_count <= STARTUP_MODE_DURATION && timeout_seconds(TIMER_INTERVAL_PROBE_STARTUP))) {
- // Normal discovery
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
- }
- } else if (timeout_seconds(TIMER_INTERVAL_PROBE_PANIC)) {
- // Panic discovery
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
+ unsigned long irqflags;
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, keepalive_work.work);
+
+ dnbd3_send_empty_request(dev, CMD_KEEPALIVE);
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (device_active(dev)) {
+ mod_delayed_work(system_freezable_power_efficient_wq,
+ &dev->keepalive_work, KEEPALIVE_INTERVAL * HZ);
}
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+}
+
+/**
+ * Delayed work triggering discovery (alt server check)
+ */
+static void dnbd3_discover_workfn(struct work_struct *work)
+{
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, discover_work.work);
- dev->hb_timer.expires = jiffies + HZ;
+ dnbd3_discover(dev);
+}
- ++dev->heartbeat_count;
- add_timer(&dev->hb_timer);
+/**
+ * For manually triggering an immediate discovery
+ */
+static void dnbd3_start_discover(dnbd3_device_t *dev, bool panic)
+{
+ unsigned long irqflags;
-#undef timeout_seconds
+ if (!device_active(dev))
+ return;
+ if (panic && dnbd3_flag_get(dev->connection_lock)) {
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (!dev->panic) {
+ // Panic freshly turned on
+ dev->panic = true;
+ dev->discover_interval = TIMER_INTERVAL_PROBE_PANIC;
+ }
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ dnbd3_flag_reset(dev->connection_lock);
+ }
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ mod_delayed_work(system_freezable_power_efficient_wq,
+ &dev->discover_work, 1);
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
}
-static int dnbd3_net_discover(void *data)
+/**
+ * Wrapper for the actual discover function below. Check run conditions
+ * here and re-schedule delayed task here.
+ */
+static void dnbd3_discover(dnbd3_device_t *dev)
+{
+ unsigned long irqflags;
+
+ if (!device_active(dev) || dnbd3_flag_taken(dev->connection_lock))
+ return; // device not active anymore, or just about to switch
+ if (!dnbd3_flag_get(dev->discover_running))
+ return; // Already busy
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ cancel_delayed_work(&dev->discover_work);
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ dnbd3_internal_discover(dev);
+ dev->discover_count++;
+ // Re-queueing logic
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (device_active(dev)) {
+ mod_delayed_work(system_freezable_power_efficient_wq,
+ &dev->discover_work, dev->discover_interval * HZ);
+ if (dev->discover_interval < TIMER_INTERVAL_PROBE_MAX
+ && dev->discover_count > DISCOVER_STARTUP_PHASE_COUNT) {
+ dev->discover_interval += 2;
+ }
+ }
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ dnbd3_flag_reset(dev->discover_running);
+}
+
+/**
+ * Discovery. Probe all (or some) known alt servers,
+ * and initiate connection switch if appropriate
+ */
+static void dnbd3_internal_discover(dnbd3_device_t *dev)
{
- dnbd3_device_t *dev = data;
struct socket *sock, *best_sock = NULL;
dnbd3_alt_server_t *alt;
struct sockaddr_storage host_compare, best_server;
uint16_t remote_version;
- ktime_t start = ktime_set(0, 0), end = ktime_set(0, 0);
+ ktime_t start, end;
unsigned long rtt = 0, best_rtt = 0;
- unsigned long irqflags;
- int i, j, isize, fails, rtt_threshold;
- int turn = 0;
- int ready = 0, do_change = 0;
- char check_order[NUMBER_SERVERS];
-
- struct request *last_request = (struct request *)123, *cur_request = (struct request *)456;
+ int i, j, k, isize, fails, rtt_threshold;
+ int do_change = 0;
+ u8 check_order[NUMBER_SERVERS];
+ const bool ready = dev->discover_count > DISCOVER_STARTUP_PHASE_COUNT;
+ const u32 turn = dev->discover_count % DISCOVER_HISTORY_SIZE;
+ // Shuffle alt_servers
for (i = 0; i < NUMBER_SERVERS; ++i)
check_order[i] = i;
- while (!kthread_should_stop()) {
- wait_event_interruptible(dev->process_queue_discover,
- kthread_should_stop() || dev->discover || dev->thread_discover == NULL);
+ for (i = 0; i < NUMBER_SERVERS; ++i) {
+ j = prandom_u32() % NUMBER_SERVERS;
+ if (j != i) {
+ int tmp = check_order[i];
- if (kthread_should_stop() || dev->imgname == NULL || dev->thread_discover == NULL)
- break;
+ check_order[i] = check_order[j];
+ check_order[j] = tmp;
+ }
+ }
- if (!dev->discover)
- continue;
- dev->discover = 0;
+ best_server.ss_family = 0;
+ best_rtt = RTT_UNREACHABLE;
- if (dev->reported_size < 4096)
- continue;
+ if (!ready || dev->panic)
+ isize = NUMBER_SERVERS;
+ else
+ isize = 3;
- best_server.ss_family = 0;
- best_rtt = 0xFFFFFFFul;
+ for (j = 0; j < NUMBER_SERVERS; ++j) {
+ if (!device_active(dev))
+ break;
+ i = check_order[j];
+ mutex_lock(&dev->alt_servers_lock);
+ host_compare = dev->alt_servers[i].host;
+ fails = dev->alt_servers[i].failures;
+ mutex_unlock(&dev->alt_servers_lock);
+ if (host_compare.ss_family == 0)
+ continue; // Empty slot
+ // Reduced probability for hosts that have been unreachable
+ if (!dev->panic && fails > 50 && (prandom_u32() % 4) != 0)
+ continue; // If not in panic mode, skip server if it failed too many times
+ if (isize-- <= 0 && !is_same_server(&dev->cur_server.host, &host_compare))
+ continue; // Only test isize servers plus current server
+
+ // Initialize socket and connect
+ sock = NULL;
+ if (dnbd3_connect(dev, &host_compare, &sock) != 0)
+ goto error;
- if (dev->heartbeat_count < STARTUP_MODE_DURATION || dev->panic)
- isize = NUMBER_SERVERS;
- else
- isize = 3;
+ remote_version = 0;
+ if (!dnbd3_execute_handshake(dev, sock, &host_compare, &remote_version, false))
+ goto error;
- if (NUMBER_SERVERS > isize) {
- for (i = 0; i < isize; ++i) {
- j = ((ktime_to_s(start) >> i) ^ (ktime_to_us(start) >> j)) % NUMBER_SERVERS;
- if (j != i) {
- int tmp = check_order[i];
- check_order[i] = check_order[j];
- check_order[j] = tmp;
- }
+ // panic mode, take first responding server
+ if (dev->panic) {
+ dnbd3_dev_dbg_host(dev, &host_compare, "panic mode, changing to new server\n");
+ if (!dnbd3_flag_get(dev->connection_lock)) {
+ dnbd3_dev_dbg_host(dev, &host_compare, "...raced, ignoring\n");
+ } else {
+ // Check global flag, a connect might have been in progress
+ if (best_sock != NULL)
+ sock_release(best_sock);
+ set_socket_timeout(sock, false, SOCKET_TIMEOUT_RECV * 1000 + 1000);
+ if (dnbd3_set_primary_connection(dev, sock, &host_compare, remote_version) != 0)
+ sock_release(sock);
+ dnbd3_flag_reset(dev->connection_lock);
+ return;
}
}
- for (j = 0; j < NUMBER_SERVERS; ++j) {
- i = check_order[j];
- mutex_lock(&dev->alt_servers_lock);
- host_compare = dev->alt_servers[i].host;
- fails = dev->alt_servers[i].failures;
- mutex_unlock(&dev->alt_servers_lock);
- if (host_compare.ss_family == 0)
- continue; // Empty slot
- if (!dev->panic && fails > 50
- && (ktime_to_us(start) & 7) != 0)
- continue; // If not in panic mode, skip server if it failed too many times
- if (isize-- <= 0 && !is_same_server(&dev->cur_server.host, &host_compare))
- continue; // Only test isize servers plus current server
-
- // Initialize socket and connect
- sock = dnbd3_connect(dev, &host_compare);
- if (sock == NULL)
- goto error;
-
- if (!dnbd3_execute_handshake(dev, sock, &host_compare, &remote_version))
- goto error;
-
-
- // panic mode, take first responding server
- if (dev->panic) {
- dnbd3_dev_dbg_host(dev, &host_compare, "panic mode, changing to new server\n");
- while (atomic_cmpxchg(&dev->connection_lock, 0, 1) != 0)
- schedule();
-
- if (dev->panic) {
- // Re-check, a connect might have been in progress
- dev->panic = 0;
- if (best_sock != NULL)
- sock_release(best_sock);
-
- dev->better_sock = sock; // Pass over socket to take a shortcut in *_connect();
- put_task_struct(dev->thread_discover);
- dev->thread_discover = NULL;
- dnbd3_net_disconnect(dev);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host = host_compare;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- dnbd3_net_connect(dev);
- atomic_set(&dev->connection_lock, 0);
- return 0;
- }
- atomic_set(&dev->connection_lock, 0);
- }
-
- // start rtt measurement
- start = ktime_get_real();
-
- if (!dnbd3_request_test_block(dev, &host_compare, sock))
- goto error;
-
- end = ktime_get_real(); // end rtt measurement
+ // actual rtt measurement is just the first block requests and reply
+ start = ktime_get_real();
+ if (!dnbd3_request_test_block(dev, &host_compare, sock))
+ goto error;
+ end = ktime_get_real();
- mutex_lock(&dev->alt_servers_lock);
- if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
- dev->alt_servers[i].protocol_version = remote_version;
- dev->alt_servers[i].rtts[turn] = (unsigned long)ktime_us_delta(end, start);
-
- rtt = (dev->alt_servers[i].rtts[0] + dev->alt_servers[i].rtts[1]
- + dev->alt_servers[i].rtts[2] + dev->alt_servers[i].rtts[3])
- / 4;
- dev->alt_servers[i].failures = 0;
- if (dev->alt_servers[i].best_count > 1)
- dev->alt_servers[i].best_count -= 2;
+ mutex_lock(&dev->alt_servers_lock);
+ if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
+ dev->alt_servers[i].protocol_version = remote_version;
+ dev->alt_servers[i].rtts[turn] =
+ (unsigned long)ktime_us_delta(end, start);
+
+ rtt = 0;
+ for (k = 0; k < DISCOVER_HISTORY_SIZE; ++k) {
+ rtt += dev->alt_servers[i].rtts[k];
}
- mutex_unlock(&dev->alt_servers_lock);
+ rtt /= DISCOVER_HISTORY_SIZE;
+ dev->alt_servers[i].failures = 0;
+ if (dev->alt_servers[i].best_count > 1)
+ dev->alt_servers[i].best_count -= 2;
+ }
+ mutex_unlock(&dev->alt_servers_lock);
- if (best_rtt > rtt) {
- // This one is better, keep socket open in case we switch
- best_rtt = rtt;
- best_server = host_compare;
- if (best_sock != NULL)
- sock_release(best_sock);
- best_sock = sock;
- sock = NULL;
- } else {
- // Not better, discard connection
- sock_release(sock);
- sock = NULL;
- }
+ if (best_rtt > rtt) {
+ // This one is better, keep socket open in case we switch
+ best_rtt = rtt;
+ best_server = host_compare;
+ if (best_sock != NULL)
+ sock_release(best_sock);
+ best_sock = sock;
+ sock = NULL;
+ } else {
+ // Not better, discard connection
+ sock_release(sock);
+ sock = NULL;
+ }
- // update cur servers rtt
- if (is_same_server(&dev->cur_server.host, &host_compare))
- dev->cur_server.rtt = rtt;
+ // update cur servers rtt
+ if (is_same_server(&dev->cur_server.host, &host_compare))
+ dev->cur_server.rtt = rtt;
- continue;
+ continue;
error:
- if (sock != NULL) {
- sock_release(sock);
- sock = NULL;
- }
- mutex_lock(&dev->alt_servers_lock);
- if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
- ++dev->alt_servers[i].failures;
- dev->alt_servers[i].rtts[turn] = RTT_UNREACHABLE;
- if (dev->alt_servers[i].best_count > 2)
- dev->alt_servers[i].best_count -= 3;
- }
- mutex_unlock(&dev->alt_servers_lock);
- if (is_same_server(&dev->cur_server.host, &host_compare))
- dev->cur_server.rtt = RTT_UNREACHABLE;
- } // for loop over alt_servers
+ if (sock != NULL) {
+ sock_release(sock);
+ sock = NULL;
+ }
+ mutex_lock(&dev->alt_servers_lock);
+ if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
+ if (remote_version)
+ dev->alt_servers[i].protocol_version = remote_version;
+ ++dev->alt_servers[i].failures;
+ dev->alt_servers[i].rtts[turn] = RTT_UNREACHABLE;
+ if (dev->alt_servers[i].best_count > 2)
+ dev->alt_servers[i].best_count -= 3;
+ }
+ mutex_unlock(&dev->alt_servers_lock);
+ if (is_same_server(&dev->cur_server.host, &host_compare))
+ dev->cur_server.rtt = RTT_UNREACHABLE;
+ } // END - for loop over alt_servers
+ if (best_server.ss_family == 0) {
+ // No alt server could be reached
+ ASSERT(!best_sock);
if (dev->panic) {
if (dev->panic_count < 255)
dev->panic_count++;
@@ -293,295 +330,166 @@ error:
if (PROBE_COUNT_TIMEOUT > 0 && dev->panic_count == PROBE_COUNT_TIMEOUT + 1)
dnbd3_blk_fail_all_requests(dev);
}
+ return;
+ }
- if (best_server.ss_family == 0 || kthread_should_stop() || dev->thread_discover == NULL) {
- // No alt server could be reached at all or thread should stop
- if (best_sock != NULL) {
- // Should never happen actually
- sock_release(best_sock);
- best_sock = NULL;
- }
- continue;
- }
-
- // If best server was repeatedly measured best, lower the switching threshold more
- mutex_lock(&dev->alt_servers_lock);
- alt = get_existing_alt_from_addr(&best_server, dev);
- if (alt != NULL) {
- if (alt->best_count < 148)
- alt->best_count += 3;
- rtt_threshold = 1500 - (alt->best_count * 10);
- } else {
- rtt_threshold = 1500;
- }
- mutex_unlock(&dev->alt_servers_lock);
-
- do_change = ready && !is_same_server(&best_server, &dev->cur_server.host)
- && (ktime_to_us(start) & 3) != 0
- && RTT_THRESHOLD_FACTOR(dev->cur_server.rtt) > best_rtt + rtt_threshold;
-
- if (ready && !do_change && best_sock != NULL) {
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- if (!list_empty(&dev->request_queue_send)) {
- cur_request = list_entry(dev->request_queue_send.next, struct request, queuelist);
- do_change = (cur_request == last_request);
- if (do_change)
- dev_warn(dnbd3_device_to_dev(dev), "hung request, triggering change\n");
- } else {
- cur_request = (struct request *)123;
- }
- last_request = cur_request;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- }
-
- // take server with lowest rtt
- // if a (dis)connect is already in progress, we do nothing, this is not panic mode
- if (do_change && atomic_cmpxchg(&dev->connection_lock, 0, 1) == 0) {
- dev_info(dnbd3_device_to_dev(dev), "server %pISpc is faster (%lluµs vs. %lluµs)\n",
- &best_server,
- (unsigned long long)best_rtt, (unsigned long long)dev->cur_server.rtt);
- dev->better_sock = best_sock; // Take shortcut by continuing to use open connection
- put_task_struct(dev->thread_discover);
- dev->thread_discover = NULL;
- dnbd3_net_disconnect(dev);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host = best_server;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- dev->cur_server.rtt = best_rtt;
- dnbd3_net_connect(dev);
- atomic_set(&dev->connection_lock, 0);
- return 0;
- }
-
- // Clean up connection that was held open for quicker server switch
- if (best_sock != NULL) {
+ // If best server was repeatedly measured best, lower the switching threshold more
+ mutex_lock(&dev->alt_servers_lock);
+ alt = get_existing_alt_from_addr(&best_server, dev);
+ if (alt != NULL) {
+ if (alt->best_count < 178)
+ alt->best_count += 3;
+ rtt_threshold = 1800 - (alt->best_count * 10);
+ remote_version = alt->protocol_version;
+ } else {
+ rtt_threshold = 1800;
+ remote_version = 0;
+ }
+ mutex_unlock(&dev->alt_servers_lock);
+
+ do_change = ready && !is_same_server(&best_server, &dev->cur_server.host)
+ && RTT_THRESHOLD_FACTOR(dev->cur_server.rtt) > best_rtt + rtt_threshold;
+
+ // take server with lowest rtt
+ // if a (dis)connect is already in progress, we do nothing, this is not panic mode
+ if (do_change && device_active(dev) && dnbd3_flag_get(dev->connection_lock)) {
+ dev_info(dnbd3_device_to_dev(dev), "server %pISpc is faster (%lluµs vs. %lluµs)\n",
+ &best_server,
+ (unsigned long long)best_rtt, (unsigned long long)dev->cur_server.rtt);
+ set_socket_timeout(sock, false, // recv
+ MAX(best_rtt / 1000, SOCKET_TIMEOUT_RECV * 1000) + 500);
+ set_socket_timeout(sock, true, // send
+ MAX(best_rtt / 1000, SOCKET_TIMEOUT_SEND * 1000) + 500);
+ if (dnbd3_set_primary_connection(dev, best_sock, &best_server, remote_version) != 0)
sock_release(best_sock);
- best_sock = NULL;
- }
-
- // Increase rtt array index pointer, low probability that it doesn't advance
- if (!ready || (ktime_to_us(start) & 15) != 0)
- turn = (turn + 1) % 4;
- if (turn == 2) // Set ready when we only have 2 of 4 measurements for quicker load balancing
- ready = 1;
+ dnbd3_flag_reset(dev->connection_lock);
+ return;
}
- if (kthread_should_stop())
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally\n", __func__);
- else
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s exited unexpectedly\n", __func__);
-
- return 0;
+ // Clean up connection that was held open for quicker server switch
+ if (best_sock != NULL)
+ sock_release(best_sock);
}
-static int dnbd3_net_send(void *data)
+/**
+ * Worker for sending pending requests. This will be triggered whenever
+ * we get a new request from the block layer. The worker will then
+ * work through all the requests in the send queue, request them from
+ * the server, and return again.
+ */
+static void dnbd3_send_workfn(struct work_struct *work)
{
- dnbd3_device_t *dev = data;
- struct request *blk_request, *tmp_request;
-
- dnbd3_request_t dnbd3_request;
- struct msghdr msg;
- struct kvec iov;
-
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, send_work);
+ struct request *blk_request;
+ struct dnbd3_cmd *cmd;
unsigned long irqflags;
- int ret = 0;
-
- init_msghdr(msg);
-
- dnbd3_request.magic = dnbd3_packet_magic;
-
- set_user_nice(current, -20);
-
- // move already sent requests to request_queue_send again
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- if (!list_empty(&dev->request_queue_receive)) {
- dev_dbg(dnbd3_device_to_dev(dev), "request queue was not empty");
- list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_receive, queuelist) {
- list_del_init(&blk_request->queuelist);
- list_add(&blk_request->queuelist, &dev->request_queue_send);
- }
- }
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
-
- while (!kthread_should_stop()) {
- wait_event_interruptible(dev->process_queue_send,
- kthread_should_stop() || !list_empty(&dev->request_queue_send));
-
- if (kthread_should_stop())
- break;
-
- // extract block request
- /* lock since we aquire a blk request from the request_queue_send */
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- if (list_empty(&dev->request_queue_send)) {
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- continue;
- }
- blk_request = list_entry(dev->request_queue_send.next, struct request, queuelist);
- // what to do?
- switch (dnbd3_req_op(blk_request)) {
- case DNBD3_DEV_READ:
- dnbd3_request.cmd = CMD_GET_BLOCK;
- dnbd3_request.offset = blk_rq_pos(blk_request) << 9; // *512
- dnbd3_request.size = blk_rq_bytes(blk_request); // bytes left to complete entire request
- // enqueue request to request_queue_receive
- list_del_init(&blk_request->queuelist);
- list_add_tail(&blk_request->queuelist, &dev->request_queue_receive);
- break;
- case DNBD3_REQ_OP_SPECIAL:
- dnbd3_request.cmd = dnbd3_priv_to_cmd(blk_request);
- dnbd3_request.size = 0;
- list_del_init(&blk_request->queuelist);
+ mutex_lock(&dev->send_mutex);
+ while (dev->sock && device_active(dev)) {
+ // extract next block request
+ spin_lock_irqsave(&dev->send_queue_lock, irqflags);
+ if (list_empty(&dev->send_queue)) {
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
break;
-
- default:
- if (!atomic_read(&dev->connection_lock))
- dev_err(dnbd3_device_to_dev(dev), "unknown command (send %u %u)\n",
- (int)blk_request->cmd_flags, (int)dnbd3_req_op(blk_request));
- list_del_init(&blk_request->queuelist);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- continue;
}
- // send net request
- dnbd3_request.handle = (uint64_t)(uintptr_t)blk_request; // Double cast to prevent warning on 32bit
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- fixup_request(dnbd3_request);
- iov.iov_base = &dnbd3_request;
- iov.iov_len = sizeof(dnbd3_request);
- if (kernel_sendmsg(dev->sock, &msg, &iov, 1, sizeof(dnbd3_request)) != sizeof(dnbd3_request)) {
- if (!atomic_read(&dev->connection_lock))
+ blk_request = list_entry(dev->send_queue.next, struct request, queuelist);
+ list_del_init(&blk_request->queuelist);
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
+ // append to receive queue
+ spin_lock_irqsave(&dev->recv_queue_lock, irqflags);
+ list_add_tail(&blk_request->queuelist, &dev->recv_queue);
+ spin_unlock_irqrestore(&dev->recv_queue_lock, irqflags);
+
+ cmd = blk_mq_rq_to_pdu(blk_request);
+ if (!dnbd3_send_request(dev->sock, CMD_GET_BLOCK, cmd->handle,
+ blk_rq_pos(blk_request) << 9 /* sectors */, blk_rq_bytes(blk_request))) {
+ if (!dnbd3_flag_taken(dev->connection_lock)) {
dnbd3_dev_err_host_cur(dev, "connection to server lost (send)\n");
- ret = -ESHUTDOWN;
- goto cleanup;
+ dnbd3_start_discover(dev, true);
+ }
+ break;
}
}
-
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally\n", __func__);
- return 0;
-
-cleanup:
- if (!atomic_read(&dev->connection_lock) && !kthread_should_stop()) {
- dev_dbg(dnbd3_device_to_dev(dev), "send thread: Triggering panic mode...\n");
- if (dev->sock)
- kernel_sock_shutdown(dev->sock, SHUT_RDWR);
- dev->panic = 1;
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
- }
-
- if (kthread_should_stop() || ret == 0 || atomic_read(&dev->connection_lock))
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally (cleanup)\n", __func__);
- else
- dev_err(dnbd3_device_to_dev(dev), "kthread %s terminated abnormally (%d)\n", __func__, ret);
-
- return 0;
+ mutex_unlock(&dev->send_mutex);
}
-static int dnbd3_net_receive(void *data)
+/**
+ * The receive workfn stays active for as long as the connection to a server
+ * lasts, i.e. it only gets restarted when we switch to a new server.
+ */
+static void dnbd3_recv_workfn(struct work_struct *work)
{
- dnbd3_device_t *dev = data;
- struct request *blk_request, *tmp_request, *received_request;
-
- dnbd3_reply_t dnbd3_reply;
- struct msghdr msg;
- struct kvec iov;
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, recv_work);
+ struct request *blk_request;
+ struct request *rq_iter;
+ struct dnbd3_cmd *cmd;
+ dnbd3_reply_t reply_hdr;
struct req_iterator iter;
struct bio_vec bvec_inst;
struct bio_vec *bvec = &bvec_inst;
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL | MSG_WAITALL };
+ struct kvec iov;
void *kaddr;
unsigned long irqflags;
uint16_t rid;
- unsigned long recv_timeout = jiffies;
-
- int count, remaining, ret = 0;
-
- init_msghdr(msg);
- set_user_nice(current, -20);
+ int remaining;
+ int ret;
- while (!kthread_should_stop()) {
+ mutex_lock(&dev->recv_mutex);
+ while (dev->sock) {
// receive net reply
- iov.iov_base = &dnbd3_reply;
- iov.iov_len = sizeof(dnbd3_reply);
- ret = kernel_recvmsg(dev->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags);
-
- /* end thread after socket timeout or reception of data */
- if (kthread_should_stop())
- break;
-
- /* check return value of kernel_recvmsg() */
+ ret = dnbd3_recv_reply(dev->sock, &reply_hdr);
if (ret == 0) {
/* have not received any data, but remote peer is shutdown properly */
dnbd3_dev_dbg_host_cur(dev, "remote peer has performed an orderly shutdown\n");
- goto cleanup;
+ goto out_unlock;
} else if (ret < 0) {
if (ret == -EAGAIN) {
- if (jiffies < recv_timeout)
- recv_timeout = jiffies; // Handle overflow
- if ((jiffies - recv_timeout) / HZ > SOCKET_KEEPALIVE_TIMEOUT) {
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(dev, "receive timeout reached (%d of %d secs)\n",
- (int)((jiffies - recv_timeout) / HZ),
- (int)SOCKET_KEEPALIVE_TIMEOUT);
- ret = -ETIMEDOUT;
- goto cleanup;
- }
- continue;
+ if (!dnbd3_flag_taken(dev->connection_lock))
+ dnbd3_dev_err_host_cur(dev, "receive timeout reached\n");
} else {
- /* for all errors other than -EAGAIN, print message and abort thread */
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(dev, "connection to server lost (receive)\n");
- goto cleanup;
+ /* for all errors other than -EAGAIN, print errno */
+ if (!dnbd3_flag_taken(dev->connection_lock))
+ dnbd3_dev_err_host_cur(dev, "connection to server lost (receive, errno=%d)\n", ret);
}
+ goto out_unlock;
}
/* check if arrived data is valid */
- if (ret != sizeof(dnbd3_reply)) {
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(dev, "recv partial msg header (%d bytes)\n", ret);
- ret = -EINVAL;
- goto cleanup;
+ if (ret != sizeof(reply_hdr)) {
+ if (!dnbd3_flag_taken(dev->connection_lock))
+ dnbd3_dev_err_host_cur(dev, "recv partial msg header (%d/%d bytes)\n",
+ ret, (int)sizeof(reply_hdr));
+ goto out_unlock;
}
- fixup_reply(dnbd3_reply);
// check error
- if (dnbd3_reply.magic != dnbd3_packet_magic) {
+ if (reply_hdr.magic != dnbd3_packet_magic) {
dnbd3_dev_err_host_cur(dev, "wrong packet magic (receive)\n");
- ret = -EINVAL;
- goto cleanup;
- }
- if (dnbd3_reply.cmd == 0) {
- dnbd3_dev_err_host_cur(dev, "command was 0 (Receive)\n");
- ret = -EINVAL;
- goto cleanup;
+ goto out_unlock;
}
- // Update timeout
- recv_timeout = jiffies;
-
// what to do?
- switch (dnbd3_reply.cmd) {
+ switch (reply_hdr.cmd) {
case CMD_GET_BLOCK:
// search for replied request in queue
blk_request = NULL;
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_for_each_entry_safe(received_request, tmp_request, &dev->request_queue_receive,
- queuelist) {
- if ((uint64_t)(uintptr_t)received_request == dnbd3_reply.handle) {
- // Double cast to prevent warning on 32bit
- blk_request = received_request;
+ spin_lock_irqsave(&dev->recv_queue_lock, irqflags);
+ list_for_each_entry(rq_iter, &dev->recv_queue, queuelist) {
+ cmd = blk_mq_rq_to_pdu(rq_iter);
+ if (cmd->handle == reply_hdr.handle) {
+ blk_request = rq_iter;
list_del_init(&blk_request->queuelist);
break;
}
}
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ spin_unlock_irqrestore(&dev->recv_queue_lock, irqflags);
if (blk_request == NULL) {
- dnbd3_dev_err_host_cur(dev, "received block data for unrequested handle (%llu: %llu)\n",
- (unsigned long long)dnbd3_reply.handle,
- (unsigned long long)dnbd3_reply.size);
- ret = -EINVAL;
- goto cleanup;
+ dnbd3_dev_err_host_cur(dev, "received block data for unrequested handle (%llx: len=%llu)\n",
+ reply_hdr.handle,
+ (u64)reply_hdr.size);
+ goto out_unlock;
}
// receive data and answer to block layer
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 14, 0)
@@ -599,45 +507,36 @@ static int dnbd3_net_receive(void *data)
/* have not received any data, but remote peer is shutdown properly */
dnbd3_dev_dbg_host_cur(
dev, "remote peer has performed an orderly shutdown\n");
- ret = 0;
} else if (ret < 0) {
- if (!atomic_read(&dev->connection_lock))
+ if (!dnbd3_flag_taken(dev->connection_lock))
dnbd3_dev_err_host_cur(dev,
"disconnect: receiving from net to block layer\n");
} else {
- if (!atomic_read(&dev->connection_lock))
+ if (!dnbd3_flag_taken(dev->connection_lock))
dnbd3_dev_err_host_cur(dev,
"receiving from net to block layer (%d bytes)\n", ret);
- ret = -EINVAL;
}
// Requeue request
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_add(&blk_request->queuelist, &dev->request_queue_send);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- goto cleanup;
+ spin_lock_irqsave(&dev->send_queue_lock, irqflags);
+ list_add(&blk_request->queuelist, &dev->send_queue);
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
+ goto out_unlock;
}
}
-#ifdef DNBD3_BLK_MQ
blk_mq_end_request(blk_request, BLK_STS_OK);
-#else
- blk_end_request_all(blk_request, 0);
-#endif
- continue;
+ break;
case CMD_GET_SERVERS:
- remaining = dnbd3_reply.size;
+ remaining = reply_hdr.size;
if (dev->use_server_provided_alts) {
dnbd3_server_entry_t new_server;
while (remaining >= sizeof(dnbd3_server_entry_t)) {
- iov.iov_base = &new_server;
- iov.iov_len = sizeof(dnbd3_server_entry_t);
- if (kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len,
- msg.msg_flags) != sizeof(dnbd3_server_entry_t)) {
- if (!atomic_read(&dev->connection_lock))
+ if (dnbd3_recv_bytes(dev->sock, &new_server, sizeof(new_server))
+ != sizeof(new_server)) {
+ if (!dnbd3_flag_taken(dev->connection_lock))
dnbd3_dev_err_host_cur(dev, "recv CMD_GET_SERVERS payload\n");
- ret = -EINVAL;
- goto cleanup;
+ goto out_unlock;
}
// TODO: Log
if (new_server.failures == 0) { // ADD
@@ -645,36 +544,20 @@ static int dnbd3_net_receive(void *data)
} else { // REM
dnbd3_rem_server(dev, &new_server.host);
}
- remaining -= sizeof(dnbd3_server_entry_t);
- }
- }
- // Drain any payload still on the wire
- while (remaining > 0) {
- count = MIN(sizeof(dnbd3_reply),
- remaining); // Abuse the reply struct as the receive buffer
- iov.iov_base = &dnbd3_reply;
- iov.iov_len = count;
- ret = kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
- if (ret <= 0) {
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(
- dev, "recv additional payload from CMD_GET_SERVERS\n");
- ret = -EINVAL;
- goto cleanup;
+ remaining -= sizeof(new_server);
}
- remaining -= ret;
}
- continue;
+ if (!dnbd3_drain_socket(dev, dev->sock, remaining))
+ goto out_unlock;
+ break;
case CMD_LATEST_RID:
- if (dnbd3_reply.size != 2) {
- dev_err(dnbd3_device_to_dev(dev), "CMD_LATEST_RID.size != 2\n");
+ if (reply_hdr.size < 2) {
+ dev_err(dnbd3_device_to_dev(dev), "CMD_LATEST_RID.size < 2\n");
continue;
}
- iov.iov_base = &rid;
- iov.iov_len = sizeof(rid);
- if (kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags) <= 0) {
- if (!atomic_read(&dev->connection_lock))
+ if (dnbd3_recv_bytes(dev->sock, &rid, 2) != 2) {
+ if (!dnbd3_flag_taken(dev->connection_lock))
dev_err(dnbd3_device_to_dev(dev), "could not receive CMD_LATEST_RID payload\n");
} else {
rid = net_order_16(rid);
@@ -682,70 +565,52 @@ static int dnbd3_net_receive(void *data)
dev->imgname, (int)rid, (int)dev->rid);
dev->update_available = (rid > dev->rid ? 1 : 0);
}
+ if (reply_hdr.size > 2)
+ dnbd3_drain_socket(dev, dev->sock, reply_hdr.size - 2);
continue;
case CMD_KEEPALIVE:
- if (dnbd3_reply.size != 0)
- dev_err(dnbd3_device_to_dev(dev), "keep alive packet with payload\n");
+ if (reply_hdr.size != 0) {
+ dev_dbg(dnbd3_device_to_dev(dev), "keep alive packet with payload\n");
+ dnbd3_drain_socket(dev, dev->sock, reply_hdr.size);
+ }
continue;
default:
- dev_err(dnbd3_device_to_dev(dev), "unknown command (receive)\n");
- continue;
+ dev_err(dnbd3_device_to_dev(dev), "unknown command: %d (receive), aborting connection\n", (int)reply_hdr.cmd);
+ goto out_unlock;
}
}
-
- dev_dbg(dnbd3_device_to_dev(dev), "kthread thread_receive terminated normally\n");
- return 0;
-
-cleanup:
- if (!atomic_read(&dev->connection_lock) && !kthread_should_stop()) {
- dev_dbg(dnbd3_device_to_dev(dev), "recv thread: Triggering panic mode...\n");
- if (dev->sock)
- kernel_sock_shutdown(dev->sock, SHUT_RDWR);
- dev->panic = 1;
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
- }
-
- if (kthread_should_stop() || ret == 0 || atomic_read(&dev->connection_lock))
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally (cleanup)\n", __func__);
- else
- dev_err(dnbd3_device_to_dev(dev), "kthread %s terminated abnormally (%d)\n", __func__, ret);
-
- return 0;
+out_unlock:
+ // This will check if we actually still need a new connection
+ dnbd3_start_discover(dev, true);
+ mutex_unlock(&dev->recv_mutex);
}
-static void set_socket_timeouts(struct socket *sock, int timeout_ms)
+/**
+ * Set send or receive timeout of given socket
+ */
+static void set_socket_timeout(struct socket *sock, bool set_send, int timeout_ms)
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 1, 0)
+ int opt = set_send ? SO_SNDTIMEO_NEW : SO_RCVTIMEO_NEW;
struct __kernel_sock_timeval timeout;
#else
+ int opt = set_send ? SO_SNDTIMEO : SO_RCVTIMEO;
struct timeval timeout;
#endif
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 9, 0)
- sockptr_t timeout_ptr;
-
- timeout_ptr = KERNEL_SOCKPTR(&timeout);
+ sockptr_t timeout_ptr = KERNEL_SOCKPTR(&timeout);
#else
- char *timeout_ptr;
-
- timeout_ptr = (char *)&timeout;
+ char *timeout_ptr = (char *)&timeout;
#endif
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;
-
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 1, 0)
- sock_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO_NEW, timeout_ptr, sizeof(timeout));
- sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO_NEW, timeout_ptr, sizeof(timeout));
-#else
- sock_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, timeout_ptr, sizeof(timeout));
- sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, timeout_ptr, sizeof(timeout));
-#endif
+ sock_setsockopt(sock, SOL_SOCKET, opt, timeout_ptr, sizeof(timeout));
}
-static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr)
+static int dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket **sock_out)
{
ktime_t start;
int ret, connect_time_ms;
@@ -763,7 +628,7 @@ static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage
#endif
if (ret < 0) {
dev_err(dnbd3_device_to_dev(dev), "couldn't create socket: %d\n", ret);
- return NULL;
+ return ret;
}
/* Only one retry, TCP no delay */
@@ -790,36 +655,40 @@ static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage
connect_time_ms = dev->cur_server.rtt * 2 / 1000;
}
/* but obey a minimal configurable value, and maximum sanity check */
- if (connect_time_ms < SOCKET_TIMEOUT_CLIENT_DATA * 1000)
- connect_time_ms = SOCKET_TIMEOUT_CLIENT_DATA * 1000;
+ if (connect_time_ms < SOCKET_TIMEOUT_SEND * 1000)
+ connect_time_ms = SOCKET_TIMEOUT_SEND * 1000;
else if (connect_time_ms > 60000)
connect_time_ms = 60000;
- set_socket_timeouts(sock, connect_time_ms);
+ set_socket_timeout(sock, false, connect_time_ms); // recv
+ set_socket_timeout(sock, true, connect_time_ms); // send
start = ktime_get_real();
while (--retries > 0) {
ret = kernel_connect(sock, (struct sockaddr *)addr, addrlen, 0);
connect_time_ms = (int)ktime_ms_delta(ktime_get_real(), start);
- if (connect_time_ms > 2 * SOCKET_TIMEOUT_CLIENT_DATA * 1000) {
+ if (connect_time_ms > 2 * SOCKET_TIMEOUT_SEND * 1000) {
/* Either I'm losing my mind or there was a specific build of kernel
* 5.x where SO_RCVTIMEO didn't affect the connect call above, so
* this function would hang for over a minute for unreachable hosts.
* Leave in this debug check for twice the configured timeout
*/
- dev_dbg(dnbd3_device_to_dev(dev), "%pISpc connect call took %dms\n",
- addr, connect_time_ms);
+ dnbd3_dev_dbg_host(dev, addr, "connect: call took %dms\n",
+ connect_time_ms);
}
if (ret != 0) {
if (ret == -EINTR)
- continue;
- dev_dbg(dnbd3_device_to_dev(dev), "%pISpc connect failed (%d, blocked %dms)\n",
- addr, ret, connect_time_ms);
+ dnbd3_dev_dbg_host(dev, addr, "connect: interrupted system call (blocked %dms)\n",
+ connect_time_ms);
+ else
+ dnbd3_dev_dbg_host(dev, addr, "connect: failed (%d, blocked %dms)\n",
+ ret, connect_time_ms);
goto error;
}
- return sock;
+ *sock_out = sock;
+ return 0;
}
error:
sock_release(sock);
- return NULL;
+ return ret < 0 ? ret : -EIO;
}
#define dnbd3_err_dbg_host(...) do { \
@@ -837,37 +706,39 @@ error:
* server, so we validate the filesize, rid, name against what we expect.
* The server's protocol version is returned in 'remote_version'
*/
-static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
- struct sockaddr_storage *addr, uint16_t *remote_version)
+static bool dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
+ struct sockaddr_storage *addr, uint16_t *remote_version, bool copy_data)
{
+ unsigned long irqflags;
const char *name;
uint64_t filesize;
int mlen;
- uint16_t rid, initial_connect;
- struct msghdr msg;
+ uint16_t rid;
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL | MSG_WAITALL };
struct kvec iov[2];
serialized_buffer_t *payload;
- dnbd3_reply_t dnbd3_reply;
- dnbd3_request_t dnbd3_request = { .magic = dnbd3_packet_magic };
+ dnbd3_reply_t reply_hdr;
+ dnbd3_request_t request_hdr = { .magic = dnbd3_packet_magic };
payload = kmalloc(sizeof(*payload), GFP_KERNEL);
if (payload == NULL)
goto error;
- initial_connect = (dev->reported_size == 0);
- init_msghdr(msg);
+ if (copy_data && device_active(dev)) {
+ dev_warn(dnbd3_device_to_dev(dev), "Called handshake function with copy_data enabled when reported_size is not zero\n");
+ }
// Request filesize
- dnbd3_request.cmd = CMD_SELECT_IMAGE;
- iov[0].iov_base = &dnbd3_request;
- iov[0].iov_len = sizeof(dnbd3_request);
+ request_hdr.cmd = CMD_SELECT_IMAGE;
+ iov[0].iov_base = &request_hdr;
+ iov[0].iov_len = sizeof(request_hdr);
serializer_reset_write(payload);
serializer_put_uint16(payload, PROTOCOL_VERSION); // DNBD3 protocol version
serializer_put_string(payload, dev->imgname); // image name
serializer_put_uint16(payload, dev->rid); // revision id
serializer_put_uint8(payload, 0); // are we a server? (no!)
iov[1].iov_base = payload;
- dnbd3_request.size = iov[1].iov_len = serializer_get_written_length(payload);
- fixup_request(dnbd3_request);
+ request_hdr.size = iov[1].iov_len = serializer_get_written_length(payload);
+ fixup_request(request_hdr);
mlen = iov[0].iov_len + iov[1].iov_len;
if (kernel_sendmsg(sock, &msg, iov, 2, mlen) != mlen) {
dnbd3_err_dbg_host(dev, addr, "requesting image size failed\n");
@@ -875,28 +746,28 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
}
// receive net reply
- iov[0].iov_base = &dnbd3_reply;
- iov[0].iov_len = sizeof(dnbd3_reply);
- if (kernel_recvmsg(sock, &msg, iov, 1, sizeof(dnbd3_reply), msg.msg_flags) != sizeof(dnbd3_reply)) {
+ if (dnbd3_recv_reply(sock, &reply_hdr) != sizeof(reply_hdr)) {
dnbd3_err_dbg_host(dev, addr, "receiving image size packet (header) failed\n");
goto error;
}
- fixup_reply(dnbd3_reply);
- if (dnbd3_reply.magic != dnbd3_packet_magic || dnbd3_reply.cmd != CMD_SELECT_IMAGE || dnbd3_reply.size < 4) {
+ if (reply_hdr.magic != dnbd3_packet_magic
+ || reply_hdr.cmd != CMD_SELECT_IMAGE || reply_hdr.size < 4
+ || reply_hdr.size > sizeof(*payload)) {
dnbd3_err_dbg_host(dev, addr,
- "corrupted CMD_SELECT_IMAGE reply\n");
+ "corrupt CMD_SELECT_IMAGE reply\n");
goto error;
}
// receive data
iov[0].iov_base = payload;
- iov[0].iov_len = dnbd3_reply.size;
- if (kernel_recvmsg(sock, &msg, iov, 1, dnbd3_reply.size, msg.msg_flags) != dnbd3_reply.size) {
+ iov[0].iov_len = reply_hdr.size;
+ if (kernel_recvmsg(sock, &msg, iov, 1, reply_hdr.size, msg.msg_flags)
+ != reply_hdr.size) {
dnbd3_err_dbg_host(dev, addr,
"receiving payload of CMD_SELECT_IMAGE reply failed\n");
goto error;
}
- serializer_reset_read(payload, dnbd3_reply.size);
+ serializer_reset_read(payload, reply_hdr.size);
*remote_version = serializer_get_uint16(payload);
name = serializer_get_string(payload);
@@ -910,7 +781,6 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
(int)MIN_SUPPORTED_SERVER);
goto error;
}
-
if (name == NULL) {
dnbd3_err_dbg_host(dev, addr, "server did not supply an image name\n");
goto error;
@@ -920,20 +790,16 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
goto error;
}
- /* only check image name if this isn't the initial connect */
- if (initial_connect && dev->rid != 0 && strcmp(name, dev->imgname) != 0) {
- dnbd3_err_dbg_host(dev, addr, "server offers image '%s', requested '%s'\n", name, dev->imgname);
- goto error;
- }
-
- if (initial_connect) {
+ if (copy_data) {
if (filesize < DNBD3_BLOCK_SIZE) {
dnbd3_err_dbg_host(dev, addr, "reported size by server is < 4096\n");
goto error;
}
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
if (strlen(dev->imgname) < strlen(name)) {
dev->imgname = krealloc(dev->imgname, strlen(name) + 1, GFP_KERNEL);
if (dev->imgname == NULL) {
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
dnbd3_err_dbg_host(dev, addr, "reallocating buffer for new image name failed\n");
goto error;
}
@@ -942,9 +808,10 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
dev->rid = rid;
// store image information
dev->reported_size = filesize;
+ dev->update_available = 0;
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
set_capacity(dev->disk, dev->reported_size >> 9); /* 512 Byte blocks */
dnbd3_dev_dbg_host(dev, addr, "image size: %llu\n", dev->reported_size);
- dev->update_available = 0;
} else {
/* switching connection, sanity checks */
if (rid != dev->rid) {
@@ -954,6 +821,11 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
goto error;
}
+ if (strcmp(name, dev->imgname) != 0) {
+ dnbd3_err_dbg_host(dev, addr, "server offers image '%s', requested '%s'\n", name, dev->imgname);
+ goto error;
+ }
+
if (filesize != dev->reported_size) {
dnbd3_err_dbg_host(dev, addr,
"reported image size of %llu does not match expected value %llu\n",
@@ -962,251 +834,287 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
}
}
kfree(payload);
- return 1;
+ return true;
error:
kfree(payload);
- return 0;
+ return false;
+}
+
+static bool dnbd3_send_request(struct socket *sock, u16 cmd, u64 handle, u64 offset, u32 size)
+{
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL };
+ dnbd3_request_t request_hdr = {
+ .magic = dnbd3_packet_magic,
+ .cmd = cmd,
+ .size = size,
+ .offset = offset,
+ .handle = handle,
+ };
+ struct kvec iov = { .iov_base = &request_hdr, .iov_len = sizeof(request_hdr) };
+
+ fixup_request(request_hdr);
+ return kernel_sendmsg(sock, &msg, &iov, 1, sizeof(request_hdr)) == sizeof(request_hdr);
+}
+
+/**
+ * Send a request with given cmd type and empty payload.
+ */
+static bool dnbd3_send_empty_request(dnbd3_device_t *dev, u16 cmd)
+{
+ int ret;
+
+ mutex_lock(&dev->send_mutex);
+ ret = dev->sock
+ && dnbd3_send_request(dev->sock, cmd, 0, 0, 0);
+ mutex_unlock(&dev->send_mutex);
+ return ret;
+}
+
+static int dnbd3_recv_bytes(struct socket *sock, void *buffer, size_t count)
+{
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL | MSG_WAITALL };
+ struct kvec iov = { .iov_base = buffer, .iov_len = count };
+
+ return kernel_recvmsg(sock, &msg, &iov, 1, count, msg.msg_flags);
+}
+
+static int dnbd3_recv_reply(struct socket *sock, dnbd3_reply_t *reply_hdr)
+{
+ int ret = dnbd3_recv_bytes(sock, reply_hdr, sizeof(*reply_hdr));
+
+ fixup_reply(*reply_hdr);
+ return ret;
}
-static int dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket *sock)
+static bool dnbd3_drain_socket(dnbd3_device_t *dev, struct socket *sock, int bytes)
{
- dnbd3_request_t dnbd3_request = { .magic = dnbd3_packet_magic };
- dnbd3_reply_t dnbd3_reply;
+ int ret;
struct kvec iov;
- struct msghdr msg;
- char *buf = NULL;
- char smallbuf[256];
- int remaining, buffer_size, ret, func_return;
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL };
+
+ while (bytes > 0) {
+ iov.iov_base = __garbage_mem;
+ iov.iov_len = sizeof(__garbage_mem);
+ ret = kernel_recvmsg(sock, &msg, &iov, 1, MIN(bytes, iov.iov_len), msg.msg_flags);
+ if (ret <= 0) {
+ dnbd3_dev_err_host_cur(dev, "draining payload failed (ret=%d)\n", ret);
+ return false;
+ }
+ bytes -= ret;
+ }
+ return true;
+}
- init_msghdr(msg);
+static bool dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket *sock)
+{
+ dnbd3_reply_t reply_hdr;
- func_return = 0;
// Request block
- dnbd3_request.cmd = CMD_GET_BLOCK;
- // Do *NOT* pick a random block as it has proven to cause severe
- // cache thrashing on the server
- dnbd3_request.offset = 0;
- dnbd3_request.size = RTT_BLOCK_SIZE;
- fixup_request(dnbd3_request);
- iov.iov_base = &dnbd3_request;
- iov.iov_len = sizeof(dnbd3_request);
-
- if (kernel_sendmsg(sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0) {
+ if (!dnbd3_send_request(sock, CMD_GET_BLOCK, 0, 0, RTT_BLOCK_SIZE)) {
dnbd3_err_dbg_host(dev, addr, "requesting test block failed\n");
- goto error;
+ return false;
}
// receive net reply
- iov.iov_base = &dnbd3_reply;
- iov.iov_len = sizeof(dnbd3_reply);
- if (kernel_recvmsg(sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags)
- != sizeof(dnbd3_reply)) {
+ if (dnbd3_recv_reply(sock, &reply_hdr) != sizeof(reply_hdr)) {
dnbd3_err_dbg_host(dev, addr, "receiving test block header packet failed\n");
- goto error;
+ return false;
}
- fixup_reply(dnbd3_reply);
- if (dnbd3_reply.magic != dnbd3_packet_magic || dnbd3_reply.cmd != CMD_GET_BLOCK
- || dnbd3_reply.size != RTT_BLOCK_SIZE) {
+ if (reply_hdr.magic != dnbd3_packet_magic || reply_hdr.cmd != CMD_GET_BLOCK
+ || reply_hdr.size != RTT_BLOCK_SIZE || reply_hdr.handle != 0) {
dnbd3_err_dbg_host(dev, addr,
- "unexpected reply to block request: cmd=%d, size=%d (discover)\n",
- (int)dnbd3_reply.cmd, (int)dnbd3_reply.size);
- goto error;
+ "unexpected reply to block request: cmd=%d, size=%d, handle=%llu (discover)\n",
+ (int)reply_hdr.cmd, (int)reply_hdr.size, reply_hdr.handle);
+ return false;
}
// receive data
- buf = kmalloc(DNBD3_BLOCK_SIZE, GFP_NOWAIT);
- if (buf == NULL) {
- /* fallback to stack if we're really memory constrained */
- buf = smallbuf;
- buffer_size = sizeof(smallbuf);
- } else {
- buffer_size = DNBD3_BLOCK_SIZE;
- }
- remaining = RTT_BLOCK_SIZE;
- /* TODO in either case we could build a large iovec that points to the same buffer over and over again */
- while (remaining > 0) {
- iov.iov_base = buf;
- iov.iov_len = buffer_size;
- ret = kernel_recvmsg(sock, &msg, &iov, 1, MIN(remaining, buffer_size), msg.msg_flags);
- if (ret <= 0) {
- dnbd3_err_dbg_host(dev, addr, "receiving test block payload failed (ret=%d)\n", ret);
- goto error;
- }
- remaining -= ret;
- }
- func_return = 1;
- // Fallthrough!
-error:
- if (buf != smallbuf)
- kfree(buf);
- return func_return;
+ return dnbd3_drain_socket(dev, sock, RTT_BLOCK_SIZE);
}
#undef dnbd3_err_dbg_host
-static int spawn_worker_thread(dnbd3_device_t *dev, struct task_struct **task, const char *name,
- int (*threadfn)(void *data))
+static void replace_main_socket(dnbd3_device_t *dev, struct socket *sock, struct sockaddr_storage *addr, u16 protocol_version)
{
- ASSERT(*task == NULL);
- *task = kthread_create(threadfn, dev, "%s-%s", dev->disk->disk_name, name);
- if (!IS_ERR(*task)) {
- get_task_struct(*task);
- wake_up_process(*task);
- return 1;
+ unsigned long irqflags;
+
+ mutex_lock(&dev->send_mutex);
+ // First, shutdown connection, so receive worker will leave its mainloop
+ if (dev->sock)
+ kernel_sock_shutdown(dev->sock, SHUT_RDWR);
+ mutex_lock(&dev->recv_mutex);
+ // Receive worker is done, get rid of socket and replace
+ if (dev->sock)
+ sock_release(dev->sock);
+ dev->sock = sock;
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (addr == NULL) {
+ memset(&dev->cur_server, 0, sizeof(dev->cur_server));
+ } else {
+ dev->cur_server.host = *addr;
+ dev->cur_server.rtt = 0;
+ dev->cur_server.protocol_version = protocol_version;
}
- dev_err(dnbd3_device_to_dev(dev), "failed to create %s thread (%ld)\n",
- name, PTR_ERR(*task));
- /* reset possible non-NULL error value */
- *task = NULL;
- return 0;
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ mutex_unlock(&dev->recv_mutex);
+ mutex_unlock(&dev->send_mutex);
}
-static void stop_worker_thread(dnbd3_device_t *dev, struct task_struct **task, const char *name, int quiet)
+static void dnbd3_release_resources(dnbd3_device_t *dev)
{
- int ret;
-
- if (*task == NULL)
- return;
- if (!quiet)
- dnbd3_dev_dbg_host_cur(dev, "stop %s thread\n", name);
- ret = kthread_stop(*task);
- put_task_struct(*task);
- if (ret == -EINTR) {
- /* thread has never been scheduled and run */
- if (!quiet)
- dev_dbg(dnbd3_device_to_dev(dev), "%s thread has never run\n", name);
- } else {
- /* thread has run, check if it has terminated successfully */
- if (ret < 0 && !quiet)
- dev_err(dnbd3_device_to_dev(dev), "%s thread was not terminated correctly\n", name);
- }
- *task = NULL;
+ if (dev->send_wq)
+ destroy_workqueue(dev->send_wq);
+ dev->send_wq = NULL;
+ if (dev->recv_wq)
+ destroy_workqueue(dev->recv_wq);
+ dev->recv_wq = NULL;
+ mutex_destroy(&dev->send_mutex);
+ mutex_destroy(&dev->recv_mutex);
}
-int dnbd3_net_connect(dnbd3_device_t *dev)
+/**
+ * Establish new connection on a dnbd3 device.
+ * Return 0 on success, errno otherwise
+ */
+int dnbd3_new_connection(dnbd3_device_t *dev, struct sockaddr_storage *addr, bool init)
{
- struct request *req_alt_servers = NULL;
unsigned long irqflags;
+ struct socket *sock = NULL;
+ uint16_t proto_version;
+ int ret;
- ASSERT(atomic_read(&dev->connection_lock));
-
- if (dev->use_server_provided_alts) {
- req_alt_servers = kmalloc(sizeof(*req_alt_servers), GFP_KERNEL);
- if (req_alt_servers == NULL)
- dnbd3_dev_err_host_cur(dev, "Cannot allocate memory to request list of alt servers\n");
+ ASSERT(dnbd3_flag_taken(dev->connection_lock));
+ if (init && device_active(dev)) {
+ dnbd3_dev_err_host_cur(dev, "device already configured/connected\n");
+ return -EBUSY;
+ }
+ if (!init && !device_active(dev)) {
+ dev_warn(dnbd3_device_to_dev(dev), "connection switch called on unconfigured device\n");
+ return -ENOTCONN;
}
- if (dev->cur_server.host.ss_family == 0 || dev->imgname == NULL) {
- dnbd3_dev_err_host_cur(dev, "connect: host or image name not set\n");
+ dnbd3_dev_dbg_host(dev, addr, "connecting...\n");
+ ret = dnbd3_connect(dev, addr, &sock);
+ if (ret != 0 || sock == NULL)
goto error;
- }
- if (dev->sock) {
- dnbd3_dev_err_host_cur(dev, "socket already connected\n");
+ /* execute the "select image" handshake */
+ // if init is true, reported_size will be set
+ if (!dnbd3_execute_handshake(dev, sock, addr, &proto_version, init)) {
+ ret = -EINVAL;
goto error;
}
- ASSERT(dev->thread_send == NULL);
- ASSERT(dev->thread_receive == NULL);
- ASSERT(dev->thread_discover == NULL);
-
- if (dev->better_sock != NULL) {
- // Switching server, connection is already established and size request was executed
- dnbd3_dev_dbg_host_cur(dev, "on-the-fly server change\n");
- dev->sock = dev->better_sock;
- dev->better_sock = NULL;
- } else {
- // no established connection yet from discovery thread, start new one
- uint16_t proto_version;
-
- dnbd3_dev_dbg_host_cur(dev, "connecting\n");
- dev->sock = dnbd3_connect(dev, &dev->cur_server.host);
- if (dev->sock == NULL) {
- dnbd3_dev_err_host_cur(dev, "%s: Failed\n", __func__);
- goto error;
+ if (init) {
+ // We're setting up the device for use - allocate resources
+ // Do not goto error before this
+ ASSERT(!dev->send_wq);
+ ASSERT(!dev->recv_wq);
+ mutex_init(&dev->send_mutex);
+ mutex_init(&dev->recv_mutex);
+ // a designated queue for sending, that allows one active task only
+ dev->send_wq = alloc_workqueue("dnbd%d-send",
+ WQ_UNBOUND | WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_HIGHPRI,
+ 1, dev->index);
+ dev->recv_wq = alloc_workqueue("dnbd%d-recv",
+ WQ_UNBOUND | WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_CPU_INTENSIVE,
+ 1, dev->index);
+ if (!dev->send_wq || !dev->recv_wq) {
+ ret = -ENOMEM;
+ goto error_dealloc;
}
- /* execute the "select image" handshake */
- if (!dnbd3_execute_handshake(dev, dev->sock, &dev->cur_server.host, &proto_version))
- goto error;
-
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.protocol_version = proto_version;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
}
- /* create required threads */
- if (!spawn_worker_thread(dev, &dev->thread_send, "send", dnbd3_net_send))
- goto error;
- if (!spawn_worker_thread(dev, &dev->thread_receive, "receive", dnbd3_net_receive))
- goto error;
- if (!spawn_worker_thread(dev, &dev->thread_discover, "discover", dnbd3_net_discover))
- goto error;
+ set_socket_timeout(sock, false, SOCKET_TIMEOUT_RECV * 1000); // recv
+ dnbd3_set_primary_connection(dev, sock, addr, proto_version);
+ sock = NULL; // In case we ever goto error* after this point
- dnbd3_dev_dbg_host_cur(dev, "connection established\n");
- dev->panic = 0;
- dev->panic_count = 0;
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (init) {
+ dev->discover_count = 0;
+ dev->discover_interval = TIMER_INTERVAL_PROBE_STARTUP;
+ // discovery and keepalive are not critical, use the power efficient queue
+ queue_delayed_work(system_power_efficient_wq, &dev->discover_work,
+ dev->discover_interval * HZ);
+ queue_delayed_work(system_power_efficient_wq, &dev->keepalive_work,
+ KEEPALIVE_INTERVAL * HZ);
+ // but the receiver is performance critical AND runs indefinitely, use the
+ // the cpu intensive queue, as jobs submitted there will not cound towards
+ // the concurrency limit of per-cpu worker threads. It still feels a little
+ // dirty to avoid managing our own thread, but nbd does it too.
+ }
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ return 0;
- if (req_alt_servers != NULL) {
- // Enqueue request to request_queue_send for a fresh list of alt servers
- dnbd3_cmd_to_priv(req_alt_servers, CMD_GET_SERVERS);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_add(&req_alt_servers->queuelist, &dev->request_queue_send);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- wake_up(&dev->process_queue_send);
+error_dealloc:
+ if (init) {
+ // If anything fails during initialization, free resources again
+ dnbd3_release_resources(dev);
}
+error:
+ if (init)
+ dev->reported_size = 0;
+ if (sock)
+ sock_release(sock);
+ return ret < 0 ? ret : -EIO;
+}
- // add heartbeat timer
- // Do not goto error after creating the timer - we require that the timer exists
- // if dev->sock != NULL -- see dnbd3_net_disconnect
- dev->heartbeat_count = 0;
- timer_setup(&dev->hb_timer, dnbd3_net_heartbeat, 0);
- dev->hb_timer.expires = jiffies + HZ;
- add_timer(&dev->hb_timer);
+void dnbd3_net_work_init(dnbd3_device_t *dev)
+{
+ INIT_WORK(&dev->send_work, dnbd3_send_workfn);
+ INIT_WORK(&dev->recv_work, dnbd3_recv_workfn);
+ INIT_DELAYED_WORK(&dev->discover_work, dnbd3_discover_workfn);
+ INIT_DELAYED_WORK(&dev->keepalive_work, dnbd3_keepalive_workfn);
+}
- return 0;
+static int dnbd3_set_primary_connection(dnbd3_device_t *dev, struct socket *sock, struct sockaddr_storage *addr, u16 protocol_version)
+{
+ unsigned long irqflags;
-error:
- stop_worker_thread(dev, &dev->thread_send, "send", 1);
- stop_worker_thread(dev, &dev->thread_receive, "receive", 1);
- stop_worker_thread(dev, &dev->thread_discover, "discover", 1);
- if (dev->sock) {
- sock_release(dev->sock);
- dev->sock = NULL;
+ ASSERT(dnbd3_flag_taken(dev->connection_lock));
+ if (addr->ss_family == 0 || dev->imgname == NULL || sock == NULL) {
+ dnbd3_dev_err_host_cur(dev, "connect: host, image name or sock not set\n");
+ return -EINVAL;
}
+
+ replace_main_socket(dev, sock, addr, protocol_version);
spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host.ss_family = 0;
+ dev->panic = false;
+ dev->panic_count = 0;
+ dev->discover_interval = TIMER_INTERVAL_PROBE_SWITCH;
+ queue_work(dev->recv_wq, &dev->recv_work);
spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- kfree(req_alt_servers);
+ if (dev->use_server_provided_alts) {
+ dnbd3_send_empty_request(dev, CMD_GET_SERVERS);
+ }
- return -1;
+ dnbd3_dev_dbg_host_cur(dev, "connection switched\n");
+ dnbd3_blk_requeue_all_requests(dev);
+ return 0;
}
+/**
+ * Disconnect the device, shutting it down.
+ */
int dnbd3_net_disconnect(dnbd3_device_t *dev)
{
- unsigned long irqflags;
-
+ ASSERT(dnbd3_flag_taken(dev->connection_lock));
+ if (!device_active(dev))
+ return -ENOTCONN;
dev_dbg(dnbd3_device_to_dev(dev), "disconnecting device ...\n");
- ASSERT(atomic_read(&dev->connection_lock));
- dev->discover = 0;
-
- if (dev->sock) {
- kernel_sock_shutdown(dev->sock, SHUT_RDWR);
- // clear heartbeat timer
- del_timer(&dev->hb_timer);
- }
+ dev->reported_size = 0;
+ /* quickly fail all requests */
+ dnbd3_blk_fail_all_requests(dev);
+ replace_main_socket(dev, NULL, NULL, 0);
- // kill sending and receiving threads
- stop_worker_thread(dev, &dev->thread_send, "send", 0);
- stop_worker_thread(dev, &dev->thread_receive, "receive", 0);
- stop_worker_thread(dev, &dev->thread_discover, "discover", 0);
- if (dev->sock) {
- sock_release(dev->sock);
- dev->sock = NULL;
- }
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host.ss_family = 0;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ cancel_delayed_work_sync(&dev->keepalive_work);
+ cancel_delayed_work_sync(&dev->discover_work);
+ cancel_work_sync(&dev->send_work);
+ cancel_work_sync(&dev->recv_work);
+ dnbd3_blk_fail_all_requests(dev);
+ dnbd3_release_resources(dev);
+ dev_dbg(dnbd3_device_to_dev(dev), "all workers shut down\n");
return 0;
}
diff --git a/src/kernel/net.h b/src/kernel/net.h
index 4d658ab..69fa523 100644
--- a/src/kernel/net.h
+++ b/src/kernel/net.h
@@ -24,8 +24,10 @@
#include "dnbd3_main.h"
-int dnbd3_net_connect(dnbd3_device_t *lo);
+void dnbd3_net_work_init(dnbd3_device_t *dev);
-int dnbd3_net_disconnect(dnbd3_device_t *lo);
+int dnbd3_new_connection(dnbd3_device_t *dev, struct sockaddr_storage *addr, bool init);
+
+int dnbd3_net_disconnect(dnbd3_device_t *dev);
#endif /* NET_H_ */
diff --git a/src/kernel/sysfs.c b/src/kernel/sysfs.c
index 79bc21e..3e5febd 100644
--- a/src/kernel/sysfs.c
+++ b/src/kernel/sysfs.c
@@ -22,7 +22,6 @@
#include <linux/kobject.h>
#include "sysfs.h"
-#include "utils.h"
#ifndef MIN
#define MIN(a, b) ((a) < (b) ? (a) : (b))
@@ -33,7 +32,12 @@
*/
ssize_t show_cur_server_addr(char *buf, dnbd3_device_t *dev)
{
- return MIN(snprintf(buf, PAGE_SIZE, "%pISpc\n", &dev->cur_server.host), PAGE_SIZE);
+ ssize_t ret;
+
+ spin_lock(&dev->blk_lock);
+ ret = MIN(snprintf(buf, PAGE_SIZE, "%pISpc\n", &dev->cur_server.host), PAGE_SIZE);
+ spin_unlock(&dev->blk_lock);
+ return ret;
}
/**
@@ -42,7 +46,11 @@ ssize_t show_cur_server_addr(char *buf, dnbd3_device_t *dev)
*/
ssize_t show_alt_servers(char *buf, dnbd3_device_t *dev)
{
- int i, size = PAGE_SIZE, ret;
+ int i, size = PAGE_SIZE;
+ ssize_t ret;
+
+ if (mutex_lock_interruptible(&dev->alt_servers_lock) != 0)
+ return 0;
for (i = 0; i < NUMBER_SERVERS; ++i) {
if (dev->alt_servers[i].host.ss_family == 0)
@@ -63,6 +71,7 @@ ssize_t show_alt_servers(char *buf, dnbd3_device_t *dev)
break;
}
}
+ mutex_unlock(&dev->alt_servers_lock);
return PAGE_SIZE - size;
}
@@ -71,7 +80,12 @@ ssize_t show_alt_servers(char *buf, dnbd3_device_t *dev)
*/
ssize_t show_image_name(char *buf, dnbd3_device_t *dev)
{
- return MIN(snprintf(buf, PAGE_SIZE, "%s\n", dev->imgname), PAGE_SIZE);
+ ssize_t ret;
+
+ spin_lock(&dev->blk_lock);
+ ret = MIN(snprintf(buf, PAGE_SIZE, "%s\n", dev->imgname), PAGE_SIZE);
+ spin_unlock(&dev->blk_lock);
+ return ret;
}
/**
@@ -79,11 +93,13 @@ ssize_t show_image_name(char *buf, dnbd3_device_t *dev)
*/
ssize_t show_rid(char *buf, dnbd3_device_t *dev)
{
+ // No locking here, primitive type, no pointer to allocated memory
return MIN(snprintf(buf, PAGE_SIZE, "%d\n", dev->rid), PAGE_SIZE);
}
ssize_t show_update_available(char *buf, dnbd3_device_t *dev)
{
+ // Same story
return MIN(snprintf(buf, PAGE_SIZE, "%d\n", dev->update_available), PAGE_SIZE);
}
diff --git a/src/kernel/utils.c b/src/kernel/utils.c
deleted file mode 100644
index f2b7264..0000000
--- a/src/kernel/utils.c
+++ /dev/null
@@ -1,48 +0,0 @@
-// SPDX-License-Identifier: GPL-2.0
-/*
- * This file is part of the Distributed Network Block Device 3
- *
- * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de>
- *
- * This file may be licensed under the terms of the
- * GNU General Public License Version 2 (the ``GPL'').
- *
- * Software distributed under the License is distributed
- * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
- * express or implied. See the GPL for the specific language
- * governing rights and limitations.
- *
- * You should have received a copy of the GPL along with this
- * program. If not, go to http://www.gnu.org/licenses/gpl.html
- * or write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- *
- */
-
-#include <linux/kernel.h>
-
-#include "utils.h"
-
-unsigned int inet_addr(char *str)
-{
- int a, b, c, d;
- char arr[4];
- int ret;
-
- ret = sscanf(str, "%d.%d.%d.%d", &a, &b, &c, &d);
- if (ret > 0) {
- arr[0] = a;
- arr[1] = b;
- arr[2] = c;
- arr[3] = d;
- }
-
- return *(unsigned int *)arr;
-}
-
-void inet_ntoa(struct in_addr addr, char *str)
-{
- unsigned char *ptr = (unsigned char *)&addr;
-
- sprintf(str, "%d.%d.%d.%d", ptr[0] & 0xff, ptr[1] & 0xff, ptr[2] & 0xff, ptr[3] & 0xff);
-}
diff --git a/src/kernel/utils.h b/src/kernel/utils.h
deleted file mode 100644
index e0efb97..0000000
--- a/src/kernel/utils.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/* SPDX-License-Identifier: GPL-2.0 */
-/*
- * This file is part of the Distributed Network Block Device 3
- *
- * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de>
- *
- * This file may be licensed under the terms of the
- * GNU General Public License Version 2 (the ``GPL'').
- *
- * Software distributed under the License is distributed
- * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
- * express or implied. See the GPL for the specific language
- * governing rights and limitations.
- *
- * You should have received a copy of the GPL along with this
- * program. If not, go to http://www.gnu.org/licenses/gpl.html
- * or write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- *
- */
-
-#ifndef UTILS_H_
-#define UTILS_H_
-
-#include <linux/in.h>
-
-unsigned int inet_addr(char *str);
-void inet_ntoa(struct in_addr addr, char *str);
-
-#endif /* UTILS_H_ */