summaryrefslogtreecommitdiffstats
path: root/kernel/core.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/core.c')
-rw-r--r--kernel/core.c510
1 files changed, 510 insertions, 0 deletions
diff --git a/kernel/core.c b/kernel/core.c
new file mode 100644
index 0000000..6bee380
--- /dev/null
+++ b/kernel/core.c
@@ -0,0 +1,510 @@
+/*
+ * kernel/core.c - The principal component of the driver.
+ * See core.h for comments on each function.
+ */
+
+
+#include "dnbd2.h"
+#include "core.h"
+#include "devices.h"
+#include "misc.h"
+#include "fops.h"
+#include "servers.h"
+
+
+int dnbd2_major;
+static dnbd2_device_t dev[DNBD2_DEVICES];
+
+
+void dnbd2_request(request_queue_t *q)
+{
+ int i;
+ struct request *req;
+ struct req_info *info;
+ dnbd2_device_t *dev;
+
+ while ((req = elv_next_request(q)) != NULL) {
+ /* Discard invalid block requests. */
+ if (!blk_fs_request(req)) {
+ end_request(req, 0);
+ continue;
+ }
+ if (rq_data_dir(req) != READ) {
+ end_request(req, 0);
+ continue;
+ }
+
+ /* Prepare request. */
+ dev = req->rq_disk->private_data;
+ for (i=0 ; i<POOL_SIZE ; i++) {
+ if (dev->info_pool[i].cnt == -1)
+ break;
+ }
+ if (i == POOL_SIZE)
+ continue;
+ dev->pending_reqs++;
+ info = &dev->info_pool[i];
+ info->time = jiffies;
+ info->cnt = 0;
+ info->cmd = CMD_GET_BLOCK;
+ info->dst = dev->active_server;
+ req->special = info;
+ blkdev_dequeue_request(req);
+
+ /* Enqueue the request for sending. */
+ spin_lock_bh(&dev->send_queue_lock);
+ list_add_tail(&req->queuelist, &dev->send_queue);
+ spin_unlock_bh(&dev->send_queue_lock);
+
+ /* Wakeup sender function. */
+ wake_up_interruptible(&dev->sender_wq);
+ }
+}
+
+
+int dnbd2_tx_loop(void *data)
+{
+ struct request *req;
+ dnbd2_device_t *dev = data;
+ struct req_info *info;
+
+ daemonize("dnbd2_tx_loop");
+ allow_signal(SIGKILL);
+ complete(&dev->tx_start);
+
+ while (1) {
+ wait_event_interruptible(dev->sender_wq,
+ !list_empty(&dev->send_queue));
+
+ /*
+ * If wait_event_interruptible is interrupted while
+ * the queue is not empty it retunrs 0, not
+ * -ERESTARTSYS. Therefore we need another flag.
+ */
+ if (dev->tx_signal)
+ break;
+
+ /* Dequeue request from the send-queue. */
+ spin_lock_bh(&dev->send_queue_lock);
+ req = blkdev_entry_to_request(dev->send_queue.next);
+ list_del_init(&req->queuelist);
+ spin_unlock_bh(&dev->send_queue_lock);
+
+ /*
+ * This is a good place to do some sanity checks
+ * because the request is neither in the send- nor in
+ * the pending-queue.
+ */
+ down(&dev->servers_mutex);
+ info = req->special;
+ switch (info->cmd) {
+ case CMD_GET_BLOCK:
+ /*
+ * If dnbd2_request is called w/o first
+ * opening the device or with the device's
+ * capacity set to zero we should land here.
+ */
+ if (!dev->running) {
+ dnbd2_end_request(req, 0);
+ up(&dev->servers_mutex);
+ continue;
+ }
+ /*
+ * Send block requests to dev->active_server,
+ * which is always set while running.
+ */
+ info->last_dst = info->dst;
+ info->dst = dev->active_server;
+ break;
+
+ case CMD_GET_SERVERS:
+ case CMD_GET_SIZE:
+ /*
+ * Discard the request if it's destination is
+ * no longer available.
+ */
+ if (!info->dst->sock) {
+ dnbd2_end_request(req, 1);
+ up(&dev->servers_mutex);
+ continue;
+ }
+ break;
+ }
+
+ /* Enqueue request in the pending-queue. */
+ spin_lock_bh(&dev->pending_queue_lock);
+ list_add_tail(&req->queuelist, &dev->pending_queue);
+ spin_unlock_bh(&dev->pending_queue_lock);
+
+ /* Send request. */
+ dnbd2_send_request(req, dev);
+ up(&dev->servers_mutex);
+ }
+
+ complete_and_exit(&dev->tx_stop, 0);
+}
+
+
+void dnbd2_send_request(struct request *req, dnbd2_device_t *dev) {
+ dnbd2_data_request_t dnbd2_req;
+ struct req_info *info = req->special;
+
+ dnbd2_req.cmd = cpu_to_be16(info->cmd);
+ dnbd2_req.time = cpu_to_be16(info->time);
+ dnbd2_req.vid = cpu_to_be16(dev->vid);
+ dnbd2_req.rid = cpu_to_be16(dev->rid);
+ dnbd2_req.num = cpu_to_be64(req->sector * SECTOR_SIZE);
+
+ /*
+ * If sock_xmit fails the request eventually gets requeued.
+ * That's why we don't check its return value.
+ */
+ sock_xmit(info->dst->sock, SEND, &dnbd2_req, sizeof(dnbd2_req));
+ info->cnt++;
+ if (info->dst != info->last_dst) {
+ info->cnt = 1;
+ } else if (info->cnt > 1) {
+ info->dst->retries++;
+ }
+}
+
+
+int dnbd2_rx_loop(void *data)
+{
+ struct srv_info *srv_info = data;
+ dnbd2_device_t *dev = srv_info->dev;
+ dnbd2_data_reply_t dnbd2_rep;
+ struct request *req;
+ uint64_t num;
+ uint16_t cmd, time;
+ char *buffer;
+ int ret, i;
+
+ daemonize("dnbd2_rx_loop");
+ allow_signal(SIGKILL);
+ complete(&srv_info->rx_start);
+
+ while (1) {
+ ret = sock_xmit(srv_info->sock, RECV,
+ &dnbd2_rep, sizeof(dnbd2_rep));
+ if (ret == -EINTR)
+ break;
+ if (ret != sizeof(dnbd2_rep) ||
+ be16_to_cpu(dnbd2_rep.vid) != dev->vid ||
+ be16_to_cpu(dnbd2_rep.rid) != dev->rid)
+ continue;
+
+ cmd = be16_to_cpu(dnbd2_rep.cmd);
+ num = be64_to_cpu(dnbd2_rep.num);
+ time = be16_to_cpu(dnbd2_rep.time);
+
+ /* Find a matching request in the pending-queue. */
+ req = dnbd2_find_request(num, cmd, srv_info);
+ if (!req)
+ continue;
+ srv_info->last_reply = jiffies;
+ update_rtt(diff(time), srv_info, cmd);
+
+ switch (cmd) {
+ case CMD_GET_BLOCK:
+ spin_lock(&dev->kmap_lock);
+ buffer = __bio_kmap_atomic(req->bio, 0, KM_USER0);
+ memcpy(buffer,
+ dnbd2_rep.payload.data,
+ DNBD2_BLOCK_SIZE);
+ __bio_kunmap_atomic(req->bio, KM_USER0);
+ spin_unlock(&dev->kmap_lock);
+ break;
+
+ case CMD_GET_SIZE:
+ if (!srv_info->capacity) {
+ srv_info->capacity = num / SECTOR_SIZE;
+ wake_up_all(&srv_info->wq);
+ }
+ break;
+
+ case CMD_GET_SERVERS:
+ down(&dev->servers_mutex);
+
+ if (dev->emergency)
+ stop_emergency(srv_info);
+
+ /* Recreate the emergency list. */
+ dev->emerg_list[0].ip = srv_info->ip;
+ dev->emerg_list[0].port = srv_info->port;
+ for (i=1 ; i<ALT_SERVERS_MAX; i++) {
+ if (num) {
+ memcpy(&dev->emerg_list[i],
+ &dnbd2_rep.payload.server[i-1],
+ sizeof(dnbd2_server_t));
+ try_add_server(dev->emerg_list[i], dev);
+ num--;
+ } else {
+ dev->emerg_list[i].ip = 0;
+ dev->emerg_list[i].port = 0;
+ }
+ }
+ up(&dev->servers_mutex);
+ break;
+ }
+ dnbd2_end_request(req, 1);
+ }
+
+ complete_and_exit(&srv_info->rx_stop, 0);
+}
+
+
+struct request *dnbd2_find_request(uint64_t num,
+ uint16_t cmd,
+ struct srv_info *dst)
+{
+ dnbd2_device_t *dev = dst->dev;
+ struct list_head *cur, *next;
+ struct req_info *info;
+ struct request *req;
+ sector_t sector;
+
+ switch (cmd) {
+ case CMD_GET_BLOCK:
+ sector = num / SECTOR_SIZE;
+ spin_lock_bh(&dev->pending_queue_lock);
+ list_for_each_safe(cur, next, &dev->pending_queue) {
+ req = blkdev_entry_to_request(cur);
+ info = req->special;
+
+ if (req->sector == sector && info->cmd == cmd) {
+ list_del_init(&req->queuelist);
+ spin_unlock_bh(&dev->pending_queue_lock);
+ return req;
+ }
+ }
+ break;
+
+ case CMD_GET_SIZE:
+ case CMD_GET_SERVERS:
+ spin_lock_bh(&dev->pending_queue_lock);
+ list_for_each_safe(cur, next, &dev->pending_queue) {
+ req = blkdev_entry_to_request(cur);
+ info = req->special;
+
+ if (info->cmd == cmd && info->dst == dst) {
+ list_del_init(&req->queuelist);
+ spin_unlock_bh(&dev->pending_queue_lock);
+ return req;
+ }
+ }
+ }
+
+ spin_unlock_bh(&dev->pending_queue_lock);
+ return NULL;
+}
+
+
+void dnbd2_end_request(struct request *req, int success)
+{
+ unsigned long flags;
+ struct req_info *info = req->special;
+ dnbd2_device_t *dev;
+
+ switch (info->cmd) {
+ case CMD_GET_BLOCK:
+ dev = req->rq_disk->private_data;
+ spin_lock_irqsave(&dev->blk_lock, flags);
+ list_del_init(&req->queuelist);
+ if (!end_that_request_first(req, success, req->nr_sectors)) {
+ end_that_request_last(req, success);
+ }
+ dev->pending_reqs--;
+ spin_unlock_irqrestore(&dev->blk_lock, flags);
+ info->cnt = -1;
+ break;
+
+ case CMD_GET_SIZE:
+ case CMD_GET_SERVERS:
+ kfree(info);
+ kfree(req);
+ break;
+ }
+}
+
+
+void dnbd2_requeue_timer(unsigned long arg)
+{
+ dnbd2_device_t *dev = (dnbd2_device_t *)arg;
+ struct list_head *cur, *next;
+ struct request *req;
+ struct req_info *info;
+ unsigned long too_long;
+ int requeue;
+
+ spin_lock(&dev->pending_queue_lock);
+ list_for_each_safe(cur, next, &dev->pending_queue) {
+ requeue = 0;
+ req = blkdev_entry_to_request(cur);
+ info = req->special;
+ if (!info->cnt)
+ continue;
+
+ /* Each request type has a specific requeue policy. */
+ switch (info->cmd) {
+ case CMD_GET_BLOCK:
+ too_long = 2 * (info->dst->srtt >> SRTT_SHIFT);
+ too_long *= 4 << info->cnt;
+ if (too_long > HZ)
+ too_long = HZ;
+ if (diff(info->time) >= too_long)
+ requeue = 1;
+ break;
+
+ case CMD_GET_SERVERS:
+ case CMD_GET_SIZE:
+ if (info->cnt == 4) {
+ list_del_init(&req->queuelist);
+ dnbd2_end_request(req, 0);
+ schedule_del_server(info->dst);
+ break;
+ }
+ if (diff(info->time) >= HZ)
+ requeue = 1;
+ break;
+ }
+
+ if (requeue) {
+ list_del_init(&req->queuelist);
+ spin_lock(&dev->send_queue_lock);
+ list_add_tail(&req->queuelist, &dev->send_queue);
+ spin_unlock(&dev->send_queue_lock);
+ }
+ }
+ spin_unlock(&dev->pending_queue_lock);
+
+ wake_up_interruptible(&dev->sender_wq);
+ dev->requeue_timer.expires = jiffies + REQUEUE_INTERVAL;
+ add_timer(&dev->requeue_timer);
+}
+
+
+void dnbd2_hb_timer(unsigned long arg)
+{
+ dnbd2_device_t *dev = (dnbd2_device_t *)arg;
+ int i;
+
+ if (dev->running)
+ for_each_server(i)
+ enqueue_hb(&dev->servers[i]);
+
+ wake_up_interruptible(&dev->sender_wq);
+ dev->hb_timer.expires = jiffies + dev->hb_interval;
+ add_timer(&dev->hb_timer);
+}
+
+
+void dnbd2_to_timer(unsigned long arg)
+{
+ dnbd2_device_t *dev = (dnbd2_device_t *)arg;
+
+ if (dev->running)
+ schedule_activate_fastest(dev);
+
+ dev->to_timer.expires = jiffies + TO_INTERVAL;
+ add_timer(&dev->to_timer);
+}
+
+
+void start_emergency(dnbd2_device_t *dev)
+{
+ int i;
+
+ if (dev->emergency)
+ return;
+
+ p("No servers reachable, starting emergency mode!\n");
+ dev->emergency = 1;
+
+ del_timer(&dev->requeue_timer);
+ del_timer(&dev->to_timer);
+ del_timer(&dev->hb_timer);
+
+ /* Activate the emergency list. */
+ for_each_server(i)
+ try_add_server(dev->emerg_list[i], dev);
+
+ /* Increase frequency of heartbeats. */
+ dev->hb_interval = HB_EMERG_INTERVAL;
+ dev->hb_timer.expires = jiffies + HB_EMERG_INTERVAL;
+ add_timer(&dev->hb_timer);
+}
+
+
+void stop_emergency(struct srv_info *srv_info)
+{
+ dnbd2_device_t *dev = srv_info->dev;
+
+ if (!dev->emergency)
+ return;
+
+ p("Stopping emergency mode.\n");
+ del_timer(&dev->hb_timer);
+
+ dev->active_server = srv_info;
+
+ /* Decrease frequency of heartbeats. */
+ dev->hb_interval = HB_NORMAL_INTERVAL;
+ dev->hb_timer.expires = jiffies + HB_NORMAL_INTERVAL;
+ add_timer(&dev->hb_timer);
+
+ dev->requeue_timer.expires = jiffies + REQUEUE_INTERVAL;
+ add_timer(&dev->requeue_timer);
+ dev->to_timer.expires = jiffies + TO_INTERVAL;
+ add_timer(&dev->to_timer);
+
+ dev->emergency = 0;
+}
+
+
+static int __init dnbd2_init(void)
+{
+ int i;
+
+ /* We are platform dependant. */
+ if (DNBD2_BLOCK_SIZE != PAGE_SIZE) {
+ printk(LOG "DNBD2_BLOCK_SIZE (%d) != PAGE_SIZE (%d)\n",
+ (int) DNBD2_BLOCK_SIZE, (int) PAGE_SIZE);
+ return -EINVAL;
+ }
+
+ dnbd2_major = register_blkdev(0, "dnbd2");
+ if (dnbd2_major <= 0) {
+ p("Could not get major number.\n");
+ return -EBUSY;
+ }
+
+ for (i=0 ; i<DNBD2_DEVICES ; i++)
+ if (add_device(&dev[i], i))
+ goto out;
+
+ p("DNBD2 loaded.\n");
+ return 0;
+
+ out:
+ while (i--)
+ del_device(&dev[i]);
+ return -ENOMEM;
+}
+
+
+static void __exit dnbd2_exit(void)
+{
+ int i;
+ for (i=0 ; i<DNBD2_DEVICES ; i++)
+ del_device(&dev[i]);
+ unregister_blkdev(dnbd2_major, "dnbd2");
+ p("DNBD2 unloaded.\n");
+}
+
+
+module_init(dnbd2_init);
+module_exit(dnbd2_exit);
+
+MODULE_DESCRIPTION("Distributed Network Block Device v2");
+MODULE_LICENSE("GPL");