diff options
Diffstat (limited to 'kernel/servers.c')
-rw-r--r-- | kernel/servers.c | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/kernel/servers.c b/kernel/servers.c new file mode 100644 index 0000000..7d5c229 --- /dev/null +++ b/kernel/servers.c @@ -0,0 +1,355 @@ +/* + * kernel/servers.c + */ + + +#include "dnbd2.h" +#include "servers.h" +#include "core.h" +#include "misc.h" + + +/* Only use RTTs smaller than RTT_MAX for statistics. */ +#define RTT_MAX HZ/4 + + +/* + * Find the next configured server which is not active. + */ +struct srv_info *next_server(dnbd2_device_t *dev) +{ + int i; + struct srv_info *srv_info, *next_srv = NULL; + + for_each_server(i) { + srv_info = &dev->servers[i]; + if (srv_info->sock && srv_info != dev->active_server) { + next_srv = srv_info; + break; + } + } + return next_srv; +} + + +/* + * Find the server with smallest SRTT. + */ +struct srv_info *fastest_server(uint16_t *srtt, dnbd2_device_t *dev) +{ + int i; + struct srv_info *srv_info, *alt_srv = NULL; + unsigned long min = RTT_MAX << SRTT_SHIFT; + + for_each_server(i) { + srv_info = &dev->servers[i]; + if (srv_info->sock && + srv_info->srtt < min && + srv_info->min < RTT_MAX) { + min = srv_info->srtt; + alt_srv = srv_info; + } + } + + *srtt = min >> SRTT_SHIFT; + return alt_srv; +} + + +/* + * This function can be enqueued in a workqueue. It removes srv_info + * from the list of servers. + */ +void del_server_work(struct work_struct *work) +{ + struct srv_info *srv_info = container_of(work, struct srv_info, work); + dnbd2_device_t *dev = srv_info->dev; + + down(&dev->servers_mutex); + if (!srv_info->sock || dev->active_server == srv_info) { + up(&dev->servers_mutex); + return; + } + del_server(srv_info); + up(&dev->servers_mutex); + return; +} + + +/* + * This function can be enqueued in a workqueue. Read comment on + * schedule_activate_fastest in servers.h + */ +void activate_fastest_work(struct work_struct *work) +{ + dnbd2_device_t *dev = container_of(work, dnbd2_device_t, work); + struct srv_info *alt_srv, *next_srv; + uint16_t min, srtt; + int newsrv = 0; + long unsigned delta; + + down(&dev->servers_mutex); + if (!dev->active_server) + goto out; + + /* Detect if dev->active_server is stalled. */ + delta = (long)jiffies - (long)dev->active_server->last_reply; + if (dev->pending_reqs && delta > TIMEOUT_STALLED) { + next_srv = next_server(dev); + if (!next_srv) { + start_emergency(dev); + goto out; + } + alt_srv = dev->active_server; + dev->active_server = next_srv; + del_server(alt_srv); + } + + /* Switch to another server if requirements met. */ + srtt = dev->active_server->srtt >> SRTT_SHIFT; + alt_srv = fastest_server(&min, dev); + if (!alt_srv || alt_srv == dev->active_server) + goto out; + if (dev->to_percent) { + if (100 * (srtt - min) < dev->to_percent * srtt) + goto out; + newsrv = 1; + } + if (dev->to_jiffies) { + if (min + dev->to_jiffies > srtt) + goto out; + newsrv = 1; + } + if (newsrv) + dev->active_server = alt_srv; + + out: + up(&dev->servers_mutex); +} + + +int start_rx_loop(struct srv_info *srv_info) +{ + srv_info->rx_id = kernel_thread(dnbd2_rx_loop, srv_info, CLONE_KERNEL); + if (srv_info->rx_id < 0) { + srv_info->rx_id = 0; + return -1; + } + wait_for_completion(&srv_info->rx_start); + return 0; +} + + +void stop_rx_loop(struct srv_info *srv_info) +{ + if (!srv_info->rx_id) + return; + kill_proc(srv_info->rx_id, SIGKILL, 1); + wait_for_completion(&srv_info->rx_stop); + srv_info->rx_id = 0; +} + + +/******************************************************/ +/* For the next functions see servers.h for comments. */ +/******************************************************/ + + +int add_server(dnbd2_server_t server, struct srv_info *srv_info) +{ + struct sockaddr_in addr; + struct socket *sock; + + if (!server.ip || !server.port) + return -1; + + if (sock_create(PF_INET,SOCK_DGRAM, IPPROTO_UDP, &sock) < 0) { + p("Could not create socket.\n"); + return -1; + } + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = server.ip; + addr.sin_port = server.port; + if (sock->ops->connect(sock, + (struct sockaddr *)&addr, + sizeof(addr), 0)) { + p("Could not connect to socket.\n"); + goto out; + } + + srv_info->sock = sock; + srv_info->ip = server.ip; + srv_info->port = server.port; + srv_info->min = RTT_MAX; + + if (start_rx_loop(srv_info)) { + p("Could not start rx_loop\n"); + goto out; + } + + return 0; + out: + del_server(srv_info); + return -1; +} + + +void del_server(struct srv_info *srv_info) +{ + stop_rx_loop(srv_info); + if (srv_info->sock) + sock_release(srv_info->sock); + srv_info->sock = NULL; + srv_info->ip = 0; + srv_info->port = 0; + srv_info->srtt = 0; + srv_info->min = 0; + srv_info->max = 0; + srv_info->srtt = 0; + srv_info->retries = 0; + srv_info->last_reply = 0; +} + + +void try_add_server(dnbd2_server_t server, dnbd2_device_t *dev) +{ + int i; + + for_each_server(i) + if (dev->servers[i].sock && + dev->servers[i].ip == server.ip && + dev->servers[i].port == server.port) + return; + + for_each_server(i) + if (!dev->servers[i].sock) + break; + + if (i == ALT_SERVERS_MAX) + return; + + add_server(server, &dev->servers[i]); +} + + +sector_t srv_get_capacity(struct srv_info *srv_info) +{ + struct request *req; + struct req_info *info; + dnbd2_device_t *dev = srv_info->dev; + sector_t capacity; + int ret; + + info = kmalloc(sizeof(struct req_info), GFP_KERNEL); + if (!info) + return 0; + req = kmalloc(sizeof(struct request), GFP_KERNEL); + if (!req) { + kfree(info); + return 0; + } + info->cmd = CMD_GET_SIZE; + info->cnt = 0; + info->time = jiffies; + info->dst = srv_info; + info->last_dst = srv_info; + req->special = info; + req->sector = 0; + INIT_LIST_HEAD(&req->queuelist); + + /* Enqueue the request for sending. */ + spin_lock_bh(&dev->send_queue_lock); + list_add_tail(&req->queuelist, &dev->send_queue); + spin_unlock_bh(&dev->send_queue_lock); + + /* Wake up sender function. */ + wake_up_interruptible(&dev->sender_wq); + + /* If we don't get an answer in 4 seconds we give up. */ + ret = wait_event_timeout(srv_info->wq, srv_info->capacity, 4*HZ); + capacity = srv_info->capacity; + srv_info->capacity = 0; + + return ret ? capacity : 0; +} + + +void update_rtt(uint16_t rtt, struct srv_info *srv_info, uint16_t cmd) +{ + if (rtt == 0) + rtt = 1; + if (rtt > RTT_MAX) + return; + + if (rtt < srv_info->min) + srv_info->min = rtt; + if (rtt > srv_info->max) + srv_info->max = rtt; + + if (!srv_info->srtt) { + srv_info->srtt = rtt << SRTT_SHIFT; + return; + } + + switch (cmd) { + case CMD_GET_BLOCK: + srv_info->srtt = SRTT_BETA * srv_info->srtt; + srv_info->srtt += (SRTT_BETA_COMP * ((int)rtt)) << SRTT_SHIFT; + srv_info->srtt /= SRTT_BETA_BASE; + break; + + case CMD_GET_SIZE: + case CMD_GET_SERVERS: + srv_info->srtt = rtt << SRTT_SHIFT; + break; + } +} + + +void enqueue_hb(struct srv_info *srv_info) +{ + struct request *req; + struct req_info *info; + dnbd2_device_t *dev = srv_info->dev; + + if (!srv_info->sock) + return; + info = kmalloc(sizeof(struct req_info), GFP_ATOMIC); + if (!info) + return; + req = kmalloc(sizeof(struct request), GFP_ATOMIC); + if (!req) { + kfree(info); + return; + } + info->cmd = CMD_GET_SERVERS; + info->cnt = 0; + info->time = jiffies; + info->dst = srv_info; + info->dst = srv_info; + info->last_dst = srv_info; + req->special = info; + req->sector = 0; + INIT_LIST_HEAD(&req->queuelist); + + /* Enqueue the request for sending. */ + spin_lock_bh(&dev->send_queue_lock); + list_add_tail(&req->queuelist, &dev->send_queue); + spin_unlock_bh(&dev->send_queue_lock); +} + + +void schedule_del_server(struct srv_info *srv_info) +{ + PREPARE_WORK(&srv_info->work, del_server_work); //, srv_info); + /* Change in /<linuxheaders>/include/linux/workqueue.h */ + schedule_work(&srv_info->work); +} + + +void schedule_activate_fastest(dnbd2_device_t *dev) +{ + PREPARE_WORK(&dev->work, activate_fastest_work); //, dev); + /* Change in /<linuxheaders>/include/linux/workqueue.h */ + schedule_work(&dev->work); +} |