summaryrefslogblamecommitdiffstats
path: root/kernel/core.c
blob: 96cd7b003f7be37d4cbb9b1bc08212ca28378821 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
















                                                            


                                                
                                      
      





                              
                                                     
                                                     


                                                   
                                           
      
                                                         


                                               
                                                         

















                                                        
                                           






































































































                                                                        
                                                                  





















































































































                                                                                
                                                                          





































                                                                         
                                                


                                                        
     
                                                                                  
                                                                                 

                                                            
      

































































































































































































                                                                         
/*
 * kernel/core.c - The principal component of the driver.
 *                 See core.h for comments on each function.
 */


#include "dnbd2.h"
#include "core.h"
#include "devices.h"
#include "misc.h"
#include "fops.h"
#include "servers.h"


int dnbd2_major;
static dnbd2_device_t dev[DNBD2_DEVICES];

#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
void dnbd2_request(struct request_queue *q)
#else
void dnbd2_request(request_queue_t *q)
#endif
{
	int i;
	struct request *req;
	struct req_info *info;
	dnbd2_device_t *dev;

	while ((req = blk_peek_request(q)) != NULL) {
		/* Discard invalid block requests. */
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,36)
		if (req->cmd_type != REQ_TYPE_FS) {
#else
		if (!blk_fs_request(req)) {
#endif
			__blk_end_request_cur(req, -EIO);
			continue;
		}
		if (rq_data_dir(req) != READ) {
			__blk_end_request_cur(req, -EIO);
			continue;
		}

		/* Prepare request. */
		dev = req->rq_disk->private_data;
		for (i=0 ; i<POOL_SIZE ; i++) {
			if (dev->info_pool[i].cnt == -1)
				break;
		}
		if (i == POOL_SIZE)
			continue;
		dev->pending_reqs++;
		info = &dev->info_pool[i];
		info->time = jiffies;
		info->cnt  = 0;
		info->cmd  = CMD_GET_BLOCK;
		info->dst  = dev->active_server;
		req->special = info;
		req = blk_fetch_request(q);

		/* Enqueue the request for sending. */
		spin_lock_bh(&dev->send_queue_lock);
		list_add_tail(&req->queuelist, &dev->send_queue);
		spin_unlock_bh(&dev->send_queue_lock);

		/* Wakeup sender function. */
		wake_up_interruptible(&dev->sender_wq);
	}
}


int dnbd2_tx_loop(void *data)
{
	struct request *req;
	dnbd2_device_t *dev = data;
	struct req_info *info;

	daemonize("dnbd2_tx_loop");
	allow_signal(SIGKILL);
	complete(&dev->tx_start);

	while (1) {
		wait_event_interruptible(dev->sender_wq,
					 !list_empty(&dev->send_queue));

		/*
		 * If wait_event_interruptible is interrupted while
		 * the queue is not empty it retunrs 0, not
		 * -ERESTARTSYS. Therefore we need another flag.
		 */
		if (dev->tx_signal)
			break;

		/* Dequeue request from the send-queue. */
		spin_lock_bh(&dev->send_queue_lock);
		req = blkdev_entry_to_request(dev->send_queue.next);
		list_del_init(&req->queuelist);
		spin_unlock_bh(&dev->send_queue_lock);

		/*
		 * This is a good place to do some sanity checks
		 * because the request is neither in the send- nor in
		 * the pending-queue.
		 */
		down(&dev->servers_mutex);
		info = req->special;
		switch (info->cmd) {
		case CMD_GET_BLOCK:
			/*
			 * If dnbd2_request is called w/o first
			 * opening the device or with the device's
			 * capacity set to zero we should land here.
			 */
			if (!dev->running) {
				dnbd2_end_request(req, 0);
				up(&dev->servers_mutex);
				continue;
			}
			/*
			 * Send block requests to dev->active_server,
			 * which is always set while running.
			 */
			info->last_dst = info->dst;
			info->dst = dev->active_server;
			break;

		case CMD_GET_SERVERS:
		case CMD_GET_SIZE:
			/*
			 * Discard the request if it's destination is
			 * no longer available.
			 */
			if (!info->dst->sock) {
				dnbd2_end_request(req, 1);
				up(&dev->servers_mutex);
				continue;
			}
			break;
		}

		/* Enqueue request in the pending-queue. */
		spin_lock_bh(&dev->pending_queue_lock);
		list_add_tail(&req->queuelist, &dev->pending_queue);
		spin_unlock_bh(&dev->pending_queue_lock);

		/* Send request. */
		dnbd2_send_request(req, dev);
		up(&dev->servers_mutex);
	}

	complete_and_exit(&dev->tx_stop, 0);
}


