summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsr2012-09-04 19:22:57 +0200
committersr2012-09-04 19:22:57 +0200
commitbc4e381484024237df0b04bb667f742fe4846b35 (patch)
treea28ee79b9999eb9c1436f0f7f6e7042537b413d2
parent[SERVER] Check which dnbd3 devices are idle and ready to use for proxy mode (diff)
downloaddnbd3-bc4e381484024237df0b04bb667f742fe4846b35.tar.gz
dnbd3-bc4e381484024237df0b04bb667f742fe4846b35.tar.xz
dnbd3-bc4e381484024237df0b04bb667f742fe4846b35.zip
[SERVER] More work towards automatic server discovery and querying
-rw-r--r--src/kernel/net.c2
-rw-r--r--src/server/helper.h68
-rw-r--r--src/server/ipc.c79
-rw-r--r--src/server/job.c111
-rw-r--r--src/server/net.c19
-rw-r--r--src/server/saveload.c2
-rw-r--r--src/server/server.c16
-rw-r--r--src/server/server.h15
8 files changed, 228 insertions, 84 deletions
diff --git a/src/kernel/net.c b/src/kernel/net.c
index f437464..323c59c 100644
--- a/src/kernel/net.c
+++ b/src/kernel/net.c
@@ -451,7 +451,7 @@ int dnbd3_net_discover(void *data)
spin_lock_irqsave(&dev->blk_lock, irqflags);
for (i = 0; i < dev->new_servers_num; ++i)
{
- if (dev->new_servers[i].host.type != AF_INET) // Invalid entry.. (Add IPv6)
+ if (dev->new_servers[i].host.type != AF_INET) // Invalid entry.. (Add IPv6 someday)
continue;
alt_server = get_existing_server(&dev->new_servers[i], dev);
if (alt_server != NULL) // Server already known
diff --git a/src/server/helper.h b/src/server/helper.h
index e787c42..fdcc3b5 100644
--- a/src/server/helper.h
+++ b/src/server/helper.h
@@ -4,6 +4,8 @@
#include "server.h"
#include <netinet/in.h>
#include <string.h>
+#include <errno.h>
+#include <unistd.h>
char parse_address(char *string, dnbd3_host_t *host);
char host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen);
@@ -11,11 +13,69 @@ char is_valid_namespace(char *namespace);
char is_valid_imagename(char *namespace);
void strtolower(char *string);
-static inline int is_same_server(const dnbd3_trusted_server_t *const a, const dnbd3_trusted_server_t *const b)
+static inline int is_same_server(const dnbd3_host_t *const a, const dnbd3_host_t *const b)
{
- return (a->host.type == b->host.type)
- && (a->host.port == b->host.port)
- && (0 == memcmp(a->host.addr, b->host.addr, (a->host.type == AF_INET ? 4 : 16)));
+ return (a->type == b->type)
+ && (a->port == b->port)
+ && (0 == memcmp(a->addr, b->addr, (a->type == AF_INET ? 4 : 16)));
+}
+
+/**
+ * Send message to client, return !=0 on success, 0 on failure
+ */
+static inline int send_data(int client_sock, void *data_in, int len)
+{
+ if (len <= 0) // Nothing to send
+ return 1;
+ char *data = data_in; // Needed for pointer arithmetic
+ int ret, i;
+ for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout)
+ {
+ ret = send(client_sock, data, len, 0);
+ if (ret == 0) // Connection closed
+ return 0;
+ if (ret < 0)
+ {
+ if (errno != EAGAIN) // Some unexpected error
+ return 0;
+ usleep(1000); // 1ms
+ continue;
+ }
+ len -= ret;
+ if (len <= 0) // Sent everything
+ return 1;
+ data += ret; // move target buffer pointer
+ }
+ return 0;
+}
+
+/**
+ * Receive data from client, return !=0 on success, 0 on failure
+ */
+static inline int recv_data(int client_sock, void *buffer_out, int len)
+{
+ if (len <= 0) // Nothing to receive
+ return 1;
+ char *data = buffer_out; // Needed for pointer arithmetic
+ int ret, i;
+ for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout)
+ {
+ ret = recv(client_sock, data, len, MSG_WAITALL);
+ if (ret == 0) // Connection closed
+ return 0;
+ if (ret < 0)
+ {
+ if (errno != EAGAIN) // Some unexpected error
+ return 0;
+ usleep(1000); // 1ms
+ continue;
+ }
+ len -= ret;
+ if (len <= 0) // Received everything
+ return 1;
+ data += ret; // move target buffer pointer
+ }
+ return 0;
}
// one byte in the map covers 8 4kib blocks, so 32kib per byte
diff --git a/src/server/ipc.c b/src/server/ipc.c
index b93844a..0dbf46f 100644
--- a/src/server/ipc.c
+++ b/src/server/ipc.c
@@ -58,8 +58,6 @@ static char *payload = NULL;
static int ipc_receive(int client_sock);
static int get_highest_fd(GSList *sockets);
-static int send_reply(int client_sock, void *data_in, int len);
-static int recv_data(int client_sock, void *buffer_out, int len);
static int is_password_correct(xmlDocPtr doc);
static int get_terminal_width();
@@ -300,64 +298,6 @@ void dnbd3_ipc_shutdown()
}
/**
- * Send message to client, return !=0 on success, 0 on failure
- */
-static int send_reply(int client_sock, void *data_in, int len)
-{
- if (len <= 0) // Nothing to send
- return 1;
- char *data = data_in; // Needed for pointer arithmetic
- int ret, i;
- for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout)
- {
- ret = send(client_sock, data, len, 0);
- if (ret == 0) // Connection closed
- return 0;
- if (ret < 0)
- {
- if (errno != EAGAIN) // Some unexpected error
- return 0;
- usleep(1000); // 1ms
- continue;
- }
- len -= ret;
- if (len <= 0) // Sent everything
- return 1;
- data += ret; // move target buffer pointer
- }
- return 0;
-}
-
-/**
- * Receive data from client, return !=0 on success, 0 on failure
- */
-static int recv_data(int client_sock, void *buffer_out, int len)
-{
- if (len <= 0) // Nothing to receive
- return 1;
- char *data = buffer_out; // Needed for pointer arithmetic
- int ret, i;
- for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout)
- {
- ret = recv(client_sock, data, len, MSG_WAITALL);
- if (ret == 0) // Connection closed
- return 0;
- if (ret < 0)
- {
- if (errno != EAGAIN) // Some unexpected error
- return 0;
- usleep(1000); // 1ms
- continue;
- }
- len -= ret;
- if (len <= 0) // Received everything
- return 1;
- data += ret; // move target buffer pointer
- }
- return 0;
-}
-
-/**
* Returns !=0 if send/recv successful, 0 on any kind of network failure
*/
static int ipc_receive(int client_sock)
@@ -404,7 +344,7 @@ static int ipc_receive(int client_sock)
memlogf("[INFO] Server shutdown by IPC request");
header.size = ntohl(0);
header.error = ntohl(0);
- return_value = send_reply(client_sock, &header, sizeof(header));
+ return_value = send_data(client_sock, &header, sizeof(header));
dnbd3_cleanup();
break;
@@ -419,6 +359,8 @@ static int ipc_receive(int client_sock)
goto get_info_reply_cleanup;
xmlDocSetRootElement(docReply, root_node);
+ xmlNewChild(root_node, NULL, BAD_CAST "namespace", BAD_CAST _local_namespace);
+
// Images
parent_node = xmlNewNode(NULL, BAD_CAST "images");
if (parent_node == NULL)
@@ -496,7 +438,7 @@ static int ipc_receive(int client_sock)
goto get_info_reply_cleanup;
host_to_string(&server->host, strbuffer, STRBUFLEN);
xmlNewProp(tmp_node, BAD_CAST "ip", BAD_CAST strbuffer);
- sprintf(strbuffer, "%d", (int)server->host.port);
+ sprintf(strbuffer, "%d", (int)ntohs(server->host.port));
xmlNewProp(tmp_node, BAD_CAST "port", BAD_CAST strbuffer);
if (server->comment)
xmlNewProp(tmp_node, BAD_CAST "comment", BAD_CAST server->comment);
@@ -540,9 +482,9 @@ get_info_reply_cleanup:
if (locked)
pthread_spin_unlock(&_spinlock);
// Send reply
- return_value = send_reply(client_sock, &header, sizeof(header));
+ return_value = send_data(client_sock, &header, sizeof(header));
if (return_value && xmlbuff)
- return_value = send_reply(client_sock, xmlbuff, buffersize);
+ return_value = send_data(client_sock, xmlbuff, buffersize);
// Cleanup
xmlFree(xmlbuff);
free(log);
@@ -554,7 +496,7 @@ get_info_reply_cleanup:
{
header.size = htonl(0);
header.error = htonl(ERROR_MISSING_ARGUMENT);
- return_value = send_reply(client_sock, &header, sizeof(header));
+ return_value = send_data(client_sock, &header, sizeof(header));
break;
}
docRequest = xmlReadMemory(payload, header.size, "noname.xml", NULL, 0);
@@ -565,7 +507,7 @@ get_info_reply_cleanup:
{
header.error = htonl(ERROR_WRONG_PASSWORD);
header.size = htonl(0);
- return_value = send_reply(client_sock, &header, sizeof(header));
+ return_value = send_data(client_sock, &header, sizeof(header));
break;
}
@@ -603,14 +545,14 @@ get_info_reply_cleanup:
header.error = htonl(ERROR_INVALID_XML);
header.size = htonl(0);
- return_value = send_reply(client_sock, &header, sizeof(header));
+ return_value = send_data(client_sock, &header, sizeof(header));
break;
default:
memlogf("[ERROR] Unknown IPC command: %u", (unsigned int)header.cmd);
header.size = htonl(0);
header.error = htonl(ERROR_UNKNOWN_COMMAND);
- return_value = send_reply(client_sock, &header, sizeof(header));
+ return_value = send_data(client_sock, &header, sizeof(header));
break;
}
@@ -690,6 +632,7 @@ void dnbd3_ipc_send(int cmd)
{
char *buf = malloc(header.size + 1);
size = recv(client_sock, buf, header.size, MSG_WAITALL);
+ printf("\n%s\n\n", buf);
xmlDocPtr doc = xmlReadMemory(buf, size, "noname.xml", NULL, 0);
buf[header.size] = 0;
diff --git a/src/server/job.c b/src/server/job.c
index 4a30cd1..cf45681 100644
--- a/src/server/job.c
+++ b/src/server/job.c
@@ -1,6 +1,8 @@
#include "job.h"
#include "saveload.h"
+#include "helper.h"
#include "memlog.h"
+#include "ipc.h"
#include <stdint.h>
#include <stdio.h>
@@ -17,6 +19,7 @@
#include <libxml/parser.h>
#include <libxml/xpath.h>
#include "xmlutil.h"
+#include "../config.h"
#define DEV_STRLEN 12 // INCLUDING NULLCHAR (increase to 13 if you need more than 100 (0-99) devices)
#define MAX_NUM_DEVICES_TO_CHECK 100
@@ -43,6 +46,7 @@ static char keep_running = TRUE;
// Private functions
static char *get_free_device();
static void query_servers();
+static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server);
static void update_image_atimes(time_t now);
//
@@ -130,7 +134,7 @@ static void query_servers()
struct timeval client_timeout, connect_timeout;
client_timeout.tv_sec = 0;
client_timeout.tv_usec = 500 * 1000;
- connect_timeout.tv_sec = 2;
+ connect_timeout.tv_sec = 1;
connect_timeout.tv_usec = 0;
int client_sock, num;
dnbd3_trusted_server_t *server;
@@ -138,6 +142,7 @@ static void query_servers()
struct sockaddr_in addr4;
for (num = 0;; ++num)
{
+ char *xmlbuffer = NULL;
// "Iterate" this way to prevent holding the lock for a long time, although it is possible to skip a server this way...
pthread_spin_lock(&_spinlock);
server = g_slist_nth_data(_trusted_servers, num);
@@ -164,25 +169,117 @@ static void query_servers()
memset(&addr4, 0, sizeof(addr4));
addr4.sin_family = AF_INET;
memcpy(&addr4.sin_addr.s_addr, host.addr, 4);
- addr4.sin_port = host.port;
+ addr4.sin_port = htons(ntohs(host.port) + 1);
// Connect to server
setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &connect_timeout, sizeof(connect_timeout));
setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &connect_timeout, sizeof(connect_timeout));
if (connect(client_sock, (struct sockaddr *)&addr4, sizeof(addr4)) < 0)
{
printf("[DEBUG] Could not connect to other server...\n");
- close(client_sock); // TODO: Remove from alt server list if failed too often
- continue;
+ goto communication_error;
}
// Apply read/write timeout
setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &client_timeout, sizeof(client_timeout));
setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &client_timeout, sizeof(client_timeout));
//
- // TODO: Send and receive info from server
- //
+ // Send and receive info from server
+ // Send message
+ dnbd3_ipc_t header;
+ header.cmd = htonl(IPC_INFO);
+ header.size = 0;
+ header.error = 0;
+ send(client_sock, (char *)&header, sizeof(header), 0);
+ if (!recv_data(client_sock, &header, sizeof(header)))
+ {
+ printf("[DEBUG] Could not get status from other server...\n");
+ goto communication_error;
+ }
+ header.cmd = ntohl(header.cmd);
+ header.size = ntohl(header.size);
+ header.error = ntohl(header.error);
+ if (header.cmd != IPC_INFO || header.error != 0)
+ {
+ printf("[DEBUG] Error. Reply from other server was cmd:%d, error:%d\n", (int)header.cmd, (int)header.error);
+ goto communication_error;
+ }
+ if (header.size > MAX_IPC_PAYLOAD)
+ {
+ memlogf("[WARNING] XML payload from other server exceeds MAX_IPC_PAYLOAD (%d > %d)", (int)header.size, (int)MAX_IPC_PAYLOAD);
+ goto communication_error;
+ }
+ xmlbuffer = malloc(header.size);
+ if (!recv_data(client_sock, xmlbuffer, header.size))
+ {
+ printf("[DEBUG] Error reading XML payload from other server.\n");
+ goto communication_error;
+ }
close(client_sock);
//
- // TODO: Process data, update server info, add/remove this server as alt server for images, replicate images, etc.
+ // Process data, update server info, add/remove this server as alt server for images, replicate images, etc.
+ xmlDocPtr doc = xmlReadMemory(xmlbuffer, header.size, "noname.xml", NULL, 0);
+ free(xmlbuffer);
+ xmlbuffer = NULL;
+ if (doc == NULL)
+ {
+ memlogf("[WARNING] Could not parse XML data received by other server.");
+ goto communication_error;
+ }
+ // Data seems ok
+ // ...
+ xmlFreeDoc(doc);
+ //
+ continue;
+communication_error:
+ close(client_sock);
+ free(xmlbuffer);
+ pthread_spin_lock(&_spinlock);
+ if (g_slist_find(_trusted_servers, server))
+ {
+ if (server->unreachable < 10 && ++server->unreachable == 5)
+ dnbd3_remove_alt_server(server);
+ }
+ pthread_spin_unlock(&_spinlock);
+ }
+}
+
+/**
+ * !! Call this while holding the lock !!
+ */
+static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server)
+{
+ GSList *iti, *itc;
+ int i;
+ dnbd3_reply_t header;
+ header.cmd = CMD_GET_SERVERS;
+ header.magic = dnbd3_packet_magic;
+ header.size = sizeof(dnbd3_server_entry_t);
+ fixup_reply(header);
+ // Iterate over all images
+ for (iti = _dnbd3_images; iti; iti = iti->next)
+ {
+ dnbd3_image_t *const image = iti->data;
+ // Check if any alt_server for that image is the server to be removed
+ for (i = 0; i < NUMBER_SERVERS; ++i)
+ {
+ if (is_same_server(&server->host, &image->servers[i].host))
+ {
+ // Remove server from that image and tell every connected client about it
+ image->servers[i].failures = 1;
+ for (itc = _dnbd3_clients; itc; itc = itc->next)
+ {
+ dnbd3_client_t *const client = itc->data;
+ if (client->image == image)
+ {
+ // Don't send message directly as the lock is being held; instead, enqueue it
+ NEW_BINSTRING(message, sizeof(header) + sizeof(image->servers[i]));
+ memcpy(message->data, &header, sizeof(header));
+ memcpy(message->data + sizeof(header), &image->servers[i], sizeof(image->servers[i]));
+ client->sendqueue = g_slist_append(client->sendqueue, message);
+ }
+ }
+ image->servers[i].host.type = 0;
+ }
+ }
}
}
diff --git a/src/server/net.c b/src/server/net.c
index 963d3ea..b351ed3 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -32,6 +32,7 @@
#include <arpa/inet.h>
#include <netinet/tcp.h>
+#include "helper.h"
#include "server.h"
#include "saveload.h"
#include "memlog.h"
@@ -223,6 +224,7 @@ void *dnbd3_handle_query(void *dnbd3_client)
}
}
+ // client handling mainloop
if (image) while (recv_request_header(client->sock, &request))
{
switch (request.cmd)
@@ -393,6 +395,21 @@ void *dnbd3_handle_query(void *dnbd3_client)
}
+ // Check for messages that have been queued from another thread
+ while (client->sendqueue != NULL)
+ {
+ dnbd3_binstring_t *message = NULL;
+ pthread_spin_lock(&_spinlock);
+ if (client->sendqueue != NULL)
+ {
+ message = client->sendqueue->data;
+ client->sendqueue = g_slist_remove(client->sendqueue, message);
+ }
+ pthread_spin_unlock(&_spinlock);
+ send_data(client->sock, message->data, message->len);
+ free(message);
+ }
+
}
pthread_spin_lock(&_spinlock);
_dnbd3_clients = g_slist_remove(_dnbd3_clients, client);
@@ -401,7 +418,7 @@ void *dnbd3_handle_query(void *dnbd3_client)
close(client->sock);
if (image_file != -1) close(image_file);
if (image_cache != -1) close(image_cache);
- g_free(client);
+ dnbd3_free_client(client);
pthread_exit((void *) 0);
}
diff --git a/src/server/saveload.c b/src/server/saveload.c
index 6af9b39..b9c0164 100644
--- a/src/server/saveload.c
+++ b/src/server/saveload.c
@@ -564,7 +564,7 @@ dnbd3_trusted_server_t *dnbd3_get_trusted_server(char *address, char create_if_n
for (iterator = _trusted_servers; iterator; iterator = iterator->next)
{
dnbd3_trusted_server_t *comp = iterator->data;
- if (is_same_server(comp, &server))
+ if (is_same_server(&comp->host, &server.host))
return comp;
}
if (!create_if_not_found)
diff --git a/src/server/server.c b/src/server/server.c
index 4b30009..2b8828d 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -264,7 +264,7 @@ int main(int argc, char *argv[])
pthread_spin_lock(&_spinlock);
_dnbd3_clients = g_slist_remove(_dnbd3_clients, dnbd3_client);
pthread_spin_unlock(&_spinlock);
- g_free(dnbd3_client);
+ dnbd3_free_client(dnbd3_client);
close(fd);
continue;
}
@@ -273,3 +273,17 @@ int main(int argc, char *argv[])
dnbd3_cleanup();
}
+
+/**
+ * Free the client struct recursively
+ */
+void dnbd3_free_client(dnbd3_client_t *client)
+{
+ GSList *it; // Doesn't lock, so call this function after removing the client from _dnbd3_clients
+ for (it = client->sendqueue; it; it = it->next)
+ {
+ free(it->data);
+ }
+ g_slist_free(client->sendqueue);
+ g_free(client);
+}
diff --git a/src/server/server.h b/src/server/server.h
index 93f219e..c23f80f 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -48,18 +48,30 @@ typedef struct
typedef struct
{
+ uint16_t len;
+ uint8_t data[65535];
+} dnbd3_binstring_t;
+// Do not always allocate as much memory as required to hold the entire binstring struct, but only as much as is required to hold the actual data
+#define NEW_BINSTRING(_name, _len) \
+ dnbd3_binstring_t *_name = malloc(sizeof(uint16_t) + _len); \
+ _name->len = _len
+
+typedef struct
+{
int sock;
dnbd3_host_t host;
uint8_t is_server; // TRUE if a server in proxy mode, FALSE if real client
pthread_t thread;
dnbd3_image_t *image;
+ GSList *sendqueue; // list of dnbd3_binstring_t*
} dnbd3_client_t;
typedef struct
{
- dnbd3_host_t host;
gchar *comment;
GSList *namespaces; // List of dnbd3_namespace_t
+ dnbd3_host_t host;
+ uint8_t unreachable;
} dnbd3_trusted_server_t;
typedef struct
@@ -81,6 +93,7 @@ extern int _fake_delay;
#endif
void dnbd3_cleanup();
+void dnbd3_free_client(dnbd3_client_t *client);
#if !defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64
#error Please set _FILE_OFFSET_BITS to 64 in your makefile/configuration