From bc4e381484024237df0b04bb667f742fe4846b35 Mon Sep 17 00:00:00 2001 From: sr Date: Tue, 4 Sep 2012 19:22:57 +0200 Subject: [SERVER] More work towards automatic server discovery and querying --- src/kernel/net.c | 2 +- src/server/helper.h | 68 +++++++++++++++++++++++++++++-- src/server/ipc.c | 79 +++++------------------------------ src/server/job.c | 111 ++++++++++++++++++++++++++++++++++++++++++++++---- src/server/net.c | 19 ++++++++- src/server/saveload.c | 2 +- src/server/server.c | 16 +++++++- src/server/server.h | 15 ++++++- 8 files changed, 228 insertions(+), 84 deletions(-) diff --git a/src/kernel/net.c b/src/kernel/net.c index f437464..323c59c 100644 --- a/src/kernel/net.c +++ b/src/kernel/net.c @@ -451,7 +451,7 @@ int dnbd3_net_discover(void *data) spin_lock_irqsave(&dev->blk_lock, irqflags); for (i = 0; i < dev->new_servers_num; ++i) { - if (dev->new_servers[i].host.type != AF_INET) // Invalid entry.. (Add IPv6) + if (dev->new_servers[i].host.type != AF_INET) // Invalid entry.. (Add IPv6 someday) continue; alt_server = get_existing_server(&dev->new_servers[i], dev); if (alt_server != NULL) // Server already known diff --git a/src/server/helper.h b/src/server/helper.h index e787c42..fdcc3b5 100644 --- a/src/server/helper.h +++ b/src/server/helper.h @@ -4,6 +4,8 @@ #include "server.h" #include #include +#include +#include char parse_address(char *string, dnbd3_host_t *host); char host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen); @@ -11,11 +13,69 @@ char is_valid_namespace(char *namespace); char is_valid_imagename(char *namespace); void strtolower(char *string); -static inline int is_same_server(const dnbd3_trusted_server_t *const a, const dnbd3_trusted_server_t *const b) +static inline int is_same_server(const dnbd3_host_t *const a, const dnbd3_host_t *const b) { - return (a->host.type == b->host.type) - && (a->host.port == b->host.port) - && (0 == memcmp(a->host.addr, b->host.addr, (a->host.type == AF_INET ? 4 : 16))); + return (a->type == b->type) + && (a->port == b->port) + && (0 == memcmp(a->addr, b->addr, (a->type == AF_INET ? 4 : 16))); +} + +/** + * Send message to client, return !=0 on success, 0 on failure + */ +static inline int send_data(int client_sock, void *data_in, int len) +{ + if (len <= 0) // Nothing to send + return 1; + char *data = data_in; // Needed for pointer arithmetic + int ret, i; + for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout) + { + ret = send(client_sock, data, len, 0); + if (ret == 0) // Connection closed + return 0; + if (ret < 0) + { + if (errno != EAGAIN) // Some unexpected error + return 0; + usleep(1000); // 1ms + continue; + } + len -= ret; + if (len <= 0) // Sent everything + return 1; + data += ret; // move target buffer pointer + } + return 0; +} + +/** + * Receive data from client, return !=0 on success, 0 on failure + */ +static inline int recv_data(int client_sock, void *buffer_out, int len) +{ + if (len <= 0) // Nothing to receive + return 1; + char *data = buffer_out; // Needed for pointer arithmetic + int ret, i; + for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout) + { + ret = recv(client_sock, data, len, MSG_WAITALL); + if (ret == 0) // Connection closed + return 0; + if (ret < 0) + { + if (errno != EAGAIN) // Some unexpected error + return 0; + usleep(1000); // 1ms + continue; + } + len -= ret; + if (len <= 0) // Received everything + return 1; + data += ret; // move target buffer pointer + } + return 0; } // one byte in the map covers 8 4kib blocks, so 32kib per byte diff --git a/src/server/ipc.c b/src/server/ipc.c index b93844a..0dbf46f 100644 --- a/src/server/ipc.c +++ b/src/server/ipc.c @@ -58,8 +58,6 @@ static char *payload = NULL; static int ipc_receive(int client_sock); static int get_highest_fd(GSList *sockets); -static int send_reply(int client_sock, void *data_in, int len); -static int recv_data(int client_sock, void *buffer_out, int len); static int is_password_correct(xmlDocPtr doc); static int get_terminal_width(); @@ -299,64 +297,6 @@ void dnbd3_ipc_shutdown() server_sock = -1; } -/** - * Send message to client, return !=0 on success, 0 on failure - */ -static int send_reply(int client_sock, void *data_in, int len) -{ - if (len <= 0) // Nothing to send - return 1; - char *data = data_in; // Needed for pointer arithmetic - int ret, i; - for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout) - { - ret = send(client_sock, data, len, 0); - if (ret == 0) // Connection closed - return 0; - if (ret < 0) - { - if (errno != EAGAIN) // Some unexpected error - return 0; - usleep(1000); // 1ms - continue; - } - len -= ret; - if (len <= 0) // Sent everything - return 1; - data += ret; // move target buffer pointer - } - return 0; -} - -/** - * Receive data from client, return !=0 on success, 0 on failure - */ -static int recv_data(int client_sock, void *buffer_out, int len) -{ - if (len <= 0) // Nothing to receive - return 1; - char *data = buffer_out; // Needed for pointer arithmetic - int ret, i; - for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout) - { - ret = recv(client_sock, data, len, MSG_WAITALL); - if (ret == 0) // Connection closed - return 0; - if (ret < 0) - { - if (errno != EAGAIN) // Some unexpected error - return 0; - usleep(1000); // 1ms - continue; - } - len -= ret; - if (len <= 0) // Received everything - return 1; - data += ret; // move target buffer pointer - } - return 0; -} - /** * Returns !=0 if send/recv successful, 0 on any kind of network failure */ @@ -404,7 +344,7 @@ static int ipc_receive(int client_sock) memlogf("[INFO] Server shutdown by IPC request"); header.size = ntohl(0); header.error = ntohl(0); - return_value = send_reply(client_sock, &header, sizeof(header)); + return_value = send_data(client_sock, &header, sizeof(header)); dnbd3_cleanup(); break; @@ -419,6 +359,8 @@ static int ipc_receive(int client_sock) goto get_info_reply_cleanup; xmlDocSetRootElement(docReply, root_node); + xmlNewChild(root_node, NULL, BAD_CAST "namespace", BAD_CAST _local_namespace); + // Images parent_node = xmlNewNode(NULL, BAD_CAST "images"); if (parent_node == NULL) @@ -496,7 +438,7 @@ static int ipc_receive(int client_sock) goto get_info_reply_cleanup; host_to_string(&server->host, strbuffer, STRBUFLEN); xmlNewProp(tmp_node, BAD_CAST "ip", BAD_CAST strbuffer); - sprintf(strbuffer, "%d", (int)server->host.port); + sprintf(strbuffer, "%d", (int)ntohs(server->host.port)); xmlNewProp(tmp_node, BAD_CAST "port", BAD_CAST strbuffer); if (server->comment) xmlNewProp(tmp_node, BAD_CAST "comment", BAD_CAST server->comment); @@ -540,9 +482,9 @@ get_info_reply_cleanup: if (locked) pthread_spin_unlock(&_spinlock); // Send reply - return_value = send_reply(client_sock, &header, sizeof(header)); + return_value = send_data(client_sock, &header, sizeof(header)); if (return_value && xmlbuff) - return_value = send_reply(client_sock, xmlbuff, buffersize); + return_value = send_data(client_sock, xmlbuff, buffersize); // Cleanup xmlFree(xmlbuff); free(log); @@ -554,7 +496,7 @@ get_info_reply_cleanup: { header.size = htonl(0); header.error = htonl(ERROR_MISSING_ARGUMENT); - return_value = send_reply(client_sock, &header, sizeof(header)); + return_value = send_data(client_sock, &header, sizeof(header)); break; } docRequest = xmlReadMemory(payload, header.size, "noname.xml", NULL, 0); @@ -565,7 +507,7 @@ get_info_reply_cleanup: { header.error = htonl(ERROR_WRONG_PASSWORD); header.size = htonl(0); - return_value = send_reply(client_sock, &header, sizeof(header)); + return_value = send_data(client_sock, &header, sizeof(header)); break; } @@ -603,14 +545,14 @@ get_info_reply_cleanup: header.error = htonl(ERROR_INVALID_XML); header.size = htonl(0); - return_value = send_reply(client_sock, &header, sizeof(header)); + return_value = send_data(client_sock, &header, sizeof(header)); break; default: memlogf("[ERROR] Unknown IPC command: %u", (unsigned int)header.cmd); header.size = htonl(0); header.error = htonl(ERROR_UNKNOWN_COMMAND); - return_value = send_reply(client_sock, &header, sizeof(header)); + return_value = send_data(client_sock, &header, sizeof(header)); break; } @@ -690,6 +632,7 @@ void dnbd3_ipc_send(int cmd) { char *buf = malloc(header.size + 1); size = recv(client_sock, buf, header.size, MSG_WAITALL); + printf("\n%s\n\n", buf); xmlDocPtr doc = xmlReadMemory(buf, size, "noname.xml", NULL, 0); buf[header.size] = 0; diff --git a/src/server/job.c b/src/server/job.c index 4a30cd1..cf45681 100644 --- a/src/server/job.c +++ b/src/server/job.c @@ -1,6 +1,8 @@ #include "job.h" #include "saveload.h" +#include "helper.h" #include "memlog.h" +#include "ipc.h" #include #include @@ -17,6 +19,7 @@ #include #include #include "xmlutil.h" +#include "../config.h" #define DEV_STRLEN 12 // INCLUDING NULLCHAR (increase to 13 if you need more than 100 (0-99) devices) #define MAX_NUM_DEVICES_TO_CHECK 100 @@ -43,6 +46,7 @@ static char keep_running = TRUE; // Private functions static char *get_free_device(); static void query_servers(); +static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server); static void update_image_atimes(time_t now); // @@ -130,7 +134,7 @@ static void query_servers() struct timeval client_timeout, connect_timeout; client_timeout.tv_sec = 0; client_timeout.tv_usec = 500 * 1000; - connect_timeout.tv_sec = 2; + connect_timeout.tv_sec = 1; connect_timeout.tv_usec = 0; int client_sock, num; dnbd3_trusted_server_t *server; @@ -138,6 +142,7 @@ static void query_servers() struct sockaddr_in addr4; for (num = 0;; ++num) { + char *xmlbuffer = NULL; // "Iterate" this way to prevent holding the lock for a long time, although it is possible to skip a server this way... pthread_spin_lock(&_spinlock); server = g_slist_nth_data(_trusted_servers, num); @@ -164,25 +169,117 @@ static void query_servers() memset(&addr4, 0, sizeof(addr4)); addr4.sin_family = AF_INET; memcpy(&addr4.sin_addr.s_addr, host.addr, 4); - addr4.sin_port = host.port; + addr4.sin_port = htons(ntohs(host.port) + 1); // Connect to server setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &connect_timeout, sizeof(connect_timeout)); setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &connect_timeout, sizeof(connect_timeout)); if (connect(client_sock, (struct sockaddr *)&addr4, sizeof(addr4)) < 0) { printf("[DEBUG] Could not connect to other server...\n"); - close(client_sock); // TODO: Remove from alt server list if failed too often - continue; + goto communication_error; } // Apply read/write timeout setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &client_timeout, sizeof(client_timeout)); setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &client_timeout, sizeof(client_timeout)); // - // TODO: Send and receive info from server - // + // Send and receive info from server + // Send message + dnbd3_ipc_t header; + header.cmd = htonl(IPC_INFO); + header.size = 0; + header.error = 0; + send(client_sock, (char *)&header, sizeof(header), 0); + if (!recv_data(client_sock, &header, sizeof(header))) + { + printf("[DEBUG] Could not get status from other server...\n"); + goto communication_error; + } + header.cmd = ntohl(header.cmd); + header.size = ntohl(header.size); + header.error = ntohl(header.error); + if (header.cmd != IPC_INFO || header.error != 0) + { + printf("[DEBUG] Error. Reply from other server was cmd:%d, error:%d\n", (int)header.cmd, (int)header.error); + goto communication_error; + } + if (header.size > MAX_IPC_PAYLOAD) + { + memlogf("[WARNING] XML payload from other server exceeds MAX_IPC_PAYLOAD (%d > %d)", (int)header.size, (int)MAX_IPC_PAYLOAD); + goto communication_error; + } + xmlbuffer = malloc(header.size); + if (!recv_data(client_sock, xmlbuffer, header.size)) + { + printf("[DEBUG] Error reading XML payload from other server.\n"); + goto communication_error; + } close(client_sock); // - // TODO: Process data, update server info, add/remove this server as alt server for images, replicate images, etc. + // Process data, update server info, add/remove this server as alt server for images, replicate images, etc. + xmlDocPtr doc = xmlReadMemory(xmlbuffer, header.size, "noname.xml", NULL, 0); + free(xmlbuffer); + xmlbuffer = NULL; + if (doc == NULL) + { + memlogf("[WARNING] Could not parse XML data received by other server."); + goto communication_error; + } + // Data seems ok + // ... + xmlFreeDoc(doc); + // + continue; +communication_error: + close(client_sock); + free(xmlbuffer); + pthread_spin_lock(&_spinlock); + if (g_slist_find(_trusted_servers, server)) + { + if (server->unreachable < 10 && ++server->unreachable == 5) + dnbd3_remove_alt_server(server); + } + pthread_spin_unlock(&_spinlock); + } +} + +/** + * !! Call this while holding the lock !! + */ +static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server) +{ + GSList *iti, *itc; + int i; + dnbd3_reply_t header; + header.cmd = CMD_GET_SERVERS; + header.magic = dnbd3_packet_magic; + header.size = sizeof(dnbd3_server_entry_t); + fixup_reply(header); + // Iterate over all images + for (iti = _dnbd3_images; iti; iti = iti->next) + { + dnbd3_image_t *const image = iti->data; + // Check if any alt_server for that image is the server to be removed + for (i = 0; i < NUMBER_SERVERS; ++i) + { + if (is_same_server(&server->host, &image->servers[i].host)) + { + // Remove server from that image and tell every connected client about it + image->servers[i].failures = 1; + for (itc = _dnbd3_clients; itc; itc = itc->next) + { + dnbd3_client_t *const client = itc->data; + if (client->image == image) + { + // Don't send message directly as the lock is being held; instead, enqueue it + NEW_BINSTRING(message, sizeof(header) + sizeof(image->servers[i])); + memcpy(message->data, &header, sizeof(header)); + memcpy(message->data + sizeof(header), &image->servers[i], sizeof(image->servers[i])); + client->sendqueue = g_slist_append(client->sendqueue, message); + } + } + image->servers[i].host.type = 0; + } + } } } diff --git a/src/server/net.c b/src/server/net.c index 963d3ea..b351ed3 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -32,6 +32,7 @@ #include #include +#include "helper.h" #include "server.h" #include "saveload.h" #include "memlog.h" @@ -223,6 +224,7 @@ void *dnbd3_handle_query(void *dnbd3_client) } } + // client handling mainloop if (image) while (recv_request_header(client->sock, &request)) { switch (request.cmd) @@ -393,6 +395,21 @@ void *dnbd3_handle_query(void *dnbd3_client) } + // Check for messages that have been queued from another thread + while (client->sendqueue != NULL) + { + dnbd3_binstring_t *message = NULL; + pthread_spin_lock(&_spinlock); + if (client->sendqueue != NULL) + { + message = client->sendqueue->data; + client->sendqueue = g_slist_remove(client->sendqueue, message); + } + pthread_spin_unlock(&_spinlock); + send_data(client->sock, message->data, message->len); + free(message); + } + } pthread_spin_lock(&_spinlock); _dnbd3_clients = g_slist_remove(_dnbd3_clients, client); @@ -401,7 +418,7 @@ void *dnbd3_handle_query(void *dnbd3_client) close(client->sock); if (image_file != -1) close(image_file); if (image_cache != -1) close(image_cache); - g_free(client); + dnbd3_free_client(client); pthread_exit((void *) 0); } diff --git a/src/server/saveload.c b/src/server/saveload.c index 6af9b39..b9c0164 100644 --- a/src/server/saveload.c +++ b/src/server/saveload.c @@ -564,7 +564,7 @@ dnbd3_trusted_server_t *dnbd3_get_trusted_server(char *address, char create_if_n for (iterator = _trusted_servers; iterator; iterator = iterator->next) { dnbd3_trusted_server_t *comp = iterator->data; - if (is_same_server(comp, &server)) + if (is_same_server(&comp->host, &server.host)) return comp; } if (!create_if_not_found) diff --git a/src/server/server.c b/src/server/server.c index 4b30009..2b8828d 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -264,7 +264,7 @@ int main(int argc, char *argv[]) pthread_spin_lock(&_spinlock); _dnbd3_clients = g_slist_remove(_dnbd3_clients, dnbd3_client); pthread_spin_unlock(&_spinlock); - g_free(dnbd3_client); + dnbd3_free_client(dnbd3_client); close(fd); continue; } @@ -273,3 +273,17 @@ int main(int argc, char *argv[]) dnbd3_cleanup(); } + +/** + * Free the client struct recursively + */ +void dnbd3_free_client(dnbd3_client_t *client) +{ + GSList *it; // Doesn't lock, so call this function after removing the client from _dnbd3_clients + for (it = client->sendqueue; it; it = it->next) + { + free(it->data); + } + g_slist_free(client->sendqueue); + g_free(client); +} diff --git a/src/server/server.h b/src/server/server.h index 93f219e..c23f80f 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -46,6 +46,16 @@ typedef struct uint8_t relayed; // TRUE if relayed from other server (needs dnbd3 client module loaded) } dnbd3_image_t; +typedef struct +{ + uint16_t len; + uint8_t data[65535]; +} dnbd3_binstring_t; +// Do not always allocate as much memory as required to hold the entire binstring struct, but only as much as is required to hold the actual data +#define NEW_BINSTRING(_name, _len) \ + dnbd3_binstring_t *_name = malloc(sizeof(uint16_t) + _len); \ + _name->len = _len + typedef struct { int sock; @@ -53,13 +63,15 @@ typedef struct uint8_t is_server; // TRUE if a server in proxy mode, FALSE if real client pthread_t thread; dnbd3_image_t *image; + GSList *sendqueue; // list of dnbd3_binstring_t* } dnbd3_client_t; typedef struct { - dnbd3_host_t host; gchar *comment; GSList *namespaces; // List of dnbd3_namespace_t + dnbd3_host_t host; + uint8_t unreachable; } dnbd3_trusted_server_t; typedef struct @@ -81,6 +93,7 @@ extern int _fake_delay; #endif void dnbd3_cleanup(); +void dnbd3_free_client(dnbd3_client_t *client); #if !defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64 #error Please set _FILE_OFFSET_BITS to 64 in your makefile/configuration -- cgit v1.2.3-55-g7522