summaryrefslogtreecommitdiffstats
path: root/src/server/job.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/job.c')
-rw-r--r--src/server/job.c111
1 files changed, 104 insertions, 7 deletions
diff --git a/src/server/job.c b/src/server/job.c
index 4a30cd1..cf45681 100644
--- a/src/server/job.c
+++ b/src/server/job.c
@@ -1,6 +1,8 @@
#include "job.h"
#include "saveload.h"
+#include "helper.h"
#include "memlog.h"
+#include "ipc.h"
#include <stdint.h>
#include <stdio.h>
@@ -17,6 +19,7 @@
#include <libxml/parser.h>
#include <libxml/xpath.h>
#include "xmlutil.h"
+#include "../config.h"
#define DEV_STRLEN 12 // INCLUDING NULLCHAR (increase to 13 if you need more than 100 (0-99) devices)
#define MAX_NUM_DEVICES_TO_CHECK 100
@@ -43,6 +46,7 @@ static char keep_running = TRUE;
// Private functions
static char *get_free_device();
static void query_servers();
+static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server);
static void update_image_atimes(time_t now);
//
@@ -130,7 +134,7 @@ static void query_servers()
struct timeval client_timeout, connect_timeout;
client_timeout.tv_sec = 0;
client_timeout.tv_usec = 500 * 1000;
- connect_timeout.tv_sec = 2;
+ connect_timeout.tv_sec = 1;
connect_timeout.tv_usec = 0;
int client_sock, num;
dnbd3_trusted_server_t *server;
@@ -138,6 +142,7 @@ static void query_servers()
struct sockaddr_in addr4;
for (num = 0;; ++num)
{
+ char *xmlbuffer = NULL;
// "Iterate" this way to prevent holding the lock for a long time, although it is possible to skip a server this way...
pthread_spin_lock(&_spinlock);
server = g_slist_nth_data(_trusted_servers, num);
@@ -164,25 +169,117 @@ static void query_servers()
memset(&addr4, 0, sizeof(addr4));
addr4.sin_family = AF_INET;
memcpy(&addr4.sin_addr.s_addr, host.addr, 4);
- addr4.sin_port = host.port;
+ addr4.sin_port = htons(ntohs(host.port) + 1);
// Connect to server
setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &connect_timeout, sizeof(connect_timeout));
setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &connect_timeout, sizeof(connect_timeout));
if (connect(client_sock, (struct sockaddr *)&addr4, sizeof(addr4)) < 0)
{
printf("[DEBUG] Could not connect to other server...\n");
- close(client_sock); // TODO: Remove from alt server list if failed too often
- continue;
+ goto communication_error;
}
// Apply read/write timeout
setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &client_timeout, sizeof(client_timeout));
setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &client_timeout, sizeof(client_timeout));
//
- // TODO: Send and receive info from server
- //
+ // Send and receive info from server
+ // Send message
+ dnbd3_ipc_t header;
+ header.cmd = htonl(IPC_INFO);
+ header.size = 0;
+ header.error = 0;
+ send(client_sock, (char *)&header, sizeof(header), 0);
+ if (!recv_data(client_sock, &header, sizeof(header)))
+ {
+ printf("[DEBUG] Could not get status from other server...\n");
+ goto communication_error;
+ }
+ header.cmd = ntohl(header.cmd);
+ header.size = ntohl(header.size);
+ header.error = ntohl(header.error);
+ if (header.cmd != IPC_INFO || header.error != 0)
+ {
+ printf("[DEBUG] Error. Reply from other server was cmd:%d, error:%d\n", (int)header.cmd, (int)header.error);
+ goto communication_error;
+ }
+ if (header.size > MAX_IPC_PAYLOAD)
+ {
+ memlogf("[WARNING] XML payload from other server exceeds MAX_IPC_PAYLOAD (%d > %d)", (int)header.size, (int)MAX_IPC_PAYLOAD);
+ goto communication_error;
+ }
+ xmlbuffer = malloc(header.size);
+ if (!recv_data(client_sock, xmlbuffer, header.size))
+ {
+ printf("[DEBUG] Error reading XML payload from other server.\n");
+ goto communication_error;
+ }
close(client_sock);
//
- // TODO: Process data, update server info, add/remove this server as alt server for images, replicate images, etc.
+ // Process data, update server info, add/remove this server as alt server for images, replicate images, etc.
+ xmlDocPtr doc = xmlReadMemory(xmlbuffer, header.size, "noname.xml", NULL, 0);
+ free(xmlbuffer);
+ xmlbuffer = NULL;
+ if (doc == NULL)
+ {
+ memlogf("[WARNING] Could not parse XML data received by other server.");
+ goto communication_error;
+ }
+ // Data seems ok
+ // ...
+ xmlFreeDoc(doc);
+ //
+ continue;
+communication_error:
+ close(client_sock);
+ free(xmlbuffer);
+ pthread_spin_lock(&_spinlock);
+ if (g_slist_find(_trusted_servers, server))
+ {
+ if (server->unreachable < 10 && ++server->unreachable == 5)
+ dnbd3_remove_alt_server(server);
+ }
+ pthread_spin_unlock(&_spinlock);
+ }
+}
+
+/**
+ * !! Call this while holding the lock !!
+ */
+static void dnbd3_remove_alt_server(dnbd3_trusted_server_t *server)
+{
+ GSList *iti, *itc;
+ int i;
+ dnbd3_reply_t header;
+ header.cmd = CMD_GET_SERVERS;
+ header.magic = dnbd3_packet_magic;
+ header.size = sizeof(dnbd3_server_entry_t);
+ fixup_reply(header);
+ // Iterate over all images
+ for (iti = _dnbd3_images; iti; iti = iti->next)
+ {
+ dnbd3_image_t *const image = iti->data;
+ // Check if any alt_server for that image is the server to be removed
+ for (i = 0; i < NUMBER_SERVERS; ++i)
+ {
+ if (is_same_server(&server->host, &image->servers[i].host))
+ {
+ // Remove server from that image and tell every connected client about it
+ image->servers[i].failures = 1;
+ for (itc = _dnbd3_clients; itc; itc = itc->next)
+ {
+ dnbd3_client_t *const client = itc->data;
+ if (client->image == image)
+ {
+ // Don't send message directly as the lock is being held; instead, enqueue it
+ NEW_BINSTRING(message, sizeof(header) + sizeof(image->servers[i]));
+ memcpy(message->data, &header, sizeof(header));
+ memcpy(message->data + sizeof(header), &image->servers[i], sizeof(image->servers[i]));
+ client->sendqueue = g_slist_append(client->sendqueue, message);
+ }
+ }
+ image->servers[i].host.type = 0;
+ }
+ }
}
}