diff options
author | Simon Rettberg | 2013-07-31 21:45:00 +0200 |
---|---|---|
committer | Simon Rettberg | 2013-07-31 21:45:00 +0200 |
commit | e7b3781c3e5c105fcd80a75546b9ab75eae8a2c9 (patch) | |
tree | 90e523671bb2ea349639b744b5bc3ee81f512fb7 | |
parent | [SERVER] Still working on the uplink... Almost there (diff) | |
download | dnbd3-e7b3781c3e5c105fcd80a75546b9ab75eae8a2c9.tar.gz dnbd3-e7b3781c3e5c105fcd80a75546b9ab75eae8a2c9.tar.xz dnbd3-e7b3781c3e5c105fcd80a75546b9ab75eae8a2c9.zip |
[SERVER] Uplink handing complete (untested, as alt servers can't be defined yet, so prepare for lots of fixes ;))
-rw-r--r-- | CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/config.h | 13 | ||||
-rw-r--r-- | src/kernel/net.c | 1 | ||||
-rw-r--r-- | src/server/altservers.c | 390 | ||||
-rw-r--r-- | src/server/altservers.h | 14 | ||||
-rw-r--r-- | src/server/globals.h | 3 | ||||
-rw-r--r-- | src/server/helper.h | 67 | ||||
-rw-r--r-- | src/server/locks.c | 2 | ||||
-rw-r--r-- | src/server/net.c | 3 | ||||
-rw-r--r-- | src/server/server.c | 7 | ||||
-rw-r--r-- | src/server/uplink.c | 100 | ||||
-rw-r--r-- | src/server/uplink.h | 7 |
12 files changed, 486 insertions, 124 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b4b43d..596fa43 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,6 +47,9 @@ TARGET_LINK_LIBRARIES(dnbd3-client) FILE(GLOB_RECURSE SERVER_SRCS src/server/*.c) ADD_EXECUTABLE(dnbd3-server ${SERVER_SRCS}) TARGET_LINK_LIBRARIES(dnbd3-server ${CMAKE_THREAD_LIBS_INIT} ${ZLIB_LIBRARIES}) +if(UNIX AND NOT APPLE) + target_link_libraries(dnbd3-server rt) +endif() diff --git a/src/config.h b/src/config.h index 3c2f166..6ad99dd 100644 --- a/src/config.h +++ b/src/config.h @@ -29,10 +29,11 @@ #define SERVER_MAX_IMAGES 5000 #define SERVER_MAX_ALTS 1000 #define SERVER_MAX_UPLINK_QUEUE 1000 +#define SERVER_MAX_PENDING_ALT_CHECKS 100 // +++++ Other magic constants -#define SERVER_RTT_PROBES 4 -#define SERVER_RTT_DELAY_INIT 8 +#define SERVER_RTT_PROBES 5 +#define SERVER_RTT_DELAY_INIT 5 #define SERVER_RTT_DELAY_MAX 15 // +++++ Network +++++ @@ -40,6 +41,9 @@ #define PORT 5003 #define RPC_PORT (PORT+1) +// No serialized payload allowed exceeding this many bytes (so actual data from client->server is not affected by this limit!) +#define MAX_PAYLOAD 1000 + // Protocol version should be increased whenever new features/messages are added, // so either the client or server can run in compatibility mode, or they can // cancel the connection right away if the protocol has changed too much @@ -51,9 +55,6 @@ // Length of comment fields (for alt server etc.) #define COMMENT_LENGTH 120 -// No payload allowed exceeding this many bytes (actual data from client->server is not affected by this limit!) -#define MAX_PAYLOAD 1000 - // in seconds if not stated otherwise (MS = milliseconds) #define SOCKET_TIMEOUT_SERVER_MS 30000 #define SOCKET_TIMEOUT_CLIENT_DATA 2 @@ -79,7 +80,7 @@ // +++++ Block Device +++++ #define KERNEL_SECTOR_SIZE 512 -#define DNBD3_BLOCK_SIZE 4096 // NEVER CHANGE THIS OR THE WORLD WILL END! +#define DNBD3_BLOCK_SIZE ((uint64_t)4096) // NEVER CHANGE THIS OR THE WORLD WILL END! #define NUMBER_DEVICES 8 #define DEFAULT_READ_AHEAD_KB 512 diff --git a/src/kernel/net.c b/src/kernel/net.c index 36abf45..e088943 100644 --- a/src/kernel/net.c +++ b/src/kernel/net.c @@ -21,6 +21,7 @@ #include "net.h" #include "blk.h" #include "utils.h" + #include "serialize.h" #include <linux/time.h> diff --git a/src/server/altservers.c b/src/server/altservers.c new file mode 100644 index 0000000..6301c32 --- /dev/null +++ b/src/server/altservers.c @@ -0,0 +1,390 @@ +#include "altservers.h" +#include "uplink.h" +#include "locks.h" +#include "sockhelper.h" +#include "memlog.h" +#include "helper.h" +#include <stdlib.h> +#include <unistd.h> +#include <sys/epoll.h> +#include <sys/errno.h> +#include <math.h> +#include <assert.h> +#include <inttypes.h> +#include <time.h> +#include "../serialize.h" + +static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; +static pthread_spinlock_t pendingLock; +static int signalPipe = -1; + +static dnbd3_alt_server_t _alt_servers[SERVER_MAX_ALTS]; +static int _num_alts = 0; +static pthread_spinlock_t _alts_lock; + +static pthread_t altThread; + +static void *altserver_main(void *data); +static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const unsigned int rtt); + +void altserver_init() +{ + spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE ); + if ( 0 != pthread_create( &altThread, NULL, &altserver_main, (void *)NULL ) ) { + memlogf( "[ERROR] Could not start altservers connector thread" ); + exit( EXIT_FAILURE ); + } +} + +/** + * ONLY called from the passed uplink's main thread + */ +void altserver_find_uplink(dnbd3_connection_t *uplink) +{ + if ( uplink->rttTestResult == RTT_INPROGRESS ) return; + spin_lock( &pendingLock ); + for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( pending[i] != NULL ) continue; + pending[i] = uplink; + uplink->rttTestResult = RTT_INPROGRESS; + spin_unlock( &pendingLock ); + write( signalPipe, "", 1 ); + return; + } + // End of loop - no free slot + spin_unlock( &pendingLock ); + memlogf( "[WARNING] No more free RTT measurement slots, ignoring a request..." ); +} + +/** + * Get <size> known (working) alt servers, ordered by network closeness + * (by finding the smallest possible subnet) + */ +int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) +{ + if ( host == NULL || host->type == 0 || _num_alts == 0 || output == NULL || size <= 0 ) return 0; + int i, j; + int count = 0; + int distance[size]; + spin_lock( &_alts_lock ); + for (i = 0; i < _num_alts; ++i) { + if ( host->type != _alt_servers[i].host.type ) continue; // Wrong address family + // TODO: Prefer same AF here, but if in the end we got less servers than requested, add + // servers of other AF too (after this loop) + if ( count == 0 ) { + // Trivial - this is the first entry + output[0].host = _alt_servers[i].host; + output[0].failures = 0; + distance[0] = altservers_net_closeness( host, &output[0].host ); + count++; + } else { + // Other entries already exist, insert in proper position + const int dist = altservers_net_closeness( host, &_alt_servers[i].host ); + for (j = 0; j < size; ++j) { + if ( j < count && dist <= distance[j] ) continue; + if ( j > count ) break; // Should never happen but just in case... + if ( j < count ) { + // Check if we're in the middle and need to move other entries... + if ( j + 1 < size ) { + memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) ); + memmove( &distance[j + 1], &distance[j], sizeof(int) * (size - j - 1) ); + } + } else { + count++; + } + output[j].host = _alt_servers[i].host; + output[j].failures = 0; + distance[j] = dist; + break; + } + } + } + // "if count < size then add servers of other address families" + spin_unlock( &_alts_lock ); + return count; +} + +/** + * Get <size> alt servers. If there are more alt servers than + * requested, random servers will be picked + */ +int altservers_get(dnbd3_host_t *output, int size) +{ + int count = 0, i, j, num; + spin_lock( &_alts_lock ); + if ( size <= _num_alts ) { + for (i = 0; i < size; ++i) { + if ( _alt_servers[i].host.type == 0 ) continue; + output[count++] = _alt_servers[i].host; + } + } else { + int which[_num_alts]; // Generate random order over _num_alts + for (i = 0; i < _num_alts; ++i) { + again: ; + num = rand() % _num_alts; + for (j = 0; j < i; ++j) { + if ( which[j] == num ) goto again; + } + which[i] = num; + } // Now pick <size> working alt servers in that generated order + for (i = 0; i < size; ++i) { + if ( _alt_servers[which[i]].host.type == 0 ) continue; + output[count++] = _alt_servers[which[i]].host; + if ( count >= size ) break; + } + } + spin_unlock( &_alts_lock ); + return count; +} + +/** + * Update rtt history of given server - returns the new average for that server + */ +static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const unsigned int rtt) +{ + unsigned int avg = rtt; + int i; + spin_lock( &_alts_lock ); + for (i = 0; i < _num_alts; ++i) { + if ( !is_same_server( host, &_alt_servers[i].host ) ) continue; + _alt_servers[i].rtt[++_alt_servers[i].rttIndex % SERVER_RTT_PROBES] = rtt; +#if SERVER_RTT_PROBES == 5 + avg = (_alt_servers[i].rtt[0] + _alt_servers[i].rtt[1] + _alt_servers[i].rtt[2] + _alt_servers[i].rtt[3] + _alt_servers[i].rtt[4]) + / SERVER_RTT_PROBES; +#else +#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES + avg = 0; + for (int j = 0; j < SERVER_RTT_PROBES; ++j) { + avg += _alt_servers[i].rtt[j]; + } + avg /= SERVER_RTT_PROBES; +#endif + break; + } + spin_unlock( &_alts_lock ); + return avg; +} + +/** + * Determine how close two addresses are to each other by comparing the number of + * matching bits from the left of the address. Does not count individual bits but + * groups of 4 for speed. + */ +int altservers_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2) +{ + if ( host1 == NULL || host2 == NULL || host1->type != host2->type ) return -1; + int retval = 0; + const int max = host1->type == AF_INET ? 4 : 16; + for (int i = 0; i < max; ++i) { + if ( (host1->addr[i] & 0xf0) != (host2->addr[i] & 0xf0) ) return retval; + ++retval; + if ( (host1->addr[i] & 0x0f) != (host2->addr[i] & 0x0f) ) return retval; + ++retval; + } + return retval; +} + +static void *altserver_main(void *data) +{ + const int MAXEVENTS = 3; + const int ALTS = 4; + struct epoll_event ev, events[MAXEVENTS]; + int readPipe = -1, fdEpoll = -1; + int numSocks, ret, itLink, itAlt, numAlts; + int found, len; + char buffer[DNBD3_BLOCK_SIZE ]; + dnbd3_host_t servers[ALTS + 1]; + dnbd3_request_t request; + dnbd3_reply_t reply; + serialized_buffer_t serialized; + struct iovec iov[2]; + struct timespec start, end; + + // Make valgrind happy + memset( &reply, 0, sizeof(reply) ); + memset( &request, 0, sizeof(request) ); + request.magic = dnbd3_packet_magic; + // Init spinlock + spin_init( &pendingLock, PTHREAD_PROCESS_PRIVATE ); + // Init waiting links queue + for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) + pending[i] = NULL; + // Init signal-pipe + fdEpoll = epoll_create( 2 ); + if ( fdEpoll == -1 ) { + memlogf( "[WARNING] epoll_create failed. Uplink unavailable." ); + goto cleanup; + } + { + int pipes[2]; + if ( pipe( pipes ) < 0 ) { + memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); + goto cleanup; + } + sock_set_nonblock( pipes[0] ); + sock_set_nonblock( pipes[1] ); + readPipe = pipes[0]; + signalPipe = pipes[1]; + memset( &ev, 0, sizeof(ev) ); + ev.events = EPOLLIN; + ev.data.fd = readPipe; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, readPipe, &ev ) < 0 ) { + memlogf( "[WARNING] adding read-signal-pipe to epoll set failed" ); + goto cleanup; + } + } + // LOOP + while ( !_shutdown ) { + // Wait 5 seconds max. + numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, 5000 ); + if ( numSocks < 0 ) { + memlogf( "[WARNING] epoll_wait() error in uplink_connector" ); + usleep( 100000 ); + } + // Empty pipe + do { + ret = read( readPipe, buffer, sizeof buffer ); + } while ( ret > 0 ); // Throw data away, this is just used for waking this thread up + if ( ret == 0 ) { + memlogf( "[WARNING] Signal pipe of uplink_connector for %s closed! Things will break!" ); + } + ret = errno; + if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { + memlogf( "[WARNING] Errno %d on pipe-read on uplink_connector for %s! Things will break!", ret ); + } + // Work your way through the queue + spin_lock( &pendingLock ); + for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { + if ( pending[itLink] == NULL ) continue; + spin_unlock( &pendingLock ); + dnbd3_connection_t * const uplink = pending[itLink]; + assert( uplink->rttTestResult == RTT_INPROGRESS ); + // Now get 4 alt servers + numAlts = altservers_get( servers, ALTS ); + if ( uplink->fd != -1 ) { + // Add current server if not already in list + found = FALSE; + for (itAlt = 0; itAlt < numAlts; ++itAlt) { + if ( !is_same_server( &uplink->currentServer, &servers[itAlt] ) ) continue; + found = TRUE; + break; + } + if ( !found ) servers[numAlts++] = uplink->currentServer; + } + // Test them all + int bestSock = -1; + int bestIndex = -1; + unsigned int bestRtt = 0xfffffff; + unsigned int currentRtt = 0xfffffff; + for (itAlt = 0; itAlt < numAlts; ++itAlt) { + usleep( 1000 ); + // Connect + clock_gettime( CLOCK_MONOTONIC_RAW, &start ); + int sock = sock_connect( &servers[itAlt], 750, 1250 ); + if ( sock < 0 ) continue; + // Select image ++++++++++++++++++++++++++++++ + serializer_reset_write( &serialized ); + serializer_put_uint16( &serialized, PROTOCOL_VERSION ); + serializer_put_string( &serialized, uplink->image->lower_name ); + serializer_put_uint16( &serialized, uplink->image->rid ); + serializer_put_uint8( &serialized, 1 ); // isServer = TRUE + len = serializer_get_written_length( &serialized ); + request.cmd = CMD_SELECT_IMAGE; + request.size = len; + fixup_request( request ); + iov[0].iov_base = &request; + iov[0].iov_len = sizeof(request); + iov[1].iov_base = &serialized; + iov[1].iov_len = len; + if ( writev( sock, iov, 2 ) != len + sizeof(request) ) goto server_failed; + // See if selecting the image succeeded ++++++++++++++++++++++++++++++ + if ( recv( sock, &reply, sizeof(reply), MSG_WAITALL ) != sizeof(reply) ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header after CMD_SELECT_IMAGE (%s)", + uplink->image->lower_name ); + } + // check reply header + fixup_reply( reply ); + if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD || reply.magic != dnbd3_packet_magic ) goto server_failed; + // Not found + // receive reply payload + if ( recv( sock, &serialized, reply.size, MSG_WAITALL ) != reply.size ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Cold not read CMD_SELECT_IMAGE payload (%s)", uplink->image->lower_name ); + } + // handle/check reply payload + serializer_reset_read( &serialized, reply.size ); + const uint16_t protocol_version = serializer_get_uint16( &serialized ); + if ( protocol_version < MIN_SUPPORTED_SERVER ) goto server_failed; + const char *name = serializer_get_string( &serialized ); + if ( strcmp( name, uplink->image->lower_name ) != 0 ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Server offers image '%s', requested '%s'", name, uplink->image->lower_name ); + } + const uint16_t rid = serializer_get_uint16( &serialized ); + if ( rid != uplink->image->rid ) ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)", + (int)rid, (int)uplink->image->rid, uplink->image->lower_name ); + const uint64_t image_size = serializer_get_uint64( &serialized ); + if ( image_size != uplink->image->filesize ) ERROR_GOTO_VA( server_failed, + "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)", + image_size, uplink->image->filesize, uplink->image->lower_name ); + // Request random block ++++++++++++++++++++++++++++++ + request.cmd = CMD_GET_BLOCK; + request.offset = (uplink->image->filesize - 1) & ~(DNBD3_BLOCK_SIZE - 1); + request.size = DNBD3_BLOCK_SIZE; + fixup_request( request ); + if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) ERROR_GOTO_VA( server_failed, + "[ERROR] Could not request random block for %s", uplink->image->lower_name ); + // See if requesting the block succeeded ++++++++++++++++++++++ + if ( recv( sock, &reply, sizeof(reply), MSG_WAITALL ) != sizeof(reply) ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header after CMD_GET_BLOCK (%s)", + uplink->image->lower_name ); + } + // check reply header + fixup_reply( reply ); + if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed, + "[ERROR] Reply to random block request is %d bytes for %s", reply.size, uplink->image->lower_name ); + if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, 0 ) != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed, + "[ERROR] Could not read random block from socket for %s", uplink->image->lower_name ); + clock_gettime( CLOCK_MONOTONIC_RAW, &end ); + // Measurement done - everything fine so far + const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs + const unsigned int avg = altservers_update_rtt( &servers[itAlt], rtt ); + if ( is_same_server( &servers[itAlt], &uplink->currentServer ) ) { + currentRtt = avg; + close( sock ); + } else if ( avg < bestRtt ) { + if ( bestSock != -1 ) close( bestSock ); + bestSock = sock; + bestRtt = avg; + bestIndex = itAlt; + } else { + close( sock ); + } + continue; + // Jump here if anything went wrong + server_failed: ; + close( sock ); + } + // Done testing all servers. See if we should switch + if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { + // yep + uplink->betterFd = bestSock; + uplink->betterServer = servers[bestIndex]; + uplink->rttTestResult = RTT_DOCHANGE; + } else { + // nope + if ( bestSock != -1 ) close( bestSock ); + uplink->rttTestResult = RTT_DONTCHANGE; + } + // end of loop over all pending uplinks + pending[itLink] = NULL; + spin_lock( &pendingLock ); + } + spin_unlock( &pendingLock ); + } + cleanup: ; + spin_destroy( &pendingLock ); + if ( fdEpoll != -1 ) close( fdEpoll ); + if ( readPipe != -1 ) close( readPipe ); + if ( signalPipe != -1 ) close( signalPipe ); + signalPipe = -1; + return NULL ; +} diff --git a/src/server/altservers.h b/src/server/altservers.h new file mode 100644 index 0000000..6aec195 --- /dev/null +++ b/src/server/altservers.h @@ -0,0 +1,14 @@ +#ifndef _ALTSERVERS_H_ +#define _ALTSERVERS_H_ + +#include "globals.h" + +void altserver_init(); + +void altserver_find_uplink(dnbd3_connection_t *uplink); + +int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); + +int altservers_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2); + +#endif /* UPLINK_CONNECTOR_H_ */ diff --git a/src/server/globals.h b/src/server/globals.h index 7f47288..d3424b6 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -71,7 +71,8 @@ typedef struct char comment[COMMENT_LENGTH]; time_t last_told; dnbd3_host_t host; - int rtt[]; + int rtt[SERVER_RTT_PROBES]; + int rttIndex; } dnbd3_alt_server_t; typedef struct diff --git a/src/server/helper.h b/src/server/helper.h index daa6695..e068abd 100644 --- a/src/server/helper.h +++ b/src/server/helper.h @@ -8,6 +8,9 @@ #include <unistd.h> #include "../types.h" +#define ERROR_GOTO(jumplabel, errormsg) do { memlogf(errormsg); goto jumplabel; } while (0); +#define ERROR_GOTO_VA(jumplabel, errormsg, ...) do { memlogf(errormsg, __VA_ARGS__); goto jumplabel; } while (0); + char parse_address(char *string, dnbd3_host_t *host); char host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen); char is_valid_namespace(char *namespace); @@ -17,11 +20,9 @@ void remove_trailing_slash(char *string); int file_exists(char *file); int file_writable(char *file); -static inline int is_same_server(const dnbd3_host_t *const a, const dnbd3_host_t *const b) +static inline int is_same_server(const dnbd3_host_t * const a, const dnbd3_host_t * const b) { - return (a->type == b->type) - && (a->port == b->port) - && (0 == memcmp(a->addr, b->addr, (a->type == AF_INET ? 4 : 16))); + return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) )); } /** @@ -29,25 +30,24 @@ static inline int is_same_server(const dnbd3_host_t *const a, const dnbd3_host_t */ static inline int send_data(int client_sock, void *data_in, int len) { - if (len <= 0) // Nothing to send - return 1; + if ( len <= 0 ) // Nothing to send + return 1; char *data = data_in; // Needed for pointer arithmetic int ret, i; for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout) - { - ret = send(client_sock, data, len, 0); - if (ret == 0) // Connection closed + { + ret = send( client_sock, data, len, 0 ); + if ( ret == 0 ) // Connection closed + return 0; + if ( ret < 0 ) { + if ( errno != EAGAIN ) // Some unexpected error return 0; - if (ret < 0) - { - if (errno != EAGAIN) // Some unexpected error - return 0; - usleep(1000); // 1ms + usleep( 1000 ); // 1ms continue; } len -= ret; - if (len <= 0) // Sent everything - return 1; + if ( len <= 0 ) // Sent everything + return 1; data += ret; // move target buffer pointer } return 0; @@ -58,25 +58,24 @@ static inline int send_data(int client_sock, void *data_in, int len) */ static inline int recv_data(int client_sock, void *buffer_out, int len) { - if (len <= 0) // Nothing to receive - return 1; + if ( len <= 0 ) // Nothing to receive + return 1; char *data = buffer_out; // Needed for pointer arithmetic int ret, i; for (i = 0; i < 3; ++i) // Retry at most 3 times, each try takes at most 0.5 seconds (socket timeout) - { - ret = recv(client_sock, data, len, MSG_WAITALL); - if (ret == 0) // Connection closed + { + ret = recv( client_sock, data, len, MSG_WAITALL ); + if ( ret == 0 ) // Connection closed + return 0; + if ( ret < 0 ) { + if ( errno != EAGAIN ) // Some unexpected error return 0; - if (ret < 0) - { - if (errno != EAGAIN) // Some unexpected error - return 0; - usleep(1000); // 1ms + usleep( 1000 ); // 1ms continue; } len -= ret; - if (len <= 0) // Received everything - return 1; + if ( len <= 0 ) // Received everything + return 1; data += ret; // move target buffer pointer } return 0; @@ -84,12 +83,12 @@ static inline int recv_data(int client_sock, void *buffer_out, int len) static inline int strend(char *string, char *suffix) { - if (string == NULL) return FALSE; - if (suffix == NULL || *suffix == '\0') return TRUE; - const size_t len1 = strlen(string); - const size_t len2 = strlen(suffix); - if (len2 > len1) return FALSE; - return strcmp(string + len1 - len2, suffix) == 0; + if ( string == NULL ) return FALSE; + if ( suffix == NULL || *suffix == '\0' ) return TRUE; + const size_t len1 = strlen( string ); + const size_t len2 = strlen( suffix ); + if ( len2 > len1 ) return FALSE; + return strcmp( string + len1 - len2, suffix ) == 0; } #endif diff --git a/src/server/locks.c b/src/server/locks.c index d9eed35..4293c2f 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -221,7 +221,7 @@ static void *debug_thread_watchdog(void *something) } pthread_spin_unlock( &initdestory ); } - sleep( 10 ); + sleep( 5 ); } return NULL ; } diff --git a/src/server/net.c b/src/server/net.c index 1383454..a7e110b 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -34,6 +34,7 @@ #include "server.h" #include "image.h" #include "uplink.h" +#include "altservers.h" #include "memlog.h" #include "../serialize.h" #include "../config.h" @@ -292,7 +293,7 @@ void *net_client_handler(void *dnbd3_client) case CMD_GET_SERVERS: client->is_server = FALSE; // Only clients request list of servers // Build list of known working alt servers - num = uplink_get_matching_alt_servers( &client->host, server_list, NUMBER_SERVERS ); + num = altservers_get_matching( &client->host, server_list, NUMBER_SERVERS ); reply.cmd = CMD_GET_SERVERS; reply.size = num * sizeof(dnbd3_server_entry_t); send_reply( client->sock, &reply, server_list ); diff --git a/src/server/server.c b/src/server/server.c index cc7a76a..dc374a0 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -39,6 +39,7 @@ #include "image.h" #include "uplink.h" #include "net.h" +#include "altservers.h" #include "memlog.h" #define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on @@ -189,9 +190,9 @@ int main(int argc, char *argv[]) case 'crc4': return image_generate_crc_file( optarg ) ? 0 : EXIT_FAILURE; case 'asrt': - printf("Testing a failing assertion:\n"); + printf( "Testing a failing assertion:\n" ); assert( 4 == 5 ); - printf("Assertion 4 == 5 seems to hold. ;-)\n"); + printf( "Assertion 4 == 5 seems to hold. ;-)\n" ); return EXIT_SUCCESS; } opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); @@ -204,7 +205,7 @@ int main(int argc, char *argv[]) spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE ); spin_init( &_images_lock, PTHREAD_PROCESS_PRIVATE ); - spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE ); + altserver_init(); #ifdef _DEBUG debug_locks_start_watchdog(); diff --git a/src/server/uplink.c b/src/server/uplink.c index ab23b70..9896e49 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -3,6 +3,7 @@ #include "memlog.h" #include "sockhelper.h" #include "image.h" +#include "altservers.h" #include <pthread.h> #include <sys/socket.h> #include <string.h> @@ -13,78 +14,10 @@ #include <stdlib.h> #include <stdio.h> -dnbd3_alt_server_t *_alt_servers[SERVER_MAX_ALTS]; -int _num_alts = 0; -pthread_spinlock_t _alts_lock; - static void* uplink_mainloop(void *data); static void uplink_send_requests(dnbd3_connection_t *link, int newOnly); static void uplink_handle_receive(dnbd3_connection_t *link); -/** - * Get <size> known (working) alt servers, ordered by network closeness - * (by finding the smallest possible subnet) - */ -int uplink_get_matching_alt_servers(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) -{ - if ( host == NULL || host->type == 0 || _num_alts == 0 ) return 0; - int i, j; - int count = 0; - int distance[size]; - spin_lock( &_alts_lock ); - for (i = 0; i < _num_alts; ++i) { - if ( host->type != _alt_servers[i]->host.type ) continue; // Wrong address family - if ( count == 0 ) { - // Trivial - this is the first entry - memcpy( &output[0].host, &_alt_servers[i]->host, sizeof(dnbd3_host_t) ); - output[0].failures = 0; - distance[0] = uplink_net_closeness( host, &output[0].host ); - count++; - } else { - // Other entries already exist, insert in proper position - const int dist = uplink_net_closeness( host, &_alt_servers[i]->host ); - for (j = 0; j < size; ++j) { - if ( j < count && dist <= distance[j] ) continue; - if ( j > count ) break; // Should never happen but just in case... - if ( j < count ) { - // Check if we're in the middle and need to move other entries... - if ( j + 1 < size ) { - memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) ); - memmove( &distance[j + 1], &distance[j], sizeof(int) * (size - j - 1) ); - } - } else { - count++; - } - memcpy( &output[j].host, &_alt_servers[i]->host, sizeof(dnbd3_host_t) ); - output[j].failures = 0; - distance[j] = dist; - break; - } - } - } - spin_unlock( &_alts_lock ); - return count; -} - -/** - * Determine how close two addresses are to each other by comparing the number of - * matching bits from the left of the address. Does not count individual bits but - * groups of 4 for speed. - */ -int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2) -{ - if ( host1 == NULL || host2 == NULL || host1->type != host2->type ) return -1; - int retval = 0; - const int max = host1->type == AF_INET ? 4 : 16; - for (int i = 0; i < max; ++i) { - if ( (host1->addr[i] & 0xf0) != (host2->addr[i] & 0xf0) ) return retval; - ++retval; - if ( (host1->addr[i] & 0x0f) != (host2->addr[i] & 0x0f) ) return retval; - ++retval; - } - return retval; -} - // ############ uplink connection handling /** @@ -232,7 +165,7 @@ static void* uplink_mainloop(void *data) if ( waitTime < 1500 ) waitTime = 1500; } numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime ); - if ( _shutdown || link->shutdown ) break; + if ( _shutdown || link->shutdown ) goto cleanup; if ( numSocks < 0 ) { // Error? memlogf( "[DEBUG] epoll_wait() error %d", (int)errno); usleep( 10000 ); @@ -257,14 +190,24 @@ static void* uplink_mainloop(void *data) } // No error, handle normally if ( events[i].data.fd == fdPipe ) { - while ( read( fdPipe, buffer, sizeof buffer ) > 0 ) { - } // Throw data away, this is just used for waking this thread up + int ret; + do { + ret = read( fdPipe, buffer, sizeof buffer ); + } while ( ret > 0 ); // Throw data away, this is just used for waking this thread up + if ( ret == 0 ) { + memlogf( "[WARNING] Signal pipe of uplink for %s closed! Things will break!", link->image->lower_name ); + } + ret = errno; + if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { + memlogf( "[WARNING] Errno %d on pipe-read on uplink for %s! Things will break!", ret, link->image->lower_name ); + } if ( link->fd != -1 ) { uplink_send_requests( link, TRUE ); } } else if ( events[i].data.fd == link->fd ) { uplink_handle_receive( link ); if ( link->fd == -1 ) nextAltCheck = 0; + if ( _shutdown || link->shutdown ) goto cleanup; } else { printf( "[DEBUG] Sanity check: unknown FD ready on epoll! Closing...\n" ); close( events[i].data.fd ); @@ -294,6 +237,17 @@ static void* uplink_mainloop(void *data) // The rtt worker already did the handshake for our image, so there's nothing // more to do here } + // See if we should trigger a RTT measurement + if ( link->rttTestResult == RTT_IDLE ) { + const time_t now = time( NULL ); + if ( nextAltCheck - now > SERVER_RTT_DELAY_MAX ) { + nextAltCheck = now + SERVER_RTT_DELAY_MAX; + } else if ( now >= nextAltCheck ) { + altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); + nextAltCheck = now + altCheckInterval; + altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) + } + } } cleanup: ; const int fd = link->fd; @@ -304,6 +258,10 @@ static void* uplink_mainloop(void *data) if ( signal != -1 ) close( signal ); if ( fdPipe != -1 ) close( fdPipe ); if ( fdEpoll != -1 ) close( fdEpoll ); + // Wait for the RTT check to finish/fail if it's in progress + while ( link->rttTestResult == RTT_INPROGRESS ) + usleep( 10000 ); + if ( link->betterFd != -1 ) close( link->betterFd ); return NULL ; } diff --git a/src/server/uplink.h b/src/server/uplink.h index f6917be..cb1d4e7 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -4,13 +4,6 @@ #include "../types.h" #include "globals.h" -extern dnbd3_alt_server_t *_alt_servers[SERVER_MAX_ALTS]; -extern int _num_alts; -extern pthread_spinlock_t _alts_lock; - -int uplink_get_matching_alt_servers(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); - -int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2); int uplink_init(dnbd3_image_t *image); |