summaryrefslogtreecommitdiffstats
path: root/src/kernel/net.c
diff options
context:
space:
mode:
authorSimon Rettberg2022-02-12 23:56:35 +0100
committerSimon Rettberg2022-02-18 21:34:55 +0100
commiteb2876f6542af2bfa47c7a6905ecc4f81f1d2ad3 (patch)
tree17ebb5fd2d4770a4dd67f857f2488221cd46874c /src/kernel/net.c
parent[KERNEL] Add missing include to fix compile on 4.14.x (diff)
downloaddnbd3-eb2876f6542af2bfa47c7a6905ecc4f81f1d2ad3.tar.gz
dnbd3-eb2876f6542af2bfa47c7a6905ecc4f81f1d2ad3.tar.xz
dnbd3-eb2876f6542af2bfa47c7a6905ecc4f81f1d2ad3.zip
[KERNEL] Refactor to use workqueues and blk-mq only
Using workqueues frees us from having to manage the lifecycle of three dedicated threads. Discovery (alt server checks) and sending keepalive packets is now done using work on the power efficient system queue. Sending and receiving happens via dedicated work queues with higher priority. blk-mq has also been around for quite a while in the kernel, so switching to it doesn't hurt backwards compatibility. As the code is now refactored to work more as blk-mq is designed, backwards compatibility even improved while at the same time freeing us from an arsenal of macros that were required to make the blk-mq port look and feel like the old implementation. For example, the code now compiles on CentOS 7 with kernel 3.10 without requiring special macros to detect the heavily modified RedHat kernel with all its backported features. A few other design limitations have been rectified along the way, e.g. switching to another server now doesn't internally disconnect from the current one first, which theoretically could lead to a non-working setup, if the new server isn't reachable and then - because of some transient network error - switching back also fails. As the discover-thread was torn down from the disconnect call, the connection would also not repair itself eventually. we now establish the new connection in parallel to the old one, and only if that succeeds do we replace the old one with it, similar to how the automatic alt-server switch already does it.
Diffstat (limited to 'src/kernel/net.c')
-rw-r--r--src/kernel/net.c1384
1 files changed, 646 insertions, 738 deletions
diff --git a/src/kernel/net.c b/src/kernel/net.c
index 5919832..f5806de 100644
--- a/src/kernel/net.c
+++ b/src/kernel/net.c
@@ -22,7 +22,6 @@
#include <dnbd3/config/client.h>
#include "net.h"
#include "blk.h"
-#include "utils.h"
#include "dnbd3_main.h"
#include <dnbd3/shared/serialize.h>
@@ -30,7 +29,6 @@
#include <linux/time.h>
#include <linux/ktime.h>
#include <linux/tcp.h>
-#include <linux/sched/task.h>
#ifndef MIN
#define MIN(a, b) ((a) < (b) ? (a) : (b))
@@ -40,7 +38,7 @@
#define ktime_to_s(kt) ktime_divns(kt, NSEC_PER_SEC)
#endif
-#ifdef CONFIG_DEBUG_DRIVER
+#ifdef DEBUG
#define ASSERT(x) \
do { \
if (!(x)) { \
@@ -54,15 +52,6 @@
} while (0)
#endif
-#define init_msghdr(h) \
- do { \
- h.msg_name = NULL; \
- h.msg_namelen = 0; \
- h.msg_control = NULL; \
- h.msg_controllen = 0; \
- h.msg_flags = MSG_WAITALL | MSG_NOSIGNAL; \
- } while (0)
-
#define dnbd3_dev_dbg_host(dev, host, fmt, ...) \
dev_dbg(dnbd3_device_to_dev(dev), "(%pISpc): " fmt, (host), ##__VA_ARGS__)
#define dnbd3_dev_err_host(dev, host, fmt, ...) \
@@ -73,219 +62,267 @@
#define dnbd3_dev_err_host_cur(dev, fmt, ...) \
dnbd3_dev_err_host(dev, &(dev)->cur_server.host, fmt, ##__VA_ARGS__)
-static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr);
+static bool dnbd3_drain_socket(dnbd3_device_t *dev, struct socket *sock, int bytes);
+static int dnbd3_recv_bytes(struct socket *sock, void *buffer, size_t count);
+static int dnbd3_recv_reply(struct socket *sock, dnbd3_reply_t *reply_hdr);
+static bool dnbd3_send_request(struct socket *sock, u16 cmd, u64 handle, u64 offset, u32 size);
+
+static int dnbd3_set_primary_connection(dnbd3_device_t *dev, struct socket *sock,
+ struct sockaddr_storage *addr, u16 protocol_version);
+
+static int dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr,
+ struct socket **sock_out);
+
+static bool dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
+ struct sockaddr_storage *addr, uint16_t *remote_version, bool copy_image_info);
+
+static bool dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr,
+ struct socket *sock);
+
+static bool dnbd3_send_empty_request(dnbd3_device_t *dev, u16 cmd);
+
+static void dnbd3_start_discover(dnbd3_device_t *dev, bool panic);
+
+static void dnbd3_discover(dnbd3_device_t *dev);
+
+static void dnbd3_internal_discover(dnbd3_device_t *dev);
-static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
- struct sockaddr_storage *addr, uint16_t *remote_version);
+static void set_socket_timeout(struct socket *sock, bool set_send, int timeout_ms);
-static int dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket *sock);
+// Use as write-only dump, don't care about race conditions etc.
+static u8 __garbage_mem[PAGE_SIZE];
-static void dnbd3_net_heartbeat(struct timer_list *arg)
+/**
+ * Delayed work triggering sending of keepalive packet.
+ */
+static void dnbd3_keepalive_workfn(struct work_struct *work)
{
- dnbd3_device_t *dev = (dnbd3_device_t *)container_of(arg, dnbd3_device_t, hb_timer);
-
- // Because different events need different intervals, the timer is called once a second.
- // Other intervals can be derived using dev->heartbeat_count.
-#define timeout_seconds(x) (dev->heartbeat_count % (x) == 0)
-
- if (!dev->panic) {
- if (timeout_seconds(TIMER_INTERVAL_KEEPALIVE_PACKET)) {
- struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC);
- // send keepalive
- if (req) {
- unsigned long irqflags;
-
- dnbd3_cmd_to_priv(req, CMD_KEEPALIVE);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_add_tail(&req->queuelist, &dev->request_queue_send);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- wake_up(&dev->process_queue_send);
- } else {
- dev_err(dnbd3_device_to_dev(dev), "couldn't create keepalive request\n");
- }
- }
- if ((dev->heartbeat_count > STARTUP_MODE_DURATION && timeout_seconds(TIMER_INTERVAL_PROBE_NORMAL)) ||
- (dev->heartbeat_count <= STARTUP_MODE_DURATION && timeout_seconds(TIMER_INTERVAL_PROBE_STARTUP))) {
- // Normal discovery
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
- }
- } else if (timeout_seconds(TIMER_INTERVAL_PROBE_PANIC)) {
- // Panic discovery
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
+ unsigned long irqflags;
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, keepalive_work.work);
+
+ dnbd3_send_empty_request(dev, CMD_KEEPALIVE);
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (device_active(dev)) {
+ mod_delayed_work(system_freezable_power_efficient_wq,
+ &dev->keepalive_work, KEEPALIVE_INTERVAL * HZ);
}
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+}
+
+/**
+ * Delayed work triggering discovery (alt server check)
+ */
+static void dnbd3_discover_workfn(struct work_struct *work)
+{
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, discover_work.work);
- dev->hb_timer.expires = jiffies + HZ;
+ dnbd3_discover(dev);
+}
- ++dev->heartbeat_count;
- add_timer(&dev->hb_timer);
+/**
+ * For manually triggering an immediate discovery
+ */
+static void dnbd3_start_discover(dnbd3_device_t *dev, bool panic)
+{
+ unsigned long irqflags;
-#undef timeout_seconds
+ if (!device_active(dev))
+ return;
+ if (panic && dnbd3_flag_get(dev->connection_lock)) {
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (!dev->panic) {
+ // Panic freshly turned on
+ dev->panic = true;
+ dev->discover_interval = TIMER_INTERVAL_PROBE_PANIC;
+ }
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ dnbd3_flag_reset(dev->connection_lock);
+ }
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ mod_delayed_work(system_freezable_power_efficient_wq,
+ &dev->discover_work, 1);
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
}
-static int dnbd3_net_discover(void *data)
+/**
+ * Wrapper for the actual discover function below. Check run conditions
+ * here and re-schedule delayed task here.
+ */
+static void dnbd3_discover(dnbd3_device_t *dev)
+{
+ unsigned long irqflags;
+
+ if (!device_active(dev) || dnbd3_flag_taken(dev->connection_lock))
+ return; // device not active anymore, or just about to switch
+ if (!dnbd3_flag_get(dev->discover_running))
+ return; // Already busy
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ cancel_delayed_work(&dev->discover_work);
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ dnbd3_internal_discover(dev);
+ dev->discover_count++;
+ // Re-queueing logic
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (device_active(dev)) {
+ mod_delayed_work(system_freezable_power_efficient_wq,
+ &dev->discover_work, dev->discover_interval * HZ);
+ if (dev->discover_interval < TIMER_INTERVAL_PROBE_MAX
+ && dev->discover_count > DISCOVER_STARTUP_PHASE_COUNT) {
+ dev->discover_interval += 2;
+ }
+ }
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ dnbd3_flag_reset(dev->discover_running);
+}
+
+/**
+ * Discovery. Probe all (or some) known alt servers,
+ * and initiate connection switch if appropriate
+ */
+static void dnbd3_internal_discover(dnbd3_device_t *dev)
{
- dnbd3_device_t *dev = data;
struct socket *sock, *best_sock = NULL;
dnbd3_alt_server_t *alt;
struct sockaddr_storage host_compare, best_server;
uint16_t remote_version;
- ktime_t start = ktime_set(0, 0), end = ktime_set(0, 0);
+ ktime_t start, end;
unsigned long rtt = 0, best_rtt = 0;
- unsigned long irqflags;
- int i, j, isize, fails, rtt_threshold;
- int turn = 0;
- int ready = 0, do_change = 0;
- char check_order[NUMBER_SERVERS];
-
- struct request *last_request = (struct request *)123, *cur_request = (struct request *)456;
+ int i, j, k, isize, fails, rtt_threshold;
+ int do_change = 0;
+ u8 check_order[NUMBER_SERVERS];
+ const bool ready = dev->discover_count > DISCOVER_STARTUP_PHASE_COUNT;
+ const u32 turn = dev->discover_count % DISCOVER_HISTORY_SIZE;
+ // Shuffle alt_servers
for (i = 0; i < NUMBER_SERVERS; ++i)
check_order[i] = i;
- while (!kthread_should_stop()) {
- wait_event_interruptible(dev->process_queue_discover,
- kthread_should_stop() || dev->discover || dev->thread_discover == NULL);
+ for (i = 0; i < NUMBER_SERVERS; ++i) {
+ j = prandom_u32() % NUMBER_SERVERS;
+ if (j != i) {
+ int tmp = check_order[i];
- if (kthread_should_stop() || dev->imgname == NULL || dev->thread_discover == NULL)
- break;
+ check_order[i] = check_order[j];
+ check_order[j] = tmp;
+ }
+ }
- if (!dev->discover)
- continue;
- dev->discover = 0;
+ best_server.ss_family = 0;
+ best_rtt = RTT_UNREACHABLE;
- if (dev->reported_size < 4096)
- continue;
+ if (!ready || dev->panic)
+ isize = NUMBER_SERVERS;
+ else
+ isize = 3;
- best_server.ss_family = 0;
- best_rtt = 0xFFFFFFFul;
+ for (j = 0; j < NUMBER_SERVERS; ++j) {
+ if (!device_active(dev))
+ break;
+ i = check_order[j];
+ mutex_lock(&dev->alt_servers_lock);
+ host_compare = dev->alt_servers[i].host;
+ fails = dev->alt_servers[i].failures;
+ mutex_unlock(&dev->alt_servers_lock);
+ if (host_compare.ss_family == 0)
+ continue; // Empty slot
+ // Reduced probability for hosts that have been unreachable
+ if (!dev->panic && fails > 50 && (prandom_u32() % 4) != 0)
+ continue; // If not in panic mode, skip server if it failed too many times
+ if (isize-- <= 0 && !is_same_server(&dev->cur_server.host, &host_compare))
+ continue; // Only test isize servers plus current server
+
+ // Initialize socket and connect
+ sock = NULL;
+ if (dnbd3_connect(dev, &host_compare, &sock) != 0)
+ goto error;
- if (dev->heartbeat_count < STARTUP_MODE_DURATION || dev->panic)
- isize = NUMBER_SERVERS;
- else
- isize = 3;
+ remote_version = 0;
+ if (!dnbd3_execute_handshake(dev, sock, &host_compare, &remote_version, false))
+ goto error;
- if (NUMBER_SERVERS > isize) {
- for (i = 0; i < isize; ++i) {
- j = ((ktime_to_s(start) >> i) ^ (ktime_to_us(start) >> j)) % NUMBER_SERVERS;
- if (j != i) {
- int tmp = check_order[i];
- check_order[i] = check_order[j];
- check_order[j] = tmp;
- }
+ // panic mode, take first responding server
+ if (dev->panic) {
+ dnbd3_dev_dbg_host(dev, &host_compare, "panic mode, changing to new server\n");
+ if (!dnbd3_flag_get(dev->connection_lock)) {
+ dnbd3_dev_dbg_host(dev, &host_compare, "...raced, ignoring\n");
+ } else {
+ // Check global flag, a connect might have been in progress
+ if (best_sock != NULL)
+ sock_release(best_sock);
+ set_socket_timeout(sock, false, SOCKET_TIMEOUT_RECV * 1000 + 1000);
+ if (dnbd3_set_primary_connection(dev, sock, &host_compare, remote_version) != 0)
+ sock_release(sock);
+ dnbd3_flag_reset(dev->connection_lock);
+ return;
}
}
- for (j = 0; j < NUMBER_SERVERS; ++j) {
- i = check_order[j];
- mutex_lock(&dev->alt_servers_lock);
- host_compare = dev->alt_servers[i].host;
- fails = dev->alt_servers[i].failures;
- mutex_unlock(&dev->alt_servers_lock);
- if (host_compare.ss_family == 0)
- continue; // Empty slot
- if (!dev->panic && fails > 50
- && (ktime_to_us(start) & 7) != 0)
- continue; // If not in panic mode, skip server if it failed too many times
- if (isize-- <= 0 && !is_same_server(&dev->cur_server.host, &host_compare))
- continue; // Only test isize servers plus current server
-
- // Initialize socket and connect
- sock = dnbd3_connect(dev, &host_compare);
- if (sock == NULL)
- goto error;
-
- if (!dnbd3_execute_handshake(dev, sock, &host_compare, &remote_version))
- goto error;
-
-
- // panic mode, take first responding server
- if (dev->panic) {
- dnbd3_dev_dbg_host(dev, &host_compare, "panic mode, changing to new server\n");
- while (atomic_cmpxchg(&dev->connection_lock, 0, 1) != 0)
- schedule();
-
- if (dev->panic) {
- // Re-check, a connect might have been in progress
- dev->panic = 0;
- if (best_sock != NULL)
- sock_release(best_sock);
-
- dev->better_sock = sock; // Pass over socket to take a shortcut in *_connect();
- put_task_struct(dev->thread_discover);
- dev->thread_discover = NULL;
- dnbd3_net_disconnect(dev);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host = host_compare;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- dnbd3_net_connect(dev);
- atomic_set(&dev->connection_lock, 0);
- return 0;
- }
- atomic_set(&dev->connection_lock, 0);
- }
-
- // start rtt measurement
- start = ktime_get_real();
-
- if (!dnbd3_request_test_block(dev, &host_compare, sock))
- goto error;
-
- end = ktime_get_real(); // end rtt measurement
+ // actual rtt measurement is just the first block requests and reply
+ start = ktime_get_real();
+ if (!dnbd3_request_test_block(dev, &host_compare, sock))
+ goto error;
+ end = ktime_get_real();
- mutex_lock(&dev->alt_servers_lock);
- if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
- dev->alt_servers[i].protocol_version = remote_version;
- dev->alt_servers[i].rtts[turn] = (unsigned long)ktime_us_delta(end, start);
-
- rtt = (dev->alt_servers[i].rtts[0] + dev->alt_servers[i].rtts[1]
- + dev->alt_servers[i].rtts[2] + dev->alt_servers[i].rtts[3])
- / 4;
- dev->alt_servers[i].failures = 0;
- if (dev->alt_servers[i].best_count > 1)
- dev->alt_servers[i].best_count -= 2;
+ mutex_lock(&dev->alt_servers_lock);
+ if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
+ dev->alt_servers[i].protocol_version = remote_version;
+ dev->alt_servers[i].rtts[turn] =
+ (unsigned long)ktime_us_delta(end, start);
+
+ rtt = 0;
+ for (k = 0; k < DISCOVER_HISTORY_SIZE; ++k) {
+ rtt += dev->alt_servers[i].rtts[k];
}
- mutex_unlock(&dev->alt_servers_lock);
+ rtt /= DISCOVER_HISTORY_SIZE;
+ dev->alt_servers[i].failures = 0;
+ if (dev->alt_servers[i].best_count > 1)
+ dev->alt_servers[i].best_count -= 2;
+ }
+ mutex_unlock(&dev->alt_servers_lock);
- if (best_rtt > rtt) {
- // This one is better, keep socket open in case we switch
- best_rtt = rtt;
- best_server = host_compare;
- if (best_sock != NULL)
- sock_release(best_sock);
- best_sock = sock;
- sock = NULL;
- } else {
- // Not better, discard connection
- sock_release(sock);
- sock = NULL;
- }
+ if (best_rtt > rtt) {
+ // This one is better, keep socket open in case we switch
+ best_rtt = rtt;
+ best_server = host_compare;
+ if (best_sock != NULL)
+ sock_release(best_sock);
+ best_sock = sock;
+ sock = NULL;
+ } else {
+ // Not better, discard connection
+ sock_release(sock);
+ sock = NULL;
+ }
- // update cur servers rtt
- if (is_same_server(&dev->cur_server.host, &host_compare))
- dev->cur_server.rtt = rtt;
+ // update cur servers rtt
+ if (is_same_server(&dev->cur_server.host, &host_compare))
+ dev->cur_server.rtt = rtt;
- continue;
+ continue;
error:
- if (sock != NULL) {
- sock_release(sock);
- sock = NULL;
- }
- mutex_lock(&dev->alt_servers_lock);
- if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
- ++dev->alt_servers[i].failures;
- dev->alt_servers[i].rtts[turn] = RTT_UNREACHABLE;
- if (dev->alt_servers[i].best_count > 2)
- dev->alt_servers[i].best_count -= 3;
- }
- mutex_unlock(&dev->alt_servers_lock);
- if (is_same_server(&dev->cur_server.host, &host_compare))
- dev->cur_server.rtt = RTT_UNREACHABLE;
- } // for loop over alt_servers
+ if (sock != NULL) {
+ sock_release(sock);
+ sock = NULL;
+ }
+ mutex_lock(&dev->alt_servers_lock);
+ if (is_same_server(&dev->alt_servers[i].host, &host_compare)) {
+ if (remote_version)
+ dev->alt_servers[i].protocol_version = remote_version;
+ ++dev->alt_servers[i].failures;
+ dev->alt_servers[i].rtts[turn] = RTT_UNREACHABLE;
+ if (dev->alt_servers[i].best_count > 2)
+ dev->alt_servers[i].best_count -= 3;
+ }
+ mutex_unlock(&dev->alt_servers_lock);
+ if (is_same_server(&dev->cur_server.host, &host_compare))
+ dev->cur_server.rtt = RTT_UNREACHABLE;
+ } // END - for loop over alt_servers
+ if (best_server.ss_family == 0) {
+ // No alt server could be reached
+ ASSERT(!best_sock);
if (dev->panic) {
if (dev->panic_count < 255)
dev->panic_count++;
@@ -293,295 +330,166 @@ error:
if (PROBE_COUNT_TIMEOUT > 0 && dev->panic_count == PROBE_COUNT_TIMEOUT + 1)
dnbd3_blk_fail_all_requests(dev);
}
+ return;
+ }
- if (best_server.ss_family == 0 || kthread_should_stop() || dev->thread_discover == NULL) {
- // No alt server could be reached at all or thread should stop
- if (best_sock != NULL) {
- // Should never happen actually
- sock_release(best_sock);
- best_sock = NULL;
- }
- continue;
- }
-
- // If best server was repeatedly measured best, lower the switching threshold more
- mutex_lock(&dev->alt_servers_lock);
- alt = get_existing_alt_from_addr(&best_server, dev);
- if (alt != NULL) {
- if (alt->best_count < 148)
- alt->best_count += 3;
- rtt_threshold = 1500 - (alt->best_count * 10);
- } else {
- rtt_threshold = 1500;
- }
- mutex_unlock(&dev->alt_servers_lock);
-
- do_change = ready && !is_same_server(&best_server, &dev->cur_server.host)
- && (ktime_to_us(start) & 3) != 0
- && RTT_THRESHOLD_FACTOR(dev->cur_server.rtt) > best_rtt + rtt_threshold;
-
- if (ready && !do_change && best_sock != NULL) {
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- if (!list_empty(&dev->request_queue_send)) {
- cur_request = list_entry(dev->request_queue_send.next, struct request, queuelist);
- do_change = (cur_request == last_request);
- if (do_change)
- dev_warn(dnbd3_device_to_dev(dev), "hung request, triggering change\n");
- } else {
- cur_request = (struct request *)123;
- }
- last_request = cur_request;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- }
-
- // take server with lowest rtt
- // if a (dis)connect is already in progress, we do nothing, this is not panic mode
- if (do_change && atomic_cmpxchg(&dev->connection_lock, 0, 1) == 0) {
- dev_info(dnbd3_device_to_dev(dev), "server %pISpc is faster (%lluµs vs. %lluµs)\n",
- &best_server,
- (unsigned long long)best_rtt, (unsigned long long)dev->cur_server.rtt);
- dev->better_sock = best_sock; // Take shortcut by continuing to use open connection
- put_task_struct(dev->thread_discover);
- dev->thread_discover = NULL;
- dnbd3_net_disconnect(dev);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host = best_server;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- dev->cur_server.rtt = best_rtt;
- dnbd3_net_connect(dev);
- atomic_set(&dev->connection_lock, 0);
- return 0;
- }
-
- // Clean up connection that was held open for quicker server switch
- if (best_sock != NULL) {
+ // If best server was repeatedly measured best, lower the switching threshold more
+ mutex_lock(&dev->alt_servers_lock);
+ alt = get_existing_alt_from_addr(&best_server, dev);
+ if (alt != NULL) {
+ if (alt->best_count < 178)
+ alt->best_count += 3;
+ rtt_threshold = 1800 - (alt->best_count * 10);
+ remote_version = alt->protocol_version;
+ } else {
+ rtt_threshold = 1800;
+ remote_version = 0;
+ }
+ mutex_unlock(&dev->alt_servers_lock);
+
+ do_change = ready && !is_same_server(&best_server, &dev->cur_server.host)
+ && RTT_THRESHOLD_FACTOR(dev->cur_server.rtt) > best_rtt + rtt_threshold;
+
+ // take server with lowest rtt
+ // if a (dis)connect is already in progress, we do nothing, this is not panic mode
+ if (do_change && device_active(dev) && dnbd3_flag_get(dev->connection_lock)) {
+ dev_info(dnbd3_device_to_dev(dev), "server %pISpc is faster (%lluµs vs. %lluµs)\n",
+ &best_server,
+ (unsigned long long)best_rtt, (unsigned long long)dev->cur_server.rtt);
+ set_socket_timeout(sock, false, // recv
+ MAX(best_rtt / 1000, SOCKET_TIMEOUT_RECV * 1000) + 500);
+ set_socket_timeout(sock, true, // send
+ MAX(best_rtt / 1000, SOCKET_TIMEOUT_SEND * 1000) + 500);
+ if (dnbd3_set_primary_connection(dev, best_sock, &best_server, remote_version) != 0)
sock_release(best_sock);
- best_sock = NULL;
- }
-
- // Increase rtt array index pointer, low probability that it doesn't advance
- if (!ready || (ktime_to_us(start) & 15) != 0)
- turn = (turn + 1) % 4;
- if (turn == 2) // Set ready when we only have 2 of 4 measurements for quicker load balancing
- ready = 1;
+ dnbd3_flag_reset(dev->connection_lock);
+ return;
}
- if (kthread_should_stop())
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally\n", __func__);
- else
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s exited unexpectedly\n", __func__);
-
- return 0;
+ // Clean up connection that was held open for quicker server switch
+ if (best_sock != NULL)
+ sock_release(best_sock);
}
-static int dnbd3_net_send(void *data)
+/**
+ * Worker for sending pending requests. This will be triggered whenever
+ * we get a new request from the block layer. The worker will then
+ * work through all the requests in the send queue, request them from
+ * the server, and return again.
+ */
+static void dnbd3_send_workfn(struct work_struct *work)
{
- dnbd3_device_t *dev = data;
- struct request *blk_request, *tmp_request;
-
- dnbd3_request_t dnbd3_request;
- struct msghdr msg;
- struct kvec iov;
-
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, send_work);
+ struct request *blk_request;
+ struct dnbd3_cmd *cmd;
unsigned long irqflags;
- int ret = 0;
-
- init_msghdr(msg);
-
- dnbd3_request.magic = dnbd3_packet_magic;
-
- set_user_nice(current, -20);
-
- // move already sent requests to request_queue_send again
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- if (!list_empty(&dev->request_queue_receive)) {
- dev_dbg(dnbd3_device_to_dev(dev), "request queue was not empty");
- list_for_each_entry_safe(blk_request, tmp_request, &dev->request_queue_receive, queuelist) {
- list_del_init(&blk_request->queuelist);
- list_add(&blk_request->queuelist, &dev->request_queue_send);
- }
- }
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
-
- while (!kthread_should_stop()) {
- wait_event_interruptible(dev->process_queue_send,
- kthread_should_stop() || !list_empty(&dev->request_queue_send));
-
- if (kthread_should_stop())
- break;
-
- // extract block request
- /* lock since we aquire a blk request from the request_queue_send */
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- if (list_empty(&dev->request_queue_send)) {
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- continue;
- }
- blk_request = list_entry(dev->request_queue_send.next, struct request, queuelist);
- // what to do?
- switch (dnbd3_req_op(blk_request)) {
- case DNBD3_DEV_READ:
- dnbd3_request.cmd = CMD_GET_BLOCK;
- dnbd3_request.offset = blk_rq_pos(blk_request) << 9; // *512
- dnbd3_request.size = blk_rq_bytes(blk_request); // bytes left to complete entire request
- // enqueue request to request_queue_receive
- list_del_init(&blk_request->queuelist);
- list_add_tail(&blk_request->queuelist, &dev->request_queue_receive);
- break;
- case DNBD3_REQ_OP_SPECIAL:
- dnbd3_request.cmd = dnbd3_priv_to_cmd(blk_request);
- dnbd3_request.size = 0;
- list_del_init(&blk_request->queuelist);
+ mutex_lock(&dev->send_mutex);
+ while (dev->sock && device_active(dev)) {
+ // extract next block request
+ spin_lock_irqsave(&dev->send_queue_lock, irqflags);
+ if (list_empty(&dev->send_queue)) {
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
break;
-
- default:
- if (!atomic_read(&dev->connection_lock))
- dev_err(dnbd3_device_to_dev(dev), "unknown command (send %u %u)\n",
- (int)blk_request->cmd_flags, (int)dnbd3_req_op(blk_request));
- list_del_init(&blk_request->queuelist);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- continue;
}
- // send net request
- dnbd3_request.handle = (uint64_t)(uintptr_t)blk_request; // Double cast to prevent warning on 32bit
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- fixup_request(dnbd3_request);
- iov.iov_base = &dnbd3_request;
- iov.iov_len = sizeof(dnbd3_request);
- if (kernel_sendmsg(dev->sock, &msg, &iov, 1, sizeof(dnbd3_request)) != sizeof(dnbd3_request)) {
- if (!atomic_read(&dev->connection_lock))
+ blk_request = list_entry(dev->send_queue.next, struct request, queuelist);
+ list_del_init(&blk_request->queuelist);
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
+ // append to receive queue
+ spin_lock_irqsave(&dev->recv_queue_lock, irqflags);
+ list_add_tail(&blk_request->queuelist, &dev->recv_queue);
+ spin_unlock_irqrestore(&dev->recv_queue_lock, irqflags);
+
+ cmd = blk_mq_rq_to_pdu(blk_request);
+ if (!dnbd3_send_request(dev->sock, CMD_GET_BLOCK, cmd->handle,
+ blk_rq_pos(blk_request) << 9 /* sectors */, blk_rq_bytes(blk_request))) {
+ if (!dnbd3_flag_taken(dev->connection_lock)) {
dnbd3_dev_err_host_cur(dev, "connection to server lost (send)\n");
- ret = -ESHUTDOWN;
- goto cleanup;
+ dnbd3_start_discover(dev, true);
+ }
+ break;
}
}
-
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally\n", __func__);
- return 0;
-
-cleanup:
- if (!atomic_read(&dev->connection_lock) && !kthread_should_stop()) {
- dev_dbg(dnbd3_device_to_dev(dev), "send thread: Triggering panic mode...\n");
- if (dev->sock)
- kernel_sock_shutdown(dev->sock, SHUT_RDWR);
- dev->panic = 1;
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
- }
-
- if (kthread_should_stop() || ret == 0 || atomic_read(&dev->connection_lock))
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally (cleanup)\n", __func__);
- else
- dev_err(dnbd3_device_to_dev(dev), "kthread %s terminated abnormally (%d)\n", __func__, ret);
-
- return 0;
+ mutex_unlock(&dev->send_mutex);
}
-static int dnbd3_net_receive(void *data)
+/**
+ * The receive workfn stays active for as long as the connection to a server
+ * lasts, i.e. it only gets restarted when we switch to a new server.
+ */
+static void dnbd3_recv_workfn(struct work_struct *work)
{
- dnbd3_device_t *dev = data;
- struct request *blk_request, *tmp_request, *received_request;
-
- dnbd3_reply_t dnbd3_reply;
- struct msghdr msg;
- struct kvec iov;
+ dnbd3_device_t *dev = container_of(work, dnbd3_device_t, recv_work);
+ struct request *blk_request;
+ struct request *rq_iter;
+ struct dnbd3_cmd *cmd;
+ dnbd3_reply_t reply_hdr;
struct req_iterator iter;
struct bio_vec bvec_inst;
struct bio_vec *bvec = &bvec_inst;
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL | MSG_WAITALL };
+ struct kvec iov;
void *kaddr;
unsigned long irqflags;
uint16_t rid;
- unsigned long recv_timeout = jiffies;
-
- int count, remaining, ret = 0;
-
- init_msghdr(msg);
- set_user_nice(current, -20);
+ int remaining;
+ int ret;
- while (!kthread_should_stop()) {
+ mutex_lock(&dev->recv_mutex);
+ while (dev->sock) {
// receive net reply
- iov.iov_base = &dnbd3_reply;
- iov.iov_len = sizeof(dnbd3_reply);
- ret = kernel_recvmsg(dev->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags);
-
- /* end thread after socket timeout or reception of data */
- if (kthread_should_stop())
- break;
-
- /* check return value of kernel_recvmsg() */
+ ret = dnbd3_recv_reply(dev->sock, &reply_hdr);
if (ret == 0) {
/* have not received any data, but remote peer is shutdown properly */
dnbd3_dev_dbg_host_cur(dev, "remote peer has performed an orderly shutdown\n");
- goto cleanup;
+ goto out_unlock;
} else if (ret < 0) {
if (ret == -EAGAIN) {
- if (jiffies < recv_timeout)
- recv_timeout = jiffies; // Handle overflow
- if ((jiffies - recv_timeout) / HZ > SOCKET_KEEPALIVE_TIMEOUT) {
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(dev, "receive timeout reached (%d of %d secs)\n",
- (int)((jiffies - recv_timeout) / HZ),
- (int)SOCKET_KEEPALIVE_TIMEOUT);
- ret = -ETIMEDOUT;
- goto cleanup;
- }
- continue;
+ if (!dnbd3_flag_taken(dev->connection_lock))
+ dnbd3_dev_err_host_cur(dev, "receive timeout reached\n");
} else {
- /* for all errors other than -EAGAIN, print message and abort thread */
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(dev, "connection to server lost (receive)\n");
- goto cleanup;
+ /* for all errors other than -EAGAIN, print errno */
+ if (!dnbd3_flag_taken(dev->connection_lock))
+ dnbd3_dev_err_host_cur(dev, "connection to server lost (receive, errno=%d)\n", ret);
}
+ goto out_unlock;
}
/* check if arrived data is valid */
- if (ret != sizeof(dnbd3_reply)) {
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(dev, "recv partial msg header (%d bytes)\n", ret);
- ret = -EINVAL;
- goto cleanup;
+ if (ret != sizeof(reply_hdr)) {
+ if (!dnbd3_flag_taken(dev->connection_lock))
+ dnbd3_dev_err_host_cur(dev, "recv partial msg header (%d/%d bytes)\n",
+ ret, (int)sizeof(reply_hdr));
+ goto out_unlock;
}
- fixup_reply(dnbd3_reply);
// check error
- if (dnbd3_reply.magic != dnbd3_packet_magic) {
+ if (reply_hdr.magic != dnbd3_packet_magic) {
dnbd3_dev_err_host_cur(dev, "wrong packet magic (receive)\n");
- ret = -EINVAL;
- goto cleanup;
- }
- if (dnbd3_reply.cmd == 0) {
- dnbd3_dev_err_host_cur(dev, "command was 0 (Receive)\n");
- ret = -EINVAL;
- goto cleanup;
+ goto out_unlock;
}
- // Update timeout
- recv_timeout = jiffies;
-
// what to do?
- switch (dnbd3_reply.cmd) {
+ switch (reply_hdr.cmd) {
case CMD_GET_BLOCK:
// search for replied request in queue
blk_request = NULL;
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_for_each_entry_safe(received_request, tmp_request, &dev->request_queue_receive,
- queuelist) {
- if ((uint64_t)(uintptr_t)received_request == dnbd3_reply.handle) {
- // Double cast to prevent warning on 32bit
- blk_request = received_request;
+ spin_lock_irqsave(&dev->recv_queue_lock, irqflags);
+ list_for_each_entry(rq_iter, &dev->recv_queue, queuelist) {
+ cmd = blk_mq_rq_to_pdu(rq_iter);
+ if (cmd->handle == reply_hdr.handle) {
+ blk_request = rq_iter;
list_del_init(&blk_request->queuelist);
break;
}
}
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ spin_unlock_irqrestore(&dev->recv_queue_lock, irqflags);
if (blk_request == NULL) {
- dnbd3_dev_err_host_cur(dev, "received block data for unrequested handle (%llu: %llu)\n",
- (unsigned long long)dnbd3_reply.handle,
- (unsigned long long)dnbd3_reply.size);
- ret = -EINVAL;
- goto cleanup;
+ dnbd3_dev_err_host_cur(dev, "received block data for unrequested handle (%llx: len=%llu)\n",
+ reply_hdr.handle,
+ (u64)reply_hdr.size);
+ goto out_unlock;
}
// receive data and answer to block layer
#if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 14, 0)
@@ -599,45 +507,36 @@ static int dnbd3_net_receive(void *data)
/* have not received any data, but remote peer is shutdown properly */
dnbd3_dev_dbg_host_cur(
dev, "remote peer has performed an orderly shutdown\n");
- ret = 0;
} else if (ret < 0) {
- if (!atomic_read(&dev->connection_lock))
+ if (!dnbd3_flag_taken(dev->connection_lock))
dnbd3_dev_err_host_cur(dev,
"disconnect: receiving from net to block layer\n");
} else {
- if (!atomic_read(&dev->connection_lock))
+ if (!dnbd3_flag_taken(dev->connection_lock))
dnbd3_dev_err_host_cur(dev,
"receiving from net to block layer (%d bytes)\n", ret);
- ret = -EINVAL;
}
// Requeue request
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_add(&blk_request->queuelist, &dev->request_queue_send);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- goto cleanup;
+ spin_lock_irqsave(&dev->send_queue_lock, irqflags);
+ list_add(&blk_request->queuelist, &dev->send_queue);
+ spin_unlock_irqrestore(&dev->send_queue_lock, irqflags);
+ goto out_unlock;
}
}
-#ifdef DNBD3_BLK_MQ
blk_mq_end_request(blk_request, BLK_STS_OK);
-#else
- blk_end_request_all(blk_request, 0);
-#endif
- continue;
+ break;
case CMD_GET_SERVERS:
- remaining = dnbd3_reply.size;
+ remaining = reply_hdr.size;
if (dev->use_server_provided_alts) {
dnbd3_server_entry_t new_server;
while (remaining >= sizeof(dnbd3_server_entry_t)) {
- iov.iov_base = &new_server;
- iov.iov_len = sizeof(dnbd3_server_entry_t);
- if (kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len,
- msg.msg_flags) != sizeof(dnbd3_server_entry_t)) {
- if (!atomic_read(&dev->connection_lock))
+ if (dnbd3_recv_bytes(dev->sock, &new_server, sizeof(new_server))
+ != sizeof(new_server)) {
+ if (!dnbd3_flag_taken(dev->connection_lock))
dnbd3_dev_err_host_cur(dev, "recv CMD_GET_SERVERS payload\n");
- ret = -EINVAL;
- goto cleanup;
+ goto out_unlock;
}
// TODO: Log
if (new_server.failures == 0) { // ADD
@@ -645,36 +544,20 @@ static int dnbd3_net_receive(void *data)
} else { // REM
dnbd3_rem_server(dev, &new_server.host);
}
- remaining -= sizeof(dnbd3_server_entry_t);
- }
- }
- // Drain any payload still on the wire
- while (remaining > 0) {
- count = MIN(sizeof(dnbd3_reply),
- remaining); // Abuse the reply struct as the receive buffer
- iov.iov_base = &dnbd3_reply;
- iov.iov_len = count;
- ret = kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
- if (ret <= 0) {
- if (!atomic_read(&dev->connection_lock))
- dnbd3_dev_err_host_cur(
- dev, "recv additional payload from CMD_GET_SERVERS\n");
- ret = -EINVAL;
- goto cleanup;
+ remaining -= sizeof(new_server);
}
- remaining -= ret;
}
- continue;
+ if (!dnbd3_drain_socket(dev, dev->sock, remaining))
+ goto out_unlock;
+ break;
case CMD_LATEST_RID:
- if (dnbd3_reply.size != 2) {
- dev_err(dnbd3_device_to_dev(dev), "CMD_LATEST_RID.size != 2\n");
+ if (reply_hdr.size < 2) {
+ dev_err(dnbd3_device_to_dev(dev), "CMD_LATEST_RID.size < 2\n");
continue;
}
- iov.iov_base = &rid;
- iov.iov_len = sizeof(rid);
- if (kernel_recvmsg(dev->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags) <= 0) {
- if (!atomic_read(&dev->connection_lock))
+ if (dnbd3_recv_bytes(dev->sock, &rid, 2) != 2) {
+ if (!dnbd3_flag_taken(dev->connection_lock))
dev_err(dnbd3_device_to_dev(dev), "could not receive CMD_LATEST_RID payload\n");
} else {
rid = net_order_16(rid);
@@ -682,70 +565,52 @@ static int dnbd3_net_receive(void *data)
dev->imgname, (int)rid, (int)dev->rid);
dev->update_available = (rid > dev->rid ? 1 : 0);
}
+ if (reply_hdr.size > 2)
+ dnbd3_drain_socket(dev, dev->sock, reply_hdr.size - 2);
continue;
case CMD_KEEPALIVE:
- if (dnbd3_reply.size != 0)
- dev_err(dnbd3_device_to_dev(dev), "keep alive packet with payload\n");
+ if (reply_hdr.size != 0) {
+ dev_dbg(dnbd3_device_to_dev(dev), "keep alive packet with payload\n");
+ dnbd3_drain_socket(dev, dev->sock, reply_hdr.size);
+ }
continue;
default:
- dev_err(dnbd3_device_to_dev(dev), "unknown command (receive)\n");
- continue;
+ dev_err(dnbd3_device_to_dev(dev), "unknown command: %d (receive), aborting connection\n", (int)reply_hdr.cmd);
+ goto out_unlock;
}
}
-
- dev_dbg(dnbd3_device_to_dev(dev), "kthread thread_receive terminated normally\n");
- return 0;
-
-cleanup:
- if (!atomic_read(&dev->connection_lock) && !kthread_should_stop()) {
- dev_dbg(dnbd3_device_to_dev(dev), "recv thread: Triggering panic mode...\n");
- if (dev->sock)
- kernel_sock_shutdown(dev->sock, SHUT_RDWR);
- dev->panic = 1;
- dev->discover = 1;
- wake_up(&dev->process_queue_discover);
- }
-
- if (kthread_should_stop() || ret == 0 || atomic_read(&dev->connection_lock))
- dev_dbg(dnbd3_device_to_dev(dev), "kthread %s terminated normally (cleanup)\n", __func__);
- else
- dev_err(dnbd3_device_to_dev(dev), "kthread %s terminated abnormally (%d)\n", __func__, ret);
-
- return 0;
+out_unlock:
+ // This will check if we actually still need a new connection
+ dnbd3_start_discover(dev, true);
+ mutex_unlock(&dev->recv_mutex);
}
-static void set_socket_timeouts(struct socket *sock, int timeout_ms)
+/**
+ * Set send or receive timeout of given socket
+ */
+static void set_socket_timeout(struct socket *sock, bool set_send, int timeout_ms)
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 1, 0)
+ int opt = set_send ? SO_SNDTIMEO_NEW : SO_RCVTIMEO_NEW;
struct __kernel_sock_timeval timeout;
#else
+ int opt = set_send ? SO_SNDTIMEO : SO_RCVTIMEO;
struct timeval timeout;
#endif
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 9, 0)
- sockptr_t timeout_ptr;
-
- timeout_ptr = KERNEL_SOCKPTR(&timeout);
+ sockptr_t timeout_ptr = KERNEL_SOCKPTR(&timeout);
#else
- char *timeout_ptr;
-
- timeout_ptr = (char *)&timeout;
+ char *timeout_ptr = (char *)&timeout;
#endif
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;
-
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 1, 0)
- sock_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO_NEW, timeout_ptr, sizeof(timeout));
- sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO_NEW, timeout_ptr, sizeof(timeout));
-#else
- sock_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, timeout_ptr, sizeof(timeout));
- sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, timeout_ptr, sizeof(timeout));
-#endif
+ sock_setsockopt(sock, SOL_SOCKET, opt, timeout_ptr, sizeof(timeout));
}
-static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr)
+static int dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket **sock_out)
{
ktime_t start;
int ret, connect_time_ms;
@@ -763,7 +628,7 @@ static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage
#endif
if (ret < 0) {
dev_err(dnbd3_device_to_dev(dev), "couldn't create socket: %d\n", ret);
- return NULL;
+ return ret;
}
/* Only one retry, TCP no delay */
@@ -790,36 +655,40 @@ static struct socket *dnbd3_connect(dnbd3_device_t *dev, struct sockaddr_storage
connect_time_ms = dev->cur_server.rtt * 2 / 1000;
}
/* but obey a minimal configurable value, and maximum sanity check */
- if (connect_time_ms < SOCKET_TIMEOUT_CLIENT_DATA * 1000)
- connect_time_ms = SOCKET_TIMEOUT_CLIENT_DATA * 1000;
+ if (connect_time_ms < SOCKET_TIMEOUT_SEND * 1000)
+ connect_time_ms = SOCKET_TIMEOUT_SEND * 1000;
else if (connect_time_ms > 60000)
connect_time_ms = 60000;
- set_socket_timeouts(sock, connect_time_ms);
+ set_socket_timeout(sock, false, connect_time_ms); // recv
+ set_socket_timeout(sock, true, connect_time_ms); // send
start = ktime_get_real();
while (--retries > 0) {
ret = kernel_connect(sock, (struct sockaddr *)addr, addrlen, 0);
connect_time_ms = (int)ktime_ms_delta(ktime_get_real(), start);
- if (connect_time_ms > 2 * SOCKET_TIMEOUT_CLIENT_DATA * 1000) {
+ if (connect_time_ms > 2 * SOCKET_TIMEOUT_SEND * 1000) {
/* Either I'm losing my mind or there was a specific build of kernel
* 5.x where SO_RCVTIMEO didn't affect the connect call above, so
* this function would hang for over a minute for unreachable hosts.
* Leave in this debug check for twice the configured timeout
*/
- dev_dbg(dnbd3_device_to_dev(dev), "%pISpc connect call took %dms\n",
- addr, connect_time_ms);
+ dnbd3_dev_dbg_host(dev, addr, "connect: call took %dms\n",
+ connect_time_ms);
}
if (ret != 0) {
if (ret == -EINTR)
- continue;
- dev_dbg(dnbd3_device_to_dev(dev), "%pISpc connect failed (%d, blocked %dms)\n",
- addr, ret, connect_time_ms);
+ dnbd3_dev_dbg_host(dev, addr, "connect: interrupted system call (blocked %dms)\n",
+ connect_time_ms);
+ else
+ dnbd3_dev_dbg_host(dev, addr, "connect: failed (%d, blocked %dms)\n",
+ ret, connect_time_ms);
goto error;
}
- return sock;
+ *sock_out = sock;
+ return 0;
}
error:
sock_release(sock);
- return NULL;
+ return ret < 0 ? ret : -EIO;
}
#define dnbd3_err_dbg_host(...) do { \
@@ -837,37 +706,39 @@ error:
* server, so we validate the filesize, rid, name against what we expect.
* The server's protocol version is returned in 'remote_version'
*/
-static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
- struct sockaddr_storage *addr, uint16_t *remote_version)
+static bool dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
+ struct sockaddr_storage *addr, uint16_t *remote_version, bool copy_data)
{
+ unsigned long irqflags;
const char *name;
uint64_t filesize;
int mlen;
- uint16_t rid, initial_connect;
- struct msghdr msg;
+ uint16_t rid;
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL | MSG_WAITALL };
struct kvec iov[2];
serialized_buffer_t *payload;
- dnbd3_reply_t dnbd3_reply;
- dnbd3_request_t dnbd3_request = { .magic = dnbd3_packet_magic };
+ dnbd3_reply_t reply_hdr;
+ dnbd3_request_t request_hdr = { .magic = dnbd3_packet_magic };
payload = kmalloc(sizeof(*payload), GFP_KERNEL);
if (payload == NULL)
goto error;
- initial_connect = (dev->reported_size == 0);
- init_msghdr(msg);
+ if (copy_data && device_active(dev)) {
+ dev_warn(dnbd3_device_to_dev(dev), "Called handshake function with copy_data enabled when reported_size is not zero\n");
+ }
// Request filesize
- dnbd3_request.cmd = CMD_SELECT_IMAGE;
- iov[0].iov_base = &dnbd3_request;
- iov[0].iov_len = sizeof(dnbd3_request);
+ request_hdr.cmd = CMD_SELECT_IMAGE;
+ iov[0].iov_base = &request_hdr;
+ iov[0].iov_len = sizeof(request_hdr);
serializer_reset_write(payload);
serializer_put_uint16(payload, PROTOCOL_VERSION); // DNBD3 protocol version
serializer_put_string(payload, dev->imgname); // image name
serializer_put_uint16(payload, dev->rid); // revision id
serializer_put_uint8(payload, 0); // are we a server? (no!)
iov[1].iov_base = payload;
- dnbd3_request.size = iov[1].iov_len = serializer_get_written_length(payload);
- fixup_request(dnbd3_request);
+ request_hdr.size = iov[1].iov_len = serializer_get_written_length(payload);
+ fixup_request(request_hdr);
mlen = iov[0].iov_len + iov[1].iov_len;
if (kernel_sendmsg(sock, &msg, iov, 2, mlen) != mlen) {
dnbd3_err_dbg_host(dev, addr, "requesting image size failed\n");
@@ -875,28 +746,28 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
}
// receive net reply
- iov[0].iov_base = &dnbd3_reply;
- iov[0].iov_len = sizeof(dnbd3_reply);
- if (kernel_recvmsg(sock, &msg, iov, 1, sizeof(dnbd3_reply), msg.msg_flags) != sizeof(dnbd3_reply)) {
+ if (dnbd3_recv_reply(sock, &reply_hdr) != sizeof(reply_hdr)) {
dnbd3_err_dbg_host(dev, addr, "receiving image size packet (header) failed\n");
goto error;
}
- fixup_reply(dnbd3_reply);
- if (dnbd3_reply.magic != dnbd3_packet_magic || dnbd3_reply.cmd != CMD_SELECT_IMAGE || dnbd3_reply.size < 4) {
+ if (reply_hdr.magic != dnbd3_packet_magic
+ || reply_hdr.cmd != CMD_SELECT_IMAGE || reply_hdr.size < 4
+ || reply_hdr.size > sizeof(*payload)) {
dnbd3_err_dbg_host(dev, addr,
- "corrupted CMD_SELECT_IMAGE reply\n");
+ "corrupt CMD_SELECT_IMAGE reply\n");
goto error;
}
// receive data
iov[0].iov_base = payload;
- iov[0].iov_len = dnbd3_reply.size;
- if (kernel_recvmsg(sock, &msg, iov, 1, dnbd3_reply.size, msg.msg_flags) != dnbd3_reply.size) {
+ iov[0].iov_len = reply_hdr.size;
+ if (kernel_recvmsg(sock, &msg, iov, 1, reply_hdr.size, msg.msg_flags)
+ != reply_hdr.size) {
dnbd3_err_dbg_host(dev, addr,
"receiving payload of CMD_SELECT_IMAGE reply failed\n");
goto error;
}
- serializer_reset_read(payload, dnbd3_reply.size);
+ serializer_reset_read(payload, reply_hdr.size);
*remote_version = serializer_get_uint16(payload);
name = serializer_get_string(payload);
@@ -910,7 +781,6 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
(int)MIN_SUPPORTED_SERVER);
goto error;
}
-
if (name == NULL) {
dnbd3_err_dbg_host(dev, addr, "server did not supply an image name\n");
goto error;
@@ -920,20 +790,16 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
goto error;
}
- /* only check image name if this isn't the initial connect */
- if (initial_connect && dev->rid != 0 && strcmp(name, dev->imgname) != 0) {
- dnbd3_err_dbg_host(dev, addr, "server offers image '%s', requested '%s'\n", name, dev->imgname);
- goto error;
- }
-
- if (initial_connect) {
+ if (copy_data) {
if (filesize < DNBD3_BLOCK_SIZE) {
dnbd3_err_dbg_host(dev, addr, "reported size by server is < 4096\n");
goto error;
}
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
if (strlen(dev->imgname) < strlen(name)) {
dev->imgname = krealloc(dev->imgname, strlen(name) + 1, GFP_KERNEL);
if (dev->imgname == NULL) {
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
dnbd3_err_dbg_host(dev, addr, "reallocating buffer for new image name failed\n");
goto error;
}
@@ -942,9 +808,10 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
dev->rid = rid;
// store image information
dev->reported_size = filesize;
+ dev->update_available = 0;
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
set_capacity(dev->disk, dev->reported_size >> 9); /* 512 Byte blocks */
dnbd3_dev_dbg_host(dev, addr, "image size: %llu\n", dev->reported_size);
- dev->update_available = 0;
} else {
/* switching connection, sanity checks */
if (rid != dev->rid) {
@@ -954,6 +821,11 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
goto error;
}
+ if (strcmp(name, dev->imgname) != 0) {
+ dnbd3_err_dbg_host(dev, addr, "server offers image '%s', requested '%s'\n", name, dev->imgname);
+ goto error;
+ }
+
if (filesize != dev->reported_size) {
dnbd3_err_dbg_host(dev, addr,
"reported image size of %llu does not match expected value %llu\n",
@@ -962,251 +834,287 @@ static int dnbd3_execute_handshake(dnbd3_device_t *dev, struct socket *sock,
}
}
kfree(payload);
- return 1;
+ return true;
error:
kfree(payload);
- return 0;
+ return false;
+}
+
+static bool dnbd3_send_request(struct socket *sock, u16 cmd, u64 handle, u64 offset, u32 size)
+{
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL };
+ dnbd3_request_t request_hdr = {
+ .magic = dnbd3_packet_magic,
+ .cmd = cmd,
+ .size = size,
+ .offset = offset,
+ .handle = handle,
+ };
+ struct kvec iov = { .iov_base = &request_hdr, .iov_len = sizeof(request_hdr) };
+
+ fixup_request(request_hdr);
+ return kernel_sendmsg(sock, &msg, &iov, 1, sizeof(request_hdr)) == sizeof(request_hdr);
+}
+
+/**
+ * Send a request with given cmd type and empty payload.
+ */
+static bool dnbd3_send_empty_request(dnbd3_device_t *dev, u16 cmd)
+{
+ int ret;
+
+ mutex_lock(&dev->send_mutex);
+ ret = dev->sock
+ && dnbd3_send_request(dev->sock, cmd, 0, 0, 0);
+ mutex_unlock(&dev->send_mutex);
+ return ret;
+}
+
+static int dnbd3_recv_bytes(struct socket *sock, void *buffer, size_t count)
+{
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL | MSG_WAITALL };
+ struct kvec iov = { .iov_base = buffer, .iov_len = count };
+
+ return kernel_recvmsg(sock, &msg, &iov, 1, count, msg.msg_flags);
+}
+
+static int dnbd3_recv_reply(struct socket *sock, dnbd3_reply_t *reply_hdr)
+{
+ int ret = dnbd3_recv_bytes(sock, reply_hdr, sizeof(*reply_hdr));
+
+ fixup_reply(*reply_hdr);
+ return ret;
}
-static int dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket *sock)
+static bool dnbd3_drain_socket(dnbd3_device_t *dev, struct socket *sock, int bytes)
{
- dnbd3_request_t dnbd3_request = { .magic = dnbd3_packet_magic };
- dnbd3_reply_t dnbd3_reply;
+ int ret;
struct kvec iov;
- struct msghdr msg;
- char *buf = NULL;
- char smallbuf[256];
- int remaining, buffer_size, ret, func_return;
+ struct msghdr msg = { .msg_flags = MSG_NOSIGNAL };
+
+ while (bytes > 0) {
+ iov.iov_base = __garbage_mem;
+ iov.iov_len = sizeof(__garbage_mem);
+ ret = kernel_recvmsg(sock, &msg, &iov, 1, MIN(bytes, iov.iov_len), msg.msg_flags);
+ if (ret <= 0) {
+ dnbd3_dev_err_host_cur(dev, "draining payload failed (ret=%d)\n", ret);
+ return false;
+ }
+ bytes -= ret;
+ }
+ return true;
+}
- init_msghdr(msg);
+static bool dnbd3_request_test_block(dnbd3_device_t *dev, struct sockaddr_storage *addr, struct socket *sock)
+{
+ dnbd3_reply_t reply_hdr;
- func_return = 0;
// Request block
- dnbd3_request.cmd = CMD_GET_BLOCK;
- // Do *NOT* pick a random block as it has proven to cause severe
- // cache thrashing on the server
- dnbd3_request.offset = 0;
- dnbd3_request.size = RTT_BLOCK_SIZE;
- fixup_request(dnbd3_request);
- iov.iov_base = &dnbd3_request;
- iov.iov_len = sizeof(dnbd3_request);
-
- if (kernel_sendmsg(sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0) {
+ if (!dnbd3_send_request(sock, CMD_GET_BLOCK, 0, 0, RTT_BLOCK_SIZE)) {
dnbd3_err_dbg_host(dev, addr, "requesting test block failed\n");
- goto error;
+ return false;
}
// receive net reply
- iov.iov_base = &dnbd3_reply;
- iov.iov_len = sizeof(dnbd3_reply);
- if (kernel_recvmsg(sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags)
- != sizeof(dnbd3_reply)) {
+ if (dnbd3_recv_reply(sock, &reply_hdr) != sizeof(reply_hdr)) {
dnbd3_err_dbg_host(dev, addr, "receiving test block header packet failed\n");
- goto error;
+ return false;
}
- fixup_reply(dnbd3_reply);
- if (dnbd3_reply.magic != dnbd3_packet_magic || dnbd3_reply.cmd != CMD_GET_BLOCK
- || dnbd3_reply.size != RTT_BLOCK_SIZE) {
+ if (reply_hdr.magic != dnbd3_packet_magic || reply_hdr.cmd != CMD_GET_BLOCK
+ || reply_hdr.size != RTT_BLOCK_SIZE || reply_hdr.handle != 0) {
dnbd3_err_dbg_host(dev, addr,
- "unexpected reply to block request: cmd=%d, size=%d (discover)\n",
- (int)dnbd3_reply.cmd, (int)dnbd3_reply.size);
- goto error;
+ "unexpected reply to block request: cmd=%d, size=%d, handle=%llu (discover)\n",
+ (int)reply_hdr.cmd, (int)reply_hdr.size, reply_hdr.handle);
+ return false;
}
// receive data
- buf = kmalloc(DNBD3_BLOCK_SIZE, GFP_NOWAIT);
- if (buf == NULL) {
- /* fallback to stack if we're really memory constrained */
- buf = smallbuf;
- buffer_size = sizeof(smallbuf);
- } else {
- buffer_size = DNBD3_BLOCK_SIZE;
- }
- remaining = RTT_BLOCK_SIZE;
- /* TODO in either case we could build a large iovec that points to the same buffer over and over again */
- while (remaining > 0) {
- iov.iov_base = buf;
- iov.iov_len = buffer_size;
- ret = kernel_recvmsg(sock, &msg, &iov, 1, MIN(remaining, buffer_size), msg.msg_flags);
- if (ret <= 0) {
- dnbd3_err_dbg_host(dev, addr, "receiving test block payload failed (ret=%d)\n", ret);
- goto error;
- }
- remaining -= ret;
- }
- func_return = 1;
- // Fallthrough!
-error:
- if (buf != smallbuf)
- kfree(buf);
- return func_return;
+ return dnbd3_drain_socket(dev, sock, RTT_BLOCK_SIZE);
}
#undef dnbd3_err_dbg_host
-static int spawn_worker_thread(dnbd3_device_t *dev, struct task_struct **task, const char *name,
- int (*threadfn)(void *data))
+static void replace_main_socket(dnbd3_device_t *dev, struct socket *sock, struct sockaddr_storage *addr, u16 protocol_version)
{
- ASSERT(*task == NULL);
- *task = kthread_create(threadfn, dev, "%s-%s", dev->disk->disk_name, name);
- if (!IS_ERR(*task)) {
- get_task_struct(*task);
- wake_up_process(*task);
- return 1;
+ unsigned long irqflags;
+
+ mutex_lock(&dev->send_mutex);
+ // First, shutdown connection, so receive worker will leave its mainloop
+ if (dev->sock)
+ kernel_sock_shutdown(dev->sock, SHUT_RDWR);
+ mutex_lock(&dev->recv_mutex);
+ // Receive worker is done, get rid of socket and replace
+ if (dev->sock)
+ sock_release(dev->sock);
+ dev->sock = sock;
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (addr == NULL) {
+ memset(&dev->cur_server, 0, sizeof(dev->cur_server));
+ } else {
+ dev->cur_server.host = *addr;
+ dev->cur_server.rtt = 0;
+ dev->cur_server.protocol_version = protocol_version;
}
- dev_err(dnbd3_device_to_dev(dev), "failed to create %s thread (%ld)\n",
- name, PTR_ERR(*task));
- /* reset possible non-NULL error value */
- *task = NULL;
- return 0;
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ mutex_unlock(&dev->recv_mutex);
+ mutex_unlock(&dev->send_mutex);
}
-static void stop_worker_thread(dnbd3_device_t *dev, struct task_struct **task, const char *name, int quiet)
+static void dnbd3_release_resources(dnbd3_device_t *dev)
{
- int ret;
-
- if (*task == NULL)
- return;
- if (!quiet)
- dnbd3_dev_dbg_host_cur(dev, "stop %s thread\n", name);
- ret = kthread_stop(*task);
- put_task_struct(*task);
- if (ret == -EINTR) {
- /* thread has never been scheduled and run */
- if (!quiet)
- dev_dbg(dnbd3_device_to_dev(dev), "%s thread has never run\n", name);
- } else {
- /* thread has run, check if it has terminated successfully */
- if (ret < 0 && !quiet)
- dev_err(dnbd3_device_to_dev(dev), "%s thread was not terminated correctly\n", name);
- }
- *task = NULL;
+ if (dev->send_wq)
+ destroy_workqueue(dev->send_wq);
+ dev->send_wq = NULL;
+ if (dev->recv_wq)
+ destroy_workqueue(dev->recv_wq);
+ dev->recv_wq = NULL;
+ mutex_destroy(&dev->send_mutex);
+ mutex_destroy(&dev->recv_mutex);
}
-int dnbd3_net_connect(dnbd3_device_t *dev)
+/**
+ * Establish new connection on a dnbd3 device.
+ * Return 0 on success, errno otherwise
+ */
+int dnbd3_new_connection(dnbd3_device_t *dev, struct sockaddr_storage *addr, bool init)
{
- struct request *req_alt_servers = NULL;
unsigned long irqflags;
+ struct socket *sock = NULL;
+ uint16_t proto_version;
+ int ret;
- ASSERT(atomic_read(&dev->connection_lock));
-
- if (dev->use_server_provided_alts) {
- req_alt_servers = kmalloc(sizeof(*req_alt_servers), GFP_KERNEL);
- if (req_alt_servers == NULL)
- dnbd3_dev_err_host_cur(dev, "Cannot allocate memory to request list of alt servers\n");
+ ASSERT(dnbd3_flag_taken(dev->connection_lock));
+ if (init && device_active(dev)) {
+ dnbd3_dev_err_host_cur(dev, "device already configured/connected\n");
+ return -EBUSY;
+ }
+ if (!init && !device_active(dev)) {
+ dev_warn(dnbd3_device_to_dev(dev), "connection switch called on unconfigured device\n");
+ return -ENOTCONN;
}
- if (dev->cur_server.host.ss_family == 0 || dev->imgname == NULL) {
- dnbd3_dev_err_host_cur(dev, "connect: host or image name not set\n");
+ dnbd3_dev_dbg_host(dev, addr, "connecting...\n");
+ ret = dnbd3_connect(dev, addr, &sock);
+ if (ret != 0 || sock == NULL)
goto error;
- }
- if (dev->sock) {
- dnbd3_dev_err_host_cur(dev, "socket already connected\n");
+ /* execute the "select image" handshake */
+ // if init is true, reported_size will be set
+ if (!dnbd3_execute_handshake(dev, sock, addr, &proto_version, init)) {
+ ret = -EINVAL;
goto error;
}
- ASSERT(dev->thread_send == NULL);
- ASSERT(dev->thread_receive == NULL);
- ASSERT(dev->thread_discover == NULL);
-
- if (dev->better_sock != NULL) {
- // Switching server, connection is already established and size request was executed
- dnbd3_dev_dbg_host_cur(dev, "on-the-fly server change\n");
- dev->sock = dev->better_sock;
- dev->better_sock = NULL;
- } else {
- // no established connection yet from discovery thread, start new one
- uint16_t proto_version;
-
- dnbd3_dev_dbg_host_cur(dev, "connecting\n");
- dev->sock = dnbd3_connect(dev, &dev->cur_server.host);
- if (dev->sock == NULL) {
- dnbd3_dev_err_host_cur(dev, "%s: Failed\n", __func__);
- goto error;
+ if (init) {
+ // We're setting up the device for use - allocate resources
+ // Do not goto error before this
+ ASSERT(!dev->send_wq);
+ ASSERT(!dev->recv_wq);
+ mutex_init(&dev->send_mutex);
+ mutex_init(&dev->recv_mutex);
+ // a designated queue for sending, that allows one active task only
+ dev->send_wq = alloc_workqueue("dnbd%d-send",
+ WQ_UNBOUND | WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_HIGHPRI,
+ 1, dev->index);
+ dev->recv_wq = alloc_workqueue("dnbd%d-recv",
+ WQ_UNBOUND | WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_CPU_INTENSIVE,
+ 1, dev->index);
+ if (!dev->send_wq || !dev->recv_wq) {
+ ret = -ENOMEM;
+ goto error_dealloc;
}
- /* execute the "select image" handshake */
- if (!dnbd3_execute_handshake(dev, dev->sock, &dev->cur_server.host, &proto_version))
- goto error;
-
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.protocol_version = proto_version;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
}
- /* create required threads */
- if (!spawn_worker_thread(dev, &dev->thread_send, "send", dnbd3_net_send))
- goto error;
- if (!spawn_worker_thread(dev, &dev->thread_receive, "receive", dnbd3_net_receive))
- goto error;
- if (!spawn_worker_thread(dev, &dev->thread_discover, "discover", dnbd3_net_discover))
- goto error;
+ set_socket_timeout(sock, false, SOCKET_TIMEOUT_RECV * 1000); // recv
+ dnbd3_set_primary_connection(dev, sock, addr, proto_version);
+ sock = NULL; // In case we ever goto error* after this point
- dnbd3_dev_dbg_host_cur(dev, "connection established\n");
- dev->panic = 0;
- dev->panic_count = 0;
+ spin_lock_irqsave(&dev->blk_lock, irqflags);
+ if (init) {
+ dev->discover_count = 0;
+ dev->discover_interval = TIMER_INTERVAL_PROBE_STARTUP;
+ // discovery and keepalive are not critical, use the power efficient queue
+ queue_delayed_work(system_power_efficient_wq, &dev->discover_work,
+ dev->discover_interval * HZ);
+ queue_delayed_work(system_power_efficient_wq, &dev->keepalive_work,
+ KEEPALIVE_INTERVAL * HZ);
+ // but the receiver is performance critical AND runs indefinitely, use the
+ // the cpu intensive queue, as jobs submitted there will not cound towards
+ // the concurrency limit of per-cpu worker threads. It still feels a little
+ // dirty to avoid managing our own thread, but nbd does it too.
+ }
+ spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ return 0;
- if (req_alt_servers != NULL) {
- // Enqueue request to request_queue_send for a fresh list of alt servers
- dnbd3_cmd_to_priv(req_alt_servers, CMD_GET_SERVERS);
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- list_add(&req_alt_servers->queuelist, &dev->request_queue_send);
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- wake_up(&dev->process_queue_send);
+error_dealloc:
+ if (init) {
+ // If anything fails during initialization, free resources again
+ dnbd3_release_resources(dev);
}
+error:
+ if (init)
+ dev->reported_size = 0;
+ if (sock)
+ sock_release(sock);
+ return ret < 0 ? ret : -EIO;
+}
- // add heartbeat timer
- // Do not goto error after creating the timer - we require that the timer exists
- // if dev->sock != NULL -- see dnbd3_net_disconnect
- dev->heartbeat_count = 0;
- timer_setup(&dev->hb_timer, dnbd3_net_heartbeat, 0);
- dev->hb_timer.expires = jiffies + HZ;
- add_timer(&dev->hb_timer);
+void dnbd3_net_work_init(dnbd3_device_t *dev)
+{
+ INIT_WORK(&dev->send_work, dnbd3_send_workfn);
+ INIT_WORK(&dev->recv_work, dnbd3_recv_workfn);
+ INIT_DELAYED_WORK(&dev->discover_work, dnbd3_discover_workfn);
+ INIT_DELAYED_WORK(&dev->keepalive_work, dnbd3_keepalive_workfn);
+}
- return 0;
+static int dnbd3_set_primary_connection(dnbd3_device_t *dev, struct socket *sock, struct sockaddr_storage *addr, u16 protocol_version)
+{
+ unsigned long irqflags;
-error:
- stop_worker_thread(dev, &dev->thread_send, "send", 1);
- stop_worker_thread(dev, &dev->thread_receive, "receive", 1);
- stop_worker_thread(dev, &dev->thread_discover, "discover", 1);
- if (dev->sock) {
- sock_release(dev->sock);
- dev->sock = NULL;
+ ASSERT(dnbd3_flag_taken(dev->connection_lock));
+ if (addr->ss_family == 0 || dev->imgname == NULL || sock == NULL) {
+ dnbd3_dev_err_host_cur(dev, "connect: host, image name or sock not set\n");
+ return -EINVAL;
}
+
+ replace_main_socket(dev, sock, addr, protocol_version);
spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host.ss_family = 0;
+ dev->panic = false;
+ dev->panic_count = 0;
+ dev->discover_interval = TIMER_INTERVAL_PROBE_SWITCH;
+ queue_work(dev->recv_wq, &dev->recv_work);
spin_unlock_irqrestore(&dev->blk_lock, irqflags);
- kfree(req_alt_servers);
+ if (dev->use_server_provided_alts) {
+ dnbd3_send_empty_request(dev, CMD_GET_SERVERS);
+ }
- return -1;
+ dnbd3_dev_dbg_host_cur(dev, "connection switched\n");
+ dnbd3_blk_requeue_all_requests(dev);
+ return 0;
}
+/**
+ * Disconnect the device, shutting it down.
+ */
int dnbd3_net_disconnect(dnbd3_device_t *dev)
{
- unsigned long irqflags;
-
+ ASSERT(dnbd3_flag_taken(dev->connection_lock));
+ if (!device_active(dev))
+ return -ENOTCONN;
dev_dbg(dnbd3_device_to_dev(dev), "disconnecting device ...\n");
- ASSERT(atomic_read(&dev->connection_lock));
- dev->discover = 0;
-
- if (dev->sock) {
- kernel_sock_shutdown(dev->sock, SHUT_RDWR);
- // clear heartbeat timer
- del_timer(&dev->hb_timer);
- }
+ dev->reported_size = 0;
+ /* quickly fail all requests */
+ dnbd3_blk_fail_all_requests(dev);
+ replace_main_socket(dev, NULL, NULL, 0);
- // kill sending and receiving threads
- stop_worker_thread(dev, &dev->thread_send, "send", 0);
- stop_worker_thread(dev, &dev->thread_receive, "receive", 0);
- stop_worker_thread(dev, &dev->thread_discover, "discover", 0);
- if (dev->sock) {
- sock_release(dev->sock);
- dev->sock = NULL;
- }
- spin_lock_irqsave(&dev->blk_lock, irqflags);
- dev->cur_server.host.ss_family = 0;
- spin_unlock_irqrestore(&dev->blk_lock, irqflags);
+ cancel_delayed_work_sync(&dev->keepalive_work);
+ cancel_delayed_work_sync(&dev->discover_work);
+ cancel_work_sync(&dev->send_work);
+ cancel_work_sync(&dev->recv_work);
+ dnbd3_blk_fail_all_requests(dev);
+ dnbd3_release_resources(dev);
+ dev_dbg(dnbd3_device_to_dev(dev), "all workers shut down\n");
return 0;
}