void dnbd2_send_request(struct request *req, dnbd2_device_t *dev) {
	dnbd2_data_request_t dnbd2_req;
	struct req_info *info = req->special;

	dnbd2_req.cmd  = cpu_to_be16(info->cmd);
	dnbd2_req.time = cpu_to_be16(info->time);
	dnbd2_req.vid  = cpu_to_be16(dev->vid);
	dnbd2_req.rid  = cpu_to_be16(dev->rid);
	dnbd2_req.num  = cpu_to_be64(req->__sector * SECTOR_SIZE);

	/*
	 * If sock_xmit fails the request eventually gets requeued.
	 * That's why we don't check its return value.
	 */
	sock_xmit(info->dst->sock, SEND, &dnbd2_req, sizeof(dnbd2_req));
	info->cnt++;
	if (info->dst != info->last_dst) {
		info->cnt = 1;
	} else if (info->cnt > 1) {
		info->dst->retries++;
	}
}


int dnbd2_rx_loop(void *data)
{
	struct srv_info *srv_info = data;
	dnbd2_device_t *dev = srv_info->dev;
	dnbd2_data_reply_t dnbd2_rep;
	struct request *req;
	uint64_t num;
	uint16_t cmd, time;
	char *buffer;
	int ret, i;

	daemonize("dnbd2_rx_loop");
	allow_signal(SIGKILL);
	complete(&srv_info->rx_start);

	while (1) {
		ret = sock_xmit(srv_info->sock, RECV,
				&dnbd2_rep, sizeof(dnbd2_rep));
		if (ret == -EINTR)
			break;
		if (ret != sizeof(dnbd2_rep) ||
		    be16_to_cpu(dnbd2_rep.vid) != dev->vid ||
		    be16_to_cpu(dnbd2_rep.rid) != dev->rid)
			continue;

		cmd = be16_to_cpu(dnbd2_rep.cmd);
		num = be64_to_cpu(dnbd2_rep.num);
		time = be16_to_cpu(dnbd2_rep.time);

		/* Find a matching request in the pending-queue. */
		req = dnbd2_find_request(num, cmd, srv_info);
		if (!req)
			continue;
		srv_info->last_reply = jiffies;
		update_rtt(diff(time), srv_info, cmd);

		switch (cmd) {
		case CMD_GET_BLOCK:
			spin_lock(&dev->kmap_lock);
			buffer = __bio_kmap_atomic(req->bio, 0, KM_USER0);
			memcpy(buffer,
			       dnbd2_rep.payload.data,
			       DNBD2_BLOCK_SIZE);
			__bio_kunmap_atomic(req->bio, KM_USER0);
			spin_unlock(&dev->kmap_lock);
			break;

		case CMD_GET_SIZE:
			if (!srv_info->capacity) {
				srv_info->capacity = num / SECTOR_SIZE;
				wake_up_all(&srv_info->wq);
			}
			break;

		case CMD_GET_SERVERS:
			down(&dev->servers_mutex);

			if (dev->emergency)
				stop_emergency(srv_info);

			/* Recreate the emergency list. */
			dev->emerg_list[0].ip = srv_info->ip;
			dev->emerg_list[0].port = srv_info->port;
			for (i=1 ; i<ALT_SERVERS_MAX; i++) {
				if (num) {
					memcpy(&dev->emerg_list[i],
					       &dnbd2_rep.payload.server[i-1],
					       sizeof(dnbd2_server_t));
					try_add_server(dev->emerg_list[i], dev);
					num--;
				} else {
					dev->emerg_list[i].ip = 0;
					dev->emerg_list[i].port = 0;
				}
			}
			up(&dev->servers_mutex);
			break;
		}
		dnbd2_end_request(req, 1);
	}

	complete_and_exit(&srv_info->rx_stop, 0);
}


