From cfb0fba59db0937a00ff04b03aaa28ca671fe4d7 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 26 Aug 2013 18:59:22 +0200 Subject: [SERVER] On-the-fly transparent proxying --- src/server/altservers.c | 69 ++++++++++++++++++++++---------------------- src/server/altservers.h | 10 ++++--- src/server/image.c | 60 ++++++++++++++++++++++++++------------ src/server/image.h | 2 ++ src/server/net.c | 5 ++-- src/server/server.c | 11 ++++--- src/server/uplink.c | 76 +++++++++++++++++++++++++++---------------------- src/server/uplink.h | 2 +- 8 files changed, 136 insertions(+), 99 deletions(-) (limited to 'src') diff --git a/src/server/altservers.c b/src/server/altservers.c index 9d5f77c..84eb0db 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -27,14 +27,19 @@ static int initDone = FALSE; static pthread_t altThread; -static void *altserver_main(void *data); -static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const unsigned int rtt); +static void *altservers_main(void *data); +static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt); -void altserver_init() +int altservers_getCount() +{ + return _num_alts; +} + +void altservers_init() { spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE ); memset( _alt_servers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); - if ( 0 != pthread_create( &altThread, NULL, &altserver_main, (void *)NULL ) ) { + if ( 0 != pthread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { memlogf( "[ERROR] Could not start altservers connector thread" ); exit( EXIT_FAILURE ); } @@ -105,7 +110,7 @@ int altservers_add(dnbd3_host_t *host, const char *comment) /** * ONLY called from the passed uplink's main thread */ -void altserver_find_uplink(dnbd3_connection_t *uplink) +void altservers_findUplink(dnbd3_connection_t *uplink) { int i; assert( uplink->betterFd == -1 ); @@ -133,7 +138,7 @@ void altserver_find_uplink(dnbd3_connection_t *uplink) /** * The given uplink is about to disappear, so remove it from any queues */ -void altservers_remove_uplink(dnbd3_connection_t *uplink) +void altservers_removeUplink(dnbd3_connection_t *uplink) { spin_lock( &pendingLock ); for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { @@ -146,7 +151,7 @@ void altservers_remove_uplink(dnbd3_connection_t *uplink) * Get known (working) alt servers, ordered by network closeness * (by finding the smallest possible subnet) */ -int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) +int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) { if ( host == NULL || host->type == 0 || _num_alts == 0 || output == NULL || size <= 0 ) return 0; int i, j; @@ -161,11 +166,11 @@ int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, in // Trivial - this is the first entry output[0].host = _alt_servers[i].host; output[0].failures = 0; - distance[0] = altservers_net_closeness( host, &output[0].host ); + distance[0] = altservers_netCloseness( host, &output[0].host ); count++; } else { // Other entries already exist, insert in proper position - const int dist = altservers_net_closeness( host, &_alt_servers[i].host ); + const int dist = altservers_netCloseness( host, &_alt_servers[i].host ); for (j = 0; j < size; ++j) { if ( j < count && dist <= distance[j] ) continue; if ( j > count ) break; // Should never happen but just in case... @@ -185,7 +190,7 @@ int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, in } } } - // "if count < size then add servers of other address families" + // TODO: "if count < size then add servers of other address families" spin_unlock( &_alts_lock ); return count; } @@ -196,28 +201,22 @@ int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, in */ int altservers_get(dnbd3_host_t *output, int size) { - int count = 0, i, j, num; + if ( size <= 0 ) return 0; + int count = 0, i; spin_lock( &_alts_lock ); - if ( size >= _num_alts ) { - for (i = 0; i < _num_alts; ++i) { - if ( _alt_servers[i].host.type == 0 ) continue; - output[count++] = _alt_servers[i].host; - } - } else { - int which[_num_alts]; // Generate random order over _num_alts - for (i = 0; i < _num_alts; ++i) { - again: ; - num = rand() % _num_alts; - for (j = 0; j < i; ++j) { - if ( which[j] == num ) goto again; - } - which[i] = num; - } // Now pick working alt servers in that generated order - for (i = 0; i < size; ++i) { - if ( _alt_servers[which[i]].host.type == 0 ) continue; - output[count++] = _alt_servers[which[i]].host; - if ( count >= size ) break; - } + // Flip first server in list with a random one every time this is called + if ( _num_alts > 1 ) { + const dnbd3_alt_server_t tmp = _alt_servers[0]; + do { + i = rand() % _num_alts; + } while ( i == 0 ); + _alt_servers[0] = _alt_servers[i]; + _alt_servers[i] = tmp; + } + for (i = 0; i < _num_alts; ++i) { + if ( _alt_servers[i].host.type == 0 ) continue; + output[count++] = _alt_servers[i].host; + if ( count >= size ) break; } spin_unlock( &_alts_lock ); return count; @@ -226,7 +225,7 @@ int altservers_get(dnbd3_host_t *output, int size) /** * Update rtt history of given server - returns the new average for that server */ -static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const unsigned int rtt) +static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt) { unsigned int avg = rtt; int i; @@ -256,7 +255,7 @@ static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const * matching bits from the left of the address. Does not count individual bits but * groups of 4 for speed. */ -int altservers_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2) +int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2) { if ( host1 == NULL || host2 == NULL || host1->type != host2->type ) return -1; int retval = 0; @@ -270,7 +269,7 @@ int altservers_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2) return retval; } -static void *altserver_main(void *data) +static void *altservers_main(void *data) { const int MAXEVENTS = 3; const int ALTS = 4; @@ -411,7 +410,7 @@ static void *altserver_main(void *data) clock_gettime( CLOCK_MONOTONIC_RAW, &end ); // Measurement done - everything fine so far const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs - const unsigned int avg = altservers_update_rtt( &servers[itAlt], rtt ); + const unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); if ( uplink->fd != -1 && is_same_server( &servers[itAlt], &uplink->currentServer ) ) { currentRtt = avg; close( sock ); diff --git a/src/server/altservers.h b/src/server/altservers.h index e080d13..bb09b73 100644 --- a/src/server/altservers.h +++ b/src/server/altservers.h @@ -3,16 +3,18 @@ #include "globals.h" -void altserver_init(); +void altservers_init(); int altservers_load(); int altservers_add(dnbd3_host_t *host, const char *comment); -void altserver_find_uplink(dnbd3_connection_t *uplink); +void altservers_findUplink(dnbd3_connection_t *uplink); -int altservers_get_matching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); +int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); -int altservers_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2); +int altservers_get(dnbd3_host_t *output, int size); + +int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2); #endif /* UPLINK_CONNECTOR_H_ */ diff --git a/src/server/image.c b/src/server/image.c index b81d15a..10c650c 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -6,6 +6,7 @@ #include "integrity.h" #include "protocol.h" #include "sockhelper.h" +#include "altservers.h" #include #include @@ -41,9 +42,9 @@ static int remoteCloneCacheIndex = -1; static int image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); static int image_load_all_internal(char *base, char *path); -static int image_try_load(char *base, char *path); +static int image_try_load(char *base, char *path, int withUplink); static int64_t image_pad(const char *path, const int64_t currentSize); -static int image_clone(int sock, dnbd3_host_t *server, char *name, uint16_t revision, uint64_t imageSize); +static int image_clone(int sock, char *name, uint16_t revision, uint64_t imageSize); // ########################################## @@ -442,7 +443,7 @@ static int image_load_all_internal(char *base, char *path) if ( S_ISDIR( st.st_mode )) { image_load_all_internal( base, subpath ); // Recurse } else { - image_try_load( base, subpath ); // Load image if possible + image_try_load( base, subpath, TRUE ); // Load image if possible } } closedir( dir ); @@ -450,7 +451,7 @@ static int image_load_all_internal(char *base, char *path) #undef SUBDIR_LEN } -static int image_try_load(char *base, char *path) +static int image_try_load(char *base, char *path, int withUplink) { int i, revision; struct stat st; @@ -647,7 +648,9 @@ static int image_try_load(char *base, char *path) image = image_free( image ); goto load_error; } - uplink_init( image ); + if ( withUplink ) { + uplink_init( image, -1, NULL ); + } } // Prevent freeing in cleanup cache_map = NULL; @@ -775,11 +778,9 @@ dnbd3_image_t* image_getOrClone(char *name, uint16_t revision) if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN; pthread_mutex_lock( &remoteCloneLock ); for (i = 0; i < CACHELEN; ++i) { - if ( remoteCloneCache[i].rid != revision || strcmp( cmpname, remoteCloneCache[i].name ) != 0 ) continue; - if ( remoteCloneCache[i].deadline < now ) { - remoteCloneCache[i].name[0] = '\0'; - continue; - } + if ( remoteCloneCache[i].rid != revision + || remoteCloneCache[i].deadline < now + || strcmp( cmpname, remoteCloneCache[i].name ) != 0 ) continue; pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked... return NULL ; } @@ -796,9 +797,13 @@ dnbd3_image_t* image_getOrClone(char *name, uint16_t revision) remoteCloneCache[remoteCloneCacheIndex].deadline = now + SERVER_REMOTE_IMAGE_CHECK_CACHETIME; snprintf( remoteCloneCache[remoteCloneCacheIndex].name, NAMELEN, "%s", cmpname ); remoteCloneCache[remoteCloneCacheIndex].rid = revision; - for (;;) { - dnbd3_host_t server; // TODO: Get server :-) - int sock = sock_connect( &server, 500, 1500 ); + // Get some alt servers and try to get the image from there + dnbd3_host_t servers[4]; + int uplinkSock = -1; + dnbd3_host_t *uplinkServer = NULL; + const int count = altservers_get( servers, 4 ); + for (i = 0; i < count; ++i) { + int sock = sock_connect( &servers[i], 500, 1500 ); if ( sock < 0 ) continue; if ( !dnbd3_select_image( sock, name, revision, FLAGS8_SERVER ) ) goto server_fail; uint16_t remoteVersion, remoteRid; @@ -806,18 +811,37 @@ dnbd3_image_t* image_getOrClone(char *name, uint16_t revision) char *remoteName; if ( !dnbd3_select_image_reply( &serialized, sock, &remoteVersion, &remoteName, &remoteRid, &remoteImageSize ) ) goto server_fail; if ( remoteVersion < MIN_SUPPORTED_SERVER ) goto server_fail; - if ( revision != 0 && remoteVersion != revision ) goto server_fail; + if ( revision != 0 && remoteRid != revision ) goto server_fail; if ( remoteImageSize < DNBD3_BLOCK_SIZE || remoteName == NULL || strcmp( name, remoteName ) != 0 ) goto server_fail; - image_clone( sock, &server, name, remoteRid, remoteImageSize ); + if ( !image_clone( sock, name, remoteRid, remoteImageSize ) ) goto server_fail; + // Cloning worked :-) + uplinkSock = sock; + uplinkServer = &servers[i]; break; server_fail: ; close( sock ); } pthread_mutex_unlock( &remoteCloneLock ); - return image_get( name, revision ); + // If everything worked out, this call should now actually return the image + image = image_get( name, revision ); + if ( image != NULL && uplinkSock != -1 && uplinkServer != NULL ) { + // If so, init the uplink and pass it the socket + uplink_init( image, uplinkSock, uplinkServer ); + image->working = TRUE; + } else if ( uplinkSock >= 0 ) { + close( uplinkSock ); + } + return image; } -static int image_clone(int sock, dnbd3_host_t *server, char *name, uint16_t revision, uint64_t imageSize) +/** + * Prepare a cloned image: + * 1. Allocate empty image file and its cache map + * 2. Use passed socket to request the crc32 list and save it to disk + * 3. Load the image from disk + * Returns: TRUE on success, FALSE otherwise + */ +static int image_clone(int sock, char *name, uint16_t revision, uint64_t imageSize) { // Allocate disk space and create cache map if ( !image_create( name, revision, imageSize ) ) return FALSE; @@ -842,7 +866,7 @@ static int image_clone(int sock, dnbd3_host_t *server, char *name, uint16_t revi } // HACK: Chop of ".crc" to get the image file name crcFile[strlen( crcFile ) - 4] = '\0'; - return image_try_load( _basePath, crcFile ); + return image_try_load( _basePath, crcFile, FALSE ); } /** diff --git a/src/server/image.h b/src/server/image.h index c59ae9e..9ba4436 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -18,6 +18,8 @@ int image_saveCacheMap(dnbd3_image_t *image); dnbd3_image_t* image_get(char *name, uint16_t revision); +dnbd3_image_t* image_getOrClone(char *name, uint16_t revision); + dnbd3_image_t* image_lock(dnbd3_image_t *image); void image_release(dnbd3_image_t *image); diff --git a/src/server/net.c b/src/server/net.c index c068338..4e25564 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -156,7 +156,7 @@ void *net_client_handler(void *dnbd3_client) printf( "[DEBUG] Incomplete handshake received\n" ); } } else { - image = image_get( image_name, rid ); + client->image = image = image_getOrClone( image_name, rid ); if ( image == NULL ) { //printf( "[DEBUG] Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); } else if ( !image->working ) { @@ -172,7 +172,6 @@ void *net_client_handler(void *dnbd3_client) reply.cmd = CMD_SELECT_IMAGE; reply.size = serializer_get_written_length( &payload ); if ( send_reply( client->sock, &reply, &payload ) ) { - client->image = image; if ( !client->is_server ) image->atime = time( NULL ); bOk = TRUE; } @@ -310,7 +309,7 @@ void *net_client_handler(void *dnbd3_client) case CMD_GET_SERVERS: client->is_server = FALSE; // Only clients request list of servers // Build list of known working alt servers - num = altservers_get_matching( &client->host, server_list, NUMBER_SERVERS ); + num = altservers_getMatching( &client->host, server_list, NUMBER_SERVERS ); reply.cmd = CMD_GET_SERVERS; reply.size = num * sizeof(dnbd3_server_entry_t); send_reply( client->sock, &reply, server_list ); diff --git a/src/server/server.c b/src/server/server.c index 6b1721b..8252054 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -203,7 +203,7 @@ int main(int argc, char *argv[]) _configDir = strdup( optarg ); break; case 'd': -#ifdef _DEBUG + #ifdef _DEBUG _fake_delay = atoi( optarg ); break; #else @@ -226,7 +226,7 @@ int main(int argc, char *argv[]) //dnbd3_rpc_send(RPC_IMG_LIST); return EXIT_SUCCESS; case 'h': - case '?': + case '?': dnbd3_print_help( argv[0] ); break; case 'v': @@ -273,7 +273,7 @@ int main(int argc, char *argv[]) initmemlog(); spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE ); spin_init( &_images_lock, PTHREAD_PROCESS_PRIVATE ); - altserver_init(); + altservers_init(); integrity_init(); memlogf( "DNBD3 server starting.... Machine type: " ENDIAN_MODE ); @@ -413,7 +413,8 @@ void dnbd3_remove_client(dnbd3_client_t *client) /** * Free the client struct recursively. * !! Make sure to call this function after removing the client from _dnbd3_clients !! - * Locks on: _clients[].lock, might call function that locks on _images and _image[] + * Locks on: _clients[].lock, _images[].lock + * might call functions that lock on _images, _image[], uplink.queueLock, client.sendMutex */ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) { @@ -421,7 +422,9 @@ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) if ( client->sock >= 0 ) close( client->sock ); client->sock = -1; if ( client->image != NULL ) { + spin_lock( &client->image->lock ); if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); + spin_unlock( &client->image->lock ); image_release( client->image ); } client->image = NULL; diff --git a/src/server/uplink.c b/src/server/uplink.c index 38b1415..dc2874e 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -30,7 +30,7 @@ static int uplink_send_keepalive(const int fd); * image. Uplinks run in their own thread. * Locks on: _images[].lock */ -int uplink_init(dnbd3_image_t *image) +int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) { dnbd3_connection_t *link = NULL; assert( image != NULL ); @@ -45,8 +45,14 @@ int uplink_init(dnbd3_image_t *image) link->queueLen = 0; link->fd = -1; link->signal = -1; - link->betterFd = -1; - link->rttTestResult = RTT_IDLE; + if ( sock >= 0 ) { + link->betterFd = sock; + link->betterServer = *host; + link->rttTestResult = RTT_DOCHANGE; + } else { + link->betterFd = -1; + link->rttTestResult = RTT_IDLE; + } link->recvBufferLen = 0; link->shutdown = FALSE; spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); @@ -83,6 +89,7 @@ void uplink_shutdown(dnbd3_image_t *image) /** * Remove given client from uplink request queue + * Locks on: uplink.queueLock, client.sendMutex */ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) { @@ -201,6 +208,35 @@ static void* uplink_mainloop(void *data) } } while ( !_shutdown && !link->shutdown ) { + // Check if server switch is in order + if ( link->rttTestResult == RTT_DOCHANGE ) { + link->rttTestResult = RTT_IDLE; + // The rttTest worker thread has finished our request. + // And says it's better to switch to another server + const int fd = link->fd; + link->fd = link->betterFd; + if ( fd != -1 ) close( fd ); + // Re-send all pending requests + uplink_send_requests( link, FALSE ); + link->betterFd = -1; + link->currentServer = link->betterServer; + link->image->working = TRUE; + buffer[0] = '@'; + if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { + printf( "[DEBUG] Now connected to %s\n", buffer + 1 ); + setThreadName( buffer ); + } + memset( &ev, 0, sizeof(ev) ); + ev.events = EPOLLIN; + ev.data.fd = link->fd; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { + memlogf( "[WARNING] adding uplink to epoll set failed" ); + goto cleanup; + } + nextAltCheck = time( NULL ) + altCheckInterval; + // The rtt worker already did the handshake for our image, so there's nothing + // more to do here + } // epoll() if ( link->fd == -1 ) { waitTime = 2000; @@ -216,7 +252,8 @@ static void* uplink_mainloop(void *data) usleep( 10000 ); continue; } - for (i = 0; i < numSocks; ++i) { // Check all events + // Check all events + for (i = 0; i < numSocks; ++i) { if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { if ( events[i].data.fd == link->signal ) { memlogf( "[WARNING] epoll error on signal-pipe!" ); @@ -259,35 +296,6 @@ static void* uplink_mainloop(void *data) } } // Done handling epoll sockets - // Check if server switch is in order - if ( link->rttTestResult == RTT_DOCHANGE ) { - link->rttTestResult = RTT_IDLE; - // The rttTest worker thread has finished our request. - // And says it's better to switch to another server - const int fd = link->fd; - link->fd = link->betterFd; - if ( fd != -1 ) close( fd ); - // Re-send all pending requests - uplink_send_requests( link, FALSE ); - link->betterFd = -1; - link->currentServer = link->betterServer; - link->image->working = TRUE; - buffer[0] = '@'; - if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { - printf( "[DEBUG] Now connected to %s\n", buffer + 1 ); - setThreadName( buffer ); - } - memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN; - ev.data.fd = link->fd; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { - memlogf( "[WARNING] adding uplink to epoll set failed" ); - goto cleanup; - } - nextAltCheck = time( NULL ) + altCheckInterval; - // The rtt worker already did the handshake for our image, so there's nothing - // more to do here - } // See if we should trigger a RTT measurement if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { const time_t now = time( NULL ); @@ -315,7 +323,7 @@ static void* uplink_mainloop(void *data) } } else { // Not complete - do measurement - altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) + altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) // Also send a keepalive packet to the currently connected server if ( link->fd != -1 ) { if ( !uplink_send_keepalive( link->fd ) ) { diff --git a/src/server/uplink.h b/src/server/uplink.h index b2d24a6..e9a4760 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -4,7 +4,7 @@ #include "../types.h" #include "globals.h" -int uplink_init(dnbd3_image_t *image); +int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host); void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client); -- cgit v1.2.3-55-g7522