diff options
Diffstat (limited to 'src/server/job.c')
-rw-r--r-- | src/server/job.c | 111 |
1 files changed, 104 insertions, 7 deletions
diff --git a/src/server/job.c b/src/server/job.c index 4a30cd1..cf45681 100644 --- a/src/server/job.c +++ b/src/server/job.c @@ -1,6 +1,8 @@ #include "job.h" #include "saveload.h" +#include "helper.h" #include "memlog.h" +#include "ipc.h" #include <stdint.h> #include <stdio.h> @@ -17,6 +19,7 @@ #include <libxml/parser.h> #include <libxml/xpath.h> #include "xmlutil.h" +#include "../config.h" #define DEV_STRLEN 12 // INCLUDING NULLCHAR (increase to 13 if you need more than 100 (0-99) devices) #define MAX_NUM_DEVICES_TO_CHECK 100 @@ -43,6 +46,7 @@ static char keep_running = TRUE; // Private functions static char *get_free_device(); static void query_servers(); +static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server); static void update_image_atimes(time_t now); // @@ -130,7 +134,7 @@ static void query_servers() struct timeval client_timeout, connect_timeout; client_timeout.tv_sec = 0; client_timeout.tv_usec = 500 * 1000; - connect_timeout.tv_sec = 2; + connect_timeout.tv_sec = 1; connect_timeout.tv_usec = 0; int client_sock, num; dnbd3_trusted_server_t *server; @@ -138,6 +142,7 @@ static void query_servers() struct sockaddr_in addr4; for (num = 0;; ++num) { + char *xmlbuffer = NULL; // "Iterate" this way to prevent holding the lock for a long time, although it is possible to skip a server this way... pthread_spin_lock(&_spinlock); server = g_slist_nth_data(_trusted_servers, num); @@ -164,25 +169,117 @@ static void query_servers() memset(&addr4, 0, sizeof(addr4)); addr4.sin_family = AF_INET; memcpy(&addr4.sin_addr.s_addr, host.addr, 4); - addr4.sin_port = host.port; + addr4.sin_port = htons(ntohs(host.port) + 1); // Connect to server setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &connect_timeout, sizeof(connect_timeout)); setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &connect_timeout, sizeof(connect_timeout)); if (connect(client_sock, (struct sockaddr *)&addr4, sizeof(addr4)) < 0) { printf("[DEBUG] Could not connect to other server...\n"); - close(client_sock); // TODO: Remove from alt server list if failed too often - continue; + goto communication_error; } // Apply read/write timeout setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &client_timeout, sizeof(client_timeout)); setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &client_timeout, sizeof(client_timeout)); // - // TODO: Send and receive info from server - // + // Send and receive info from server + // Send message + dnbd3_ipc_t header; + header.cmd = htonl(IPC_INFO); + header.size = 0; + header.error = 0; + send(client_sock, (char *)&header, sizeof(header), 0); + if (!recv_data(client_sock, &header, sizeof(header))) + { + printf("[DEBUG] Could not get status from other server...\n"); + goto communication_error; + } + header.cmd = ntohl(header.cmd); + header.size = ntohl(header.size); + header.error = ntohl(header.error); + if (header.cmd != IPC_INFO || header.error != 0) + { + printf("[DEBUG] Error. Reply from other server was cmd:%d, error:%d\n", (int)header.cmd, (int)header.error); + goto communication_error; + } + if (header.size > MAX_IPC_PAYLOAD) + { + memlogf("[WARNING] XML payload from other server exceeds MAX_IPC_PAYLOAD (%d > %d)", (int)header.size, (int)MAX_IPC_PAYLOAD); + goto communication_error; + } + xmlbuffer = malloc(header.size); + if (!recv_data(client_sock, xmlbuffer, header.size)) + { + printf("[DEBUG] Error reading XML payload from other server.\n"); + goto communication_error; + } close(client_sock); // - // TODO: Process data, update server info, add/remove this server as alt server for images, replicate images, etc. + // Process data, update server info, add/remove this server as alt server for images, replicate images, etc. + xmlDocPtr doc = xmlReadMemory(xmlbuffer, header.size, "noname.xml", NULL, 0); + free(xmlbuffer); + xmlbuffer = NULL; + if (doc == NULL) + { + memlogf("[WARNING] Could not parse XML data received by other server."); + goto communication_error; + } + // Data seems ok + // ... + xmlFreeDoc(doc); + // + continue; +communication_error: + close(client_sock); + free(xmlbuffer); + pthread_spin_lock(&_spinlock); + if (g_slist_find(_trusted_servers, server)) + { + if (server->unreachable < 10 && ++server->unreachable == 5) + dnbd3_remove_alt_server(server); + } + pthread_spin_unlock(&_spinlock); + } +} + +/** + * !! Call this while holding the lock !! + */ +static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server) +{ + GSList *iti, *itc; + int i; + dnbd3_reply_t header; + header.cmd = CMD_GET_SERVERS; + header.magic = dnbd3_packet_magic; + header.size = sizeof(dnbd3_server_entry_t); + fixup_reply(header); + // Iterate over all images + for (iti = _dnbd3_images; iti; iti = iti->next) + { + dnbd3_image_t *const image = iti->data; + // Check if any alt_server for that image is the server to be removed + for (i = 0; i < NUMBER_SERVERS; ++i) + { + if (is_same_server(&server->host, &image->servers[i].host)) + { + // Remove server from that image and tell every connected client about it + image->servers[i].failures = 1; + for (itc = _dnbd3_clients; itc; itc = itc->next) + { + dnbd3_client_t *const client = itc->data; + if (client->image == image) + { + // Don't send message directly as the lock is being held; instead, enqueue it + NEW_BINSTRING(message, sizeof(header) + sizeof(image->servers[i])); + memcpy(message->data, &header, sizeof(header)); + memcpy(message->data + sizeof(header), &image->servers[i], sizeof(image->servers[i])); + client->sendqueue = g_slist_append(client->sendqueue, message); + } + } + image->servers[i].host.type = 0; + } + } } } |