summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/kernel/core.c1
-rw-r--r--src/kernel/dnbd3.h1
-rw-r--r--src/kernel/net.c232
3 files changed, 205 insertions, 29 deletions
diff --git a/src/kernel/core.c b/src/kernel/core.c
index c2d14d2..1347bcd 100644
--- a/src/kernel/core.c
+++ b/src/kernel/core.c
@@ -341,6 +341,7 @@ int dnbd3_add_device(struct dnbd3_device *dev, int minor)
for (i = 0; i < NUMBER_CONNECTIONS; i++) {
dev->socks[i].device = dev;
+ dev->socks[i].sock_nr = i;
}
disk = alloc_disk(1);
diff --git a/src/kernel/dnbd3.h b/src/kernel/dnbd3.h
index cde94a2..f3f848a 100644
--- a/src/kernel/dnbd3.h
+++ b/src/kernel/dnbd3.h
@@ -45,6 +45,7 @@ struct dnbd3_server {
struct dnbd3_sock {
+ uint16_t sock_nr;
struct socket *sock;
struct mutex lock;
struct request *pending;
diff --git a/src/kernel/net.c b/src/kernel/net.c
index 7df8369..0f875c5 100644
--- a/src/kernel/net.c
+++ b/src/kernel/net.c
@@ -49,10 +49,12 @@
} while (0)
static DECLARE_WAIT_QUEUE_HEAD(send_wq);
-static uint64_t send_wq_handle;
+static volatile uint64_t send_wq_signal;
static int dnbd3_socket_connect(struct dnbd3_device *dev, struct dnbd3_server *server);
-static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock);
+static int __dnbd3_socket_connect(struct dnbd3_server * server, struct dnbd3_sock *sock);
+static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock, bool wait_for_receiver);
+
static void dnbd3_print_host(struct dnbd3_host_t *host, char *msg)
{
@@ -74,6 +76,12 @@ static void dnbd3_print_server_list(struct dnbd3_device *dev)
}
}
+
+static inline uint64_t dnbd3_to_wq_signal(int minor, uint16_t dnbd3_cmd, uint16_t sock_nr) {
+ return ((uint64_t) minor << 32) | ((uint32_t) dnbd3_cmd << 16) | sock_nr;
+}
+
+
static uint64_t dnbd3_to_handle(uint32_t arg0, uint32_t arg1) {
return ((uint64_t) arg0 << 32) | arg1;
}
@@ -159,7 +167,7 @@ int dnbd3_send_request_blocking(struct dnbd3_sock *sock, int dnbd3_cmd)
{
int result = 0;
uint64_t handle;
- struct request *req = kmalloc(sizeof(struct request), GFP_ATOMIC );
+ struct request *req = kmalloc(sizeof(struct request), GFP_KERNEL);
printk(KERN_DEBUG "dnbd3: starting blocking request\n");
if (!req) {
printk(KERN_ERR "dnbd3: kmalloc failed\n");
@@ -186,13 +194,13 @@ int dnbd3_send_request_blocking(struct dnbd3_sock *sock, int dnbd3_cmd)
mutex_unlock(&sock->lock);
goto error;
}
- send_wq_handle = 0;
- handle = dnbd3_to_handle(sock->device->minor, dnbd3_cmd);
+ send_wq_signal = 0;
+ handle = dnbd3_to_wq_signal(sock->device->minor, dnbd3_cmd, sock->sock_nr);
mutex_unlock(&sock->lock);
printk(KERN_DEBUG "dnbd3: blocking request going to sleep wait for handle %llu\n", handle);
- if (wait_event_interruptible_timeout(send_wq, handle == send_wq_handle, REQUEST_TIMEOUT) <= 0) { // timeout or interrupt
+ if (wait_event_interruptible_timeout(send_wq, handle == send_wq_signal, 10 /* REQUEST_TIMEOUT*/) <= 0) { // timeout or interrupt
printk(KERN_WARNING "dndbd3: request timed out\n");
result = -EIO;
goto error;
@@ -240,7 +248,6 @@ static void dnbd3_receive_worker(struct work_struct *work)
goto error;
}
- result = 0;
fixup_reply(dnbd3_reply);
// check error
@@ -289,7 +296,8 @@ static void dnbd3_receive_worker(struct work_struct *work)
kaddr = kmap(bvec->bv_page) + bvec->bv_offset;
iov.iov_base = kaddr;
iov.iov_len = bvec->bv_len;
- if (kernel_recvmsg(sock->sock, &msg, &iov, 1, bvec->bv_len, msg.msg_flags) != bvec->bv_len) {
+ result = kernel_recvmsg(sock->sock, &msg, &iov, 1, bvec->bv_len, msg.msg_flags);
+ if (result != bvec->bv_len) {
kunmap(bvec->bv_page);
sigprocmask(SIG_SETMASK, &oldset, NULL );
printk(KERN_ERR "dnbd3: could not receive form net to block layer\n");
@@ -316,7 +324,8 @@ static void dnbd3_receive_worker(struct work_struct *work)
if (count != 0) {
iov.iov_base = dev->new_servers;
iov.iov_len = count * sizeof(dnbd3_server_entry_t);
- if (kernel_recvmsg(sock->sock, &msg, &iov, 1, (count * sizeof(dnbd3_server_entry_t)), msg.msg_flags) != (count * sizeof(dnbd3_server_entry_t))) {
+ result = kernel_recvmsg(sock->sock, &msg, &iov, 1, (count * sizeof(dnbd3_server_entry_t)), msg.msg_flags);
+ if (result != (count * sizeof(dnbd3_server_entry_t))) {
printk(KERN_ERR "dnbd3: failed to get servers\n");
mutex_unlock(&dev->device_lock);
goto error;
@@ -336,7 +345,6 @@ consume_payload:
mutex_unlock(&dev->device_lock);
goto error;
}
- result = 0;
}
mutex_unlock(&dev->device_lock);
break;
@@ -348,7 +356,8 @@ consume_payload:
printk(KERN_DEBUG "dnbd3: latest rid received\n");
iov.iov_base = &rid;
iov.iov_len = sizeof(rid);
- if (kernel_recvmsg(sock->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags) <= 0) {
+ result = kernel_recvmsg(sock->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
+ if (result <= 0) {
printk(KERN_ERR "dnbd3: failed to get latest rid\n");
goto error;
}
@@ -368,12 +377,12 @@ consume_payload:
// receive reply payload
iov.iov_base = &payload_buffer;
iov.iov_len = dnbd3_reply.size;
- if ((result = kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags)) != dnbd3_reply.size) {
+ result = kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags);
+ if (result != dnbd3_reply.size) {
printk(KERN_ERR "dnbd3: could not read CMD_SELECT_IMAGE payload on handshake, size is %d and should be%d\n",
result, dnbd3_reply.size);
goto error;
}
- result = 0;
// handle/check reply payload
serializer_reset_read(&payload_buffer, dnbd3_reply.size);
@@ -408,11 +417,15 @@ consume_payload:
break;
}
error:
- handle = dnbd3_to_handle(dev->minor, dnbd3_reply.cmd);
+ handle = dnbd3_to_wq_signal(dev->minor, dnbd3_reply.cmd, sock->sock_nr);
printk(KERN_DEBUG "dnbd3: try to wake up queue with handle %llu\n", handle);
- send_wq_handle = handle;
+ send_wq_signal = handle;
wake_up_interruptible(&send_wq);
- if (result) {
+ if (result == 0) {
+ printk(KERN_INFO "dnbd3: result is 0, socket seems to be down\n");
+// dnbd3_socket_disconnect(dev, NULL, sock, false);//TODO use panic or something or start worker to reconnect?
+ break; //the socket seems to be down
+ } else if (result < 0) {
sock->server->failures++; // discovery takes care of to many failures
printk(KERN_WARNING "dnbd3: receive error happened %d, total failures %d\n", result, sock->server->failures);
}
@@ -449,13 +462,24 @@ static void dnbd3_discovery_timer(struct timer_list *arg)
static void dnbd3_discovery_worker(struct work_struct *work)
{
struct dnbd3_device *dev = container_of(work, struct dnbd3_device, discovery);
- struct dnbd3_sock *sock = &dev->socks[0]; // we use the first sock for discovery
+ struct dnbd3_sock *sock = NULL;
int i, j;
struct dnbd3_server *existing_server, *free_server, *failed_server;
dnbd3_server_entry_t *new_server;
+ struct kvec iov;
+ struct timeval start, end;
+ dnbd3_request_t dnbd3_request;
+ dnbd3_reply_t dnbd3_reply;
+ struct msghdr msg;
+ char *buf, *name;
+ struct request *req = NULL;
+ uint64_t rtt;
+ serialized_buffer_t *payload;
+ uint64_t filesize;
+ uint16_t rid;
printk(KERN_DEBUG "dnbd3: starting discovery worker\n");
- dnbd3_send_request_blocking(sock, CMD_GET_SERVERS);
+ dnbd3_send_request_blocking(&dev->socks[0], CMD_GET_SERVERS);
printk(KERN_DEBUG "dnbd3: new server num is %d\n", dev->new_servers_num);
if (dev->new_servers_num) {
@@ -487,7 +511,7 @@ static void dnbd3_discovery_worker(struct work_struct *work)
if (existing_server) {
if (new_server->failures == 1) { // remove is requested
dnbd3_print_host(&existing_server->host, "remove server");
- dnbd3_socket_disconnect(dev, existing_server, NULL); // TODO what to do when only one connection?
+ dnbd3_socket_disconnect(dev, existing_server, NULL, true); // TODO what to do when only one connection?
existing_server->host.type = 0;
}
// ADD, so just reset fail counter
@@ -511,28 +535,175 @@ static void dnbd3_discovery_worker(struct work_struct *work)
dev->new_servers_num = 0;
mutex_unlock(&dev->device_lock);
}
-
+ buf = kmalloc(RTT_BLOCK_SIZE, GFP_KERNEL);
+ if (!buf) {
+ printk(KERN_ERR "dnbd3: kmalloc failed\n");
+ goto error;
+ }
+ payload = (serialized_buffer_t *)buf;
+ req = kmalloc(sizeof(struct request), GFP_KERNEL);
+ if (!req) {
+ printk(KERN_ERR "dnbd3: kmalloc failed\n");
+ goto error;
+ }
+ sock = kmalloc(sizeof(struct dnbd3_sock), GFP_KERNEL);
+ if (!sock) {
+ printk(KERN_ERR "dnbd3: kmalloc failed\n");
+ goto error;
+ }
// measure rtt for all alt servers
for (i = 0; i < NUMBER_SERVERS; i++) {
+ existing_server = &dev->alt_servers[i];
+ if (existing_server->host.type) {
+ sock->sock = NULL;
+ sock->device = dev;
+ init_msghdr(msg);
+ if (__dnbd3_socket_connect(existing_server, sock)) {
+ printk(KERN_ERR "dnbd3: socket connect failed in rtt measurement\n");
+ goto rtt_error;
+ }
+ dnbd3_connect(req);
+ if (dnbd3_send_request(sock, req, NULL)) {
+ printk(KERN_ERR "dnbd3: request select image failed in rtt measurement\n");
+ goto rtt_error;
+ }
+ iov.iov_base = &dnbd3_reply;
+ iov.iov_len = sizeof(dnbd3_reply);
+ if (kernel_recvmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags) != sizeof(dnbd3_reply)) {
+ printk(KERN_ERR "dnbd3: receive select image failed in rtt measurement %d\n", j);
+ goto rtt_error;
+ }
+ fixup_reply(dnbd3_reply);
+ if (dnbd3_reply.magic != dnbd3_packet_magic || dnbd3_reply.cmd != CMD_SELECT_IMAGE || dnbd3_reply.size < 4) {
+ printk(KERN_ERR "dnbd3: receive select image wrong header in rtt measurement\n");
+ goto rtt_error;
+ }
+ // receive data
+ iov.iov_base = payload;
+ iov.iov_len = dnbd3_reply.size;
+ if (kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags) != dnbd3_reply.size) {
+ printk(KERN_ERR "dnbd3: receive data select image failed in rtt measurement\n");
+ goto rtt_error;
+ }
+ serializer_reset_read(payload, dnbd3_reply.size);
+
+ existing_server->protocol_version = serializer_get_uint16(payload);
+ if (existing_server->protocol_version < MIN_SUPPORTED_SERVER) {
+ printk(KERN_ERR "dnbd3: server version to old in rtt measurement\n");
+ goto rtt_error;
+ }
+
+ name = serializer_get_string(payload);
+ if (name == NULL ) {
+ printk(KERN_ERR "dnbd3: server did not supply an image name in rtt measurement\n");
+ goto rtt_error;
+ }
+
+ if (strcmp(name, dev->imgname) != 0) {
+ printk(KERN_ERR "dnbd3: image name %s does not match requested %s in rtt measurement\n", name, dev->imgname);
+ goto rtt_error;
+ }
+// TODO rid is 1 but dev->rid is 0
+ rid = serializer_get_uint16(payload);
+// if (rid != dev->rid) {
+// printk(KERN_ERR "dnbd3: rid %d does not match requested %d in rtt measurement\n", rid, dev->rid);
+// goto rtt_error;
+// }
+
+ filesize = serializer_get_uint64(payload);
+ if (filesize != dev->reported_size) {
+ printk(KERN_ERR "dnbd3: image size %llu does not match requested %llu in rtt measurement\n", filesize, dev->reported_size);
+ goto rtt_error;
+ }
+ // Request block
+ dnbd3_request.cmd = CMD_GET_BLOCK;
+ // Do *NOT* pick a random block as it has proven to cause severe
+ // cache thrashing on the server
+ dnbd3_request.offset = 0;
+ dnbd3_request.size = RTT_BLOCK_SIZE;
+ fixup_request(dnbd3_request);
+ iov.iov_base = &dnbd3_request;
+ iov.iov_len = sizeof(dnbd3_request);
+
+ init_msghdr(msg);
+ // start rtt measurement
+ do_gettimeofday(&start);
+
+ if (kernel_sendmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_request)) <= 0) {
+ printk(KERN_ERR "dnbd3: request test block failed in rtt measurement\n");
+ goto rtt_error;
+ }
+ // receive net reply
+ iov.iov_base = &dnbd3_reply;
+ iov.iov_len = sizeof(dnbd3_reply);
+ if ((j = kernel_recvmsg(sock->sock, &msg, &iov, 1, sizeof(dnbd3_reply), msg.msg_flags)) != sizeof(dnbd3_reply)) {
+ printk(KERN_ERR "dnbd3: receive header test block failed in rtt measurement %d %ld\n", j, sizeof(dnbd3_reply));
+ goto rtt_error;
+ }
+ fixup_reply(dnbd3_reply);
+ if (dnbd3_reply.magic != dnbd3_packet_magic|| dnbd3_reply.cmd != CMD_GET_BLOCK || dnbd3_reply.size != RTT_BLOCK_SIZE) {
+ printk(KERN_ERR "dnbd3: receive header cmd test block failed in rtt measurement\n");
+ goto rtt_error;
+ }
+
+ // receive data
+ iov.iov_base = buf;
+ iov.iov_len = RTT_BLOCK_SIZE;
+ if (kernel_recvmsg(sock->sock, &msg, &iov, 1, dnbd3_reply.size, msg.msg_flags) != RTT_BLOCK_SIZE) {
+ printk(KERN_ERR "dnbd3: receive test block failed in rtt measurement\n");
+ goto rtt_error;
+ }
+
+ do_gettimeofday(&end); // end rtt measurement
+
+ rtt = (uint64_t)((end.tv_sec - start.tv_sec) * 1000000ull + (end.tv_usec - start.tv_usec));
+
+ printk(KERN_DEBUG "dnbd3: new rrt for %pI4 is %llu\n", existing_server->host.addr, rtt);
+
+rtt_error:
+ if (sock->sock) {
+ kernel_sock_shutdown(sock->sock, SHUT_RDWR);
+ sock->server = NULL;
+ }
+
+ if (sock->sock) {
+ sock_release(sock->sock);
+ sock->sock = NULL;
+ }
+ }
+ }
+
+error:
+ if (buf) {
+ kfree(buf);
+ buf = NULL;
+ }
+ if (req) {
+ kfree(req);
+ req = NULL;
+ }
+ if (sock) {
+ kfree(sock);
+ sock = NULL;
}
}
-static int __dnbd3_socket_connect(struct dnbd3_server * server, struct dnbd3_sock *sock)
+static int __dnbd3_socket_connect(struct dnbd3_server *server, struct dnbd3_sock *sock)
{
int result = 0;
struct timeval timeout;
if (server->host.port == 0 || server->host.type == 0) {
printk(KERN_ERR "dnbd3: host or port not set\n");
- goto error;
+ return -EIO;
}
if (sock->sock) {
printk(KERN_WARNING "dnbd3: socket already connected\n");
- goto error;
+ return -EIO;
}
- timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA + 2;
+ timeout.tv_sec = SOCKET_TIMEOUT_CLIENT_DATA;
timeout.tv_usec = 0;
if ((result = dnbd3_sock_create(server->host.type, SOCK_STREAM, IPPROTO_TCP, &sock->sock)) < 0) {
@@ -636,7 +807,7 @@ error:
}
-static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock)
+static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server *server, struct dnbd3_sock *sock, bool wait_for_receiver)
{
int i;
int sock_alive = 0;
@@ -652,12 +823,13 @@ static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server
printk(KERN_WARNING "dnbd3: could not find socket to disconnect\n");
return -EIO;
}
- if (sock_alive == 1) {
+ if (sock_alive <= 1) {
printk(KERN_INFO "dnbd3: shutting down last socket and stopping discovery\n");
del_timer_sync(&dev->discovery_timer);
cancel_work_sync(&dev->discovery);
}
+ printk(KERN_DEBUG "dnbd3: stopping keepalive\n");
del_timer_sync(&sock->keepalive_timer);
cancel_work_sync(&sock->keepalive);
@@ -680,8 +852,10 @@ static int dnbd3_socket_disconnect(struct dnbd3_device *dev, struct dnbd3_server
mutex_unlock(&sock->lock);
mutex_destroy(&sock->lock);
- printk(KERN_DEBUG "dnbd3: cancel receiver work device %i\n", dev->minor);
- cancel_work_sync(&sock->receive);
+ if (wait_for_receiver) {
+ printk(KERN_DEBUG "dnbd3: cancel receiver work device %i\n", dev->minor);
+ cancel_work_sync(&sock->receive);
+ }
if (sock->sock) {
sock_release(sock->sock);
@@ -697,7 +871,7 @@ int dnbd3_net_disconnect(struct dnbd3_device *dev)
for (i = 0; i < NUMBER_CONNECTIONS; i++) {
if (dev->socks[i].sock) {
- if (dnbd3_socket_disconnect(dev, NULL, &dev->socks[i])) {
+ if (dnbd3_socket_disconnect(dev, NULL, &dev->socks[i], true)) {
result = -EIO;
}
}