diff options
-rw-r--r-- | Makefile | 16 | ||||
-rw-r--r-- | include/dnbd2.h | 131 | ||||
-rw-r--r-- | init-script/Makefile | 10 | ||||
-rwxr-xr-x | init-script/dnbd2 | 84 | ||||
-rw-r--r-- | kernel/Makefile | 18 | ||||
-rw-r--r-- | kernel/core.c | 510 | ||||
-rw-r--r-- | kernel/core.h | 102 | ||||
-rw-r--r-- | kernel/devices.c | 220 | ||||
-rw-r--r-- | kernel/devices.h | 14 | ||||
-rw-r--r-- | kernel/dnbd2.h | 118 | ||||
-rw-r--r-- | kernel/fops.c | 43 | ||||
-rw-r--r-- | kernel/fops.h | 9 | ||||
-rw-r--r-- | kernel/misc.c | 84 | ||||
-rw-r--r-- | kernel/misc.h | 23 | ||||
-rw-r--r-- | kernel/scp | 1 | ||||
-rw-r--r-- | kernel/servers.c | 355 | ||||
-rw-r--r-- | kernel/servers.h | 84 | ||||
-rw-r--r-- | kernel/sysfs.c | 460 | ||||
-rw-r--r-- | kernel/sysfs.h | 24 | ||||
-rw-r--r-- | server/Makefile | 30 | ||||
-rw-r--r-- | server/config.c | 213 | ||||
-rw-r--r-- | server/config.h | 13 | ||||
-rw-r--r-- | server/file.c | 65 | ||||
-rw-r--r-- | server/file.h | 28 | ||||
-rw-r--r-- | server/main.c | 227 | ||||
-rw-r--r-- | server/query.c | 74 | ||||
-rw-r--r-- | server/query.h | 14 | ||||
-rw-r--r-- | server/tree.c | 67 | ||||
-rw-r--r-- | server/tree.h | 37 | ||||
-rw-r--r-- | test-app/Makefile | 17 | ||||
-rw-r--r-- | test-app/data-client.c | 281 |
31 files changed, 3372 insertions, 0 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b8fa611 --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +# +# Global Makefile +# + +DIRS = server test-app kernel init-script +VPATH = $(DIRS) + +all: + @for dir in $(DIRS) ; do make -C $$dir ; done + +install: + @for dir in $(DIRS) ; do make -C $$dir install ; done + +.PHONY: +clean: + @for dir in $(DIRS) ; do make -C $$dir clean ; done diff --git a/include/dnbd2.h b/include/dnbd2.h new file mode 100644 index 0000000..4035b11 --- /dev/null +++ b/include/dnbd2.h @@ -0,0 +1,131 @@ +/* + * libs/dnbd2.h - Stuff that applies to all programs. + */ + + +/* + * The Linux Kernel's minimum block request size is PAGE_SIZE. We use + * the same block size to simplify things. We don't care about other + * sizes because our software is platform dependant. + */ +#define DNBD2_BLOCK_SIZE 4096 + +/* + * Commands the data-server understands. + */ +#define CMD_GET_BLOCK 1 /* Request a block. */ +#define CMD_GET_SIZE 2 /* Request the size of a Dataset. */ +#define CMD_GET_SERVERS 3 /* Request a list of alternative servers. */ + +/* + * Maximum number of alternative data-servers per dataset. + */ +#define ALT_SERVERS_MAX 4 + +/* + * Maximum lenght of strings including the ending \0. + */ +#define FILE_NAME_MAX 255 +#define LINE_SIZE_MAX FILE_NAME_MAX + +/* + * Network byte order <-> Host byte order (64bits). + */ +#ifndef MODULE +#if __BYTE_ORDER == __BIG_ENDIAN +#define ntohll(x) (x) +#define htonll(x) (x) +#else +#define ntohll(x) bswap_64(x) +#define htonll(x) bswap_64(x) +#endif +#endif + + +/* + * Dataset + */ +typedef struct dataset { + char path[FILE_NAME_MAX]; /* pathname to file or block-device */ + uint16_t vid; /* Volume-ID */ + uint16_t rid; /* Release-ID */ +} dataset_t; + +/* + * Structure to identify servers. + */ +#pragma pack(4) +typedef struct dnbd2_server { + uint32_t ip; /* IP (network byte order) */ + uint16_t port; /* Port (network byte order) */ + uint16_t pad; /* Padding - unused */ +} dnbd2_server_t; +#pragma pack() + +/* + * Structure for requests: + * + * <--------- 64 bits ---------> + * + * +---------------------------+ + * | cmd | time | vid | rid | + * |---------------------------| + * | offset | + * +---------------------------+ + * + * - cmd: Command (defined above) + * - time: Time of request + * - vid: Volume-ID + * - rid: Release-ID + * - num: Offset (for CMD_GET_BLOCK), otherwise undefined + */ +#pragma pack(1) +typedef struct dnbd2_data_request { + uint16_t cmd; + uint16_t time; + uint16_t vid; + uint16_t rid; + uint64_t num; +} dnbd2_data_request_t; +#pragma pack() + +/* + * Structure for replies: + * + * <--------- 64 bits ---------> + * + * +---------------------------+ + * | cmd | time | vid | rid | + * |---------------------------| + * | offset/size/n.servers | + * |---------------------------| + * | | + * | (4KB) | + * | Block/List of servers | + * | | + * +---------------------------+ + * + * - req.cmd: Always echoed + * - req.time: Always echoed + * - req.vid: Always echoed + * - req.rid: Always echoed + * - req.num: Echoed (for CMD_GET_BLOCK) or + * Dataset size (for CMD_GET_SIZE) or + * Number of Servers (for CMD_GET_SERVERS) + * - payload: A block (for CMD_GET_BLOCK) or + * Undefined (for CMD_GET_SIZE) or + * List of Servers (for CMD_GET_SIZE) + */ +#pragma pack(1) +typedef struct dnbd2_data_reply { + uint16_t cmd; + uint16_t time; + uint16_t vid; + uint16_t rid; + uint64_t num; + union { + uint8_t data[DNBD2_BLOCK_SIZE]; + dnbd2_server_t server[ALT_SERVERS_MAX]; + } payload; +} dnbd2_data_reply_t; +#pragma pack() diff --git a/init-script/Makefile b/init-script/Makefile new file mode 100644 index 0000000..b523ffc --- /dev/null +++ b/init-script/Makefile @@ -0,0 +1,10 @@ +# +# init-script/Makefile +# + +all: + +install: + cp dnbd2 /etc/init.d/ + +clean: diff --git a/init-script/dnbd2 b/init-script/dnbd2 new file mode 100755 index 0000000..e9a67ab --- /dev/null +++ b/init-script/dnbd2 @@ -0,0 +1,84 @@ +#! /bin/bash +# +### BEGIN INIT INFO +# Provides: dnbd2 +# Required-Start: $local_fs $network +# Required-Stop: $local_fs $network +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: starts and stops the DNBD2 server. +# Description: DNDB2 is a Distributed Network Block Device for +# diskless clients in unicast networks. +### END INIT INFO +# +# Author: Vito Di Leo <dileo@informatik.uni-freiburg.de> + +PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin +USER=dnbd2 +SERVER_NAME=dnbd2-server +SERVER_DESC="DNBD2 Server" + +[ -x "`which $SERVER_NAME`" ] || exit 0 + +. /lib/lsb/init-functions + +if ! (id $USER >/dev/null 2>&1) ; then + log_failure_msg "Please create system user $USER." + exit 0 +fi + +if [ ! -x /usr/bin/pkill ] ; then + log_failure_msg "Please install pkill." + exit 0 +fi + +if [ ! -x /usr/bin/sudo ] ; then + log_failure_msg "Please install sudo." + exit 0 +fi + + +case "$1" in + start) + for FILE in `ls /etc/dnbd2/servers/* 2>/dev/null` ; do + log_begin_msg "Starting $SERVER_DESC ($FILE)..." + sudo -u $USER $SERVER_NAME $FILE + log_end_msg $? + done + ;; + + stop) + log_begin_msg "Stoping $SERVER_DESC"s... + pkill -u $USER $SERVER_NAME + log_end_msg 0 + ;; + + reload|force-reload) + log_begin_msg "Reloading $SERVER_DESC"s... + pkill -SIGHUP -u $USER $SERVER_NAME + log_end_msg 0 + ;; + + restart) + $0 stop + sleep 1 + $0 start + ;; + + status) + PIDS=`pgrep -u $USER $SERVER_NAME` + if [ -n "$PIDS" ] ; then + echo $SERVER_NAME running on pids $PIDS + exit 0 + fi + echo $SERVER_NAME not running. + exit 3 + ;; + + *) + echo "Usage: dnbd2 {start|stop|restart|reload|force-reload}" >&2 + exit 3 + ;; +esac + +exit 0 diff --git a/kernel/Makefile b/kernel/Makefile new file mode 100644 index 0000000..8c01ed3 --- /dev/null +++ b/kernel/Makefile @@ -0,0 +1,18 @@ +# +# kernel/Makefile +# + +KDIR := /lib/modules/$(shell uname -r)/build +PWD := $(shell pwd) + +obj-m := dnbd2.o +dnbd2-objs := fops.o sysfs.o servers.o devices.o misc.o core.o + +all: + $(MAKE) -C $(KDIR) M=$(PWD) modules + +install: + $(MAKE) -C $(KDIR) M=$(PWD) modules_install + +clean: + rm -rf *.o *.ko *~ *.symvers dnbd2.mod.c .*o.cmd .tmp_versions diff --git a/kernel/core.c b/kernel/core.c new file mode 100644 index 0000000..6bee380 --- /dev/null +++ b/kernel/core.c @@ -0,0 +1,510 @@ +/* + * kernel/core.c - The principal component of the driver. + * See core.h for comments on each function. + */ + + +#include "dnbd2.h" +#include "core.h" +#include "devices.h" +#include "misc.h" +#include "fops.h" +#include "servers.h" + + +int dnbd2_major; +static dnbd2_device_t dev[DNBD2_DEVICES]; + + +void dnbd2_request(request_queue_t *q) +{ + int i; + struct request *req; + struct req_info *info; + dnbd2_device_t *dev; + + while ((req = elv_next_request(q)) != NULL) { + /* Discard invalid block requests. */ + if (!blk_fs_request(req)) { + end_request(req, 0); + continue; + } + if (rq_data_dir(req) != READ) { + end_request(req, 0); + continue; + } + + /* Prepare request. */ + dev = req->rq_disk->private_data; + for (i=0 ; i<POOL_SIZE ; i++) { + if (dev->info_pool[i].cnt == -1) + break; + } + if (i == POOL_SIZE) + continue; + dev->pending_reqs++; + info = &dev->info_pool[i]; + info->time = jiffies; + info->cnt = 0; + info->cmd = CMD_GET_BLOCK; + info->dst = dev->active_server; + req->special = info; + blkdev_dequeue_request(req); + + /* Enqueue the request for sending. */ + spin_lock_bh(&dev->send_queue_lock); + list_add_tail(&req->queuelist, &dev->send_queue); + spin_unlock_bh(&dev->send_queue_lock); + + /* Wakeup sender function. */ + wake_up_interruptible(&dev->sender_wq); + } +} + + +int dnbd2_tx_loop(void *data) +{ + struct request *req; + dnbd2_device_t *dev = data; + struct req_info *info; + + daemonize("dnbd2_tx_loop"); + allow_signal(SIGKILL); + complete(&dev->tx_start); + + while (1) { + wait_event_interruptible(dev->sender_wq, + !list_empty(&dev->send_queue)); + + /* + * If wait_event_interruptible is interrupted while + * the queue is not empty it retunrs 0, not + * -ERESTARTSYS. Therefore we need another flag. + */ + if (dev->tx_signal) + break; + + /* Dequeue request from the send-queue. */ + spin_lock_bh(&dev->send_queue_lock); + req = blkdev_entry_to_request(dev->send_queue.next); + list_del_init(&req->queuelist); + spin_unlock_bh(&dev->send_queue_lock); + + /* + * This is a good place to do some sanity checks + * because the request is neither in the send- nor in + * the pending-queue. + */ + down(&dev->servers_mutex); + info = req->special; + switch (info->cmd) { + case CMD_GET_BLOCK: + /* + * If dnbd2_request is called w/o first + * opening the device or with the device's + * capacity set to zero we should land here. + */ + if (!dev->running) { + dnbd2_end_request(req, 0); + up(&dev->servers_mutex); + continue; + } + /* + * Send block requests to dev->active_server, + * which is always set while running. + */ + info->last_dst = info->dst; + info->dst = dev->active_server; + break; + + case CMD_GET_SERVERS: + case CMD_GET_SIZE: + /* + * Discard the request if it's destination is + * no longer available. + */ + if (!info->dst->sock) { + dnbd2_end_request(req, 1); + up(&dev->servers_mutex); + continue; + } + break; + } + + /* Enqueue request in the pending-queue. */ + spin_lock_bh(&dev->pending_queue_lock); + list_add_tail(&req->queuelist, &dev->pending_queue); + spin_unlock_bh(&dev->pending_queue_lock); + + /* Send request. */ + dnbd2_send_request(req, dev); + up(&dev->servers_mutex); + } + + complete_and_exit(&dev->tx_stop, 0); +} + + +void dnbd2_send_request(struct request *req, dnbd2_device_t *dev) { + dnbd2_data_request_t dnbd2_req; + struct req_info *info = req->special; + + dnbd2_req.cmd = cpu_to_be16(info->cmd); + dnbd2_req.time = cpu_to_be16(info->time); + dnbd2_req.vid = cpu_to_be16(dev->vid); + dnbd2_req.rid = cpu_to_be16(dev->rid); + dnbd2_req.num = cpu_to_be64(req->sector * SECTOR_SIZE); + + /* + * If sock_xmit fails the request eventually gets requeued. + * That's why we don't check its return value. + */ + sock_xmit(info->dst->sock, SEND, &dnbd2_req, sizeof(dnbd2_req)); + info->cnt++; + if (info->dst != info->last_dst) { + info->cnt = 1; + } else if (info->cnt > 1) { + info->dst->retries++; + } +} + + +int dnbd2_rx_loop(void *data) +{ + struct srv_info *srv_info = data; + dnbd2_device_t *dev = srv_info->dev; + dnbd2_data_reply_t dnbd2_rep; + struct request *req; + uint64_t num; + uint16_t cmd, time; + char *buffer; + int ret, i; + + daemonize("dnbd2_rx_loop"); + allow_signal(SIGKILL); + complete(&srv_info->rx_start); + + while (1) { + ret = sock_xmit(srv_info->sock, RECV, + &dnbd2_rep, sizeof(dnbd2_rep)); + if (ret == -EINTR) + break; + if (ret != sizeof(dnbd2_rep) || + be16_to_cpu(dnbd2_rep.vid) != dev->vid || + be16_to_cpu(dnbd2_rep.rid) != dev->rid) + continue; + + cmd = be16_to_cpu(dnbd2_rep.cmd); + num = be64_to_cpu(dnbd2_rep.num); + time = be16_to_cpu(dnbd2_rep.time); + + /* Find a matching request in the pending-queue. */ + req = dnbd2_find_request(num, cmd, srv_info); + if (!req) + continue; + srv_info->last_reply = jiffies; + update_rtt(diff(time), srv_info, cmd); + + switch (cmd) { + case CMD_GET_BLOCK: + spin_lock(&dev->kmap_lock); + buffer = __bio_kmap_atomic(req->bio, 0, KM_USER0); + memcpy(buffer, + dnbd2_rep.payload.data, + DNBD2_BLOCK_SIZE); + __bio_kunmap_atomic(req->bio, KM_USER0); + spin_unlock(&dev->kmap_lock); + break; + + case CMD_GET_SIZE: + if (!srv_info->capacity) { + srv_info->capacity = num / SECTOR_SIZE; + wake_up_all(&srv_info->wq); + } + break; + + case CMD_GET_SERVERS: + down(&dev->servers_mutex); + + if (dev->emergency) + stop_emergency(srv_info); + + /* Recreate the emergency list. */ + dev->emerg_list[0].ip = srv_info->ip; + dev->emerg_list[0].port = srv_info->port; + for (i=1 ; i<ALT_SERVERS_MAX; i++) { + if (num) { + memcpy(&dev->emerg_list[i], + &dnbd2_rep.payload.server[i-1], + sizeof(dnbd2_server_t)); + try_add_server(dev->emerg_list[i], dev); + num--; + } else { + dev->emerg_list[i].ip = 0; + dev->emerg_list[i].port = 0; + } + } + up(&dev->servers_mutex); + break; + } + dnbd2_end_request(req, 1); + } + + complete_and_exit(&srv_info->rx_stop, 0); +} + + +struct request *dnbd2_find_request(uint64_t num, + uint16_t cmd, + struct srv_info *dst) +{ + dnbd2_device_t *dev = dst->dev; + struct list_head *cur, *next; + struct req_info *info; + struct request *req; + sector_t sector; + + switch (cmd) { + case CMD_GET_BLOCK: + sector = num / SECTOR_SIZE; + spin_lock_bh(&dev->pending_queue_lock); + list_for_each_safe(cur, next, &dev->pending_queue) { + req = blkdev_entry_to_request(cur); + info = req->special; + + if (req->sector == sector && info->cmd == cmd) { + list_del_init(&req->queuelist); + spin_unlock_bh(&dev->pending_queue_lock); + return req; + } + } + break; + + case CMD_GET_SIZE: + case CMD_GET_SERVERS: + spin_lock_bh(&dev->pending_queue_lock); + list_for_each_safe(cur, next, &dev->pending_queue) { + req = blkdev_entry_to_request(cur); + info = req->special; + + if (info->cmd == cmd && info->dst == dst) { + list_del_init(&req->queuelist); + spin_unlock_bh(&dev->pending_queue_lock); + return req; + } + } + } + + spin_unlock_bh(&dev->pending_queue_lock); + return NULL; +} + + +void dnbd2_end_request(struct request *req, int success) +{ + unsigned long flags; + struct req_info *info = req->special; + dnbd2_device_t *dev; + + switch (info->cmd) { + case CMD_GET_BLOCK: + dev = req->rq_disk->private_data; + spin_lock_irqsave(&dev->blk_lock, flags); + list_del_init(&req->queuelist); + if (!end_that_request_first(req, success, req->nr_sectors)) { + end_that_request_last(req, success); + } + dev->pending_reqs--; + spin_unlock_irqrestore(&dev->blk_lock, flags); + info->cnt = -1; + break; + + case CMD_GET_SIZE: + case CMD_GET_SERVERS: + kfree(info); + kfree(req); + break; + } +} + + +void dnbd2_requeue_timer(unsigned long arg) +{ + dnbd2_device_t *dev = (dnbd2_device_t *)arg; + struct list_head *cur, *next; + struct request *req; + struct req_info *info; + unsigned long too_long; + int requeue; + + spin_lock(&dev->pending_queue_lock); + list_for_each_safe(cur, next, &dev->pending_queue) { + requeue = 0; + req = blkdev_entry_to_request(cur); + info = req->special; + if (!info->cnt) + continue; + + /* Each request type has a specific requeue policy. */ + switch (info->cmd) { + case CMD_GET_BLOCK: + too_long = 2 * (info->dst->srtt >> SRTT_SHIFT); + too_long *= 4 << info->cnt; + if (too_long > HZ) + too_long = HZ; + if (diff(info->time) >= too_long) + requeue = 1; + break; + + case CMD_GET_SERVERS: + case CMD_GET_SIZE: + if (info->cnt == 4) { + list_del_init(&req->queuelist); + dnbd2_end_request(req, 0); + schedule_del_server(info->dst); + break; + } + if (diff(info->time) >= HZ) + requeue = 1; + break; + } + + if (requeue) { + list_del_init(&req->queuelist); + spin_lock(&dev->send_queue_lock); + list_add_tail(&req->queuelist, &dev->send_queue); + spin_unlock(&dev->send_queue_lock); + } + } + spin_unlock(&dev->pending_queue_lock); + + wake_up_interruptible(&dev->sender_wq); + dev->requeue_timer.expires = jiffies + REQUEUE_INTERVAL; + add_timer(&dev->requeue_timer); +} + + +void dnbd2_hb_timer(unsigned long arg) +{ + dnbd2_device_t *dev = (dnbd2_device_t *)arg; + int i; + + if (dev->running) + for_each_server(i) + enqueue_hb(&dev->servers[i]); + + wake_up_interruptible(&dev->sender_wq); + dev->hb_timer.expires = jiffies + dev->hb_interval; + add_timer(&dev->hb_timer); +} + + +void dnbd2_to_timer(unsigned long arg) +{ + dnbd2_device_t *dev = (dnbd2_device_t *)arg; + + if (dev->running) + schedule_activate_fastest(dev); + + dev->to_timer.expires = jiffies + TO_INTERVAL; + add_timer(&dev->to_timer); +} + + +void start_emergency(dnbd2_device_t *dev) +{ + int i; + + if (dev->emergency) + return; + + p("No servers reachable, starting emergency mode!\n"); + dev->emergency = 1; + + del_timer(&dev->requeue_timer); + del_timer(&dev->to_timer); + del_timer(&dev->hb_timer); + + /* Activate the emergency list. */ + for_each_server(i) + try_add_server(dev->emerg_list[i], dev); + + /* Increase frequency of heartbeats. */ + dev->hb_interval = HB_EMERG_INTERVAL; + dev->hb_timer.expires = jiffies + HB_EMERG_INTERVAL; + add_timer(&dev->hb_timer); +} + + +void stop_emergency(struct srv_info *srv_info) +{ + dnbd2_device_t *dev = srv_info->dev; + + if (!dev->emergency) + return; + + p("Stopping emergency mode.\n"); + del_timer(&dev->hb_timer); + + dev->active_server = srv_info; + + /* Decrease frequency of heartbeats. */ + dev->hb_interval = HB_NORMAL_INTERVAL; + dev->hb_timer.expires = jiffies + HB_NORMAL_INTERVAL; + add_timer(&dev->hb_timer); + + dev->requeue_timer.expires = jiffies + REQUEUE_INTERVAL; + add_timer(&dev->requeue_timer); + dev->to_timer.expires = jiffies + TO_INTERVAL; + add_timer(&dev->to_timer); + + dev->emergency = 0; +} + + +static int __init dnbd2_init(void) +{ + int i; + + /* We are platform dependant. */ + if (DNBD2_BLOCK_SIZE != PAGE_SIZE) { + printk(LOG "DNBD2_BLOCK_SIZE (%d) != PAGE_SIZE (%d)\n", + (int) DNBD2_BLOCK_SIZE, (int) PAGE_SIZE); + return -EINVAL; + } + + dnbd2_major = register_blkdev(0, "dnbd2"); + if (dnbd2_major <= 0) { + p("Could not get major number.\n"); + return -EBUSY; + } + + for (i=0 ; i<DNBD2_DEVICES ; i++) + if (add_device(&dev[i], i)) + goto out; + + p("DNBD2 loaded.\n"); + return 0; + + out: + while (i--) + del_device(&dev[i]); + return -ENOMEM; +} + + +static void __exit dnbd2_exit(void) +{ + int i; + for (i=0 ; i<DNBD2_DEVICES ; i++) + del_device(&dev[i]); + unregister_blkdev(dnbd2_major, "dnbd2"); + p("DNBD2 unloaded.\n"); +} + + +module_init(dnbd2_init); +module_exit(dnbd2_exit); + +MODULE_DESCRIPTION("Distributed Network Block Device v2"); +MODULE_LICENSE("GPL"); diff --git a/kernel/core.h b/kernel/core.h new file mode 100644 index 0000000..3c75ade --- /dev/null +++ b/kernel/core.h @@ -0,0 +1,102 @@ +/* + * kernel/core.h + */ + + +/* interval to send heartbeats */ +#define HB_NORMAL_INTERVAL 30*HZ + +/* interval to send heartbeats in emergency-mode */ +#define HB_EMERG_INTERVAL 5*HZ + +/* interval to check the pending-queue */ +#define REQUEUE_INTERVAL HZ/20 + +/* interval to look for faster servers */ +#define TO_INTERVAL HZ + +/* dnbd0, dnbd1, etc */ +#define DNBD2_DEVICES 3 + + +/* Given to us by the kernel at load-time. */ +extern int dnbd2_major; + + +/* + * Called by the block layer to make the driver process some + * requests. For each request it does the following: + * 1. Prepares the request by attaching information to it. + * 2. Enqueues the request into the send-queue. + * 3. Wakes up dnbd2_tx_loop. + */ +void dnbd2_request(request_queue_t *q); + +/* + * This thread sleeps until there are requests in the send-queue. + * Each request is dequeued from the send-queue, enqueued into the + * pending-queue and finally given to dnbd2_send_request. + */ +int dnbd2_tx_loop(void *data); + +/* + * Send a request and update the request's and the destination + * server's counters. + */ +void dnbd2_send_request(struct request *req, dnbd2_device_t *dev); + +/* + * This thread sleeps until a reply arrives and then processes it. + */ +int dnbd2_rx_loop(void *data); + +/* + * When a reply arrives through the network it is caught by + * dnbd2_rx_loop, which calls this function to find a matching request + * in the pending queue. + */ +struct request *dnbd2_find_request(uint64_t num, uint16_t cmd, + struct srv_info *dst); + +/* + * The driver calls this function when it is done processing a request + * and has no further use for it. Resources are freed and the block + * layer informed if necessary. + */ +void dnbd2_end_request(struct request *req, int success); + +/* + * If a request remains too long unanswered, it is moved by this + * function from the pending-queue into the send-queue. This function + * is fired regularly by a timer. + */ +void dnbd2_requeue_timer(unsigned long arg); + +/* + * Periodically enqueu, for each server, a heartbeat request + * (CMD_GET_SERVERS) into the send-queue. + */ +void dnbd2_hb_timer(unsigned long arg); + +/* + * Check every TO_INTERVAL jiffies if there is a faster server and + * switch to it. TO stands for "takeover". + */ +void dnbd2_to_timer(unsigned long arg); + +/* + * If the driver looses contact to all servers it starts the emergency + * mode: First dnbd2_requeue_timer and dnbd2_to_timer are stopped. + * Then the emergency list (the last list of servers acquired with + * CMD_GET_SERVERS) is activated and the rate of heartbeats increased + * to HB_EMERG_INTERVAL. + */ +void start_emergency(dnbd2_device_t *dev); + +/* + * This function is called if a server answers to a heartbeat during + * emergency mode. This server (@srv_info) is made the active server, + * dnbd2_requeue_timer and dnbd2_to_timer started and the rate of + * heartbeats decreased to HB_NORMAL_INTERVAL. + */ +void stop_emergency(struct srv_info *srv_info); diff --git a/kernel/devices.c b/kernel/devices.c new file mode 100644 index 0000000..ccc6db1 --- /dev/null +++ b/kernel/devices.c @@ -0,0 +1,220 @@ +/* + * kernel/devices.c + */ + + +#include "dnbd2.h" +#include "core.h" +#include "fops.h" +#include "sysfs.h" +#include "devices.h" +#include "servers.h" + + +#define TO_PERCENT 50 +#define TO_JIFFIES 10 + + +/* + * Activate the request-processing mechanism for @dev: dnbd2_requeue + * (timer) and dnbd2_tx_loop (thread). dnbd2_rx_loop is started + * afterwards by add_server on a per-server basis. + */ +int start_device(dnbd2_device_t *dev) +{ + /* Start requeue timer. */ + init_timer(&dev->requeue_timer); + dev->requeue_timer.data = (unsigned long)dev; + dev->requeue_timer.function = dnbd2_requeue_timer; + dev->requeue_timer.expires = jiffies + REQUEUE_INTERVAL; + add_timer(&dev->requeue_timer); + + /* Start heartbeat timer. */ + init_timer(&dev->hb_timer); + dev->hb_timer.data = (unsigned long)dev; + dev->hb_timer.function = dnbd2_hb_timer; + dev->hb_timer.expires = jiffies + HB_NORMAL_INTERVAL; + add_timer(&dev->hb_timer); + + /* Start takeover timer. */ + init_timer(&dev->to_timer); + dev->to_timer.data = (unsigned long)dev; + dev->to_timer.function = dnbd2_to_timer; + dev->to_timer.expires = jiffies + TO_INTERVAL; + add_timer(&dev->to_timer); + + /* Start sending thread. */ + dev->tx_signal = 0; + dev->tx_id = kernel_thread(dnbd2_tx_loop, dev, CLONE_KERNEL); + if (dev->tx_id < 0) { + del_timer(&dev->hb_timer); + del_timer(&dev->to_timer); + del_timer(&dev->requeue_timer); + return -1; + } + wait_for_completion(&dev->tx_start); + + return 0; +} + + +/* + * Deactivate the request-processing mechanism for @dev. All + * dnbd2_rx_loop threads must be stopped beforehand by del_server. + */ +void stop_device(dnbd2_device_t *dev) +{ + struct list_head *cur, *next; + struct request *req; + + /* Stop heartbeat timer. */ + del_timer(&dev->hb_timer); + + /* Stop takeover timer. */ + del_timer(&dev->to_timer); + + /* Stop request processing. */ + del_timer(&dev->requeue_timer); + dev->tx_signal = 1; + kill_proc(dev->tx_id, SIGKILL, 1); + wait_for_completion(&dev->tx_stop); + + /* Empty pending-queue. */ + list_for_each_safe(cur, next, &dev->pending_queue) { + req = blkdev_entry_to_request(cur); + list_del_init(&req->queuelist); + dnbd2_end_request(req, 0); + } + + /* Empty send-queue. */ + list_for_each_safe(cur, next, &dev->send_queue) { + req = blkdev_entry_to_request(cur); + list_del_init(&req->queuelist); + dnbd2_end_request(req, 0); + } +} + + +int add_device(dnbd2_device_t *dev, int minor) +{ + struct request_queue *queue; + struct srv_info *srv_info; + struct gendisk *disk; + int i; + + /* + * Prepare dnbd2_device_t. Please use the + * same order as in dnbd2.h. + */ + INIT_WORK(&dev->work, NULL); //, NULL); + /* Change in /<linuxheaders>/include/linux/workqueue.h */ + spin_lock_init(&dev->kmap_lock); + for (i=0 ; i<POOL_SIZE ; i++) + dev->info_pool[i].cnt = -1; + + dev->emergency = 0; + dev->running = 0; + + dev->vid = 0; + dev->rid = 0; + + atomic_set(&dev->refcnt, 0); + init_MUTEX(&dev->config_mutex); + + spin_lock_init(&dev->blk_lock); + + init_waitqueue_head(&dev->sender_wq); + spin_lock_init(&dev->send_queue_lock); + INIT_LIST_HEAD(&dev->send_queue); + + dev->pending_reqs = 0; + spin_lock_init(&dev->pending_queue_lock); + INIT_LIST_HEAD(&dev->pending_queue); + + dev->hb_interval = HB_NORMAL_INTERVAL; + + init_completion(&dev->tx_start); + init_completion(&dev->tx_stop); + + init_MUTEX(&dev->servers_mutex); + for_each_server(i) { + dev->emerg_list[i].ip = 0; + dev->emerg_list[i].port = 0; + srv_info = &dev->servers[i]; + memset(srv_info, 0, sizeof(struct srv_info)); + init_completion(&srv_info->rx_start); + init_completion(&srv_info->rx_stop); + init_waitqueue_head(&srv_info->wq); + INIT_WORK(&srv_info->work, NULL); //, NULL); + /* Change in /<linuxheaders>/include/linux/workqueue.h */ + srv_info->dev = dev; + } + dev->active_server = NULL; + + dev->to_percent = TO_PERCENT; + dev->to_jiffies = TO_JIFFIES; + + /* Prepare struct gendisk. */ + disk = alloc_disk(1); + if (!disk) { + p("Could not alloc gendisk.\n"); + goto out_nodisk; + } + dev->disk = disk; + disk->private_data = dev; + disk->major = dnbd2_major; + disk->first_minor = minor; + disk->fops = &dnbd2_fops; + sprintf(disk->disk_name, "vnbd%d", minor); + set_capacity(disk, 0); + set_disk_ro(disk, 1); + + /* Prepare struct request_queue. */ + queue = blk_init_queue(dnbd2_request, &dev->blk_lock); + if (!queue) { + p("Could not alloc request queue.\n"); + goto out_noqueue; + } + /* + * Tell the block layer to give us only requests consisting of + * one segment of DNBD2_BLOCK_SIZE bytes. + */ + blk_queue_max_sectors(queue, DNBD2_BLOCK_SIZE/SECTOR_SIZE); + blk_queue_max_segment_size(queue, DNBD2_BLOCK_SIZE); + blk_queue_hardsect_size(queue, DNBD2_BLOCK_SIZE); + blk_queue_max_phys_segments(queue, 1); + blk_queue_max_hw_segments(queue, 1); + disk->queue = queue; + add_disk(disk); + + if (start_sysfs(dev)) + goto out_nosysfs; + + if (start_device(dev)) + goto out_nostart; + + return 0; + + out_nostart: + stop_sysfs(dev); + out_nosysfs: + blk_cleanup_queue(queue); + out_noqueue: + del_gendisk(disk); + put_disk(disk); + out_nodisk: + return -ENOMEM; +} + + +void del_device(dnbd2_device_t *dev) +{ + int i; + for_each_server(i) + del_server(&dev->servers[i]); + stop_device(dev); + stop_sysfs(dev); + blk_cleanup_queue(dev->disk->queue); + del_gendisk(dev->disk); + put_disk(dev->disk); +} diff --git a/kernel/devices.h b/kernel/devices.h new file mode 100644 index 0000000..d9cc46f --- /dev/null +++ b/kernel/devices.h @@ -0,0 +1,14 @@ +/* + * kernel/devices.h + */ + + +/* + * Initialize @dev and inform the kernel. + */ +int add_device(dnbd2_device_t *dev, int minor); + +/* + * Destroy @dev and inform the kernel. + */ +void del_device(dnbd2_device_t *dev); diff --git a/kernel/dnbd2.h b/kernel/dnbd2.h new file mode 100644 index 0000000..cf693d9 --- /dev/null +++ b/kernel/dnbd2.h @@ -0,0 +1,118 @@ +/* + * kernel/dnbd2.h + */ + + +#include <linux/version.h> +#include <linux/workqueue.h> +#include <linux/completion.h> +#include <linux/blkdev.h> +#include <linux/types.h> +#include <linux/wait.h> +#include <linux/inet.h> +#include <linux/in.h> +#include <asm/semaphore.h> +#include <net/sock.h> +#include "../include/dnbd2.h" + + +#define LOG KERN_NOTICE "dnbd2: " +#define p(msg) printk(LOG msg); + +#define SECTOR_SIZE 512 + +#define for_each_server(i) for (i=0 ; i<ALT_SERVERS_MAX ; i++) + +/* max number of requests the driver can handle at once. */ +#define POOL_SIZE 128 + +/* precision of sector_t for pretty-printing */ +#ifdef CONFIG_LBD +# define SECT_PRECISION "%llu" +#else +# define SECT_PRECISION "%lu" +#endif + + +typedef struct dnbd2_device dnbd2_device_t; + +/* + * We stick this structure to each request before enqueing it into the + * send-queue. It gives information on how to treat the request. + */ +struct req_info { + uint16_t time; /* enqueue time */ + uint16_t cmd; /* command (CMD_XXX) */ + int cnt; /* send count */ + struct srv_info *dst; /* destination server */ + struct srv_info *last_dst; /* last destination server */ +}; + +/* + * Information about a server with which we communicate. + */ +struct srv_info { + dnbd2_device_t *dev; + struct kobject kobj; + struct work_struct work; + + struct socket *sock; /* NULL iff server not configured */ + uint32_t ip; /* network byte order */ + uint32_t port; /* network byte order */ + uint16_t min, max; /* min and max RTTs */ + unsigned long srtt; /* SRTT (SRTT_SHIFT left-shifted) */ + + sector_t capacity; /* used when a CMD_GET_SIZE reply arrives */ + wait_queue_head_t wq; /* used when a CMD_GET_SIZE reply arrives */ + + long rx_id; /* pid of dnbd2_rx_loop */ + struct completion rx_start; /* dnbd2_rx_loop has started */ + struct completion rx_stop; /* dnbd2_rx_loop has stopped */ + + unsigned long retries; /* number of requests retried */ + unsigned long last_reply; /* time of last reply */ +}; + +struct dnbd2_device { + struct work_struct work; + struct kobject kobj; + spinlock_t kmap_lock; + struct req_info info_pool[POOL_SIZE]; + + int running; /* device is running and capacity != 0 */ + int emergency; /* device has lost contact to all servers */ + + uint16_t vid, rid; /* Volume-ID and Release-ID */ + + atomic_t refcnt; /* for open/release, see fops.c */ + struct semaphore config_mutex; /* for open/release and sysfs-fops */ + + struct gendisk *disk; /* interface to the block layer */ + spinlock_t blk_lock; /* queue-lock, shared with the block layer */ + + wait_queue_head_t sender_wq; /* wait-queue to notify dnbd2_tx_loop */ + spinlock_t send_queue_lock; + struct list_head send_queue; + + unsigned long pending_reqs; /* number of block requests pending */ + spinlock_t pending_queue_lock; + struct list_head pending_queue; + + struct timer_list requeue_timer; /* requeue timer */ + struct timer_list to_timer; /* takeover timer */ + struct timer_list hb_timer; /* heartbeat timer */ + unsigned long hb_interval; /* HB_NORMAL_ or HB_EMERG_INTERVAL */ + + int tx_id; /* pid of dnbd2_tx_loop */ + struct completion tx_start; /* dnbd2_tx_loop has started */ + struct completion tx_stop; /* dnbd2_tx_loop has stopped */ + int tx_signal; /* tells dnbd2_tx_loop to stop */ + + struct semaphore servers_mutex; + struct srv_info servers[ALT_SERVERS_MAX]; + dnbd2_server_t emerg_list[ALT_SERVERS_MAX]; /* last servers known */ + struct srv_info *active_server; + + int to_percent; /* percent threshold for takeover (relativ) */ + uint16_t to_jiffies; /* jiffies threshold for takeover (absolute) */ +}; diff --git a/kernel/fops.c b/kernel/fops.c new file mode 100644 index 0000000..c098e13 --- /dev/null +++ b/kernel/fops.c @@ -0,0 +1,43 @@ +/* + * kernel/fops.c + */ + + +#include "dnbd2.h" +#include "fops.h" + + +struct block_device_operations dnbd2_fops = { + .owner = THIS_MODULE, + .open = dnbd2_open, + .release = dnbd2_release, +}; + + +int dnbd2_open(struct inode *inode, struct file *file) +{ + dnbd2_device_t *dev = inode->i_bdev->bd_disk->private_data; + if (down_interruptible(&dev->config_mutex)) + return -EBUSY; + + /* FIXME: How do we put this add/start_device? */ + if (set_blocksize(inode->i_bdev, DNBD2_BLOCK_SIZE)) { + up(&dev->config_mutex); + return -EBUSY; + } + + atomic_inc(&dev->refcnt); + up(&dev->config_mutex); + return 0; +} + + +int dnbd2_release(struct inode *inode, struct file *file) +{ + dnbd2_device_t *dev = inode->i_bdev->bd_disk->private_data; + if (down_interruptible(&dev->config_mutex)) + return -EBUSY; + atomic_dec(&dev->refcnt); + up(&dev->config_mutex); + return 0; +} diff --git a/kernel/fops.h b/kernel/fops.h new file mode 100644 index 0000000..f5222d4 --- /dev/null +++ b/kernel/fops.h @@ -0,0 +1,9 @@ +/* + * kernel/fops.h + */ + + +extern struct block_device_operations dnbd2_fops; + +int dnbd2_open(struct inode *inode, struct file *file); +int dnbd2_release(struct inode *inode, struct file *file); diff --git a/kernel/misc.c b/kernel/misc.c new file mode 100644 index 0000000..2756bef --- /dev/null +++ b/kernel/misc.c @@ -0,0 +1,84 @@ +/* + * kernel/misc.c + */ + + +#include "dnbd2.h" +#include "misc.h" + + +uint16_t diff(uint16_t then) +{ + int diff = jiffies & 0xffff; + diff -= then; + if (diff < 0) + diff += 1 << 16; + return diff; +} + + +int sock_xmit(struct socket *sock, int send, void *buf, int size) +{ + int result; + struct msghdr msg; + struct kvec iov; + unsigned long flags; + sigset_t oldset; + + spin_lock_irqsave(¤t->sighand->siglock, flags); + oldset = current->blocked; + sigfillset(¤t->blocked); + sigdelsetmask(¤t->blocked, sigmask(SIGKILL)); + recalc_sigpending(); + spin_unlock_irqrestore(¤t->sighand->siglock, flags); + + do { + sock->sk->sk_allocation = GFP_NOIO; + iov.iov_base = buf; + iov.iov_len = size; + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = MSG_NOSIGNAL; + + if (send) + result = kernel_sendmsg(sock, &msg, &iov, 1, size); + else + result = kernel_recvmsg(sock, &msg, &iov, 1, size, 0); + + if (signal_pending(current)) { + siginfo_t info; + spin_lock_irqsave(¤t->sighand->siglock, flags); + dequeue_signal(current, ¤t->blocked, &info); + spin_unlock_irqrestore(¤t->sighand->siglock, + flags); + result = -EINTR; + break; + } + + if (result <= 0) { + if (result == 0) + result = -EPIPE; + break; + } + size -= result; + buf += result; + } while (size > 0); + + spin_lock_irqsave(¤t->sighand->siglock, flags); + current->blocked = oldset; + recalc_sigpending(); + spin_unlock_irqrestore(¤t->sighand->siglock, flags); + + return result; +} + + +char *inet_ntoa(uint32_t ip) +{ + static char buf[sizeof "aaa.bbb.ccc.ddd"]; + unsigned char *nums = (unsigned char *)&ip; + sprintf(buf, "%u.%u.%u.%u", nums[0], nums[1], nums[2], nums[3]); + return buf; +} diff --git a/kernel/misc.h b/kernel/misc.h new file mode 100644 index 0000000..8f7f511 --- /dev/null +++ b/kernel/misc.h @@ -0,0 +1,23 @@ +/* + * kernel/misc.h - Usefull stuff that doesn't fit anywhere else. + */ + + +#define SEND 1 +#define RECV 0 + + +/* + * Jiffies between now and @then. + */ +uint16_t diff(uint16_t then); + +/* + * Send or receive packet (from nbd.c) + */ +int sock_xmit(struct socket *sock, int send, void *buf, int size); + +/* + * Pretty printing of IPs. + */ +char *inet_ntoa(uint32_t ip); diff --git a/kernel/scp b/kernel/scp new file mode 100644 index 0000000..c14b937 --- /dev/null +++ b/kernel/scp @@ -0,0 +1 @@ +scp /root/fs/ba-vito/tags/abgabe/kernel/dnbd2.ko 132.230.4.71:/var/opt/openslx/stage1/suse-10.2/lib/modules/2.6.18.8-0.3-default/kernel/drivers/block/ diff --git a/kernel/servers.c b/kernel/servers.c new file mode 100644 index 0000000..7d5c229 --- /dev/null +++ b/kernel/servers.c @@ -0,0 +1,355 @@ +/* + * kernel/servers.c + */ + + +#include "dnbd2.h" +#include "servers.h" +#include "core.h" +#include "misc.h" + + +/* Only use RTTs smaller than RTT_MAX for statistics. */ +#define RTT_MAX HZ/4 + + +/* + * Find the next configured server which is not active. + */ +struct srv_info *next_server(dnbd2_device_t *dev) +{ + int i; + struct srv_info *srv_info, *next_srv = NULL; + + for_each_server(i) { + srv_info = &dev->servers[i]; + if (srv_info->sock && srv_info != dev->active_server) { + next_srv = srv_info; + break; + } + } + return next_srv; +} + + +/* + * Find the server with smallest SRTT. + */ +struct srv_info *fastest_server(uint16_t *srtt, dnbd2_device_t *dev) +{ + int i; + struct srv_info *srv_info, *alt_srv = NULL; + unsigned long min = RTT_MAX << SRTT_SHIFT; + + for_each_server(i) { + srv_info = &dev->servers[i]; + if (srv_info->sock && + srv_info->srtt < min && + srv_info->min < RTT_MAX) { + min = srv_info->srtt; + alt_srv = srv_info; + } + } + + *srtt = min >> SRTT_SHIFT; + return alt_srv; +} + + +/* + * This function can be enqueued in a workqueue. It removes srv_info + * from the list of servers. + */ +void del_server_work(struct work_struct *work) +{ + struct srv_info *srv_info = container_of(work, struct srv_info, work); + dnbd2_device_t *dev = srv_info->dev; + + down(&dev->servers_mutex); + if (!srv_info->sock || dev->active_server == srv_info) { + up(&dev->servers_mutex); + return; + } + del_server(srv_info); + up(&dev->servers_mutex); + return; +} + + +/* + * This function can be enqueued in a workqueue. Read comment on + * schedule_activate_fastest in servers.h + */ +void activate_fastest_work(struct work_struct *work) +{ + dnbd2_device_t *dev = container_of(work, dnbd2_device_t, work); + struct srv_info *alt_srv, *next_srv; + uint16_t min, srtt; + int newsrv = 0; + long unsigned delta; + + down(&dev->servers_mutex); + if (!dev->active_server) + goto out; + + /* Detect if dev->active_server is stalled. */ + delta = (long)jiffies - (long)dev->active_server->last_reply; + if (dev->pending_reqs && delta > TIMEOUT_STALLED) { + next_srv = next_server(dev); + if (!next_srv) { + start_emergency(dev); + goto out; + } + alt_srv = dev->active_server; + dev->active_server = next_srv; + del_server(alt_srv); + } + + /* Switch to another server if requirements met. */ + srtt = dev->active_server->srtt >> SRTT_SHIFT; + alt_srv = fastest_server(&min, dev); + if (!alt_srv || alt_srv == dev->active_server) + goto out; + if (dev->to_percent) { + if (100 * (srtt - min) < dev->to_percent * srtt) + goto out; + newsrv = 1; + } + if (dev->to_jiffies) { + if (min + dev->to_jiffies > srtt) + goto out; + newsrv = 1; + } + if (newsrv) + dev->active_server = alt_srv; + + out: + up(&dev->servers_mutex); +} + + +int start_rx_loop(struct srv_info *srv_info) +{ + srv_info->rx_id = kernel_thread(dnbd2_rx_loop, srv_info, CLONE_KERNEL); + if (srv_info->rx_id < 0) { + srv_info->rx_id = 0; + return -1; + } + wait_for_completion(&srv_info->rx_start); + return 0; +} + + +void stop_rx_loop(struct srv_info *srv_info) +{ + if (!srv_info->rx_id) + return; + kill_proc(srv_info->rx_id, SIGKILL, 1); + wait_for_completion(&srv_info->rx_stop); + srv_info->rx_id = 0; +} + + +/******************************************************/ +/* For the next functions see servers.h for comments. */ +/******************************************************/ + + +int add_server(dnbd2_server_t server, struct srv_info *srv_info) +{ + struct sockaddr_in addr; + struct socket *sock; + + if (!server.ip || !server.port) + return -1; + + if (sock_create(PF_INET,SOCK_DGRAM, IPPROTO_UDP, &sock) < 0) { + p("Could not create socket.\n"); + return -1; + } + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = server.ip; + addr.sin_port = server.port; + if (sock->ops->connect(sock, + (struct sockaddr *)&addr, + sizeof(addr), 0)) { + p("Could not connect to socket.\n"); + goto out; + } + + srv_info->sock = sock; + srv_info->ip = server.ip; + srv_info->port = server.port; + srv_info->min = RTT_MAX; + + if (start_rx_loop(srv_info)) { + p("Could not start rx_loop\n"); + goto out; + } + + return 0; + out: + del_server(srv_info); + return -1; +} + + +void del_server(struct srv_info *srv_info) +{ + stop_rx_loop(srv_info); + if (srv_info->sock) + sock_release(srv_info->sock); + srv_info->sock = NULL; + srv_info->ip = 0; + srv_info->port = 0; + srv_info->srtt = 0; + srv_info->min = 0; + srv_info->max = 0; + srv_info->srtt = 0; + srv_info->retries = 0; + srv_info->last_reply = 0; +} + + +void try_add_server(dnbd2_server_t server, dnbd2_device_t *dev) +{ + int i; + + for_each_server(i) + if (dev->servers[i].sock && + dev->servers[i].ip == server.ip && + dev->servers[i].port == server.port) + return; + + for_each_server(i) + if (!dev->servers[i].sock) + break; + + if (i == ALT_SERVERS_MAX) + return; + + add_server(server, &dev->servers[i]); +} + + +sector_t srv_get_capacity(struct srv_info *srv_info) +{ + struct request *req; + struct req_info *info; + dnbd2_device_t *dev = srv_info->dev; + sector_t capacity; + int ret; + + info = kmalloc(sizeof(struct req_info), GFP_KERNEL); + if (!info) + return 0; + req = kmalloc(sizeof(struct request), GFP_KERNEL); + if (!req) { + kfree(info); + return 0; + } + info->cmd = CMD_GET_SIZE; + info->cnt = 0; + info->time = jiffies; + info->dst = srv_info; + info->last_dst = srv_info; + req->special = info; + req->sector = 0; + INIT_LIST_HEAD(&req->queuelist); + + /* Enqueue the request for sending. */ + spin_lock_bh(&dev->send_queue_lock); + list_add_tail(&req->queuelist, &dev->send_queue); + spin_unlock_bh(&dev->send_queue_lock); + + /* Wake up sender function. */ + wake_up_interruptible(&dev->sender_wq); + + /* If we don't get an answer in 4 seconds we give up. */ + ret = wait_event_timeout(srv_info->wq, srv_info->capacity, 4*HZ); + capacity = srv_info->capacity; + srv_info->capacity = 0; + + return ret ? capacity : 0; +} + + +void update_rtt(uint16_t rtt, struct srv_info *srv_info, uint16_t cmd) +{ + if (rtt == 0) + rtt = 1; + if (rtt > RTT_MAX) + return; + + if (rtt < srv_info->min) + srv_info->min = rtt; + if (rtt > srv_info->max) + srv_info->max = rtt; + + if (!srv_info->srtt) { + srv_info->srtt = rtt << SRTT_SHIFT; + return; + } + + switch (cmd) { + case CMD_GET_BLOCK: + srv_info->srtt = SRTT_BETA * srv_info->srtt; + srv_info->srtt += (SRTT_BETA_COMP * ((int)rtt)) << SRTT_SHIFT; + srv_info->srtt /= SRTT_BETA_BASE; + break; + + case CMD_GET_SIZE: + case CMD_GET_SERVERS: + srv_info->srtt = rtt << SRTT_SHIFT; + break; + } +} + + +void enqueue_hb(struct srv_info *srv_info) +{ + struct request *req; + struct req_info *info; + dnbd2_device_t *dev = srv_info->dev; + + if (!srv_info->sock) + return; + info = kmalloc(sizeof(struct req_info), GFP_ATOMIC); + if (!info) + return; + req = kmalloc(sizeof(struct request), GFP_ATOMIC); + if (!req) { + kfree(info); + return; + } + info->cmd = CMD_GET_SERVERS; + info->cnt = 0; + info->time = jiffies; + info->dst = srv_info; + info->dst = srv_info; + info->last_dst = srv_info; + req->special = info; + req->sector = 0; + INIT_LIST_HEAD(&req->queuelist); + + /* Enqueue the request for sending. */ + spin_lock_bh(&dev->send_queue_lock); + list_add_tail(&req->queuelist, &dev->send_queue); + spin_unlock_bh(&dev->send_queue_lock); +} + + +void schedule_del_server(struct srv_info *srv_info) +{ + PREPARE_WORK(&srv_info->work, del_server_work); //, srv_info); + /* Change in /<linuxheaders>/include/linux/workqueue.h */ + schedule_work(&srv_info->work); +} + + +void schedule_activate_fastest(dnbd2_device_t *dev) +{ + PREPARE_WORK(&dev->work, activate_fastest_work); //, dev); + /* Change in /<linuxheaders>/include/linux/workqueue.h */ + schedule_work(&dev->work); +} diff --git a/kernel/servers.h b/kernel/servers.h new file mode 100644 index 0000000..83d9b46 --- /dev/null +++ b/kernel/servers.h @@ -0,0 +1,84 @@ +/* + * kernel/servers.h + */ + + +#define SRTT_BETA 990 +#define SRTT_BETA_COMP 10 +#define SRTT_BETA_BASE 1000 + +/* + * Though presented as a 16-bit number in sysfs, the SRTT is stored in + * an unsigned long, SRTT_SHIFT bits left-shifted. This helps + * preserve the precision we need tou update the SRTTs. If we stored + * SRTT in an integer, it would only be changed by RTTS *very* far + * away from it. + */ +#define SRTT_SHIFT 10 + +/* + * After this interval without answering + * requests a server is considered down. + */ +#define TIMEOUT_STALLED 3*HZ + + +/* + * Configure @srv_info based on @server: Create a socket, connect it + * and start a dnbd2_rx_loop for it. + */ +int add_server(dnbd2_server_t server, struct srv_info *srv_info); + +/* + * Reset @srv_info: Stop dnbd2_rx_loop, close the socket and zero all + * variables. + */ +void del_server(struct srv_info *srv_info); + +/* + * Look for an unused server-slot in @dev and configure it according + * to @server (using add_server). Nothing happens if the list of + * servers if full or if there is already a server in the list with + * the same IP and port as @server. + */ +void try_add_server(dnbd2_server_t server, dnbd2_device_t *dev); + +/* + * This function enqueues into the send-queue a request for the device + * capacity (CMD_GET_SIZE). This type of request is retransmitted + * every second. If an answer doesn't arrive within 4 seconds it + * assumes that the server is down. + */ +sector_t srv_get_capacity(struct srv_info *srv_info); + +/* + * Update min, max and srtt in @srv_info. + */ +void update_rtt(uint16_t rtt, struct srv_info *srv_info, uint16_t cmd); + +/* + * Enqueue a heartbeat request (CMD_GET_SERVERS) for @srv_info into + * the send-queue. + */ +void enqueue_hb(struct srv_info *srv_info); + +/* + * Schedule the removal of @srv_info to be processed by the global + * kernel workqueue as soon as possible. This function allows to + * trigger the removal of servers from within timers, in which + * sleeping is not allowed. + */ +void schedule_del_server(struct srv_info *srv_info); + +/* + * Schedule the search for a faster server: + * 1. Detect if the active server is stalled (down). In this case + * switch to another server, if possible, and remove the old one + * from the list. If no servers are available to switch to + * start the emergency mode. + * 2. If the emergency mode was not started go through the list of + * servers and pick the one with the smallest SRTT. If this + * server meets our requirements (to_jiffies and to_percent) + * switch to it. + */ +void schedule_activate_fastest(dnbd2_device_t *dev); diff --git a/kernel/sysfs.c b/kernel/sysfs.c new file mode 100644 index 0000000..802e9d2 --- /dev/null +++ b/kernel/sysfs.c @@ -0,0 +1,460 @@ +/* + * kernel/sysfs.c + */ + + +#include "dnbd2.h" +#include "misc.h" +#include "sysfs.h" +#include "devices.h" +#include "servers.h" + + +#define RW 0644 +#define RO 0444 + +#define kobj_to_dev(kp) container_of(kp, dnbd2_device_t, kobj) +#define kobj_to_srv(kp) container_of(kp, struct srv_info, kobj) + +#define attr_to_srvattr(ap) container_of(ap, struct server_attr, attr) +#define attr_to_devattr(ap) container_of(ap, struct device_attr, attr) + +#define DEV_ATTR_RW(_name) \ +static struct device_attr _name = \ +__ATTR(_name, RW, show_##_name, store_##_name) + +#define SRV_ATTR_RW(_name) \ +static struct server_attr _name = \ +__ATTR(_name, RW, show_##_name, store_##_name) + +#define DEV_ATTR_RO(_name) \ +static struct device_attr _name = \ +__ATTR(_name, RO, show_##_name, NULL) + +#define SRV_ATTR_RO(_name) \ +static struct server_attr _name = \ +__ATTR(_name, RO, show_##_name, NULL) + + +struct device_attr { + struct attribute attr; + ssize_t (*show)(char *, dnbd2_device_t *); + ssize_t (*store)(const char *, size_t, dnbd2_device_t *); +}; + +struct server_attr { + struct attribute attr; + ssize_t (*show)(char *, struct srv_info *); + ssize_t (*store)(const char *, size_t, struct srv_info *); +}; + + +void release(struct kobject *kobj) {} + +ssize_t show_running(char *, dnbd2_device_t *); +ssize_t store_running(const char *, size_t, dnbd2_device_t *); +ssize_t show_to_percent(char *, dnbd2_device_t *); +ssize_t store_to_percent(const char *, size_t, dnbd2_device_t *); +ssize_t show_to_jiffies(char *, dnbd2_device_t *); +ssize_t store_to_jiffies(const char *, size_t, dnbd2_device_t *); +ssize_t show_vid(char *, dnbd2_device_t *); +ssize_t store_vid(const char *, size_t, dnbd2_device_t *); +ssize_t show_rid(char *, dnbd2_device_t *); +ssize_t store_rid(const char *, size_t, dnbd2_device_t *); +ssize_t show_pending_reqs(char *, dnbd2_device_t *); +ssize_t show_emergency(char *, dnbd2_device_t *); + +ssize_t show_sock(char *, struct srv_info *); +ssize_t store_sock(const char *, size_t, struct srv_info *); +ssize_t show_active(char *, struct srv_info *); +ssize_t store_active(const char *, size_t, struct srv_info *); +ssize_t show_rtt(char *, struct srv_info *); +ssize_t show_retries(char *, struct srv_info *); +ssize_t show_last_reply(char *, struct srv_info *); + + +/* device attributes */ +DEV_ATTR_RW(running); +DEV_ATTR_RW(to_percent); +DEV_ATTR_RW(to_jiffies); +DEV_ATTR_RW(vid); +DEV_ATTR_RW(rid); +DEV_ATTR_RO(pending_reqs); +DEV_ATTR_RO(emergency); + +static struct attribute *device_attrs[] = { + &running.attr, + &to_percent.attr, + &to_jiffies.attr, + &vid.attr, + &rid.attr, + &pending_reqs.attr, + &emergency.attr, + NULL, +}; + +/* server attributes */ +SRV_ATTR_RW(sock); +SRV_ATTR_RW(active); +SRV_ATTR_RO(rtt); +SRV_ATTR_RO(retries); +SRV_ATTR_RO(last_reply); + +struct attribute *server_attrs[] = { + &sock.attr, + &active.attr, + &rtt.attr, + &retries.attr, + &last_reply.attr, + NULL, +}; + + +/* + * Wrapper functions for show/store, one pair for device attributes + * and one pair for server attributes. Each attribute has its own + * specific function(s). + */ +ssize_t device_show(struct kobject *kobj, struct attribute *attr, + char *buf) +{ + struct device_attr *device_attr = attr_to_devattr(attr); + dnbd2_device_t *dev = kobj_to_dev(kobj); + return device_attr->show(buf, dev); +} + +ssize_t device_store(struct kobject *kobj, struct attribute *attr, + const char *buf, size_t count) +{ + int ret; + struct device_attr *device_attr = attr_to_devattr(attr); + dnbd2_device_t *dev = kobj_to_dev(kobj); + down(&dev->config_mutex); + ret = device_attr->store(buf, count, dev); + up(&dev->config_mutex); + return ret; +} + +ssize_t server_show(struct kobject *kobj, struct attribute *attr, + char *buf) +{ + struct server_attr *server_attr = attr_to_srvattr(attr); + struct srv_info *srv_info = kobj_to_srv(kobj); + return server_attr->show(buf, srv_info); +} + +ssize_t server_store(struct kobject *kobj, struct attribute *attr, + const char *buf, size_t count) +{ + int ret; + struct server_attr *server_attr = attr_to_srvattr(attr); + struct srv_info *srv_info = kobj_to_srv(kobj); + down(&srv_info->dev->config_mutex); + ret = server_attr->store(buf, count, srv_info); + up(&srv_info->dev->config_mutex); + return ret; +} + + +struct sysfs_ops device_ops = { + .show = device_show, + .store = device_store, +}; + +struct sysfs_ops server_ops = { + .show = server_show, + .store = server_store, +}; + +struct kobj_type device_ktype = { + .default_attrs = device_attrs, + .sysfs_ops = &device_ops, + .release = release, +}; + +struct kobj_type server_ktype = { + .default_attrs = server_attrs, + .sysfs_ops = &server_ops, + .release = release, +}; + + +/* + * RW device attribute functions. + */ +ssize_t show_running(char *buf, dnbd2_device_t *dev) +{ + return sprintf(buf, "%d\n", dev->running); +} + +ssize_t store_running(const char *buf, size_t count, dnbd2_device_t *dev) +{ + sector_t capacity = 0; + struct srv_info *srv_info; + int i, running, ret = sscanf(buf, "%d", &running); + + if (ret != 1) + return -EINVAL; + + if (!dev->running) { + if (running != 1 || !dev->vid || !dev->rid) + return -EINVAL; + if (atomic_read(&dev->refcnt) > 0) + return -EBUSY; + + for_each_server(i) { + srv_info = &dev->servers[i]; + if (!srv_info->sock) + continue; + capacity = srv_get_capacity(srv_info); + if (capacity) { + set_capacity(dev->disk, capacity); + dev->active_server = srv_info; + break; + } + } + if (!capacity) { + p("Could not contact any servers.\n"); + return -EHOSTUNREACH; + } + for_each_server(i) { + dev->emerg_list[i].ip = dev->servers[i].ip; + dev->emerg_list[i].port = dev->servers[i].port; + } + printk(LOG "Device capacity = " SECT_PRECISION " KB\n", + capacity * SECTOR_SIZE / 1024); + dev->running = 1; + __module_get(THIS_MODULE); + } else { + if (running != 0) + return -EINVAL; + if (atomic_read(&dev->refcnt) > 0) + return -EBUSY; + + /* Stop device. */ + dev->running = 0; + set_capacity(dev->disk, 0); + dev->active_server = NULL; + module_put(THIS_MODULE); + } + + return count; +} + +ssize_t show_vid(char *buf, dnbd2_device_t *dev) +{ + return sprintf(buf, "%hu\n", dev->vid); +} + +ssize_t store_vid(const char *buf, size_t count, dnbd2_device_t *dev) +{ + uint16_t vid; + + if (dev->running) + return -EBUSY; + if (sscanf(buf, "%hu", &vid) != 1) + return -EINVAL; + + dev->vid = vid; + return count; +} + +ssize_t show_rid(char *buf, dnbd2_device_t *dev) +{ + return sprintf(buf, "%hu\n", dev->rid); +} + +ssize_t store_rid(const char *buf, size_t count, dnbd2_device_t *dev) +{ + uint16_t rid; + + if (dev->running) + return -EBUSY; + if (sscanf(buf, "%hu", &rid) != 1) + return -EINVAL; + + dev->rid = rid; + return count; +} + +ssize_t show_to_percent(char *buf, dnbd2_device_t *dev) +{ + return sprintf(buf, "%d\n", dev->to_percent); +} + +ssize_t store_to_percent(const char *buf, size_t count, dnbd2_device_t *dev) +{ + if (sscanf(buf, "%d", &dev->to_percent) == 1) + return count; + return -EINVAL; +} + +ssize_t show_to_jiffies(char *buf, dnbd2_device_t *dev) +{ + return sprintf(buf, "%hu\n", dev->to_jiffies); +} + +ssize_t store_to_jiffies(const char *buf, size_t count, dnbd2_device_t *dev) +{ + if (sscanf(buf, "%hu", &dev->to_jiffies) == 1) + return count; + return -EINVAL; +} + + +/* + * RO device attribute functions. + */ +ssize_t show_pending_reqs(char *buf, dnbd2_device_t *dev) +{ + return sprintf(buf, "%lu\n", dev->pending_reqs); +} + +ssize_t show_emergency(char *buf, dnbd2_device_t *dev) +{ + return sprintf(buf, "%d\n", dev->emergency); +} + + +/* + * RW server attribute functions. + */ +ssize_t show_sock(char *buf, struct srv_info *srv_info) +{ + return sprintf(buf, "%s %hu\n", + inet_ntoa(srv_info->ip), + ntohs(srv_info->port)); +} + +ssize_t store_sock(const char *buf, size_t count, struct srv_info *srv_info) +{ + char ip[sizeof "aaa.bbb.ccc.ddd 12345"]; + uint16_t port; + dnbd2_server_t server; + dnbd2_device_t *dev = srv_info->dev; + + if (count > sizeof "aaa.bbb.ccc.ddd 12345") + return -EINVAL; + if (sscanf(buf, "%s %hu", ip, &port) != 2) + return -EINVAL; + server.ip = in_aton(ip); + server.port = htons(port); + + down(&dev->servers_mutex); + if (dev->running && dev->active_server == srv_info) { + up(&dev->servers_mutex); + return -EBUSY; + } + if (srv_info->sock) + del_server(srv_info); + if (server.ip && server.port && add_server(server, srv_info)) { + up(&dev->servers_mutex); + return -EINVAL; + } + up(&dev->servers_mutex); + + return count; +} + +ssize_t show_active(char *buf, struct srv_info *srv_info) +{ + if (!srv_info->sock) + return sprintf(buf, "0\n"); + if (srv_info->dev->active_server == srv_info) + return sprintf(buf, "1\n"); + return sprintf(buf, "0\n"); +} + +ssize_t store_active(const char *buf, size_t count, struct srv_info *srv_info) +{ + dnbd2_device_t *dev = srv_info->dev; + int active; + + if (sscanf(buf, "%d", &active) != 1 || active != 1) + return -EINVAL; + + down(&dev->servers_mutex); + if (!dev->running) { + up(&dev->servers_mutex); + return -EINVAL; + } + if (!srv_info->sock || dev->active_server == srv_info) { + up(&dev->servers_mutex); + return -EINVAL; + } + dev->active_server = srv_info; + up(&dev->servers_mutex); + + return count; +} + + +/* + * RO server attribute functions. + */ +ssize_t show_rtt(char *buf, struct srv_info *srv_info) +{ + return sprintf(buf, "%hu %lu %hu\n", + srv_info->min, + srv_info->srtt >> SRTT_SHIFT, + srv_info->max); +} + +ssize_t show_retries(char *buf, struct srv_info *srv_info) +{ + return sprintf(buf, "%lu\n", srv_info->retries); +} + +ssize_t show_last_reply(char *buf, struct srv_info *srv_info) +{ + return sprintf(buf, "%lu\n", srv_info->last_reply); +} + + +/* Helper for start_sysfs. */ +int setup_kobj(struct kobject *kobj, char *name, struct kobject *parent, + struct kobj_type *ktype) +{ + memset(kobj, 0, sizeof(struct kobject)); + kobj->parent = parent; + kobj->ktype = ktype; + if (kobject_set_name(kobj, name)) + return -1; + if (kobject_register(kobj)) + return -1; + return 0; +} + + +/* + * Exported functions - see sysfs.h + */ +int start_sysfs(dnbd2_device_t *dev) +{ + int i; + char name[] = "server99"; + + if (setup_kobj(&dev->kobj, "config", &dev->disk->kobj, &device_ktype)) + return -1; + + for_each_server(i) { + sprintf(name, "server%d", i); + if (setup_kobj(&dev->servers[i].kobj, name, + &dev->disk->kobj, &server_ktype)) + goto out; + } + return 0; + + out: + while (i--) + kobject_unregister(&dev->servers[i].kobj); + return -1; +} + +void stop_sysfs(dnbd2_device_t *dev) +{ + int i; + for_each_server(i) + kobject_unregister(&dev->servers[i].kobj); + kobject_unregister(&dev->kobj); +} diff --git a/kernel/sysfs.h b/kernel/sysfs.h new file mode 100644 index 0000000..a18338e --- /dev/null +++ b/kernel/sysfs.h @@ -0,0 +1,24 @@ +/* + * kernel/sysfs.h + */ + + +/* + * Setup the sysfs-interface for @dev: + * + * M = minor number + * N = ALT_SERVERS_MAX - 1 + * + * /sys/block/vnbdM/config (from @dev->kobj) + * /sys/block/vnbdM/server0 (from @dev->servers[0].kobj) + * . + * . + * /sys/block/vnbdM/serverN (from @dev->servers[N].kobj) + * + */ +int start_sysfs(dnbd2_device_t *dev); + +/* + * Destroy the sysfs-interface for @dev. + */ +void stop_sysfs(dnbd2_device_t *dev); diff --git a/server/Makefile b/server/Makefile new file mode 100644 index 0000000..a41d6da --- /dev/null +++ b/server/Makefile @@ -0,0 +1,30 @@ +# +# server/Makefile +# + +CPPFLAGS = -I../include +CFLAGS = -Wall -O2 -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 +VPATH = ../include +APP = dnbd2-server + +all: $(APP) + +install: $(APP) + cp $(APP) /usr/local/sbin + +dnbd2-server: main.o config.o query.o tree.o file.o + $(CC) -o $@ $^ + +main.o: config.h query.h tree.h file.h dnbd2.h + +config.o: config.h tree.h file.h dnbd2.h + +query.o: query.h tree.h file.h dnbd2.h + +tree.o: tree.h dnbd2.h + +file.o: file.h + +.PHONY: +clean: + $(RM) *.o *~ $(APP) diff --git a/server/config.c b/server/config.c new file mode 100644 index 0000000..5dec41e --- /dev/null +++ b/server/config.c @@ -0,0 +1,213 @@ +/* + * server/config.c + */ + + +#include <arpa/inet.h> +#include <stdlib.h> +#include <string.h> +#include <syslog.h> +#include <stdio.h> +#include "dnbd2.h" +#include "tree.h" +#include "file.h" +#include "config.h" + + +char *get_line(char *buf, FILE *file); +dataset_t *parse_dataset_file(const char *filename); +int parse_server(dnbd2_server_t *server, const char *str); + + +/* + * The config file looks like this: + * + * 192.168.178.120 + * 5000 + * /etc/dnbd2/datasets/debian-3.1 192.168.178.119:5005 + * /etc/dnbd2/datasets/suse-10.2 192.168.178.119:5005 192.168.178.118:5005 + * ... + * + */ +int parse_config_file(const char *filename, struct sockaddr_in *sockaddr, + void **tree) +{ + char buf[LINE_SIZE_MAX]; + FILE *file; + + bzero(sockaddr, sizeof(sockaddr)); + sockaddr->sin_family = AF_INET; + + /* Open file. */ + file = fopen(filename, "r"); + if (!file) { + syslog(LOG_ERR, "Could not open config file %s", filename); + return -1; + } + syslog(LOG_NOTICE, "Config file: %s", filename); + + /* Read IP from file. */ + if (!get_line(buf, file) || !inet_aton(buf, &(sockaddr->sin_addr))) { + syslog(LOG_ERR, "Could not read IP."); + return -1; + } + + /* Read port number from file. */ + if (!get_line(buf, file) || + sscanf(buf, "%hu", &(sockaddr->sin_port)) != 1) { + syslog(LOG_ERR, "Could not read port number."); + return -1; + } + sockaddr->sin_port = htons(sockaddr->sin_port); + + syslog(LOG_NOTICE, "IP = %s, Port = %hu", + inet_ntoa(sockaddr->sin_addr), ntohs(sockaddr->sin_port)); + + int datasets = 0; + char server[ALT_SERVERS_MAX][LINE_SIZE_MAX]; + char dsfile[LINE_SIZE_MAX]; + while (get_line(buf, file)) { + + int ret = sscanf(buf, "%s %s %s %s %s", + dsfile, + server[0], + server[1], + server[2], + server[3]) - 1; + + /* Parse a dataset file and put it into a tree-node. */ + dataset_t *dataset = parse_dataset_file(dsfile); + if (!dataset) + goto out_nodsfile; + node_t *data = (node_t *)malloc(sizeof(node_t)); + if (!data) { + syslog(LOG_ERR, + "Could not allocate memory for new Dataset."); + goto out_nodataset; + } + int fd = file_open(dataset->path); + if (fd == -1) { + syslog(LOG_ERR, + "Could not open file or block device %s", + dataset->path); + goto out_nodata; + } + strncpy(data->path, dsfile, FILE_NAME_MAX); + data->ds = dataset; + data->fd = fd; + + /* Parse the list of alterntive servers. */ + int i; + int cnt = 0; + for (i=0 ; i<ret ; i++) + cnt += parse_server(&data->server[cnt], server[i]); + data->servers = cnt; + + /* Check if the Volume-ID-Release-ID pair is already in use. */ + node_t *data2 = tree_find(data, tree); + if (data2) { + syslog(LOG_ERR, + "Vol-ID/Rel-ID already used in Dataset file %s", + data2->path); + goto out_nodata; + } + + /* Insert Dataset into tree. */ + if (tree_insert(data, tree) == -1) { + syslog(LOG_ERR, "Could not insert Dataset into tree."); + goto out_nodata; + } + + datasets++; + continue; + + out_nodata: + free(data); + out_nodataset: + free(dataset); + out_nodsfile: + syslog(LOG_ERR, "Problem parsing %s", dsfile); + } + + syslog(LOG_NOTICE, "Loaded %d Dataset(s).", datasets); + + fclose(file); + return datasets; +} + + +/* + * A dataset config file looks like this: + * + * /path/to/file/or/block/device + * Volume-ID + * Release-ID + */ +dataset_t *parse_dataset_file(const char *filename) +{ + char buf[LINE_SIZE_MAX]; + + FILE *file = fopen(filename, "r"); + if (!file) { + syslog(LOG_ERR, "Could not open Dataset file %s", filename); + return NULL; + } + + dataset_t *dataset = (dataset_t *) malloc(sizeof(dataset_t)); + if (!dataset) { + syslog(LOG_ERR, "Could not allocate memory for new Dataset."); + return NULL; + } + + /* Read file- or block device name from file. */ + if (!get_line(dataset->path, file)) { + syslog(LOG_ERR, "Could not read path to file or block device"); + return NULL; + } + + /* Read Volume-ID from file. */ + if (!get_line(buf, file) || sscanf(buf, "%hu", &(dataset->vid)) != 1) { + syslog(LOG_ERR, "Could not read Volume-ID."); + return NULL; + } + + /* Read Release-ID from file. */ + if (!get_line(buf, file) || sscanf(buf, "%hu", &(dataset->rid)) != 1) { + syslog(LOG_ERR, "Could not read Release-ID."); + return NULL; + } + + fclose(file); + return dataset; +} + + +char *get_line(char *buf, FILE *file) +{ + char *ret = fgets(buf, LINE_SIZE_MAX, file); + + /* change \n with \0 */ + if (ret) + buf[strlen(buf)-1] = '\0'; + + return ret; +} + + +int parse_server(dnbd2_server_t *server, const char *str) +{ + char ip[LINE_SIZE_MAX]; + uint16_t port; + struct in_addr tmp; + + if (sscanf(str, "%[^:]:%hu", ip, &port) != 2) + return 0; + + if (!inet_aton(ip, &tmp)) + return 0; + + memcpy(&server->ip, &tmp, sizeof(uint32_t)); + server->port = htons(port); + + return 1; +} diff --git a/server/config.h b/server/config.h new file mode 100644 index 0000000..dc9cff1 --- /dev/null +++ b/server/config.h @@ -0,0 +1,13 @@ +/* + * server/config.h + */ + + +/* + * Parse the server configuration file, create a socket address + * structure and a tree of datasets. + * + * Returns: Number of Datasets loaded or -1 on failure. + */ +int parse_config_file(const char *filename, struct sockaddr_in *sockaddr, + void **tree); diff --git a/server/file.c b/server/file.c new file mode 100644 index 0000000..26c3cd0 --- /dev/null +++ b/server/file.c @@ -0,0 +1,65 @@ +/* + * server/file.c + */ + + +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include "file.h" + + +int file_open(char *filename) +{ + int fd = open(filename, O_RDONLY); + if (fd == -1) + return -1; + + struct stat st; + if (fstat(fd, &st) == -1) + return -1; + + return fd; +} + + +int file_getsize(int fd, off_t *size) +{ + *size = lseek64(fd, 0, SEEK_END); + + if (*size == -1) + return -1; + + return 0; +} + + +int file_read(int fd, void *buf, size_t size, off_t pos) +{ + off_t newpos = lseek(fd, pos, SEEK_SET); + + if (newpos == -1) + return -1; + + size_t nleft = size; + ssize_t nread; + char *ptr = buf; + + while (nleft > 0) { + if ((nread = read(fd, ptr, nleft)) < 0) { + if (errno == EINTR) + continue; + + return -1; + } + + if (nread == 0) { + break; + } + + nleft -= nread; + ptr += nread; + } + + return 0; +} diff --git a/server/file.h b/server/file.h new file mode 100644 index 0000000..720cf9b --- /dev/null +++ b/server/file.h @@ -0,0 +1,28 @@ +/* + * server/file.h - Functions to work with files. + */ + + +/* + * Open a file and test it with stat. + * + * Returns: File descriptor on success or -1 on failure. + */ +int file_open(char *filename); + + +/* + * Store in @size the size in bytes of the file pointed to by @fd. + * + * Returns: 0 on success -1 on failure; + * + */ +int file_getsize(int fd, off_t *size); + + +/* + * Copy @size bytes of @fd, starting at @pos, into @buf. + * + * Returns: 0 on success -1 on failure. + */ +int file_read(int fd, void *buf, size_t size, off_t pos); diff --git a/server/main.c b/server/main.c new file mode 100644 index 0000000..5a923a9 --- /dev/null +++ b/server/main.c @@ -0,0 +1,227 @@ +/* + * server/main.c + */ + + +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/wait.h> +#include <unistd.h> +#include <syslog.h> +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include "dnbd2.h" +#include "tree.h" +#include "file.h" +#include "config.h" +#include "query.h" + + +char *cfile = NULL; /* path to config file */ +void *tree = NULL; /* tree containing datasets */ +int server_fd; /* file descriptor for UPD communication */ +struct sockaddr_in server_addr; + + +/* + * Flags and signal handlers. + */ +int sig_exit, sig_config; +void exit_handler(int sig); +void config_handler(int sig); + + +/* + * Function to configure server. We call it with LOAD on + * initialization and with RELOAD when SIGHUP is caught. + */ +enum config_action { LOAD, RELOAD }; +int configure(enum config_action action); + + +void print_usage(void); +int daemon_init(void); + + +int main(int argc, char **argv) +{ + int ret; + ssize_t n; + socklen_t len; + dnbd2_data_reply_t reply; + dnbd2_data_request_t request; + struct sockaddr_in client_addr; + struct sigaction exit_act, config_act; + + if (argc != 2) { + print_usage(); + return -1; + } + + cfile = argv[1]; + openlog(argv[0], LOG_PID, LOG_LOCAL2); + syslog(LOG_NOTICE, "Starting DNBD2 Server."); + + /* Load datasets and bind to socket. */ + if (configure(LOAD) == -1) + goto out_no_start; + + /* Daemonize. */ + ret = daemon_init(); + if (ret == -1) { + syslog(LOG_ERR, "Could not fork and background."); + goto out_no_start; + } + + /* Setup signal handlers. */ + sigaction(SIGTERM, NULL, &exit_act); + exit_act.sa_flags &= ~SA_RESTART; + exit_act.sa_handler = exit_handler; + + sigaction(SIGHUP, NULL, &config_act); + config_act.sa_flags &= ~SA_RESTART; + config_act.sa_handler = config_handler; + + sigaction(SIGTERM, &exit_act, NULL); + sigaction(SIGINT, &exit_act, NULL); + sigaction(SIGHUP, &config_act, NULL); + + while (1) { + if (sig_exit) { + syslog(LOG_NOTICE, "Stopping Server."); + exit(0); + } + if (sig_config) { + syslog(LOG_NOTICE, "Reloading configuration."); + if (configure(RELOAD) == -1) + syslog(LOG_ERR, "Not using new configuration."); + sig_config = 0; + } + + /* Receive request. */ + len = sizeof(client_addr); + n = recvfrom(server_fd, &request, sizeof(request), 0, + &client_addr, &len); + + if (n == -1 || n != sizeof(request)) + continue; + + /* Process request. */ + ret = handle_query(&request, &reply, &tree); + if (ret == -1) + continue; + + /* Send reply. */ + sendto(server_fd, &reply, sizeof(reply), 0, &client_addr, len); + } + + return 0; + + out_no_start: + syslog(LOG_ERR, "Server not started."); + fprintf(stderr, "Server not started - " + "consult your syslog for errors.\n"); + return -1; +} + + +void config_handler(int sig) +{ + sig_config = 1; +} + + +void exit_handler(int sig) +{ + sig_exit = 1; +} + + +int cmp_addr(struct sockaddr_in addr1, struct sockaddr_in addr2) +{ + int diff = memcmp(&addr1.sin_addr.s_addr, + &addr2.sin_addr.s_addr, + sizeof(in_addr_t)); + + if (diff) + return diff; + + return memcmp(&addr1.sin_port, + &addr2.sin_port, + sizeof(in_port_t)); +} + + +int configure(enum config_action action) +{ + void *tmp_tree = NULL; + int datasets, ret, tmp_fd = server_fd; + struct sockaddr_in tmp_addr; + + datasets = parse_config_file(cfile, &tmp_addr, &tmp_tree); + if (datasets <= 0) + return -1; + + /* Create a socket and bind to it. */ + if (action == LOAD || cmp_addr(tmp_addr, server_addr)) { + tmp_fd = socket(PF_INET, SOCK_DGRAM, 0); + if (tmp_fd == -1) { + syslog(LOG_ERR, "Could not create socket."); + return -1; + } + + ret = bind(tmp_fd, (struct sockaddr *) &tmp_addr, + sizeof(tmp_addr)); + + if (ret == -1) { + close(tmp_fd); + syslog(LOG_ERR, "Could not assign name to socket."); + return -1; + } + } + + /* Make new socket available and close the old one if necesary. */ + if (action == RELOAD && cmp_addr(tmp_addr, server_addr)) { + if (close(server_fd) == -1 ) { + syslog(LOG_ERR, "Could not close socket."); + } + } + + server_fd = tmp_fd; + server_addr = tmp_addr; + + tree_destroy(tree); + tree = tmp_tree; + return 0; +} + + +void print_usage(void) +{ + printf("usage: dnbd2-dserver config-file\n"); +} + + +int daemon_init(void) +{ + pid_t pid = fork(); + + if (pid == -1) + return pid; + + if (pid != 0) { + /* We are the parent. */ + exit(0); + } + + /* We are the child. */ + setsid(); + chdir("/"); + umask(0); + + return 0; +} diff --git a/server/query.c b/server/query.c new file mode 100644 index 0000000..16f48b6 --- /dev/null +++ b/server/query.c @@ -0,0 +1,74 @@ +/* + * server/query.c + */ + + +#include <arpa/inet.h> +#include <inttypes.h> +#include <byteswap.h> +#include <syslog.h> +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include "dnbd2.h" +#include "query.h" +#include "tree.h" +#include "file.h" + + +int handle_query(dnbd2_data_request_t *request, + dnbd2_data_reply_t *reply, + void **tree) +{ + int fd, i; + node_t node1; + node_t *node2; + dataset_t ds; + off_t size, pos; + uint16_t cmd; + + /* Fetch the right fd for this vid/rid pair. */ + ds.vid = ntohs(request->vid); + ds.rid = ntohs(request->rid); + node1.ds = &ds; + node2 = tree_find(&node1, tree); + if (!node2) + return -1; + fd = node2->fd; + + cmd = ntohs(request->cmd); + switch (cmd) { + case CMD_GET_BLOCK: + reply->num = request->num; + pos = ntohll(request->num); + file_read(fd, reply->payload.data, DNBD2_BLOCK_SIZE, pos); + break; + + case CMD_GET_SIZE: + if (file_getsize(fd, &size) == -1) + return -1; + reply->num = htonll(size); + break; + + case CMD_GET_SERVERS: + /* Fetch a random block to deliver a more realistic RTT. */ + pos = 0; + if (!file_getsize(fd, &size)) + pos = (off_t) (size * (rand() / (RAND_MAX + 1.0))); + file_read(fd, reply->payload.data, DNBD2_BLOCK_SIZE, pos); + reply->num = htonll(node2->servers); + for (i=0 ; i<node2->servers ; i++) { + memcpy(&reply->payload.server[i], + &node2->server[i], + sizeof(dnbd2_server_t)); + } + break; + } + + reply->cmd = request->cmd; + reply->time = request->time; + reply->vid = request->vid; + reply->rid = request->rid; + + return 0; +} diff --git a/server/query.h b/server/query.h new file mode 100644 index 0000000..ecf3969 --- /dev/null +++ b/server/query.h @@ -0,0 +1,14 @@ +/* + * server/query.h + */ + + +/* + * Builds a @reply based on @request, using the datasets stored in + * @tree. + * + * Returns: 0 on success or -1 on failure. + */ +int handle_query(dnbd2_data_request_t *request, + dnbd2_data_reply_t *reply, + void **tree); diff --git a/server/tree.c b/server/tree.c new file mode 100644 index 0000000..3daf6df --- /dev/null +++ b/server/tree.c @@ -0,0 +1,67 @@ +/* + * server/tree.c + */ + + +#include <search.h> +#include <stdlib.h> +#include <inttypes.h> +#include "dnbd2.h" +#include "tree.h" + + +int compare_node(const void *node1, const void *node2) +{ + + dataset_t *ds1 = ((node_t *) node1)->ds; + dataset_t *ds2 = ((node_t *) node2)->ds; + + int diff = ds1->vid - ds2->vid; + if (diff != 0) + return diff; + + return ds1->rid - ds2->rid; +} + + +void destroy_node(void *node) +{ + free(((node_t *) node)->ds); + free((node_t *) node); +} + + +int tree_insert(node_t *data, void **tree) +{ + void *tmp = tsearch((void *)data, tree, compare_node); + if (!tmp) + return -1; + + node_t *ret = *(node_t **) tmp; + + /* Check if there is another item + in the tree with the same key. */ + if (ret != data) + return -1; + + return 0; +} + + +node_t *tree_find(node_t *data, void **tree) +{ + void *tmp = tfind((void *)data, tree, compare_node); + + if (!tmp) + return NULL; + + node_t *ret = *(node_t **) tmp; + + return ret; +} + + +void tree_destroy(void **tree) +{ + tdestroy(tree, &destroy_node); +} diff --git a/server/tree.h b/server/tree.h new file mode 100644 index 0000000..e6eea84 --- /dev/null +++ b/server/tree.h @@ -0,0 +1,37 @@ +/* + * server/tree.h - Mechanism to store Datasets in binary trees. + */ + + +/* + * This structure represents a Dataset along with its config file and + * file descriptor. This type of items will be stored in a binary + * tree. The key to this structure is the Dataset's Volume-ID and + * Release-ID (they must be unique in each server instance). + */ +typedef struct node { + dataset_t *ds; + char path[FILE_NAME_MAX]; + int fd; + int servers; + dnbd2_server_t server[ALT_SERVERS_MAX]; +} node_t; + + +/* + * Returns: 0 on success or -1 on failure. + */ +int tree_insert(node_t *data, void **tree); + + +/* + * Returns: Pointer to item on search-hit or NULL on search-miss. + */ +node_t *tree_find(node_t *data, void **tree); + + +/* + * Free all resources used by the tree. Useful when reloading + * datasets. + */ +void tree_destroy(void **tree); diff --git a/test-app/Makefile b/test-app/Makefile new file mode 100644 index 0000000..248a285 --- /dev/null +++ b/test-app/Makefile @@ -0,0 +1,17 @@ +# +# test-apps/Makefile +# + +CPPFLAGS = -I../include +CFLAGS = -Wall -O2 -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 +VPATH = ../include +APP = data-client + +all: $(APP) + +install: + cp $(APP) /usr/local/bin + +.PHONY: +clean: + $(RM) *.o *~ $(APP) diff --git a/test-app/data-client.c b/test-app/data-client.c new file mode 100644 index 0000000..da85b89 --- /dev/null +++ b/test-app/data-client.c @@ -0,0 +1,281 @@ +/* + * test-apps/data-client.c + */ + +#include <sys/socket.h> +#include <netinet/in.h> +#include <byteswap.h> +#include <arpa/inet.h> +#include <strings.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> +#include "dnbd2.h" + + +void print_usage(void); +ssize_t writen(int fd, const void *msg, size_t n); +ssize_t readn(int fd, void *msg, size_t n); +int dnbd2_data_request(dnbd2_server_t server, + dnbd2_data_request_t request, + dnbd2_data_reply_t *reply); + + +int main(int argc, char **argv) +{ + if (argc != 6) { + print_usage(); + return 0; + } + + dnbd2_data_request_t request; + dnbd2_data_reply_t reply; + + /* The time field is always echoed by the server. + This is an arbitrary value. */ + request.time = 28481; + + /* Read IP and Port */ + dnbd2_server_t server; + if (!inet_aton(argv[1], (struct in_addr *) &(server.ip))) { + fprintf(stderr, "Invalid IP\n"); + return -1; + } + if (sscanf(argv[2], "%hu", &(server.port)) != 1) { + fprintf(stderr, "Invalid Port\n"); + return -1; + } + server.port = htons(server.port); + + /* Read Volume-ID and Release-ID */ + if (sscanf(argv[3], "%hu", &(request.vid)) != 1) { + fprintf(stderr, "Invalid Volume-ID\n"); + return -1; + } + if (sscanf(argv[4], "%hu", &(request.rid)) != 1) { + fprintf(stderr, "Invalid Release-ID\n"); + return -1; + } + + /* Read command */ + char str[16]; + int cmd = 0; + if (sscanf(argv[5], "%s", str) == 1) { + if (!strcmp("getdata", str)) + cmd = 1; + + if (!strcmp("getsize", str)) + cmd = 2; + + if (!strcmp("getservers", str)) + cmd = 3; + } + if (!cmd) { + fprintf(stderr, "Invalid CMD\n"); + return -1; + } + + /* Complete and issue request(s) */ + int ret, i, rest; + off_t size, blocks; + switch (cmd) { + case 1: + /* Get Dataset size */ + request.cmd = CMD_GET_SIZE; + ret = dnbd2_data_request(server, request, &reply); + if (ret == -1) + goto out_nosize; + size = reply.num; + + /* size = blocks * DNBD2_BLOCK_SIZE + rest */ + blocks = size / DNBD2_BLOCK_SIZE; + rest = size % DNBD2_BLOCK_SIZE; + + /* Fetch "blocks * DNBD2_BLOCK_SIZE" bytes */ + request.cmd = CMD_GET_BLOCK; + for (i=0 ; i<blocks ; i++) { + request.num = i * DNBD2_BLOCK_SIZE; + ret = dnbd2_data_request(server, request, &reply); + if (ret == -1) + goto out_nodata; + write(STDOUT_FILENO, reply.payload.data, + DNBD2_BLOCK_SIZE); + } + + /* Fetch "rest" bytes */ + if (rest != 0) { + request.num = i*DNBD2_BLOCK_SIZE; + ret = dnbd2_data_request(server, request, &reply); + if (ret == -1) + goto out_nodata; + write(STDOUT_FILENO, reply.payload.data, rest); + } + break; + + case 2: + /* Get Dataset size */ + request.cmd = CMD_GET_SIZE; + ret = dnbd2_data_request(server, request, &reply); + if (ret == -1) + goto out_nosize; + printf("Dataset Size = %lld\n", reply.num); + break; + + case 3: + /* Get list of alternative servers */ + request.cmd = CMD_GET_SERVERS; + ret = dnbd2_data_request(server, request, &reply); + if (ret == -1) + goto out_noservers; + if (reply.num == 0) + printf("No alternative servers.\n"); + + for (i=0 ; i<reply.num ; i++) { + dnbd2_server_t server; + struct in_addr addr; + memcpy(&server, + &reply.payload.server[i], + sizeof(dnbd2_server_t)); + memcpy(&addr, + &server.ip, + sizeof(uint32_t)); + printf("%s:%hu\n", inet_ntoa(addr), ntohs(server.port)); + } + break; + } + + + return 0; + + out_nodata: + fprintf(stderr, "Could not get data.\n"); + return -1; + + out_nosize: + fprintf(stderr, "Could not get Dataset size.\n"); + return -1; + + out_noservers: + fprintf(stderr, "Could not get list of alternative servers.\n"); + return -1; +} + + +int dnbd2_data_request(dnbd2_server_t server, + dnbd2_data_request_t request, + dnbd2_data_reply_t *reply) +{ + struct sockaddr_in server_addr; + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_addr.s_addr = server.ip; + server_addr.sin_port = server.port; + server_addr.sin_family = AF_INET; + + int sockfd = socket(PF_INET, SOCK_DGRAM, 0); + if (sockfd == -1) { + fprintf(stderr, "Could not create socket.\n"); + return -1; + } + if (connect(sockfd, (struct sockaddr *) &server_addr, + sizeof(server_addr)) == -1) { + fprintf(stderr,"Could not connect UDP socket."); + return -1; + } + + request.cmd = htons(request.cmd); + request.time = htons(request.time); + request.vid = htons(request.vid); + request.rid = htons(request.rid); + request.num = htonll(request.num); + + /* Send request. */ + ssize_t n = writen(sockfd, &request, sizeof(request)); + if (n == -1) { + fprintf(stderr, "Error sending request.\n"); + return -1; + } + if (n != sizeof(request)) { + fprintf(stderr, "Sent wrong request size.\n"); + return -1; + } + + /* Receive reply. */ + n = readn(sockfd, reply, sizeof(*reply)); + if (n == -1) { + fprintf(stderr, "Error receiving reply.\n"); + return -1; + } + if (n != sizeof(*reply)) { + fprintf(stderr, "Got wrong reply size.\n"); + return -1; + } + close(sockfd); + + reply->cmd = ntohs(reply->cmd); + reply->time = ntohs(reply->time); + reply->vid = ntohs(reply->vid); + reply->rid = ntohs(reply->rid); + reply->num = ntohll(reply->num); + + return 0; +} + + +ssize_t writen(int fd, const void *msg, size_t n) { + + size_t nleft; + ssize_t nwritten; + const char *ptr; + + ptr = msg; + nleft = n; + + while (nleft > 0) { + if ((nwritten = write(fd, ptr, nleft)) <= 0) { + if (errno == EINTR) + nwritten = 0; + else + return -1; + } + nleft -= nwritten; + ptr += nwritten; + } + + return (n); +} + + +ssize_t readn(int fd, void *msg, size_t n) { + + size_t nleft; + ssize_t nread; + char *ptr; + + ptr = msg; + nleft = n; + + while (nleft > 0) { + if ((nread = read(fd, ptr, nleft)) < 0) { + if (errno == EINTR) + nread = 0; + else + return -1; + } else if (nread == 0) { + break; + } + nleft -= nread; + ptr += nread; + } + + return n - nleft; +} + + +void print_usage(void) { + printf("usage: dnbd2-data IP Port Volume-ID Release-ID " + "(getsize|getdata|getservers)\n"); + printf(" getsize: Print the Dataset's size.\n"); + printf(" getdata: Write the Dataset's contents to stdout.\n"); + printf(" getsize: Print the list of alternative Servers.\n"); +} |