diff options
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/altservers.c | 738 | ||||
-rw-r--r-- | src/server/altservers.h | 16 | ||||
-rw-r--r-- | src/server/globals.h | 41 | ||||
-rw-r--r-- | src/server/image.c | 6 | ||||
-rw-r--r-- | src/server/net.c | 16 | ||||
-rw-r--r-- | src/server/server.c | 8 | ||||
-rw-r--r-- | src/server/uplink.c | 117 | ||||
-rw-r--r-- | src/server/uplink.h | 2 |
8 files changed, 463 insertions, 481 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c index fbe10a8..493ed9e 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -1,5 +1,6 @@ #include "altservers.h" #include "locks.h" +#include "threadpool.h" #include "helper.h" #include "image.h" #include "fileutil.h" @@ -14,46 +15,22 @@ #define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0); #define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) -static dnbd3_uplink_t * _Atomic pending[SERVER_MAX_PENDING_ALT_CHECKS]; -static dnbd3_signal_t * _Atomic runSignal = NULL; - static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; static atomic_int numAltServers = 0; static pthread_mutex_t altServersLock; +static ticks nextCloseUnusedFd; // TODO: Move away -static pthread_t altThread; - -static void *altservers_main(void *data); -static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt); +static void *altservers_runCheck(void *data); +static int altservers_getListForUplink(dnbd3_uplink_t *uplink, int *servers, int size, int current); +static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink); +static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt); +static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server); void altservers_init() { srand( (unsigned int)time( NULL ) ); - // Init spinlock + // Init lock mutex_init( &altServersLock, LOCK_ALT_SERVER_LIST ); - // Init signal - runSignal = signal_new(); - if ( runSignal == NULL ) { - logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." ); - exit( EXIT_FAILURE ); - } - memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); - if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { - logadd( LOG_ERROR, "Could not start altservers connector thread" ); - exit( EXIT_FAILURE ); - } - // Init waiting links queue -- this is currently a global static array so - // it will already be zero, but in case we refactor later do it explicitly - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - pending[i] = NULL; - } -} - -void altservers_shutdown() -{ - if ( runSignal == NULL ) return; - signal_call( runSignal ); // Wake altservers thread up - thread_join( altThread, NULL ); } static void addalt(int argc, char **argv, void *data) @@ -121,7 +98,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate /** * ONLY called from the passed uplink's main thread */ -void altservers_findUplink(dnbd3_uplink_t *uplink) +void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) { if ( uplink->shutdown ) return; @@ -135,67 +112,11 @@ void altservers_findUplink(dnbd3_uplink_t *uplink) assert( uplink->better.fd == -1 ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress - // XXX As this function is only ever called by the image's uplink thread, - // it cannot happen that the uplink ends up in this list concurrently mutex_lock( &uplink->rttLock ); - if ( uplink->rttTestResult == RTT_INPROGRESS ) { - for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] != uplink ) continue; - // Yep, measuring right now - return; - } - } - // Find free slot for measurement - uplink->rttTestResult = RTT_INPROGRESS; - for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] != NULL ) continue; - dnbd3_uplink_t *null = NULL; - if ( atomic_compare_exchange_strong( &pending[i], &null, uplink ) ) { - mutex_unlock( &uplink->rttLock ); - atomic_thread_fence( memory_order_release ); - signal_call( runSignal ); // Wake altservers thread up - return; - } + if ( uplink->rttTestResult != RTT_INPROGRESS ) { + threadpool_run( &altservers_runCheck, uplink ); } - // End of loop - no free slot - uplink->rttTestResult = RTT_NOT_REACHABLE; mutex_unlock( &uplink->rttLock ); - logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." ); -} - -/** - * The given uplink is about to disappear, - * wait until any pending RTT check is done. - */ -void altservers_removeUplink(dnbd3_uplink_t *uplink) -{ - assert( uplink != NULL ); - assert( uplink->shutdown ); - int i; - for ( i = 1 ;; ++i ) { - atomic_thread_fence( memory_order_acquire ); - if ( runSignal == NULL ) { - // Thread is already done, remove manually - uplink->rttTestResult = RTT_NOT_REACHABLE; - break; - } - // Thread still running, wait until test is done - bool found = false; - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] == uplink ) { - found = true; - break; - } - } - if ( !found ) // No more test running - break; - usleep( 10000 ); // 10ms - signal_call( runSignal ); // Wake altservers thread up - if ( i % 500 == 0 ) { - logadd( LOG_INFO, "Still waiting for altserver check for uplink %p...", (void*)uplink ); - } - } - logadd( LOG_DEBUG1, "Waited for %d iterations for altservers check when tearing down uplink", i ); } /** @@ -209,90 +130,124 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0; int i, j; int count = 0; - int scores[size]; - int score; - mutex_lock( &altServersLock ); + uint16_t scores[SERVER_MAX_ALTS] = { 0 }; if ( size > numAltServers ) size = numAltServers; - for (i = 0; i < numAltServers; ++i) { - if ( altServers[i].host.type == 0 ) continue; // Slot is empty - if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers + mutex_lock( &altServersLock ); + for ( i = 0; i < numAltServers; ++i ) { + if ( altServers[i].host.type == 0 || altServers[i].isPrivate ) + continue; // Slot is empty or uplink is for replication only if ( host->type == altServers[i].host.type ) { - score = altservers_netCloseness( host, &altServers[i].host ) - altServers[i].numFails; + scores[i] = 10 + altservers_netCloseness( host, &altServers[i].host ); } else { - score = -( altServers[i].numFails + 128 ); // Wrong address family + scores[i] = 1; // Wrong address family } - if ( count == 0 ) { - // Trivial - this is the first entry - output[0].host = altServers[i].host; - output[0].failures = 0; - scores[0] = score; - count++; - } else { - // Other entries already exist, insert in proper position - for (j = 0; j < size; ++j) { - if ( j < count && score <= scores[j] ) continue; - if ( j > count ) break; // Should never happen but just in case... - if ( j < count && j + 1 < size ) { - // Check if we're in the middle and need to move other entries... - memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) ); - memmove( &scores[j + 1], &scores[j], sizeof(int) * (size - j - 1) ); - } - if ( count < size ) { - count++; - } - output[j].host = altServers[i].host; - output[j].failures = 0; - scores[j] = score; - break; + } + while ( count < size ) { + i = -1; + for ( j = 0; j < numAltServers; ++j ) { + if ( scores[j] == 0 ) + continue; + if ( i == -1 || scores[j] > scores[i] ) { + i = j; } } + if ( i == -1 ) + break; + output[count].host = altServers[i].host; + output[count].failures = 0; + count++; } mutex_unlock( &altServersLock ); return count; } +bool altservers_toString(int server, char *buffer, size_t len) +{ + return host_to_string( &altServers[server].host, buffer, len ); +} + +static bool isUsableForUplink( dnbd3_uplink_t *uplink, int server, ticks *now ) +{ + dnbd3_alt_local_t *local = ( uplink == NULL ? NULL : &uplink->altData[server] ); + dnbd3_alt_server_t *global = &altServers[server]; + if ( global->isClientOnly || ( !global->isPrivate && _proxyPrivateOnly ) ) + return false; + // Blocked locally (image not found on server...) + if ( local != NULL && local->blocked ) { + if ( --local->fails > 0 ) + return false; + local->blocked = false; + } + if ( global->blocked ) { + if ( timing_diff( &global->lastFail, now ) < SERVER_GLOBAL_DUP_TIME ) + return false; + global->lastFail = *now; + if ( --global->fails > 0 ) + return false; + global->blocked = false; + } + // Not blocked, depend on both fail counters + int fails = ( local == NULL ? 0 : local->fails ) + global->fails; + return fails < SERVER_BAD_UPLINK_MIN || ( rand() % fails ) < SERVER_BAD_UPLINK_MIN; +} + +int altservers_getHostListForReplication(dnbd3_host_t *servers, int size) +{ + int idx[size]; + int num = altservers_getListForUplink( NULL, idx, size, -1 ); + for ( int i = 0; i < num; ++i ) { + servers[i] = altServers[i].host; + } + return num; +} + /** * Get <size> alt servers. If there are more alt servers than * requested, random servers will be picked. * This function is suited for finding uplink servers as * it includes private servers and ignores any "client only" servers + * @param current index of server for current connection, or -1 in panic mode */ -int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency) +static int altservers_getListForUplink(dnbd3_uplink_t *uplink, int *servers, int size, int current) { - if ( size <= 0 ) return 0; - int count = 0, i; - ticks now; - timing_get( &now ); + if ( size <= 0 ) + return 0; + int count = 0; + declare_now; mutex_lock( &altServersLock ); - // Flip first server in list with a random one every time this is called - if ( numAltServers > 1 ) { - const dnbd3_alt_server_t tmp = altServers[0]; - do { - i = rand() % numAltServers; - } while ( i == 0 ); - altServers[0] = altServers[i]; - altServers[i] = tmp; - } - // We iterate over the list twice. First run adds servers with 0 failures only, - // second one also considers those that failed (not too many times) - if ( size > numAltServers ) size = numAltServers; - for (i = 0; i < numAltServers * 2; ++i) { - dnbd3_alt_server_t *srv = &altServers[i % numAltServers]; - if ( srv->host.type == 0 ) continue; // Slot is empty - if ( _proxyPrivateOnly && !srv->isPrivate ) continue; // Config says to consider private alt-servers only? ignore! - if ( srv->isClientOnly ) continue; - bool first = ( i < numAltServers ); - if ( first ) { - if ( srv->numFails > 0 ) continue; - } else { - if ( srv->numFails == 0 ) continue; // Already added in first iteration - if ( !emergency && srv->numFails > SERVER_BAD_UPLINK_THRES // server failed X times in a row - && timing_diff( &srv->lastFail, &now ) < SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore! - if ( !emergency ) srv->numFails--; + // If we don't have enough servers to randomize, take a shortcut + if ( numAltServers <= size ) { + for ( int i = 0; i < numAltServers; ++i ) { + if ( current == -1 || i == current || isUsableForUplink( uplink, i, &now ) ) { + servers[count++] = i; + } + } + } else { + // Plenty of alt servers; randomize + uint8_t state[SERVER_MAX_ALTS] = { 0 }; + if ( current != -1 ) { // Make sure we also test the current server + servers[count++] = current; + state[current] = 2; + } + for ( int tr = size * 10; tr > 0 && count < size; --tr ) { + int idx = rand() % numAltServers; + if ( state[idx] != 0 ) + continue; + if ( isUsableForUplink( uplink, idx, &now ) ) { + servers[count++] = idx; + state[idx] = 2; // Used + } else { + state[idx] = 1; // Potential + } + } + // If panic mode, consider others too + for ( int tr = size * 10; current == -1 && tr > 0 && count < size; --tr ) { + int idx = rand() % numAltServers; + if ( state[idx] == 2 ) + continue; + servers[count++] = idx; + state[idx] = 2; // Used } - // server seems ok, include in output and decrease its fail counter - output[count++] = srv->host; - if ( count >= size ) break; } mutex_unlock( &altServersLock ); return count; @@ -320,7 +275,7 @@ json_t* altservers_toJson() "rtt", rtts, "isPrivate", (int)src[i].isPrivate, "isClientOnly", (int)src[i].isClientOnly, - "numFails", src[i].numFails + "numFails", src[i].fails ); json_array_append_new( list, server ); } @@ -329,32 +284,27 @@ json_t* altservers_toJson() /** * Update rtt history of given server - returns the new average for that server. - * XXX HOLD altServersLock WHEN CALLING THIS! */ -static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt) +static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt) { - unsigned int avg = rtt; - int i; - for (i = 0; i < numAltServers; ++i) { - if ( !isSameAddressPort( host, &altServers[i].host ) ) continue; - altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt; -#if SERVER_RTT_PROBES == 5 - avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2] - + altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES; -#else -#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES - avg = 0; - for (int j = 0; j < SERVER_RTT_PROBES; ++j) { - avg += altServers[i].rtt[j]; + uint32_t avg = 0, j; + dnbd3_alt_local_t *local = &uplink->altData[index]; + mutex_lock( &altServersLock ); + if ( likely( local->initDone ) ) { + local->rtt[++local->rttIndex % SERVER_RTT_PROBES] = rtt; + for ( j = 0; j < SERVER_RTT_PROBES; ++j ) { + avg += local->rtt[j]; } avg /= SERVER_RTT_PROBES; -#endif - // If we got a new rtt value, server must be working - if ( altServers[i].numFails > 0 ) { - altServers[i].numFails--; + } else { // First rtt measurement -- copy to every slot + for ( j = 0; j < SERVER_RTT_PROBES; ++j ) { + local->rtt[j] = rtt; } - break; + avg = rtt; + local->initDone = true; } + altServers[index].rtt[++altServers[index].rttIndex % SERVER_RTT_PROBES] = avg; + mutex_unlock( &altServersLock ); return avg; } @@ -383,40 +333,33 @@ int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2) * track of how often servers fail, and consider them disabled for some time if they * fail too many times. */ -void altservers_serverFailed(const dnbd3_host_t * const host) +void altservers_serverFailed(int server) { - int i; - int foundIndex = -1, lastOk = -1; - ticks now; - timing_get( &now ); + declare_now; mutex_lock( &altServersLock ); - for (i = 0; i < numAltServers; ++i) { - if ( foundIndex == -1 ) { - // Looking for the failed server in list - if ( isSameAddressPort( host, &altServers[i].host ) ) { - foundIndex = i; - } - } else if ( altServers[i].host.type != 0 && altServers[i].numFails == 0 ) { - lastOk = i; + if ( timing_diff( &altServers[server].lastFail, &now ) > SERVER_GLOBAL_DUP_TIME ) { + altServers[server].lastFail = now; + if ( altServers[server].fails++ >= SERVER_BAD_UPLINK_MAX ) { + altServers[server].blocked = true; } } - // Do only increase counter if last fail was not too recent. This is - // to prevent the counter from increasing rapidly if many images use the - // same uplink. If there's a network hickup, all uplinks will call this - // function and would increase the counter too quickly, disabling the server. - if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) { - altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE; - altServers[foundIndex].lastFail = now; - if ( lastOk != -1 ) { - // Make sure non-working servers are put at the end of the list, so they're less likely - // to get picked when testing servers for uplink connections. - const dnbd3_alt_server_t tmp = altServers[foundIndex]; - altServers[foundIndex] = altServers[lastOk]; - altServers[lastOk] = tmp; - } + mutex_unlock( &altServersLock ); +} + +/** + * Called from RTT checker if connecting to a server succeeded but + * subsequently selecting the given image failed. Handle this within + * the uplink and don't increase the global fail counter. + */ +static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server) +{ + mutex_lock( &altServersLock ); + if ( uplink->altData[server].fails++ >= SERVER_BAD_UPLINK_MAX ) { + uplink->altData[server].blocked = true; } mutex_unlock( &altServersLock ); } + /** * Mainloop of this module. It will wait for requests by uplinks to find a * suitable uplink server for them. If found, it will tell the uplink about @@ -425,206 +368,213 @@ void altservers_serverFailed(const dnbd3_host_t * const host) * will update quite quickly. Needs to be improved some time, ie. by only * updating the rtt if the last update was at least X seconds ago. */ -static void *altservers_main(void *data UNUSED) +static void *altservers_runCheck(void *data) +{ + dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data; + + assert( uplink != NULL ); + setThreadName( "altserver-check" ); + altservers_findUplinkInternal( uplink ); + // Save cache maps of all images if applicable + // TODO: Has nothing to do with alt servers really, maybe move somewhere else? + declare_now; + if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) { + timing_gets( &nextCloseUnusedFd, 900 ); + image_closeUnusedFd(); + } + return NULL; +} + +void altservers_findUplink(dnbd3_uplink_t *uplink) +{ + altservers_findUplinkInternal( uplink ); + while ( uplink->rttTestResult == RTT_INPROGRESS ) { + usleep( 5000 ); + } +} + +int altservers_hostToIndex(dnbd3_host_t *host) +{ + for ( int i = 0; i < numAltServers; ++i ) { + if ( isSameAddressPort( host, &altServers[i].host ) ) + return i; + } + return -1; +} + +const dnbd3_host_t* altservers_indexToHost(int server) +{ + return &altServers[server].host; +} + +// XXX Sync call above must block until async worker has finished XXX +static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) { const int ALTS = 4; - int ret, itLink, itAlt, numAlts; - bool found; - char buffer[DNBD3_BLOCK_SIZE ]; - dnbd3_reply_t reply; - dnbd3_host_t servers[ALTS + 1]; - serialized_buffer_t serialized; + int ret, itAlt, numAlts, current; + bool panic; + int servers[ALTS + 1]; struct timespec start, end; - ticks nextCloseUnusedFd; - setThreadName( "altserver-check" ); - blockNoncriticalSignals(); - timing_gets( &nextCloseUnusedFd, 900 ); - // LOOP - while ( !_shutdown ) { - // Wait 5 seconds max. - ret = signal_wait( runSignal, 5000 ); - if ( _shutdown ) goto cleanup; - if ( ret == SIGNAL_ERROR ) { - if ( errno == EAGAIN || errno == EINTR ) continue; - logadd( LOG_WARNING, "Error %d on signal_clear on alservers_main! Things will break!", errno ); - usleep( 100000 ); + if ( _shutdown ) + return; + mutex_lock( &uplink->rttLock ); + // Maybe we already have a result, or check is currently running + if ( uplink->better.fd != -1 || uplink->rttTestResult == RTT_INPROGRESS ) { + mutex_unlock( &uplink->rttLock ); + return; + } + assert( uplink->rttTestResult != RTT_DOCHANGE ); + uplink->rttTestResult = RTT_INPROGRESS; + panic = ( uplink->current.fd == -1 ); + current = uplink->current.index; // Current server index (or last one in panic mode) + mutex_unlock( &uplink->rttLock ); + // First, get 4 alt servers + numAlts = altservers_getListForUplink( uplink, servers, ALTS, panic ? -1 : current ); + // If we're already connected and only got one server anyways, there isn't much to do + if ( numAlts == 0 || ( numAlts == 1 && !panic ) ) { + uplink->rttTestResult = RTT_DONTCHANGE; + return; + } + dnbd3_image_t * const image = image_lock( uplink->image ); + if ( image == NULL ) { // Check again after locking + uplink->rttTestResult = RTT_NOT_REACHABLE; + logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" ); + return; + } + LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid ); + assert( uplink->rttTestResult == RTT_INPROGRESS ); + // Test them all + dnbd3_server_connection_t best = { .fd = -1 }; + unsigned long bestRtt = RTT_UNREACHABLE; + unsigned long currentRtt = RTT_UNREACHABLE; + for (itAlt = 0; itAlt < numAlts; ++itAlt) { + int server = servers[itAlt]; + // Connect + clock_gettime( BEST_CLOCK_SOURCE, &start ); + int sock = sock_connect( &altServers[server].host, 750, 1000 ); + if ( sock == -1 ) { // Connection failed means global error + altservers_serverFailed( server ); + continue; } - // Work your way through the queue - atomic_thread_fence( memory_order_acquire ); - for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { - dnbd3_uplink_t * const uplink = pending[itLink]; - if ( uplink == NULL ) - continue; - // First, get 4 alt servers - numAlts = altservers_getListForUplink( servers, ALTS, uplink->current.fd == -1 ); - // If we're already connected and only got one server anyways, there isn't much to do - if ( numAlts <= 1 && uplink->current.fd != -1 ) { - uplink->rttTestResult = RTT_DONTCHANGE; - continue; - } - dnbd3_image_t * const image = image_lock( uplink->image ); - if ( image == NULL ) { // Check again after locking - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_NOT_REACHABLE; - assert( pending[itLink] == uplink ); - pending[itLink] = NULL; - mutex_unlock( &uplink->rttLock ); - logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" ); - continue; - } - LOG( LOG_DEBUG2, "[%d] Running alt check", itLink ); - assert( uplink->rttTestResult == RTT_INPROGRESS ); - if ( uplink->current.fd != -1 ) { - // Add current server if not already in list - found = false; - for (itAlt = 0; itAlt < numAlts; ++itAlt) { - if ( !isSameAddressPort( &uplink->current.host, &servers[itAlt] ) ) continue; - found = true; - break; - } - if ( !found ) servers[numAlts++] = uplink->current.host; - } - // Test them all - int bestSock = -1; - int bestIndex = -1; - int bestProtocolVersion = -1; - unsigned long bestRtt = RTT_UNREACHABLE; - unsigned long currentRtt = RTT_UNREACHABLE; - for (itAlt = 0; itAlt < numAlts; ++itAlt) { - usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...) - // Connect - clock_gettime( BEST_CLOCK_SOURCE, &start ); - int sock = sock_connect( &servers[itAlt], 750, 1000 ); - if ( sock < 0 ) continue; - // Select image ++++++++++++++++++++++++++++++ - if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) { - goto server_failed; - } - // See if selecting the image succeeded ++++++++++++++++++++++++++++++ - uint16_t protocolVersion, rid; - uint64_t imageSize; - char *name; - if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { - goto server_image_not_available; - } - if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed; - if ( name == NULL || strcmp( name, image->name ) != 0 ) { - ERROR_GOTO( server_failed, "[RTT] Server offers image '%s'", name ); - } - if ( rid != image->rid ) { - ERROR_GOTO( server_failed, "[RTT] Server provides rid %d", (int)rid ); - } - if ( imageSize != image->virtualFilesize ) { - ERROR_GOTO( server_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); - } - // Request first block (NOT random!) ++++++++++++++++++++++++++++++ - if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { - LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", itLink ); - } - // See if requesting the block succeeded ++++++++++++++++++++++ - if ( !dnbd3_get_reply( sock, &reply ) ) { - LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", itLink ); - } - // check reply header - if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { - ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); - } - if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { - ERROR_GOTO( server_failed, "[RTT%d] Could not read first block payload", itLink ); - } - clock_gettime( BEST_CLOCK_SOURCE, &end ); - // Measurement done - everything fine so far - mutex_lock( &altServersLock ); - mutex_lock( &uplink->rttLock ); - const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->current.host ); - // Penaltize rtt if this was a cycle; this will treat this server with lower priority - // in the near future too, so we prevent alternating between two servers that are both - // part of a cycle and have the lowest latency. - const unsigned int rtt = (unsigned int)((end.tv_sec - start.tv_sec) * 1000000 - + (end.tv_nsec - start.tv_nsec) / 1000 - + ( (isCurrent && uplink->cycleDetected) ? 1000000 : 0 )); // µs - unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); - mutex_unlock( &altServersLock ); - // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time - if ( ( uplink->cycleDetected || uplink->current.fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000; - mutex_unlock( &uplink->rttLock ); - if ( uplink->current.fd != -1 && isCurrent ) { - // Was measuring current server - currentRtt = avg; - close( sock ); - } else if ( avg < bestRtt ) { - // Was another server, update "best" - if ( bestSock != -1 ) close( bestSock ); - bestSock = sock; - bestRtt = avg; - bestIndex = itAlt; - bestProtocolVersion = protocolVersion; - } else { - // Was too slow, ignore - close( sock ); - } - // We're done, call continue - continue; - // Jump here if anything went wrong - // This will cleanup and continue - server_failed: ; - altservers_serverFailed( &servers[itAlt] ); - server_image_not_available: ; - close( sock ); - } - // Done testing all servers. See if we should switch - if ( bestSock != -1 && (uplink->current.fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { - // yep - if ( currentRtt > 10000000 || uplink->current.fd == -1 ) { - LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); - } else { - LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); - } - sock_setTimeout( bestSock, _uplinkTimeout ); - mutex_lock( &uplink->rttLock ); - uplink->better.fd = bestSock; - uplink->better.host = servers[bestIndex]; - uplink->better.version = bestProtocolVersion; - uplink->rttTestResult = RTT_DOCHANGE; - mutex_unlock( &uplink->rttLock ); - signal_call( uplink->signal ); - } else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) { - // No server was reachable - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_NOT_REACHABLE; - mutex_unlock( &uplink->rttLock ); - } else { - // nope - if ( bestSock != -1 ) close( bestSock ); - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_DONTCHANGE; - uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away - mutex_unlock( &uplink->rttLock ); - if ( !image->working ) { - image->working = true; - LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink ); - } - } - image_release( image ); - // end of loop over all pending uplinks - assert( pending[itLink] == uplink ); - pending[itLink] = NULL; - atomic_thread_fence( memory_order_release ); + // Select image ++++++++++++++++++++++++++++++ + if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) { + goto image_failed; } - // Save cache maps of all images if applicable - declare_now; - // TODO: Has nothing to do with alt servers really, maybe move somewhere else? - if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) { - timing_gets( &nextCloseUnusedFd, 900 ); - image_closeUnusedFd(); + // See if selecting the image succeeded ++++++++++++++++++++++++++++++ + uint16_t protocolVersion, rid; + uint64_t imageSize; + char *name; + serialized_buffer_t serialized; + if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { + goto image_failed; } + if ( protocolVersion < MIN_SUPPORTED_SERVER ) { // Server version unsupported; global fail + goto server_failed; + } + if ( name == NULL || strcmp( name, image->name ) != 0 ) { + ERROR_GOTO( image_failed, "[RTT] Server offers image '%s' instead of '%s'", name, image->name ); + } + if ( rid != image->rid ) { + ERROR_GOTO( image_failed, "[RTT] Server provides rid %d instead of %d", (int)rid, (int)image->rid ); + } + if ( imageSize != image->virtualFilesize ) { + ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); + } + // Request first block (NOT random!) ++++++++++++++++++++++++++++++ + if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server ); + } + // See if requesting the block succeeded ++++++++++++++++++++++ + dnbd3_reply_t reply; + if ( !dnbd3_get_reply( sock, &reply ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server ); + } + // check reply header + if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { + // Sanity check failed; count this as global error (malicious/broken server) + ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); + } + // flush payload to include this into measurement + char buffer[DNBD3_BLOCK_SIZE]; + if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { + ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server ); + } + clock_gettime( BEST_CLOCK_SOURCE, &end ); + // Measurement done - everything fine so far + mutex_lock( &uplink->rttLock ); + const bool isCurrent = ( uplink->current.index == server ); + mutex_unlock( &uplink->rttLock ); + // Penaltize rtt if this was a cycle; this will treat this server with lower priority + // in the near future too, so we prevent alternating between two servers that are both + // part of a cycle and have the lowest latency. + uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000 + + (end.tv_nsec - start.tv_nsec) / 1000); // µs + uint32_t avg = altservers_updateRtt( uplink, server, rtt ); + // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time + if ( ( uplink->cycleDetected || panic ) && isCurrent ) { + avg = (avg * 2) + 50000; + } + if ( !panic && isCurrent ) { + // Was measuring current server + currentRtt = avg; + close( sock ); + } else if ( avg < bestRtt ) { + // Was another server, update "best" + if ( best.fd != -1 ) { + close( best.fd ); + } + best.fd = sock; + bestRtt = avg; + best.index = server; + best.version = protocolVersion; + } else { + // Was too slow, ignore + close( sock ); + } + // We're done, call continue + continue; + // Jump here if anything went wrong + // This will cleanup and continue +image_failed: + altservers_imageFailed( uplink, server ); + goto failed; +server_failed: + altservers_serverFailed( server ); +failed: + close( sock ); } - cleanup: ; - if ( runSignal != NULL ) { - signal_close( runSignal ); + // Done testing all servers. See if we should switch + if ( best.fd != -1 && (panic || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { + // yep + if ( currentRtt > 10000000 || panic ) { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); + } else { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); + } + sock_setTimeout( best.fd, _uplinkTimeout ); + mutex_lock( &uplink->rttLock ); + uplink->better = best; + uplink->rttTestResult = RTT_DOCHANGE; + mutex_unlock( &uplink->rttLock ); + signal_call( uplink->signal ); + } else if ( best.fd == -1 && currentRtt == RTT_UNREACHABLE ) { + // No server was reachable, including current + uplink->rttTestResult = RTT_NOT_REACHABLE; + } else { + // nope + if ( best.fd != -1 ) { + close( best.fd ); + } + if ( !image->working || uplink->cycleDetected ) { + image->working = true; + LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid ); + } + uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away + mutex_lock( &uplink->rttLock ); + uplink->rttTestResult = RTT_DONTCHANGE; + mutex_unlock( &uplink->rttLock ); } - runSignal = NULL; - return NULL ; + image_release( image ); } diff --git a/src/server/altservers.h b/src/server/altservers.h index e03b900..8e2b964 100644 --- a/src/server/altservers.h +++ b/src/server/altservers.h @@ -7,23 +7,27 @@ struct json_t; void altservers_init(); -void altservers_shutdown(); - int altservers_load(); bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly); -void altservers_findUplink(dnbd3_uplink_t *uplink); +void altservers_findUplinkAsync(dnbd3_uplink_t *uplink); -void altservers_removeUplink(dnbd3_uplink_t *uplink); +void altservers_findUplink(dnbd3_uplink_t *uplink); int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); -int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency); +int altservers_getHostListForReplication(dnbd3_host_t *servers, int size); + +bool altservers_toString(int server, char *buffer, size_t len); int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2); -void altservers_serverFailed(const dnbd3_host_t * const host); +void altservers_serverFailed(int server); + +int altservers_hostToIndex(dnbd3_host_t *host); + +const dnbd3_host_t* altservers_indexToHost(int server); struct json_t* altservers_toJson(); diff --git a/src/server/globals.h b/src/server/globals.h index 659e5a2..4d97c6b 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -30,10 +30,31 @@ typedef struct uint8_t hopCount; // How many hops this request has already taken across proxies } dnbd3_queued_request_t; +typedef struct +{ + int fails; // Hard fail: Connection failed + int rttIndex; + uint32_t rtt[SERVER_RTT_PROBES]; + bool isPrivate, isClientOnly; + bool blocked; // If true count down fails until 0 to enable again + ticks lastFail; // Last hard fail + dnbd3_host_t host; + char comment[COMMENT_LENGTH]; +} dnbd3_alt_server_t; + +typedef struct +{ + int fails; // Soft fail: Image not found + int rttIndex; + uint32_t rtt[SERVER_RTT_PROBES]; + bool blocked; // True if server is to be ignored and fails should be counted down + bool initDone; +} dnbd3_alt_local_t; + typedef struct { - int fd; // Socket fd for this connection - int version; // Protocol version of remote server - dnbd3_host_t host; // IP/Port of remote server + int fd; // Socket fd for this connection + int version; // Protocol version of remote server + int index; // Entry in uplinks list } dnbd3_server_connection_t; #define RTT_IDLE 0 // Not in progress @@ -51,7 +72,7 @@ struct _dnbd3_uplink pthread_mutex_t queueLock; // lock for synchronization on request queue etc. dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer - int rttTestResult; // RTT_* + atomic_int rttTestResult; // RTT_* int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! uint8_t *recvBuffer; // Buffer for receiving payload uint32_t recvBufferLen; // Len of ^^ @@ -65,21 +86,11 @@ struct _dnbd3_uplink atomic_int queueLen; // length of queue uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives) dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; + dnbd3_alt_local_t altData[SERVER_MAX_ALTS]; }; typedef struct { - char comment[COMMENT_LENGTH]; - dnbd3_host_t host; - unsigned int rtt[SERVER_RTT_PROBES]; - unsigned int rttIndex; - bool isPrivate, isClientOnly; - ticks lastFail; - int numFails; -} dnbd3_alt_server_t; - -typedef struct -{ uint8_t host[16]; int bytes; int bitMask; diff --git a/src/server/image.c b/src/server/image.c index d250715..1a6e0f8 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -1178,7 +1178,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, dnbd3_host_t servers[REP_NUM_SRV]; int uplinkSock = -1; dnbd3_host_t uplinkServer; - const int count = altservers_getListForUplink( servers, REP_NUM_SRV, false ); + const int count = altservers_getHostListForReplication( servers, REP_NUM_SRV ); uint16_t remoteProtocolVersion; uint16_t remoteRid = revision; uint64_t remoteImageSize; @@ -1491,7 +1491,7 @@ json_t* image_getListAsJson() json_t *imagesJson = json_array(); json_t *jsonImage; int i; - char uplinkName[100] = { 0 }; + char uplinkName[100]; uint64_t bytesReceived; int completeness, idleTime; declare_now; @@ -1508,7 +1508,7 @@ json_t* image_getListAsJson() uplinkName[0] = '\0'; } else { bytesReceived = image->uplink->bytesReceived; - if ( image->uplink->current.fd == -1 || !host_to_string( &image->uplink->current.host, uplinkName, sizeof(uplinkName) ) ) { + if ( !uplink_getHostString( image->uplink, uplinkName, sizeof(uplinkName) ) ) { uplinkName[0] = '\0'; } } diff --git a/src/server/net.c b/src/server/net.c index 7f3c1ce..4976eea 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -669,11 +669,19 @@ static void removeFromList(dnbd3_client_t *client) { int i; mutex_lock( &_clients_lock ); - for ( i = _num_clients - 1; i >= 0; --i ) { - if ( _clients[i] == client ) { - _clients[i] = NULL; + if ( _num_clients != 0 ) { + for ( i = _num_clients - 1; i >= 0; --i ) { + if ( _clients[i] == client ) { + _clients[i] = NULL; + break; + } + } + if ( i != 0 && i + 1 == _num_clients ) { + do { + i--; + } while ( _clients[i] == NULL && i > 0 ); + _num_clients = i + 1; } - if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; } mutex_unlock( &_clients_lock ); } diff --git a/src/server/server.c b/src/server/server.c index 838aec2..640048a 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -121,9 +121,6 @@ void dnbd3_cleanup() // Disable threadpool threadpool_close(); - // Terminate the altserver checking thread - altservers_shutdown(); - // Terminate all uplinks image_killUplinks(); @@ -198,6 +195,11 @@ int main(int argc, char *argv[]) case LONGOPT_CRC4: return image_generateCrcFile( optarg ) ? 0 : EXIT_FAILURE; case LONGOPT_ASSERT: + printf( "Testing use after free:\n" ); + volatile char * volatile test = malloc( 10 ); + test[0] = 1; + free( test ); + test[1] = 2; printf( "Testing a failing assertion:\n" ); assert( 4 == 5 ); printf( "Assertion 4 == 5 seems to hold. ;-)\n" ); diff --git a/src/server/uplink.c b/src/server/uplink.c index e21e28c..6c85580 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -96,17 +96,18 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version uplink->bytesReceived = 0; uplink->idleTime = 0; uplink->queueLen = 0; - mutex_lock( &uplink->sendMutex ); - uplink->current.fd = -1; - mutex_unlock( &uplink->sendMutex ); uplink->cacheFd = -1; uplink->signal = NULL; uplink->replicationHandle = REP_NONE; mutex_lock( &uplink->rttLock ); + mutex_lock( &uplink->sendMutex ); + uplink->current.fd = -1; + mutex_unlock( &uplink->sendMutex ); uplink->cycleDetected = false; - if ( sock >= 0 ) { + if ( sock != -1 ) { uplink->better.fd = sock; - uplink->better.host = *host; + int index = altservers_hostToIndex( host ); + uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access uplink->rttTestResult = RTT_DOCHANGE; uplink->better.version = version; } else { @@ -116,7 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version mutex_unlock( &uplink->rttLock ); uplink->recvBufferLen = 0; uplink->shutdown = false; - if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)link ) ) { + if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) { logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; } @@ -148,8 +149,8 @@ void uplink_shutdown(dnbd3_image_t *image) } dnbd3_uplink_t * const uplink = image->uplink; mutex_lock( &uplink->queueLock ); - if ( !uplink->shutdown ) { - uplink->shutdown = true; + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { signal_call( uplink->signal ); thread = uplink->thread; join = true; @@ -211,13 +212,11 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) { - mutex_unlock( &client->image->lock ); - logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - mutex_lock( &uplink->rttLock ); + if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); + mutex_unlock( &client->image->lock ); + logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); return false; } @@ -256,12 +255,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } } if ( unlikely( requestLoop ) ) { - mutex_unlock( &uplink->queueLock ); - logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - mutex_lock( &uplink->rttLock ); uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); + mutex_unlock( &uplink->queueLock ); + logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); return false; } if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { @@ -311,6 +308,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( foundExisting != -1 ) return true; // Attached to pending request, do nothing + usleep( 10000 ); + // See if we can fire away the request if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); @@ -342,7 +341,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( state == -1 ) { logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" ); } else if ( state == ULR_NEW ) { - logadd( LOG_DEBUG2, "Succesful direct uplink request" ); + //logadd( LOG_DEBUG2, "Direct uplink request" ); } else { logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); } @@ -352,10 +351,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } } - if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); - } + if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } return true; } @@ -443,7 +440,7 @@ static void* uplink_mainloop(void *data) uplink->image->working = true; uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; - if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) { + if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) { logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 ); setThreadName( buffer ); } @@ -525,9 +522,7 @@ static void* uplink_mainloop(void *data) } } // See if we should trigger an RTT measurement - mutex_lock( &uplink->rttLock ); - const int rttTestResult = uplink->rttTestResult; - mutex_unlock( &uplink->rttLock ); + int rttTestResult = uplink->rttTestResult; if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) { // It seems it's time for a check @@ -538,7 +533,7 @@ static void* uplink_mainloop(void *data) goto cleanup; } else if ( !uplink_connectionShouldShutdown( uplink ) ) { // Not complete - do measurement - altservers_findUplink( uplink ); // This will set RTT_INPROGRESS (synchronous) + altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous) if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { uplink->nextReplicationIndex = 0; } @@ -547,11 +542,9 @@ static void* uplink_mainloop(void *data) timing_set( &nextAltCheck, &now, altCheckInterval ); } } else if ( rttTestResult == RTT_NOT_REACHABLE ) { - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_IDLE; - mutex_unlock( &uplink->rttLock ); + atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ); discoverFailCount++; - timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); + timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); } #ifdef _DEBUG if ( uplink->current.fd != -1 && !uplink->shutdown ) { @@ -581,36 +574,38 @@ static void* uplink_mainloop(void *data) #endif } cleanup: ; - if ( !uplink->shutdown ) { - uplink->shutdown = true; + // Detach depends on whether someone is joining this thread... + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { thread_detach( uplink->thread ); } - altservers_removeUplink( uplink ); uplink_saveCacheMap( uplink ); - mutex_lock( &uplink->image->lock ); - if ( uplink->image->uplink == uplink ) { - uplink->image->uplink = NULL; + dnbd3_image_t *image = uplink->image; + mutex_lock( &image->lock ); + // in the list anymore, but we want to prevent it from being freed in either case + if ( image->uplink == uplink ) { + image->uplink = NULL; } + mutex_unlock( &image->lock ); // Do NOT use image without locking it mutex_lock( &uplink->queueLock ); - const int fd = uplink->current.fd; - const dnbd3_signal_t* signal = uplink->signal; - mutex_lock( &uplink->sendMutex ); - uplink->current.fd = -1; - mutex_unlock( &uplink->sendMutex ); - uplink->signal = NULL; - // Do not access uplink->image after unlocking, since we set - // image->uplink to NULL. Acquire with image_lock first, - // like done below when checking whether to re-init uplink - mutex_unlock( &uplink->image->lock ); - mutex_unlock( &uplink->queueLock ); - if ( fd != -1 ) close( fd ); - if ( signal != NULL ) signal_close( signal ); - // Wait for the RTT check to finish/fail if it's in progress - while ( uplink->rttTestResult == RTT_INPROGRESS ) + // Wait for active RTT measurement to finish + while ( uplink->rttTestResult == RTT_INPROGRESS ) { usleep( 10000 ); + } + signal_close( uplink->signal ); + mutex_lock( &uplink->rttLock ); + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + close( uplink->current.fd ); + uplink->current.fd = -1; + } if ( uplink->better.fd != -1 ) { close( uplink->better.fd ); + uplink->better.fd = -1; } + mutex_unlock( &uplink->sendMutex ); + mutex_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->queueLock ); mutex_destroy( &uplink->queueLock ); mutex_destroy( &uplink->rttLock ); mutex_destroy( &uplink->sendMutex ); @@ -619,9 +614,9 @@ static void* uplink_mainloop(void *data) if ( uplink->cacheFd != -1 ) { close( uplink->cacheFd ); } - dnbd3_image_t *image = image_lock( uplink->image ); free( uplink ); // !!! - if ( image != NULL ) { + if ( image_lock( image ) != NULL ) { + // Image is still in list... if ( !_shutdown && image->cache_map != NULL ) { // Ingegrity checker must have found something in the meantime uplink_init( image, -1, NULL, 0 ); @@ -656,7 +651,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) // the thread will re-send this request as soon as the connection // is reestablished. logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - altservers_serverFailed( &uplink->current.host ); + altservers_serverFailed( uplink->current.index ); return; } mutex_lock( &uplink->queueLock ); @@ -973,7 +968,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { if ( uplink->current.fd == -1 ) return; - altservers_serverFailed( &uplink->current.host ); + altservers_serverFailed( uplink->current.index ); mutex_lock( &uplink->sendMutex ); close( uplink->current.fd ); uplink->current.fd = -1; @@ -1138,3 +1133,13 @@ static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink) && ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) ); } +bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) +{ + int current; + mutex_lock( &uplink->rttLock ); + current = uplink->current.fd == -1 ? -1 : uplink->current.index; + mutex_unlock( &uplink->rttLock ); + if ( current == -1 ) + return false; + return altservers_toString( current, buffer, len ); +} diff --git a/src/server/uplink.h b/src/server/uplink.h index 4fd41b0..acc8e11 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -16,4 +16,6 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin void uplink_shutdown(dnbd3_image_t *image); +bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len); + #endif /* UPLINK_H_ */ |