struct request *dnbd2_find_request(uint64_t num,
				   uint16_t cmd,
				   struct srv_info *dst)
{
	dnbd2_device_t *dev = dst->dev;
	struct list_head *cur, *next;
	struct req_info *info;
	struct request *req;
	sector_t sector;

	switch (cmd) {
	case CMD_GET_BLOCK:
		sector = num / SECTOR_SIZE;
		spin_lock_bh(&dev->pending_queue_lock);
		list_for_each_safe(cur, next, &dev->pending_queue) {
			req = blkdev_entry_to_request(cur);
			info = req->special;

			if (req->__sector == sector && info->cmd == cmd) {
				list_del_init(&req->queuelist);
				spin_unlock_bh(&dev->pending_queue_lock);
				return req;
			}
		}
		break;

	case CMD_GET_SIZE:
	case CMD_GET_SERVERS:
		spin_lock_bh(&dev->pending_queue_lock);
		list_for_each_safe(cur, next, &dev->pending_queue) {
			req = blkdev_entry_to_request(cur);
			info = req->special;

			if (info->cmd == cmd && info->dst == dst) {
				list_del_init(&req->queuelist);
				spin_unlock_bh(&dev->pending_queue_lock);
				return req;
			}
		}
	}

	spin_unlock_bh(&dev->pending_queue_lock);
	return NULL;
}


void dnbd2_end_request(struct request *req, int success)
{
	unsigned long flags;
	struct req_info *info = req->special;
	dnbd2_device_t *dev;

	switch (info->cmd) {
	case CMD_GET_BLOCK:
		dev = req->rq_disk->private_data;
		spin_lock_irqsave(&dev->blk_lock, flags);
		list_del_init(&req->queuelist);
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
		// success: 0 for success, < 0 for error
		success = (success == 1) ? 0 : -1;
		__blk_end_request_cur(req, success);
#else
		// success: 1 for success, 0 for I/O error, < 0 for specific error
		if (!end_that_request_first(req, success, blk_rq_sectors(req))) {
			end_that_request_last(req, success);
		}
#endif
		dev->pending_reqs--;
		spin_unlock_irqrestore(&dev->blk_lock, flags);
		info->cnt = -1;
		break;

	case CMD_GET_SIZE:
	case CMD_GET_SERVERS:
		kfree(info);
		kfree(req);
		break;
	}
}


void dnbd2_requeue_timer(unsigned long arg)
{
	dnbd2_device_t *dev = (dnbd2_device_t *)arg;
	struct list_head *cur, *next;
	struct request *req;
	struct req_info *info;
	unsigned long too_long;
	int requeue;

	spin_lock(&dev->pending_queue_lock);
	list_for_each_safe(cur, next, &dev->pending_queue) {
		requeue = 0;
		req = blkdev_entry_to_request(cur);
		info = req->special;
		if (!info->cnt)
			continue;

		/* Each request type has a specific requeue policy. */
		switch (info->cmd) {
		case CMD_GET_BLOCK:
			too_long = 2 * (info->dst->srtt >> SRTT_SHIFT);
			too_long *= 4 << info->cnt;
			if (too_long > HZ)
				too_long = HZ;
			if (diff(info->time) >= too_long)
				requeue = 1;
			break;

		case CMD_GET_SERVERS:
		case CMD_GET_SIZE:
			if (info->cnt == 4) {
				list_del_init(&req->queuelist);
				dnbd2_end_request(req, 0);
				schedule_del_server(info->dst);
				break;
			}
			if (diff(info->time) >= HZ)
				requeue = 1;
			break;
		}

		if (requeue) {
			list_del_init(&req->queuelist);
			spin_lock(&dev->send_queue_lock);
			list_add_tail(&req->queuelist, &dev->send_queue);
			spin_unlock(&dev->send_queue_lock);
		}
	}
	spin_unlock(&dev->pending_queue_lock);

	wake_up_interruptible(&dev->sender_wq);
	dev->requeue_timer.expires = jiffies + REQUEUE_INTERVAL;
	add_timer(&dev->requeue_timer);
}


