/* * kernel/servers.c */ #include "dnbd2.h" #include "servers.h" #include "core.h" #include "misc.h" /* Only use RTTs smaller than RTT_MAX for statistics. */ #define RTT_MAX HZ/4 /* * Find the next configured server which is not active. */ struct srv_info *next_server(dnbd2_device_t *dev) { int i; struct srv_info *srv_info, *next_srv = NULL; for_each_server(i) { srv_info = &dev->servers[i]; if (srv_info->sock && srv_info != dev->active_server) { next_srv = srv_info; break; } } return next_srv; } /* * Find the server with smallest SRTT. */ struct srv_info *fastest_server(uint16_t *srtt, dnbd2_device_t *dev) { int i; struct srv_info *srv_info, *alt_srv = NULL; unsigned long min = RTT_MAX << SRTT_SHIFT; for_each_server(i) { srv_info = &dev->servers[i]; if (srv_info->sock && srv_info->srtt < min && srv_info->min < RTT_MAX) { min = srv_info->srtt; alt_srv = srv_info; } } *srtt = min >> SRTT_SHIFT; return alt_srv; } /* * This function can be enqueued in a workqueue. It removes srv_info * from the list of servers. */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,20)) void del_server_work(void *data) #else void del_server_work(struct work_struct *work) #endif { #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,20)) struct srv_info *srv_info = data; #else struct srv_info *srv_info = container_of(work, struct srv_info, work); #endif dnbd2_device_t *dev = srv_info->dev; down(&dev->servers_mutex); if (!srv_info->sock || dev->active_server == srv_info) { up(&dev->servers_mutex); return; } del_server(srv_info); up(&dev->servers_mutex); return; } /* * This function can be enqueued in a workqueue. Read comment on * schedule_activate_fastest in servers.h */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,20)) void activate_fastest_work(void *data) #else void activate_fastest_work(struct work_struct *work) #endif { #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,20)) dnbd2_device_t *dev = data; #else dnbd2_device_t *dev = container_of(work, dnbd2_device_t, work); #endif struct srv_info *alt_srv, *next_srv; uint16_t min, srtt; int newsrv = 0; long unsigned delta; down(&dev->servers_mutex); if (!dev->active_server) goto out; /* Detect if dev->active_server is stalled. */ delta = (long)jiffies - (long)dev->active_server->last_reply; if (dev->pending_reqs && delta > TIMEOUT_STALLED) { next_srv = next_server(dev); if (!next_srv) { start_emergency(dev); goto out; } alt_srv = dev->active_server; dev->active_server = next_srv; del_server(alt_srv); } /* Switch to another server if requirements met. */ srtt = dev->active_server->srtt >> SRTT_SHIFT; alt_srv = fastest_server(&min, dev); if (!alt_srv || alt_srv == dev->active_server) goto out; if (dev->to_percent) { if (100 * (srtt - min) < dev->to_percent * srtt) goto out; newsrv = 1; } if (dev->to_jiffies) { if (min + dev->to_jiffies > srtt) goto out; newsrv = 1; } if (newsrv) dev->active_server = alt_srv; out: up(&dev->servers_mutex); } int start_rx_loop(struct srv_info *srv_info) { srv_info->dnbd_thread_task = kthread_run(dnbd2_rx_loop, srv_info, "DNBD2"); if (IS_ERR(srv_info->dnbd_thread_task)){ srv_info->rx_id = 0; return -1; } //wait_for_completion(&srv_info->rx_start); return 0; } void stop_rx_loop(struct srv_info *srv_info) { if (!srv_info->rx_id) return; #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,27) //kill_proc_info(SIGKILL, 1, srv_info->rx_id); kthread_stop(srv_info->dnbd_thread_task); #else kill_proc(srv_info->rx_id, SIGKILL, 1); #endif //wait_for_completion(&srv_info->rx_stop); srv_info->rx_id = 0; } /******************************************************/ /* For the next functions see servers.h for comments. */ /******************************************************/ int add_server(dnbd2_server_t server, struct srv_info *srv_info) { struct sockaddr_in addr; struct socket *sock; if (!server.ip || !server.port) return -1; if (sock_create(PF_INET,SOCK_DGRAM, IPPROTO_UDP, &sock) < 0) { p("Could not create socket.\n"); return -1; } addr.sin_family = AF_INET; addr.sin_addr.s_addr = server.ip; addr.sin_port = server.port; if (sock->ops->connect(sock, (struct sockaddr *)&addr, sizeof(addr), 0)) { p("Could not connect to socket.\n"); goto out; } srv_info->sock = sock; srv_info->ip = server.ip; srv_info->port = server.port; srv_info->min = RTT_MAX; if (start_rx_loop(srv_info)) { p("Could not start rx_loop\n"); goto out; } return 0; out: del_server(srv_info); return -1; } void del_server(struct srv_info *srv_info) { stop_rx_loop(srv_info); if (srv_info->sock) sock_release(srv_info->sock); srv_info->sock = NULL; srv_info->ip = 0; srv_info->port = 0; srv_info->srtt = 0; srv_info->min = 0; srv_info->max = 0; srv_info->srtt = 0; srv_info->retries = 0; srv_info->last_reply = 0; } void try_add_server(dnbd2_server_t server, dnbd2_device_t *dev) { int i; for_each_server(i) if (dev->servers[i].sock && dev->servers[i].ip == server.ip && dev->servers[i].port == server.port) return; for_each_server(i) if (!dev->servers[i].sock) break; if (i == ALT_SERVERS_MAX) return; add_server(server, &dev->servers[i]); } sector_t srv_get_capacity(struct srv_info *srv_info) { struct request *req; struct req_info *info; dnbd2_device_t *dev = srv_info->dev; sector_t capacity; int ret; info = kmalloc(sizeof(struct req_info), GFP_KERNEL); if (!info) return 0; req = kmalloc(sizeof(struct request), GFP_KERNEL); if (!req) { kfree(info); return 0; } info->cmd = CMD_GET_SIZE; info->cnt = 0; info->time = jiffies; info->dst = srv_info; info->last_dst = srv_info; req->special = info; req->__sector = 0; INIT_LIST_HEAD(&req->queuelist); /* Enqueue the request for sending. */ spin_lock_bh(&dev->send_queue_lock); list_add_tail(&req->queuelist, &dev->send_queue); spin_unlock_bh(&dev->send_queue_lock); /* Wake up sender function. */ wake_up_interruptible(&dev->sender_wq); /* If we don't get an answer in 4 seconds we give up. */ ret = wait_event_timeout(srv_info->wq, srv_info->capacity, 4*HZ); capacity = srv_info->capacity; srv_info->capacity = 0; return ret ? capacity : 0; } void update_rtt(uint16_t rtt, struct srv_info *srv_info, uint16_t cmd) { if (rtt == 0) rtt = 1; if (rtt > RTT_MAX) return; if (rtt < srv_info->min) srv_info->min = rtt; if (rtt > srv_info->max) srv_info->max = rtt; if (!srv_info->srtt) { srv_info->srtt = rtt << SRTT_SHIFT; return; } switch (cmd) { case CMD_GET_BLOCK: srv_info->srtt = SRTT_BETA * srv_info->srtt; srv_info->srtt += (SRTT_BETA_COMP * ((int)rtt)) << SRTT_SHIFT; srv_info->srtt /= SRTT_BETA_BASE; break; case CMD_GET_SIZE: case CMD_GET_SERVERS: srv_info->srtt = rtt << SRTT_SHIFT; break; } } void enqueue_hb(struct srv_info *srv_info) { struct request *req; struct req_info *info; dnbd2_device_t *dev = srv_info->dev; if (!srv_info->sock) return; info = kmalloc(sizeof(struct req_info), GFP_ATOMIC); if (!info) return; req = kmalloc(sizeof(struct request), GFP_ATOMIC); if (!req) { kfree(info); return; } info->cmd = CMD_GET_SERVERS; info->cnt = 0; info->time = jiffies; info->dst = srv_info; info->dst = srv_info; info->last_dst = srv_info; req->special = info; req->__sector = 0; INIT_LIST_HEAD(&req->queuelist); /* Enqueue the request for sending. */ spin_lock_bh(&dev->send_queue_lock); list_add_tail(&req->queuelist, &dev->send_queue); spin_unlock_bh(&dev->send_queue_lock); } void schedule_del_server(struct srv_info *srv_info) { #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,20)) PREPARE_WORK(&srv_info->work, del_server_work, srv_info); #else PREPARE_WORK(&srv_info->work, del_server_work); #endif schedule_work(&srv_info->work); } void schedule_activate_fastest(dnbd2_device_t *dev) { #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,20)) PREPARE_WORK(&dev->work, activate_fastest_work, dev); #else PREPARE_WORK(&dev->work, activate_fastest_work); #endif schedule_work(&dev->work); }