summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2013-07-31 21:45:00 +0200
committerSimon Rettberg2013-07-31 21:45:00 +0200
commite7b3781c3e5c105fcd80a75546b9ab75eae8a2c9 (patch)
tree90e523671bb2ea349639b744b5bc3ee81f512fb7
parent[SERVER] Still working on the uplink... Almost there (diff)
downloaddnbd3-e7b3781c3e5c105fcd80a75546b9ab75eae8a2c9.tar.gz
dnbd3-e7b3781c3e5c105fcd80a75546b9ab75eae8a2c9.tar.xz
dnbd3-e7b3781c3e5c105fcd80a75546b9ab75eae8a2c9.zip
[SERVER] Uplink handing complete (untested, as alt servers can't be defined yet, so prepare for lots of fixes ;))
-rw-r--r--CMakeLists.txt3
-rw-r--r--src/config.h13
-rw-r--r--src/kernel/net.c1
-rw-r--r--src/server/altservers.c390
-rw-r--r--src/server/altservers.h14
-rw-r--r--src/server/globals.h3
-rw-r--r--src/server/helper.h67
-rw-r--r--src/server/locks.c2
-rw-r--r--src/server/net.c3
-rw-r--r--src/server/server.c7
-rw-r--r--src/server/uplink.c100
-rw-r--r--src/server/uplink.h7
12 files changed, 486 insertions, 124 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0b4b43d..596fa43 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -47,6 +47,9 @@ TARGET_LINK_LIBRARIES(dnbd3-client)
FILE(GLOB_RECURSE SERVER_SRCS src/server/*.c)
ADD_EXECUTABLE(dnbd3-server ${SERVER_SRCS})
TARGET_LINK_LIBRARIES(dnbd3-server ${CMAKE_THREAD_LIBS_INIT} ${ZLIB_LIBRARIES})
+if(UNIX AND NOT APPLE)
+ target_link_libraries(dnbd3-server rt)
+endif()
diff --git a/src/config.h b/src/config.h
index 3c2f166..6ad99dd 100644
--- a/src/config.h
+++ b/src/config.h
@@ -29,10 +29,11 @@
#define SERVER_MAX_IMAGES 5000
#define SERVER_MAX_ALTS 1000
#define SERVER_MAX_UPLINK_QUEUE 1000
+#define SERVER_MAX_PENDING_ALT_CHECKS 100
// +++++ Other magic constants
-#define SERVER_RTT_PROBES 4
-#define SERVER_RTT_DELAY_INIT 8
+#define SERVER_RTT_PROBES 5
+#define SERVER_RTT_DELAY_INIT 5
#define SERVER_RTT_DELAY_MAX 15
// +++++ Network +++++
@@ -40,6 +41,9 @@
#define PORT 5003
#define RPC_PORT (PORT+1)
+// No serialized payload allowed exceeding this many bytes (so actual data from client->server is not affected by this limit!)
+#define MAX_PAYLOAD 1000
+
// Protocol version should be increased whenever new features/messages are added,
// so either the client or server can run in compatibility mode, or they can
// cancel the connection right away if the protocol has changed too much
@@ -51,9 +55,6 @@
// Length of comment fields (for alt server etc.)
#define COMMENT_LENGTH 120
-// No payload allowed exceeding this many bytes (actual data from client->server is not affected by this limit!)
-#define MAX_PAYLOAD 1000
-
// in seconds if not stated otherwise (MS = milliseconds)
#define SOCKET_TIMEOUT_SERVER_MS 30000
#define SOCKET_TIMEOUT_CLIENT_DATA 2
@@ -79,7 +80,7 @@
// +++++ Block Device +++++
#define KERNEL_SECTOR_SIZE 512
-#define DNBD3_BLOCK_SIZE 4096 // NEVER CHANGE THIS OR THE WORLD WILL END!
+#define DNBD3_BLOCK_SIZE ((uint64_t)4096) // NEVER CHANGE THIS OR THE WORLD WILL END!
#define NUMBER_DEVICES 8
#define DEFAULT_READ_AHEAD_KB 512
diff --git a/src/kernel/net.c b/src/kernel/net.c
index 36abf45..e088943 100644
--- a/src/kernel/net.c
+++ b/src/kernel/net.c
@@ -21,6 +21,7 @@
#include "net.h"
#include "blk.h"
#include "utils.h"
+
#include "serialize.h"
#include <linux/time.h>
diff --git a/src/server/altservers.c b/src/server/altservers.c
new file mode 100644
index 0000000..6301c32
--- /dev/null
+++ b/src/server/altservers.c
@@ -0,0 +1,390 @@
+#include "altservers.h"
+#include "uplink.h"
+#include "locks.h"
+#include "sockhelper.h"
+#include "memlog.h"
+#include "helper.h"
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+#include <sys/errno.h>
+#include <math.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <time.h>
+#include "../serialize.h"
+
+static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
+static pthread_spinlock_t pendingLock;
+static int signalPipe = -1;
+
+static dnbd3_alt_server_t _alt_servers[SERVER_MAX_ALTS];
+static int _num_alts = 0;
+static pthread_spinlock_t _alts_lock;
+
+static pthread_t altThread;
+
+static void *altserver_main(void *data);
+static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const unsigned int rtt);
+
+void altserver_init()
+{
+ spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE );
+ if ( 0 != pthread_create( &altThread, NULL, &altserver_main, (void *)NULL ) ) {
+ memlogf( "[ERROR] Could not start altservers connector thread" );
+ exit( EXIT_FAILURE );
+ }
+}
+
+/**
+ * ONLY called from the passed uplink's main thread
+ */
+void altserver_find_uplink(dnbd3_connection_t *uplink)
+{
+ if ( uplink->rttTestResult == RTT_INPROGRESS ) return;
+ spin_lock( &pendingLock );
+ for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
+ if ( pending[i] != NULL ) continue;
+ pending[i] = uplink;
+ uplink->rttTestResult = RTT_INPROGRESS;
+ spin_unlock( &pendingLock );
+ write( signalPipe, "", 1 );
+ return;
+ }
+ // End of loop - no free slot
+ spin_unlock( &pendingLock );
+ memlogf( "[WARNING] No more free RTT measurement slots, ignoring a request..." );
+}
+
+/**
+ * Get <size> known (working) alt servers, ordered by network closeness
+ * (by finding the smallest possible subnet)
+ */
+int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size)
+{
+ if ( host == NULL || host->type == 0 || _num_alts == 0 || output == NULL || size <= 0 ) return 0;
+ int i, j;
+ int count = 0;
+ int distance[size];
+ spin_lock( &_alts_lock );
+ for (i = 0; i < _num_alts; ++i) {
+ if ( host->type != _alt_servers[i].host.type ) continue; // Wrong address family
+ // TODO: Prefer same AF here, but if in the end we got less servers than requested, add
+ // servers of other AF too (after this loop)
+ if ( count == 0 ) {
+ // Trivial - this is the first entry
+ output[0].host = _alt_servers[i].host;
+ output[0].failures = 0;
+ distance[0] = altservers_net_closeness( host, &output[0].host );
+ count++;
+ } else {
+ // Other entries already exist, insert in proper position
+ const int dist = altservers_net_closeness( host, &_alt_servers[i].host );
+ for (j = 0; j < size; ++j) {
+ if ( j < count && dist <= distance[j] ) continue;
+ if ( j > count ) break; // Should never happen but just in case...
+ if ( j < count ) {
+ // Check if we're in the middle and need to move other entries...
+ if ( j + 1 < size ) {
+ memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) );
+ memmove( &distance[j + 1], &distance[j], sizeof(int) * (size - j - 1) );
+ }
+ } else {
+ count++;
+ }
+ output[j].host = _alt_servers[i].host;
+ output[j].failures = 0;
+ distance[j] = dist;
+ break;
+ }
+ }
+ }
+ // "if count < size then add servers of other address families"
+ spin_unlock( &_alts_lock );
+ return count;
+}
+
+/**
+ * Get <size> alt servers. If there are more alt servers than
+ * requested, random servers will be picked
+ */
+int altservers_get(dnbd3_host_t *output, int size)
+{
+ int count = 0, i, j, num;
+ spin_lock( &_alts_lock );
+ if ( size <= _num_alts ) {
+ for (i = 0; i < size; ++i) {
+ if ( _alt_servers[i].host.type == 0 ) continue;
+ output[count++] = _alt_servers[i].host;
+ }
+ } else {
+ int which[_num_alts]; // Generate random order over _num_alts
+ for (i = 0; i < _num_alts; ++i) {
+ again: ;
+ num = rand() % _num_alts;
+ for (j = 0; j < i; ++j) {
+ if ( which[j] == num ) goto again;
+ }
+ which[i] = num;
+ } // Now pick <size> working alt servers in that generated order
+ for (i = 0; i < size; ++i) {
+ if ( _alt_servers[which[i]].host.type == 0 ) continue;
+ output[count++] = _alt_servers[which[i]].host;
+ if ( count >= size ) break;
+ }
+ }
+ spin_unlock( &_alts_lock );
+ return count;
+}
+
+/**
+ * Update rtt history of given server - returns the new average for that server
+ */
+static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const unsigned int rtt)
+{
+ unsigned int avg = rtt;
+ int i;
+ spin_lock( &_alts_lock );
+ for (i = 0; i < _num_alts; ++i) {
+ if ( !is_same_server( host, &_alt_servers[i].host ) ) continue;
+ _alt_servers[i].rtt[++_alt_servers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
+#if SERVER_RTT_PROBES == 5
+ avg = (_alt_servers[i].rtt[0] + _alt_servers[i].rtt[1] + _alt_servers[i].rtt[2] + _alt_servers[i].rtt[3] + _alt_servers[i].rtt[4])
+ / SERVER_RTT_PROBES;
+#else
+#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES
+ avg = 0;
+ for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
+ avg += _alt_servers[i].rtt[j];
+ }
+ avg /= SERVER_RTT_PROBES;
+#endif
+ break;
+ }
+ spin_unlock( &_alts_lock );
+ return avg;
+}
+
+/**
+ * Determine how close two addresses are to each other by comparing the number of
+ * matching bits from the left of the address. Does not count individual bits but
+ * groups of 4 for speed.
+ */
+int altservers_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2)
+{
+ if ( host1 == NULL || host2 == NULL || host1->type != host2->type ) return -1;
+ int retval = 0;
+ const int max = host1->type == AF_INET ? 4 : 16;
+ for (int i = 0; i < max; ++i) {
+ if ( (host1->addr[i] & 0xf0) != (host2->addr[i] & 0xf0) ) return retval;
+ ++retval;
+ if ( (host1->addr[i] & 0x0f) != (host2->addr[i] & 0x0f) ) return retval;
+ ++retval;
+ }
+ return retval;
+}
+
+static void *altserver_main(void *data)
+{
+ const int MAXEVENTS = 3;
+ const int ALTS = 4;
+ struct epoll_event ev, events[MAXEVENTS];
+ int readPipe = -1, fdEpoll = -1;
+ int numSocks, ret, itLink, itAlt, numAlts;
+ int found, len;
+ char buffer[DNBD3_BLOCK_SIZE ];
+ dnbd3_host_t servers[ALTS + 1];
+ dnbd3_request_t request;
+ dnbd3_reply_t reply;
+ serialized_buffer_t serialized;
+ struct iovec iov[2];
+ struct timespec start, end;
+
+ // Make valgrind happy
+ memset( &reply, 0, sizeof(reply) );
+ memset( &request, 0, sizeof(request) );
+ request.magic = dnbd3_packet_magic;
+ // Init spinlock
+ spin_init( &pendingLock, PTHREAD_PROCESS_PRIVATE );
+ // Init waiting links queue
+ for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i)
+ pending[i] = NULL;
+ // Init signal-pipe
+ fdEpoll = epoll_create( 2 );
+ if ( fdEpoll == -1 ) {
+ memlogf( "[WARNING] epoll_create failed. Uplink unavailable." );
+ goto cleanup;
+ }
+ {
+ int pipes[2];
+ if ( pipe( pipes ) < 0 ) {
+ memlogf( "[WARNING] error creating pipe. Uplink unavailable." );
+ goto cleanup;
+ }
+ sock_set_nonblock( pipes[0] );
+ sock_set_nonblock( pipes[1] );
+ readPipe = pipes[0];
+ signalPipe = pipes[1];
+ memset( &ev, 0, sizeof(ev) );
+ ev.events = EPOLLIN;
+ ev.data.fd = readPipe;
+ if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, readPipe, &ev ) < 0 ) {
+ memlogf( "[WARNING] adding read-signal-pipe to epoll set failed" );
+ goto cleanup;
+ }
+ }
+ // LOOP
+ while ( !_shutdown ) {
+ // Wait 5 seconds max.
+ numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, 5000 );
+ if ( numSocks < 0 ) {
+ memlogf( "[WARNING] epoll_wait() error in uplink_connector" );
+ usleep( 100000 );
+ }
+ // Empty pipe
+ do {
+ ret = read( readPipe, buffer, sizeof buffer );
+ } while ( ret > 0 ); // Throw data away, this is just used for waking this thread up
+ if ( ret == 0 ) {
+ memlogf( "[WARNING] Signal pipe of uplink_connector for %s closed! Things will break!" );
+ }
+ ret = errno;
+ if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) {
+ memlogf( "[WARNING] Errno %d on pipe-read on uplink_connector for %s! Things will break!", ret );
+ }
+ // Work your way through the queue
+ spin_lock( &pendingLock );
+ for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
+ if ( pending[itLink] == NULL ) continue;
+ spin_unlock( &pendingLock );
+ dnbd3_connection_t * const uplink = pending[itLink];
+ assert( uplink->rttTestResult == RTT_INPROGRESS );
+ // Now get 4 alt servers
+ numAlts = altservers_get( servers, ALTS );
+ if ( uplink->fd != -1 ) {
+ // Add current server if not already in list
+ found = FALSE;
+ for (itAlt = 0; itAlt < numAlts; ++itAlt) {
+ if ( !is_same_server( &uplink->currentServer, &servers[itAlt] ) ) continue;
+ found = TRUE;
+ break;
+ }
+ if ( !found ) servers[numAlts++] = uplink->currentServer;
+ }
+ // Test them all
+ int bestSock = -1;
+ int bestIndex = -1;
+ unsigned int bestRtt = 0xfffffff;
+ unsigned int currentRtt = 0xfffffff;
+ for (itAlt = 0; itAlt < numAlts; ++itAlt) {
+ usleep( 1000 );
+ // Connect
+ clock_gettime( CLOCK_MONOTONIC_RAW, &start );
+ int sock = sock_connect( &servers[itAlt], 750, 1250 );
+ if ( sock < 0 ) continue;
+ // Select image ++++++++++++++++++++++++++++++
+ serializer_reset_write( &serialized );
+ serializer_put_uint16( &serialized, PROTOCOL_VERSION );
+ serializer_put_string( &serialized, uplink->image->lower_name );
+ serializer_put_uint16( &serialized, uplink->image->rid );
+ serializer_put_uint8( &serialized, 1 ); // isServer = TRUE
+ len = serializer_get_written_length( &serialized );
+ request.cmd = CMD_SELECT_IMAGE;
+ request.size = len;
+ fixup_request( request );
+ iov[0].iov_base = &request;
+ iov[0].iov_len = sizeof(request);
+ iov[1].iov_base = &serialized;
+ iov[1].iov_len = len;
+ if ( writev( sock, iov, 2 ) != len + sizeof(request) ) goto server_failed;
+ // See if selecting the image succeeded ++++++++++++++++++++++++++++++
+ if ( recv( sock, &reply, sizeof(reply), MSG_WAITALL ) != sizeof(reply) ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header after CMD_SELECT_IMAGE (%s)",
+ uplink->image->lower_name );
+ }
+ // check reply header
+ fixup_reply( reply );
+ if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD || reply.magic != dnbd3_packet_magic ) goto server_failed;
+ // Not found
+ // receive reply payload
+ if ( recv( sock, &serialized, reply.size, MSG_WAITALL ) != reply.size ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Cold not read CMD_SELECT_IMAGE payload (%s)", uplink->image->lower_name );
+ }
+ // handle/check reply payload
+ serializer_reset_read( &serialized, reply.size );
+ const uint16_t protocol_version = serializer_get_uint16( &serialized );
+ if ( protocol_version < MIN_SUPPORTED_SERVER ) goto server_failed;
+ const char *name = serializer_get_string( &serialized );
+ if ( strcmp( name, uplink->image->lower_name ) != 0 ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Server offers image '%s', requested '%s'", name, uplink->image->lower_name );
+ }
+ const uint16_t rid = serializer_get_uint16( &serialized );
+ if ( rid != uplink->image->rid ) ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)",
+ (int)rid, (int)uplink->image->rid, uplink->image->lower_name );
+ const uint64_t image_size = serializer_get_uint64( &serialized );
+ if ( image_size != uplink->image->filesize ) ERROR_GOTO_VA( server_failed,
+ "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)",
+ image_size, uplink->image->filesize, uplink->image->lower_name );
+ // Request random block ++++++++++++++++++++++++++++++
+ request.cmd = CMD_GET_BLOCK;
+ request.offset = (uplink->image->filesize - 1) & ~(DNBD3_BLOCK_SIZE - 1);
+ request.size = DNBD3_BLOCK_SIZE;
+ fixup_request( request );
+ if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) ERROR_GOTO_VA( server_failed,
+ "[ERROR] Could not request random block for %s", uplink->image->lower_name );
+ // See if requesting the block succeeded ++++++++++++++++++++++
+ if ( recv( sock, &reply, sizeof(reply), MSG_WAITALL ) != sizeof(reply) ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header after CMD_GET_BLOCK (%s)",
+ uplink->image->lower_name );
+ }
+ // check reply header
+ fixup_reply( reply );
+ if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed,
+ "[ERROR] Reply to random block request is %d bytes for %s", reply.size, uplink->image->lower_name );
+ if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, 0 ) != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed,
+ "[ERROR] Could not read random block from socket for %s", uplink->image->lower_name );
+ clock_gettime( CLOCK_MONOTONIC_RAW, &end );
+ // Measurement done - everything fine so far
+ const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs
+ const unsigned int avg = altservers_update_rtt( &servers[itAlt], rtt );
+ if ( is_same_server( &servers[itAlt], &uplink->currentServer ) ) {
+ currentRtt = avg;
+ close( sock );
+ } else if ( avg < bestRtt ) {
+ if ( bestSock != -1 ) close( bestSock );
+ bestSock = sock;
+ bestRtt = avg;
+ bestIndex = itAlt;
+ } else {
+ close( sock );
+ }
+ continue;
+ // Jump here if anything went wrong
+ server_failed: ;
+ close( sock );
+ }
+ // Done testing all servers. See if we should switch
+ if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
+ // yep
+ uplink->betterFd = bestSock;
+ uplink->betterServer = servers[bestIndex];
+ uplink->rttTestResult = RTT_DOCHANGE;
+ } else {
+ // nope
+ if ( bestSock != -1 ) close( bestSock );
+ uplink->rttTestResult = RTT_DONTCHANGE;
+ }
+ // end of loop over all pending uplinks
+ pending[itLink] = NULL;
+ spin_lock( &pendingLock );
+ }
+ spin_unlock( &pendingLock );
+ }
+ cleanup: ;
+ spin_destroy( &pendingLock );
+ if ( fdEpoll != -1 ) close( fdEpoll );
+ if ( readPipe != -1 ) close( readPipe );
+ if ( signalPipe != -1 ) close( signalPipe );
+ signalPipe = -1;
+ return NULL ;
+}
diff --git a/src/server/altservers.h b/src/server/altservers.h
new file mode 100644
index 0000000..6aec195
--- /dev/null
+++ b/src/server/altservers.h
@@ -0,0 +1,14 @@
+#ifndef _ALTSERVERS_H_
+#define _ALTSERVERS_H_
+
+#include "globals.h"
+
+void altserver_init();
+
+void altserver_find_uplink(dnbd3_connection_t *uplink);
+
+int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size);
+
+int altservers_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2);
+
+#endif /* UPLINK_CONNECTOR_H_ */
diff --git a/src/server/globals.h b/src/server/globals.h
index 7f47288..d3424b6 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -71,7 +71,8 @@ typedef struct
char comment[COMMENT_LENGTH];
time_t last_told;
dnbd3_host_t host;
- int rtt[];
+ int rtt[SERVER_RTT_PROBES];
+ int rttIndex;
} dnbd3_alt_server_t;
typedef struct
diff --git a/src/server/helper.h b/src/server/helper.h
index daa6695..e068abd 100644
--- a/src/server/helper.h
+++ b/src/server/helper.h
@@ -8,6 +8,9 @@
#include <unistd.h>
#include "../types.h"
+#define ERROR_GOTO(jumplabel, errormsg) do { memlogf(errormsg); goto jumplabel; } while (0);
+#define ERROR_GOTO_VA(jumplabel, errormsg, ...) do { memlogf(errormsg, __VA_ARGS__); goto jumplabel; } while (0);
+
char parse_address(char *string, dnbd3_host_t *host);
char host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen);
char is_valid_namespace(char *namespace);
@@ -17,11 +20,9 @@ void remove_trailing_slash(char *string);
int file_exists(char *file);
int file_writable(char *file);
-static inline int is_same_server(const dnbd3_host_t *const a, const dnbd3_host_t *const b)
+static inline int is_same_server(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
{
- return (a->type == b->type)
- && (a->port == b->port)
- && (0 == memcmp(a->addr, b->addr, (a->type == AF_INET ? 4 : 16)));
+ return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) ));
}
/**
@@ -29,25 +30,24 @@ static inline int is_same_server(const dnbd3_host_t *const a, const dnbd3_host_t
*/
static inline int send_data(int client_sock, void *data_in, int len)
{
- if (len <= 0) // Nothing to send
- return 1;
+ if ( len <= 0 ) // Nothing to send
+ return 1;
char *data = data_in; // Needed for pointer arithmetic
int ret, i;
for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout)
- {
- ret = send(client_sock, data, len, 0);
- if (ret == 0) // Connection closed
+ {
+ ret = send( client_sock, data, len, 0 );
+ if ( ret == 0 ) // Connection closed
+ return 0;
+ if ( ret < 0 ) {
+ if ( errno != EAGAIN ) // Some unexpected error
return 0;
- if (ret < 0)
- {
- if (errno != EAGAIN) // Some unexpected error
- return 0;
- usleep(1000); // 1ms
+ usleep( 1000 ); // 1ms
continue;
}
len -= ret;
- if (len <= 0) // Sent everything
- return 1;
+ if ( len <= 0 ) // Sent everything
+ return 1;
data += ret; // move target buffer pointer
}
return 0;
@@ -58,25 +58,24 @@ static inline int send_data(int client_sock, void *data_in, int len)
*/
static inline int recv_data(int client_sock, void *buffer_out, int len)
{
- if (len <= 0) // Nothing to receive
- return 1;
+ if ( len <= 0 ) // Nothing to receive
+ return 1;
char *data = buffer_out; // Needed for pointer arithmetic
int ret, i;
for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout)
- {
- ret = recv(client_sock, data, len, MSG_WAITALL);
- if (ret == 0) // Connection closed
+ {
+ ret = recv( client_sock, data, len, MSG_WAITALL );
+ if ( ret == 0 ) // Connection closed
+ return 0;
+ if ( ret < 0 ) {
+ if ( errno != EAGAIN ) // Some unexpected error
return 0;
- if (ret < 0)
- {
- if (errno != EAGAIN) // Some unexpected error
- return 0;
- usleep(1000); // 1ms
+ usleep( 1000 ); // 1ms
continue;
}
len -= ret;
- if (len <= 0) // Received everything
- return 1;
+ if ( len <= 0 ) // Received everything
+ return 1;
data += ret; // move target buffer pointer
}
return 0;
@@ -84,12 +83,12 @@ static inline int recv_data(int client_sock, void *buffer_out, int len)
static inline int strend(char *string, char *suffix)
{
- if (string == NULL) return FALSE;
- if (suffix == NULL || *suffix == '\0') return TRUE;
- const size_t len1 = strlen(string);
- const size_t len2 = strlen(suffix);
- if (len2 > len1) return FALSE;
- return strcmp(string + len1 - len2, suffix) == 0;
+ if ( string == NULL ) return FALSE;
+ if ( suffix == NULL || *suffix == '\0' ) return TRUE;
+ const size_t len1 = strlen( string );
+ const size_t len2 = strlen( suffix );
+ if ( len2 > len1 ) return FALSE;
+ return strcmp( string + len1 - len2, suffix ) == 0;
}
#endif
diff --git a/src/server/locks.c b/src/server/locks.c
index d9eed35..4293c2f 100644
--- a/src/server/locks.c
+++ b/src/server/locks.c
@@ -221,7 +221,7 @@ static void *debug_thread_watchdog(void *something)
}
pthread_spin_unlock( &initdestory );
}
- sleep( 10 );
+ sleep( 5 );
}
return NULL ;
}
diff --git a/src/server/net.c b/src/server/net.c
index 1383454..a7e110b 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -34,6 +34,7 @@
#include "server.h"
#include "image.h"
#include "uplink.h"
+#include "altservers.h"
#include "memlog.h"
#include "../serialize.h"
#include "../config.h"
@@ -292,7 +293,7 @@ void *net_client_handler(void *dnbd3_client)
case CMD_GET_SERVERS:
client->is_server = FALSE; // Only clients request list of servers
// Build list of known working alt servers
- num = uplink_get_matching_alt_servers( &client->host, server_list, NUMBER_SERVERS );
+ num = altservers_get_matching( &client->host, server_list, NUMBER_SERVERS );
reply.cmd = CMD_GET_SERVERS;
reply.size = num * sizeof(dnbd3_server_entry_t);
send_reply( client->sock, &reply, server_list );
diff --git a/src/server/server.c b/src/server/server.c
index cc7a76a..dc374a0 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -39,6 +39,7 @@
#include "image.h"
#include "uplink.h"
#include "net.h"
+#include "altservers.h"
#include "memlog.h"
#define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on
@@ -189,9 +190,9 @@ int main(int argc, char *argv[])
case 'crc4':
return image_generate_crc_file( optarg ) ? 0 : EXIT_FAILURE;
case 'asrt':
- printf("Testing a failing assertion:\n");
+ printf( "Testing a failing assertion:\n" );
assert( 4 == 5 );
- printf("Assertion 4 == 5 seems to hold. ;-)\n");
+ printf( "Assertion 4 == 5 seems to hold. ;-)\n" );
return EXIT_SUCCESS;
}
opt = getopt_long( argc, argv, optString, longOpts, &longIndex );
@@ -204,7 +205,7 @@ int main(int argc, char *argv[])
spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE );
spin_init( &_images_lock, PTHREAD_PROCESS_PRIVATE );
- spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE );
+ altserver_init();
#ifdef _DEBUG
debug_locks_start_watchdog();
diff --git a/src/server/uplink.c b/src/server/uplink.c
index ab23b70..9896e49 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -3,6 +3,7 @@
#include "memlog.h"
#include "sockhelper.h"
#include "image.h"
+#include "altservers.h"
#include <pthread.h>
#include <sys/socket.h>
#include <string.h>
@@ -13,78 +14,10 @@
#include <stdlib.h>
#include <stdio.h>
-dnbd3_alt_server_t *_alt_servers[SERVER_MAX_ALTS];
-int _num_alts = 0;
-pthread_spinlock_t _alts_lock;
-
static void* uplink_mainloop(void *data);
static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
static void uplink_handle_receive(dnbd3_connection_t *link);
-/**
- * Get <size> known (working) alt servers, ordered by network closeness
- * (by finding the smallest possible subnet)
- */
-int uplink_get_matching_alt_servers(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size)
-{
- if ( host == NULL || host->type == 0 || _num_alts == 0 ) return 0;
- int i, j;
- int count = 0;
- int distance[size];
- spin_lock( &_alts_lock );
- for (i = 0; i < _num_alts; ++i) {
- if ( host->type != _alt_servers[i]->host.type ) continue; // Wrong address family
- if ( count == 0 ) {
- // Trivial - this is the first entry
- memcpy( &output[0].host, &_alt_servers[i]->host, sizeof(dnbd3_host_t) );
- output[0].failures = 0;
- distance[0] = uplink_net_closeness( host, &output[0].host );
- count++;
- } else {
- // Other entries already exist, insert in proper position
- const int dist = uplink_net_closeness( host, &_alt_servers[i]->host );
- for (j = 0; j < size; ++j) {
- if ( j < count && dist <= distance[j] ) continue;
- if ( j > count ) break; // Should never happen but just in case...
- if ( j < count ) {
- // Check if we're in the middle and need to move other entries...
- if ( j + 1 < size ) {
- memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) );
- memmove( &distance[j + 1], &distance[j], sizeof(int) * (size - j - 1) );
- }
- } else {
- count++;
- }
- memcpy( &output[j].host, &_alt_servers[i]->host, sizeof(dnbd3_host_t) );
- output[j].failures = 0;
- distance[j] = dist;
- break;
- }
- }
- }
- spin_unlock( &_alts_lock );
- return count;
-}
-
-/**
- * Determine how close two addresses are to each other by comparing the number of
- * matching bits from the left of the address. Does not count individual bits but
- * groups of 4 for speed.
- */
-int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2)
-{
- if ( host1 == NULL || host2 == NULL || host1->type != host2->type ) return -1;
- int retval = 0;
- const int max = host1->type == AF_INET ? 4 : 16;
- for (int i = 0; i < max; ++i) {
- if ( (host1->addr[i] & 0xf0) != (host2->addr[i] & 0xf0) ) return retval;
- ++retval;
- if ( (host1->addr[i] & 0x0f) != (host2->addr[i] & 0x0f) ) return retval;
- ++retval;
- }
- return retval;
-}
-
// ############ uplink connection handling
/**
@@ -232,7 +165,7 @@ static void* uplink_mainloop(void *data)
if ( waitTime < 1500 ) waitTime = 1500;
}
numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime );
- if ( _shutdown || link->shutdown ) break;
+ if ( _shutdown || link->shutdown ) goto cleanup;
if ( numSocks < 0 ) { // Error?
memlogf( "[DEBUG] epoll_wait() error %d", (int)errno);
usleep( 10000 );
@@ -257,14 +190,24 @@ static void* uplink_mainloop(void *data)
}
// No error, handle normally
if ( events[i].data.fd == fdPipe ) {
- while ( read( fdPipe, buffer, sizeof buffer ) > 0 ) {
- } // Throw data away, this is just used for waking this thread up
+ int ret;
+ do {
+ ret = read( fdPipe, buffer, sizeof buffer );
+ } while ( ret > 0 ); // Throw data away, this is just used for waking this thread up
+ if ( ret == 0 ) {
+ memlogf( "[WARNING] Signal pipe of uplink for %s closed! Things will break!", link->image->lower_name );
+ }
+ ret = errno;
+ if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) {
+ memlogf( "[WARNING] Errno %d on pipe-read on uplink for %s! Things will break!", ret, link->image->lower_name );
+ }
if ( link->fd != -1 ) {
uplink_send_requests( link, TRUE );
}
} else if ( events[i].data.fd == link->fd ) {
uplink_handle_receive( link );
if ( link->fd == -1 ) nextAltCheck = 0;
+ if ( _shutdown || link->shutdown ) goto cleanup;
} else {
printf( "[DEBUG] Sanity check: unknown FD ready on epoll! Closing...\n" );
close( events[i].data.fd );
@@ -294,6 +237,17 @@ static void* uplink_mainloop(void *data)
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
}
+ // See if we should trigger a RTT measurement
+ if ( link->rttTestResult == RTT_IDLE ) {
+ const time_t now = time( NULL );
+ if ( nextAltCheck - now > SERVER_RTT_DELAY_MAX ) {
+ nextAltCheck = now + SERVER_RTT_DELAY_MAX;
+ } else if ( now >= nextAltCheck ) {
+ altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
+ nextAltCheck = now + altCheckInterval;
+ altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ }
+ }
}
cleanup: ;
const int fd = link->fd;
@@ -304,6 +258,10 @@ static void* uplink_mainloop(void *data)
if ( signal != -1 ) close( signal );
if ( fdPipe != -1 ) close( fdPipe );
if ( fdEpoll != -1 ) close( fdEpoll );
+ // Wait for the RTT check to finish/fail if it's in progress
+ while ( link->rttTestResult == RTT_INPROGRESS )
+ usleep( 10000 );
+ if ( link->betterFd != -1 ) close( link->betterFd );
return NULL ;
}
diff --git a/src/server/uplink.h b/src/server/uplink.h
index f6917be..cb1d4e7 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -4,13 +4,6 @@
#include "../types.h"
#include "globals.h"
-extern dnbd3_alt_server_t *_alt_servers[SERVER_MAX_ALTS];
-extern int _num_alts;
-extern pthread_spinlock_t _alts_lock;
-
-int uplink_get_matching_alt_servers(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size);
-
-int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2);
int uplink_init(dnbd3_image_t *image);