void dnbd2_hb_timer(unsigned long arg)
{
	dnbd2_device_t *dev = (dnbd2_device_t *)arg;
	int i;

	if (dev->running)
		for_each_server(i)
			enqueue_hb(&dev->servers[i]);

	wake_up_interruptible(&dev->sender_wq);
	dev->hb_timer.expires = jiffies + dev->hb_interval;
	add_timer(&dev->hb_timer);
}


void dnbd2_to_timer(unsigned long arg)
{
	dnbd2_device_t *dev = (dnbd2_device_t *)arg;

	if (dev->running)
		schedule_activate_fastest(dev);

	dev->to_timer.expires = jiffies + TO_INTERVAL;
	add_timer(&dev->to_timer);
}


void start_emergency(dnbd2_device_t *dev)
{
	int i;

	if (dev->emergency)
		return;

	p("No servers reachable, starting emergency mode!\n");
	dev->emergency = 1;

	del_timer(&dev->requeue_timer);
	del_timer(&dev->to_timer);
	del_timer(&dev->hb_timer);

	/* Activate the emergency list. */
	for_each_server(i)
		try_add_server(dev->emerg_list[i], dev);

	/* Increase frequency of heartbeats. */
	dev->hb_interval = HB_EMERG_INTERVAL;
	dev->hb_timer.expires = jiffies + HB_EMERG_INTERVAL;
	add_timer(&dev->hb_timer);
}


void stop_emergency(struct srv_info *srv_info)
{
	dnbd2_device_t *dev = srv_info->dev;

	if (!dev->emergency)
		return;

	p("Stopping emergency mode.\n");
	del_timer(&dev->hb_timer);

	dev->active_server = srv_info;

	/* Decrease frequency of heartbeats. */
	dev->hb_interval = HB_NORMAL_INTERVAL;
	dev->hb_timer.expires = jiffies + HB_NORMAL_INTERVAL;
	add_timer(&dev->hb_timer);

	dev->requeue_timer.expires = jiffies + REQUEUE_INTERVAL;
	add_timer(&dev->requeue_timer);
	dev->to_timer.expires = jiffies + TO_INTERVAL;
	add_timer(&dev->to_timer);

	dev->emergency = 0;
}


static int __init dnbd2_init(void)
{
	int i;

	/* We are platform dependant. */
	if (DNBD2_BLOCK_SIZE != PAGE_SIZE) {
		printk(LOG "DNBD2_BLOCK_SIZE (%d) != PAGE_SIZE (%d)\n",
		       (int) DNBD2_BLOCK_SIZE, (int) PAGE_SIZE);
		return -EINVAL;
	}

	dnbd2_major = register_blkdev(0, "dnbd2");
	if (dnbd2_major <= 0) {
		p("Could not get major number.\n");
		return -EBUSY;
	}

	for (i=0 ; i<DNBD2_DEVICES ; i++)
		if (add_device(&dev[i], i))
			goto out;

	p("DNBD2 loaded.\n");
	return 0;

 out:
	while (i--)
		del_device(&dev[i]);
	return -ENOMEM;
}


static void __exit dnbd2_exit(void)
{
	int i;
	for (i=0 ; i<DNBD2_DEVICES ; i++)
		del_device(&dev[i]);
	unregister_blkdev(dnbd2_major, "dnbd2");
	p("DNBD2 unloaded.\n");
}


module_init(dnbd2_init);
module_exit(dnbd2_exit);

MODULE_DESCRIPTION("Distributed Network Block Device v2");
MODULE_LICENSE("GPL");