From 53dd7bc20968ddfaa601c517e6feb745bb3a21ab Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Sun, 10 Nov 2013 18:19:11 +0100 Subject: [SERVER] Split "pending" lock for alt-server finding into producer and consumer lock to fix a potential NPA when an uplink dies Also some refactoring of variable names and more comments --- src/server/altservers.c | 180 ++++++++++++++++++++++++++++-------------------- 1 file changed, 105 insertions(+), 75 deletions(-) (limited to 'src') diff --git a/src/server/altservers.c b/src/server/altservers.c index 2cb5061..2eca369 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -17,12 +17,13 @@ #include "protocol.h" static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; -static pthread_spinlock_t pendingLock; +static pthread_spinlock_t pendingLockProduce; // Lock for adding something to pending. (NULL -> nonNULL) +static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removegin something (nunNULL -> NULL) static int signalPipe = -1; -static dnbd3_alt_server_t _alt_servers[SERVER_MAX_ALTS]; -static int _num_alts = 0; -static pthread_spinlock_t _alts_lock; +static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; +static int numAltServers = 0; +static pthread_spinlock_t altServersLock; static int initDone = FALSE; static pthread_t altThread; @@ -32,13 +33,13 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const int altservers_getCount() { - return _num_alts; + return numAltServers; } void altservers_init() { - spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE ); - memset( _alt_servers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); + spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE ); + memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); if ( 0 != pthread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { memlogf( "[ERROR] Could not start altservers connector thread" ); exit( EXIT_FAILURE ); @@ -49,7 +50,6 @@ void altservers_init() void altservers_shutdown() { if ( !initDone ) return; - spin_destroy( &_alts_lock ); pthread_join( altThread, NULL ); } @@ -90,27 +90,27 @@ int altservers_load() int altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate) { int i, freeSlot = -1; - spin_lock( &_alts_lock ); - for (i = 0; i < _num_alts; ++i) { - if ( isSameAddressPort( &_alt_servers[i].host, host ) ) { - spin_unlock( &_alts_lock ); + spin_lock( &altServersLock ); + for (i = 0; i < numAltServers; ++i) { + if ( isSameAddressPort( &altServers[i].host, host ) ) { + spin_unlock( &altServersLock ); return FALSE; - } else if ( freeSlot == -1 && _alt_servers[i].host.type == 0 ) { + } else if ( freeSlot == -1 && altServers[i].host.type == 0 ) { freeSlot = i; } } if ( freeSlot == -1 ) { - if ( _num_alts >= SERVER_MAX_ALTS ) { + if ( numAltServers >= SERVER_MAX_ALTS ) { memlogf( "[WARNING] Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS ); - spin_unlock( &_alts_lock ); + spin_unlock( &altServersLock ); return FALSE; } - freeSlot = _num_alts++; + freeSlot = numAltServers++; } - _alt_servers[freeSlot].host = *host; - _alt_servers[freeSlot].isPrivate = isPrivate; - if ( comment != NULL ) snprintf( _alt_servers[freeSlot].comment, COMMENT_LENGTH, "%s", comment ); - spin_unlock( &_alts_lock ); + altServers[freeSlot].host = *host; + altServers[freeSlot].isPrivate = isPrivate; + if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment ); + spin_unlock( &altServersLock ); return TRUE; } @@ -120,25 +120,33 @@ int altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate) void altservers_findUplink(dnbd3_connection_t *uplink) { int i; + // if betterFd != -1 it means the uplink is supposed to switch to another + // server. As this function here is called by the uplink thread, it can + // never be that the uplink is supposed to switch, but instead calls + // this function. assert( uplink->betterFd == -1 ); - spin_lock( &pendingLock ); + spin_lock( &pendingLockProduce ); + // it is however possible that an RTT measurement is currently in progress, + // so check for that case and do nothing if one is in progress if ( uplink->rttTestResult == RTT_INPROGRESS ) { for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] != uplink ) continue; - spin_unlock( &pendingLock ); + // Yep, measuring right now + spin_unlock( &pendingLockProduce ); return; } } + // Find free slot for measurement for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] != NULL ) continue; pending[i] = uplink; uplink->rttTestResult = RTT_INPROGRESS; - spin_unlock( &pendingLock ); - write( signalPipe, "", 1 ); + spin_unlock( &pendingLockProduce ); + write( signalPipe, "", 1 ); // Wake altservers thread up return; } // End of loop - no free slot - spin_unlock( &pendingLock ); + spin_unlock( &pendingLockProduce ); memlogf( "[WARNING] No more free RTT measurement slots, ignoring a request..." ); } @@ -147,39 +155,40 @@ void altservers_findUplink(dnbd3_connection_t *uplink) */ void altservers_removeUplink(dnbd3_connection_t *uplink) { - spin_lock( &pendingLock ); + pthread_mutex_lock( &pendingLockConsume ); for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] == uplink ) pending[i] = NULL; } - spin_unlock( &pendingLock ); + pthread_mutex_unlock( &pendingLockConsume ); } /** * Get known (working) alt servers, ordered by network closeness * (by finding the smallest possible subnet) - * Private servers are excluded + * Private servers are excluded, so this is what you want to call to + * get a list of servers you can tell a client about */ int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) { - if ( host == NULL || host->type == 0 || _num_alts == 0 || output == NULL || size <= 0 ) return 0; + if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0; int i, j; int count = 0; int distance[size]; - spin_lock( &_alts_lock ); - for (i = 0; i < _num_alts; ++i) { - if ( host->type != _alt_servers[i].host.type ) continue; // Wrong address family - if ( _alt_servers[i].isPrivate ) continue; // Do not tell clients about private servers + spin_lock( &altServersLock ); + for (i = 0; i < numAltServers; ++i) { + if ( host->type != altServers[i].host.type ) continue; // Wrong address family + if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers // TODO: Prefer same AF here, but if in the end we got less servers than requested, add // servers of other AF too (after this loop) if ( count == 0 ) { // Trivial - this is the first entry - output[0].host = _alt_servers[i].host; + output[0].host = altServers[i].host; output[0].failures = 0; distance[0] = altservers_netCloseness( host, &output[0].host ); count++; } else { // Other entries already exist, insert in proper position - const int dist = altservers_netCloseness( host, &_alt_servers[i].host ); + const int dist = altservers_netCloseness( host, &altServers[i].host ); for (j = 0; j < size; ++j) { if ( j < count && dist <= distance[j] ) continue; if ( j > count ) break; // Should never happen but just in case... @@ -191,7 +200,7 @@ int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int if ( count < size ) { count++; } - output[j].host = _alt_servers[i].host; + output[j].host = altServers[i].host; output[j].failures = 0; distance[j] = dist; break; @@ -199,38 +208,42 @@ int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int } } // TODO: "if count < size then add servers of other address families" - spin_unlock( &_alts_lock ); + spin_unlock( &altServersLock ); return count; } /** * Get alt servers. If there are more alt servers than - * requested, random servers will be picked + * requested, random servers will be picked. + * This function is suited for finding uplink servers as + * it includes private servers and ignores any "client only" servers */ int altservers_get(dnbd3_host_t *output, int size) { if ( size <= 0 ) return 0; int count = 0, i; const time_t now = time( NULL ); - spin_lock( &_alts_lock ); + spin_lock( &altServersLock ); // Flip first server in list with a random one every time this is called - if ( _num_alts > 1 ) { - const dnbd3_alt_server_t tmp = _alt_servers[0]; + if ( numAltServers > 1 ) { + const dnbd3_alt_server_t tmp = altServers[0]; do { - i = rand() % _num_alts; + i = rand() % numAltServers; } while ( i == 0 ); - _alt_servers[0] = _alt_servers[i]; - _alt_servers[i] = tmp; + altServers[0] = altServers[i]; + altServers[i] = tmp; } - for (i = 0; i < _num_alts; ++i) { - if ( _alt_servers[i].host.type == 0 ) continue; - if ( _proxyPrivateOnly && !_alt_servers[i].isPrivate ) continue; - if ( _alt_servers[i].numFails > SERVER_MAX_UPLINK_FAILS && now - _alt_servers[i].lastFail > SERVER_BAD_UPLINK_IGNORE ) continue; - _alt_servers[i].numFails = 0; - output[count++] = _alt_servers[i].host; + for (i = 0; i < numAltServers; ++i) { + if ( altServers[i].host.type == 0 ) continue; // Slot is empty + if ( _proxyPrivateOnly && !altServers[i].isPrivate ) continue; // Config says to consider private alt-servers only? ignore! + if ( altServers[i].numFails > SERVER_MAX_UPLINK_FAILS // server failed X times in a row + && now - altServers[i].lastFail > SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore! + // server seems ok, include in output and reset its fail counter + altServers[i].numFails = 0; + output[count++] = altServers[i].host; if ( count >= size ) break; } - spin_unlock( &_alts_lock ); + spin_unlock( &altServersLock ); return count; } @@ -241,24 +254,24 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const { unsigned int avg = rtt; int i; - spin_lock( &_alts_lock ); - for (i = 0; i < _num_alts; ++i) { - if ( !isSameAddressPort( host, &_alt_servers[i].host ) ) continue; - _alt_servers[i].rtt[++_alt_servers[i].rttIndex % SERVER_RTT_PROBES] = rtt; + spin_lock( &altServersLock ); + for (i = 0; i < numAltServers; ++i) { + if ( !isSameAddressPort( host, &altServers[i].host ) ) continue; + altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt; #if SERVER_RTT_PROBES == 5 - avg = (_alt_servers[i].rtt[0] + _alt_servers[i].rtt[1] + _alt_servers[i].rtt[2] + _alt_servers[i].rtt[3] + _alt_servers[i].rtt[4]) + avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2] + altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES; #else #warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES avg = 0; for (int j = 0; j < SERVER_RTT_PROBES; ++j) { - avg += _alt_servers[i].rtt[j]; + avg += altServers[i].rtt[j]; } avg /= SERVER_RTT_PROBES; #endif break; } - spin_unlock( &_alts_lock ); + spin_unlock( &altServersLock ); return avg; } @@ -291,18 +304,29 @@ void altservers_serverFailed(const dnbd3_host_t * const host) { int i; const time_t now = time( NULL ); - spin_lock( &_alts_lock ); - for (i = 0; i < _num_alts; ++i) { - if ( !isSameAddressPort( host, &_alt_servers[i].host ) ) continue; - if ( now - _alt_servers[i].lastFail > SERVER_RTT_DELAY_INIT ) { - _alt_servers[i].numFails++; - _alt_servers[i].lastFail = now; + spin_lock( &altServersLock ); + for (i = 0; i < numAltServers; ++i) { + if ( !isSameAddressPort( host, &altServers[i].host ) ) continue; + // Do only increase counter if last fail was not too recent. This is + // to prevent the counter from increasing rapidly if many images use the + // same uplink. If there's a network hickup, all uplinks will call this + // function and would increase the counter too quickly, disabling the server. + if ( now - altServers[i].lastFail > SERVER_RTT_DELAY_INIT ) { + altServers[i].numFails++; + altServers[i].lastFail = now; } break; } - spin_unlock( &_alts_lock ); + spin_unlock( &altServersLock ); } - +/** + * Mainloop of this module. It will wait for requests by uplinks to find a + * suitable uplink server for them. If found, it will tell the uplink about + * the best server found. Currently the RTT history is kept per server and + * not per uplink, so if many images use the same uplink server, the history + * will update quite quickly. Needs to be improved some time, ie. by only + * updating the rtt if the last update was at least X seconds ago. + */ static void *altservers_main(void *data) { const int MAXEVENTS = 3; @@ -320,7 +344,7 @@ static void *altservers_main(void *data) setThreadName( "altserver-check" ); blockNoncriticalSignals(); // Init spinlock - spin_init( &pendingLock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &pendingLockProduce, PTHREAD_PROCESS_PRIVATE ); // Init waiting links queue for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) pending[i] = NULL; @@ -370,10 +394,13 @@ static void *altservers_main(void *data) } } // Work your way through the queue - spin_lock( &pendingLock ); for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { - if ( pending[itLink] == NULL ) continue; - spin_unlock( &pendingLock ); + if ( pending[itLink] == NULL ) continue; // Check once before locking, as a mutex is expensive + pthread_mutex_lock( &pendingLockConsume ); + if ( pending[itLink] == NULL ) { // Check again after locking + continue; + pthread_mutex_unlock( &pendingLockConsume ); + } dnbd3_connection_t * const uplink = pending[itLink]; assert( uplink->rttTestResult == RTT_INPROGRESS ); // Now get 4 alt servers @@ -394,7 +421,7 @@ static void *altservers_main(void *data) unsigned int bestRtt = 0xfffffff; unsigned int currentRtt = 0xfffffff; for (itAlt = 0; itAlt < numAlts; ++itAlt) { - usleep( 1000 ); + usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...) // Connect clock_gettime( CLOCK_MONOTONIC_RAW, &start ); int sock = sock_connect( &servers[itAlt], 750, 1250 ); @@ -425,7 +452,7 @@ static void *altservers_main(void *data) // Request random block ++++++++++++++++++++++++++++++ fixup_request( request ); if ( !dnbd3_get_block( sock, - (((uint64_t)start.tv_nsec | (uint64_t)rand()) * DNBD3_BLOCK_SIZE )% uplink->image->filesize, + (((uint64_t)start.tv_nsec ^ (uint64_t)rand()) * DNBD3_BLOCK_SIZE )% uplink->image->filesize, DNBD3_BLOCK_SIZE) ) { ERROR_GOTO_VA( server_failed, "[ERROR] Could not request random block for %s", uplink->image->lower_name ); } @@ -449,18 +476,23 @@ static void *altservers_main(void *data) const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs const unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); if ( uplink->fd != -1 && isSameAddressPort( &servers[itAlt], &uplink->currentServer ) ) { + // Was measuring current server currentRtt = avg; close( sock ); } else if ( avg < bestRtt ) { + // Was another server, update "best" if ( bestSock != -1 ) close( bestSock ); bestSock = sock; bestRtt = avg; bestIndex = itAlt; } else { + // Was too slow, ignore close( sock ); } + // We're done, call continue continue; // Jump here if anything went wrong + // This will cleanup and continue server_failed: ; altservers_serverFailed( &servers[itAlt] ); server_image_not_available: ; @@ -480,12 +512,10 @@ static void *altservers_main(void *data) } // end of loop over all pending uplinks pending[itLink] = NULL; - spin_lock( &pendingLock ); + pthread_mutex_unlock( &pendingLockConsume ); } - spin_unlock( &pendingLock ); } cleanup: ; - spin_destroy( &pendingLock ); if ( fdEpoll != -1 ) close( fdEpoll ); if ( readPipe != -1 ) close( readPipe ); if ( signalPipe != -1 ) close( signalPipe ); -- cgit v1.2.3-55-g7522