diff options
Diffstat (limited to 'src/server/altservers.c')
-rw-r--r-- | src/server/altservers.c | 866 |
1 files changed, 465 insertions, 401 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c index bbbc584..4413ca6 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -1,69 +1,41 @@ +#include "ini.h" #include "altservers.h" #include "locks.h" +#include "threadpool.h" #include "helper.h" #include "image.h" #include "fileutil.h" -#include "../shared/protocol.h" -#include "../shared/timing.h" -#include "../serverconfig.h" +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/config/server.h> +#include "reference.h" + #include <assert.h> #include <inttypes.h> #include <jansson.h> -#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, image->name, (int)image->rid) +#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, PIMG(image)) #define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0); #define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) -static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; -static pthread_mutex_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) -static pthread_mutex_t pendingLockConsume; // Lock for removing something (nonNULL -> NULL) -static dnbd3_signal_t* runSignal = NULL; - static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; -static int numAltServers = 0; +static atomic_int numAltServers = 0; static pthread_mutex_t altServersLock; -static pthread_t altThread; - -static void *altservers_main(void *data); -static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt); +static void *altservers_runCheck(void *data); +static int altservers_getListForUplink(dnbd3_uplink_t *uplink, const char *image, int *servers, int size, int current); +static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink); +static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt); +static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server); void altservers_init() { srand( (unsigned int)time( NULL ) ); - // Init spinlock - mutex_init( &pendingLockWrite ); - mutex_init( &pendingLockConsume ); - mutex_init( &altServersLock ); - // Init signal - runSignal = signal_new(); - if ( runSignal == NULL ) { - logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." ); - exit( EXIT_FAILURE ); - } - memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); - if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { - logadd( LOG_ERROR, "Could not start altservers connector thread" ); - exit( EXIT_FAILURE ); - } - // Init waiting links queue -- this is currently a global static array so - // it will already be zero, but in case we refactor later do it explicitly - // while also holding the write lock so thread sanitizer is happy - mutex_lock( &pendingLockWrite ); - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - pending[i] = NULL; - } - mutex_unlock( &pendingLockWrite ); -} - -void altservers_shutdown() -{ - if ( runSignal == NULL ) return; - signal_call( runSignal ); // Wake altservers thread up - thread_join( altThread, NULL ); + // Init lock + mutex_init( &altServersLock, LOCK_ALT_SERVER_LIST ); } -static void addalt(int argc, char **argv, void *data) +static void addAltFromLegacy(int argc, char **argv, void *data) { char *shost; dnbd3_host_t host; @@ -81,29 +53,82 @@ static void addalt(int argc, char **argv, void *data) return; } if ( argc == 1 ) argv[1] = ""; - if ( altservers_add( &host, argv[1], isPrivate, isClientOnly ) ) { + if ( altservers_add( &host, argv[1], isPrivate, isClientOnly, NULL ) ) { (*(int*)data)++; } } +static int addAltFromIni(void *countptr, const char* section, const char* key, const char* value) +{ + dnbd3_host_t host; + char *strhost = strdup( section ); + if ( !parse_address( strhost, &host ) ) { + free( strhost ); + logadd( LOG_WARNING, "Invalid host section in alt-servers file ignored: '%s'", section ); + return 1; + } + free( strhost ); + int index; + if ( altservers_add( &host, "", false, false, &index ) ) { + (*(int*)countptr)++; + } + if ( index == -1 ) + return 1; + if ( strcmp( key, "for" ) == 0 ) { + if ( strncmp( value, "client", 6 ) == 0 ) { + altServers[index].isClientOnly = true; + altServers[index].isPrivate = false; + } else if ( strcmp( value, "replication" ) == 0 ) { + altServers[index].isClientOnly = false; + altServers[index].isPrivate = true; + } else { + logadd( LOG_WARNING, "Invalid value in alt-servers section %s for key %s: '%s'", section, key, value ); + } + } else if ( strcmp( key, "comment" ) == 0 ) { + snprintf( altServers[index].comment, COMMENT_LENGTH, "%s", value ); + } else if ( strcmp( key, "namespace" ) == 0 ) { + dnbd3_ns_t *elem = malloc( sizeof(*elem) ); + elem->name = strdup( value ); + elem->len = strlen( value ); + do { + elem->next = altServers[index].nameSpaces; + } while ( !atomic_compare_exchange_weak( &altServers[index].nameSpaces, &elem->next, elem ) ); + } else { + logadd( LOG_DEBUG1, "Unknown key in alt-servers section: '%s'", key ); + } + return 1; +} + int altservers_load() { int count = 0; char *name; if ( asprintf( &name, "%s/%s", _configDir, "alt-servers" ) == -1 ) return -1; - file_loadLineBased( name, 1, 2, &addalt, (void*)&count ); + if ( !file_isReadable( name ) ) { + free( name ); + return 0; + } + ini_parse( name, &addAltFromIni, &count ); + if ( numAltServers == 0 ) { + logadd( LOG_INFO, "Could not parse %s as .ini file, trying to load as legacy format.", name ); + file_loadLineBased( name, 1, 2, &addAltFromLegacy, (void*)&count ); + } free( name ); logadd( LOG_DEBUG1, "Added %d alt servers\n", count ); return count; } -bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly) +bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly, int *index) { int i, freeSlot = -1; + if ( index == NULL ) { + index = &freeSlot; + } mutex_lock( &altServersLock ); for (i = 0; i < numAltServers; ++i) { if ( isSameAddressPort( &altServers[i].host, host ) ) { mutex_unlock( &altServersLock ); + *index = i; return false; } else if ( freeSlot == -1 && altServers[i].host.type == 0 ) { freeSlot = i; @@ -113,6 +138,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate if ( numAltServers >= SERVER_MAX_ALTS ) { logadd( LOG_WARNING, "Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS ); mutex_unlock( &altServersLock ); + *index = -1; return false; } freeSlot = numAltServers++; @@ -120,62 +146,48 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate altServers[freeSlot].host = *host; altServers[freeSlot].isPrivate = isPrivate; altServers[freeSlot].isClientOnly = isClientOnly; + altServers[freeSlot].nameSpaces = NULL; if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment ); mutex_unlock( &altServersLock ); + *index = freeSlot; return true; } /** * ONLY called from the passed uplink's main thread */ -void altservers_findUplink(dnbd3_connection_t *uplink) +void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) { - int i; + if ( uplink->shutdown ) + return; + if ( uplink->current.fd != -1 && numAltServers <= 1 ) + return; // if betterFd != -1 it means the uplink is supposed to switch to another // server. As this function here is called by the uplink thread, it can // never be that the uplink is supposed to switch, but instead calls // this function. - assert( uplink->betterFd == -1 ); - mutex_lock( &pendingLockWrite ); + assert( uplink->better.fd == -1 ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress - if ( uplink->rttTestResult == RTT_INPROGRESS ) { - for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] != uplink ) continue; - // Yep, measuring right now - mutex_unlock( &pendingLockWrite ); - return; + if ( uplink->rttTestResult != RTT_INPROGRESS ) { + dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref ); + if ( current == uplink ) { + threadpool_run( &altservers_runCheck, uplink, "UPLINK" ); + } else if ( current != NULL ) { + ref_put( ¤t->reference ); } } - // Find free slot for measurement - for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] != NULL ) continue; - pending[i] = uplink; - uplink->rttTestResult = RTT_INPROGRESS; - mutex_unlock( &pendingLockWrite ); - signal_call( runSignal ); // Wake altservers thread up - return; - } - // End of loop - no free slot - mutex_unlock( &pendingLockWrite ); - logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." ); } -/** - * The given uplink is about to disappear, so remove it from any queues - */ -void altservers_removeUplink(dnbd3_connection_t *uplink) +static bool isImageAllowed(dnbd3_alt_server_t *alt, const char *image) { - mutex_lock( &pendingLockConsume ); - mutex_lock( &pendingLockWrite ); - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] == uplink ) { - uplink->rttTestResult = RTT_NOT_REACHABLE; - pending[i] = NULL; - } + if ( alt->nameSpaces == NULL ) + return true; + for ( dnbd3_ns_t *it = alt->nameSpaces; it != NULL; it = it->next ) { + if ( strncmp( it->name, image, it->len ) == 0 ) + return true; } - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); + return false; } /** @@ -184,95 +196,154 @@ void altservers_removeUplink(dnbd3_connection_t *uplink) * Private servers are excluded, so this is what you want to call to * get a list of servers you can tell a client about */ -int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) +int altservers_getListForClient(dnbd3_client_t *client, dnbd3_server_entry_t *output, int size) { - if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0; + dnbd3_host_t *host = &client->host; + if ( host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) + return 0; int i, j; int count = 0; - int scores[size]; - int score; - mutex_lock( &altServersLock ); + uint16_t scores[SERVER_MAX_ALTS] = { 0 }; if ( size > numAltServers ) size = numAltServers; - for (i = 0; i < numAltServers; ++i) { - if ( altServers[i].host.type == 0 ) continue; // Slot is empty - if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers - if ( host->type == altServers[i].host.type ) { - score = altservers_netCloseness( host, &altServers[i].host ) - altServers[i].numFails; - } else { - score = -( altServers[i].numFails + 128 ); // Wrong address family - } - if ( count == 0 ) { - // Trivial - this is the first entry - output[0].host = altServers[i].host; - output[0].failures = 0; - scores[0] = score; - count++; - } else { - // Other entries already exist, insert in proper position - for (j = 0; j < size; ++j) { - if ( j < count && score <= scores[j] ) continue; - if ( j > count ) break; // Should never happen but just in case... - if ( j < count && j + 1 < size ) { - // Check if we're in the middle and need to move other entries... - memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) ); - memmove( &scores[j + 1], &scores[j], sizeof(int) * (size - j - 1) ); - } - if ( count < size ) { - count++; - } - output[j].host = altServers[i].host; - output[j].failures = 0; - scores[j] = score; - break; + mutex_lock( &altServersLock ); + for ( i = 0; i < numAltServers; ++i ) { + if ( altServers[i].host.type == 0 || altServers[i].isPrivate ) + continue; // Slot is empty or uplink is for replication only + if ( !isImageAllowed( &altServers[i], client->image->name ) ) + continue; + scores[i] = (uint16_t)( 10 + altservers_netCloseness( host, &altServers[i].host ) ); + } + while ( count < size ) { + i = -1; + for ( j = 0; j < numAltServers; ++j ) { + if ( scores[j] == 0 ) + continue; + if ( i == -1 || scores[j] > scores[i] ) { + i = j; } } + if ( i == -1 ) + break; + scores[i] = 0; + output[count].host = altServers[i].host; + output[count].failures = 0; + count++; } mutex_unlock( &altServersLock ); return count; } +bool altservers_toString(int server, char *buffer, size_t len) +{ + return host_to_string( &altServers[server].host, buffer, len ); +} + +static bool isUsableForUplink( dnbd3_uplink_t *uplink, int server, ticks *now ) +{ + dnbd3_alt_local_t *local = ( uplink == NULL ? NULL : &uplink->altData[server] ); + dnbd3_alt_server_t *global = &altServers[server]; + if ( global->isClientOnly || ( !global->isPrivate && _proxyPrivateOnly ) ) + return false; + // Blocked locally (image not found on server...) + if ( local != NULL && local->blocked ) { + if ( --local->fails > 0 ) + return false; + local->blocked = false; + } + if ( global->blocked ) { + if ( timing_diff( &global->lastFail, now ) < SERVER_GLOBAL_DUP_TIME ) + return false; + global->lastFail = *now; + if ( --global->fails > 0 ) + return false; + global->blocked = false; + } + // Not blocked, depend on both fail counters + int fails = ( local == NULL ? 0 : local->fails ) + global->fails; + return fails < SERVER_BAD_UPLINK_MIN || ( rand() % fails ) < SERVER_BAD_UPLINK_MIN; +} + +int altservers_getHostListForReplication(const char *image, dnbd3_host_t *servers, int size) +{ + int idx[size]; + int num = altservers_getListForUplink( NULL, image, idx, size, -1 ); + for ( int i = 0; i < num; ++i ) { + servers[i] = altServers[idx[i]].host; + } + return num; +} + +/** + * Returns true if there is at least one alt-server the + * given image name would be allowed to be cloned from. + */ +bool altservers_imageHasAltServers(const char *image) +{ + bool ret = false; + mutex_lock( &altServersLock ); + for ( int i = 0; i < numAltServers; ++i ) { + if ( altServers[i].isClientOnly || ( !altServers[i].isPrivate && _proxyPrivateOnly ) ) + continue; + if ( !isImageAllowed( &altServers[i], image ) ) + continue; + ret = true; + break; + } + mutex_unlock( &altServersLock ); + return ret; +} + /** * Get <size> alt servers. If there are more alt servers than * requested, random servers will be picked. * This function is suited for finding uplink servers as * it includes private servers and ignores any "client only" servers + * @param current index of server for current connection, or -1 in panic mode */ -int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency) +static int altservers_getListForUplink(dnbd3_uplink_t *uplink, const char *image, int *servers, int size, int current) { - if ( size <= 0 ) return 0; - int count = 0, i; - ticks now; - timing_get( &now ); + if ( size <= 0 ) + return 0; + int count = 0; + declare_now; mutex_lock( &altServersLock ); - // Flip first server in list with a random one every time this is called - if ( numAltServers > 1 ) { - const dnbd3_alt_server_t tmp = altServers[0]; - do { - i = rand() % numAltServers; - } while ( i == 0 ); - altServers[0] = altServers[i]; - altServers[i] = tmp; - } - // We iterate over the list twice. First run adds servers with 0 failures only, - // second one also considers those that failed (not too many times) - if ( size > numAltServers ) size = numAltServers; - for (i = 0; i < numAltServers * 2; ++i) { - dnbd3_alt_server_t *srv = &altServers[i % numAltServers]; - if ( srv->host.type == 0 ) continue; // Slot is empty - if ( _proxyPrivateOnly && !srv->isPrivate ) continue; // Config says to consider private alt-servers only? ignore! - if ( srv->isClientOnly ) continue; - bool first = ( i < numAltServers ); - if ( first ) { - if ( srv->numFails > 0 ) continue; - } else { - if ( srv->numFails == 0 ) continue; // Already added in first iteration - if ( !emergency && srv->numFails > SERVER_BAD_UPLINK_THRES // server failed X times in a row - && timing_diff( &srv->lastFail, &now ) < SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore! - if ( !emergency ) srv->numFails--; + // If we don't have enough servers to randomize, take a shortcut + if ( numAltServers <= size ) { + for ( int i = 0; i < numAltServers; ++i ) { + if ( current == -1 || i == current || isUsableForUplink( uplink, i, &now ) ) { + if ( isImageAllowed( &altServers[i], image ) ) { + servers[count++] = i; + } + } + } + } else { + // Plenty of alt servers; randomize + uint8_t state[SERVER_MAX_ALTS] = { 0 }; + if ( current != -1 ) { // Make sure we also test the current server + servers[count++] = current; + state[current] = 2; + } + for ( int tr = size * 10; tr > 0 && count < size; --tr ) { + int idx = rand() % numAltServers; + if ( state[idx] != 0 ) + continue; + if ( !isImageAllowed( &altServers[idx], image ) ) { + state[idx] = 2; // Mark as used without adding, so it will be ignored in panic loop + } else if ( isUsableForUplink( uplink, idx, &now ) ) { + servers[count++] = idx; + state[idx] = 2; // Used + } else { + state[idx] = 1; // Potential + } + } + // If panic mode, consider others too + for ( int tr = size * 10; current == -1 && tr > 0 && count < size; --tr ) { + int idx = rand() % numAltServers; + if ( state[idx] == 2 ) + continue; + servers[count++] = idx; + state[idx] = 2; // Used } - // server seems ok, include in output and decrease its fail counter - output[count++] = srv->host; - if ( count >= size ) break; } mutex_unlock( &altServersLock ); return count; @@ -300,7 +371,7 @@ json_t* altservers_toJson() "rtt", rtts, "isPrivate", (int)src[i].isPrivate, "isClientOnly", (int)src[i].isClientOnly, - "numFails", src[i].numFails + "numFails", src[i].fails ); json_array_append_new( list, server ); } @@ -308,33 +379,27 @@ json_t* altservers_toJson() } /** - * Update rtt history of given server - returns the new average for that server + * Update rtt history of given server - returns the new average for that server. */ -static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt) +static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt) { - unsigned int avg = rtt; - int i; + uint32_t avg = 0, j; + dnbd3_alt_local_t *local = &uplink->altData[index]; mutex_lock( &altServersLock ); - for (i = 0; i < numAltServers; ++i) { - if ( !isSameAddressPort( host, &altServers[i].host ) ) continue; - altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt; -#if SERVER_RTT_PROBES == 5 - avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2] - + altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES; -#else -#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES - avg = 0; - for (int j = 0; j < SERVER_RTT_PROBES; ++j) { - avg += altServers[i].rtt[j]; + if ( likely( local->initDone ) ) { + local->rtt[++local->rttIndex % SERVER_RTT_PROBES] = rtt; + for ( j = 0; j < SERVER_RTT_PROBES; ++j ) { + avg += local->rtt[j]; } avg /= SERVER_RTT_PROBES; -#endif - // If we got a new rtt value, server must be working - if ( altServers[i].numFails > 0 ) { - altServers[i].numFails--; + } else { // First rtt measurement -- copy to every slot + for ( j = 0; j < SERVER_RTT_PROBES; ++j ) { + local->rtt[j] = rtt; } - break; + avg = rtt; + local->initDone = true; } + altServers[index].rtt[++altServers[index].rttIndex % SERVER_RTT_PROBES] = avg; mutex_unlock( &altServersLock ); return avg; } @@ -364,250 +429,249 @@ int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2) * track of how often servers fail, and consider them disabled for some time if they * fail too many times. */ -void altservers_serverFailed(const dnbd3_host_t * const host) +void altservers_serverFailed(int server) { - int i; - int foundIndex = -1, lastOk = -1; - ticks now; - timing_get( &now ); + declare_now; mutex_lock( &altServersLock ); - for (i = 0; i < numAltServers; ++i) { - if ( foundIndex == -1 ) { - // Looking for the failed server in list - if ( isSameAddressPort( host, &altServers[i].host ) ) { - foundIndex = i; - } - } else if ( altServers[i].host.type != 0 && altServers[i].numFails == 0 ) { - lastOk = i; - } - } - // Do only increase counter if last fail was not too recent. This is - // to prevent the counter from increasing rapidly if many images use the - // same uplink. If there's a network hickup, all uplinks will call this - // function and would increase the counter too quickly, disabling the server. - if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) { - altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE; - altServers[foundIndex].lastFail = now; - if ( lastOk != -1 ) { - // Make sure non-working servers are put at the end of the list, so they're less likely - // to get picked when testing servers for uplink connections. - const dnbd3_alt_server_t tmp = altServers[foundIndex]; - altServers[foundIndex] = altServers[lastOk]; - altServers[lastOk] = tmp; + if ( timing_diff( &altServers[server].lastFail, &now ) > SERVER_GLOBAL_DUP_TIME ) { + altServers[server].lastFail = now; + if ( altServers[server].fails++ >= SERVER_BAD_UPLINK_MAX ) { + altServers[server].blocked = true; } } mutex_unlock( &altServersLock ); } + /** - * Mainloop of this module. It will wait for requests by uplinks to find a - * suitable uplink server for them. If found, it will tell the uplink about - * the best server found. Currently the RTT history is kept per server and - * not per uplink, so if many images use the same uplink server, the history - * will update quite quickly. Needs to be improved some time, ie. by only - * updating the rtt if the last update was at least X seconds ago. + * Called from RTT checker if connecting to a server succeeded but + * subsequently selecting the given image failed. Handle this within + * the uplink and don't increase the global fail counter. */ -static void *altservers_main(void *data UNUSED) +static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server) +{ + mutex_lock( &altServersLock ); + if ( uplink->altData[server].fails++ >= SERVER_BAD_UPLINK_MAX ) { + uplink->altData[server].blocked = true; + } + mutex_unlock( &altServersLock ); +} + +static void *altservers_runCheck(void *data) +{ + dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data; + + assert( uplink != NULL ); + setThreadName( "altserver-check" ); + altservers_findUplinkInternal( uplink ); + ref_put( &uplink->reference ); // Acquired in findUplinkAsync + return NULL; +} + +void altservers_findUplink(dnbd3_uplink_t *uplink) +{ + altservers_findUplinkInternal( uplink ); + // Above function is sync, which means normally when it + // returns, rttTestResult will not be RTT_INPROGRESS. + // But we might have an ansync call running in parallel, which would + // mean the above call returns immediately. Wait for that check + // to finish too. + while ( uplink->rttTestResult == RTT_INPROGRESS ) { + usleep( 5000 ); + } +} + +int altservers_hostToIndex(dnbd3_host_t *host) +{ + for ( int i = 0; i < numAltServers; ++i ) { + if ( isSameAddressPort( host, &altServers[i].host ) ) + return i; + } + return -1; +} + +const dnbd3_host_t* altservers_indexToHost(int server) +{ + return &altServers[server].host; +} + +// XXX Sync call above must block until async worker has finished XXX +static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) { const int ALTS = 4; - int ret, itLink, itAlt, numAlts; - bool found; - char buffer[DNBD3_BLOCK_SIZE ]; - dnbd3_reply_t reply; - dnbd3_host_t servers[ALTS + 1]; - serialized_buffer_t serialized; + int itAlt, numAlts, current; + bool panic; + int servers[ALTS + 1]; struct timespec start, end; - ticks nextCloseUnusedFd; - setThreadName( "altserver-check" ); - blockNoncriticalSignals(); - timing_gets( &nextCloseUnusedFd, 900 ); - // LOOP - while ( !_shutdown ) { - // Wait 5 seconds max. - ret = signal_wait( runSignal, 5000 ); - if ( _shutdown ) goto cleanup; - if ( ret == SIGNAL_ERROR ) { - if ( errno == EAGAIN || errno == EINTR ) continue; - logadd( LOG_WARNING, "Error %d on signal_clear on alservers_main! Things will break!", errno ); - usleep( 100000 ); + if ( _shutdown ) + return; + mutex_lock( &uplink->rttLock ); + // Maybe we already have a result, or check is currently running + if ( uplink->better.fd != -1 || uplink->rttTestResult == RTT_INPROGRESS ) { + mutex_unlock( &uplink->rttLock ); + return; + } + assert( uplink->rttTestResult != RTT_DOCHANGE ); + uplink->rttTestResult = RTT_INPROGRESS; + panic = ( uplink->current.fd == -1 ); + current = uplink->current.index; // Current server index (or last one in panic mode) + mutex_unlock( &uplink->rttLock ); + // First, get 4 alt servers + numAlts = altservers_getListForUplink( uplink, uplink->image->name, servers, ALTS, panic ? -1 : current ); + // If we're already connected and only got one server anyways, there isn't much to do + if ( numAlts == 0 || ( numAlts == 1 && !panic ) ) { + uplink->rttTestResult = RTT_DONTCHANGE; + return; + } + dnbd3_image_t * const image = image_lock( uplink->image ); + if ( image == NULL ) { // Check again after locking + uplink->rttTestResult = RTT_NOT_REACHABLE; + logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" ); + return; + } + logadd( LOG_DEBUG2, "Running alt check for %s:%d", PIMG(image) ); + assert( uplink->rttTestResult == RTT_INPROGRESS ); + // Test them all + dnbd3_server_connection_t best = { .fd = -1 }; + unsigned long bestRtt = RTT_UNREACHABLE; + unsigned long currentRtt = RTT_UNREACHABLE; + uint64_t offset = 0; + uint32_t length = DNBD3_BLOCK_SIZE; + // Try to use the range of the first request in the queue as RTT block. + // In case we have a cluster of servers where none of them has a complete + // copy, we at least make sure the one we're potentially switching to + // has the next block we're about to request. + mutex_lock( &uplink->queueLock ); + if ( uplink->queue != NULL ) { + offset = uplink->queue->from; + length = (uint32_t)( uplink->queue->to - offset ); + } + mutex_unlock( &uplink->queueLock ); + for (itAlt = 0; itAlt < numAlts; ++itAlt) { + int server = servers[itAlt]; + // Connect + clock_gettime( BEST_CLOCK_SOURCE, &start ); + int sock = sock_connect( &altServers[server].host, 750, _uplinkTimeout ); + if ( sock == -1 ) { // Connection failed means global error + altservers_serverFailed( server ); + continue; } - // Work your way through the queue - for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { - mutex_lock( &pendingLockWrite ); - if ( pending[itLink] == NULL ) { - mutex_unlock( &pendingLockWrite ); - continue; // Check once before locking, as a mutex is expensive - } - mutex_unlock( &pendingLockWrite ); - mutex_lock( &pendingLockConsume ); - mutex_lock( &pendingLockWrite ); - dnbd3_connection_t * const uplink = pending[itLink]; - mutex_unlock( &pendingLockWrite ); - if ( uplink == NULL ) { // Check again after locking - mutex_unlock( &pendingLockConsume ); - continue; - } - dnbd3_image_t * const image = image_lock( uplink->image ); - if ( image == NULL ) { // Check again after locking - uplink->rttTestResult = RTT_NOT_REACHABLE; - mutex_lock( &pendingLockWrite ); - pending[itLink] = NULL; - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); - logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" ); - continue; - } - LOG( LOG_DEBUG2, "[%d] Running alt check", itLink ); - assert( uplink->rttTestResult == RTT_INPROGRESS ); - // Now get 4 alt servers - numAlts = altservers_getListForUplink( servers, ALTS, uplink->fd == -1 ); - if ( uplink->fd != -1 ) { - // Add current server if not already in list - found = false; - for (itAlt = 0; itAlt < numAlts; ++itAlt) { - if ( !isSameAddressPort( &uplink->currentServer, &servers[itAlt] ) ) continue; - found = true; - break; - } - if ( !found ) servers[numAlts++] = uplink->currentServer; - } - // Test them all - int bestSock = -1; - int bestIndex = -1; - int bestProtocolVersion = -1; - unsigned long bestRtt = RTT_UNREACHABLE; - unsigned long currentRtt = RTT_UNREACHABLE; - for (itAlt = 0; itAlt < numAlts; ++itAlt) { - usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...) - // Connect - clock_gettime( BEST_CLOCK_SOURCE, &start ); - int sock = sock_connect( &servers[itAlt], 750, 1000 ); - if ( sock < 0 ) continue; - // Select image ++++++++++++++++++++++++++++++ - if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) { - goto server_failed; - } - // See if selecting the image succeeded ++++++++++++++++++++++++++++++ - uint16_t protocolVersion, rid; - uint64_t imageSize; - char *name; - if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { - goto server_image_not_available; - } - if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed; - if ( name == NULL || strcmp( name, image->name ) != 0 ) { - ERROR_GOTO( server_failed, "[RTT] Server offers image '%s'", name ); - } - if ( rid != image->rid ) { - ERROR_GOTO( server_failed, "[RTT] Server provides rid %d", (int)rid ); - } - if ( imageSize != image->virtualFilesize ) { - ERROR_GOTO( server_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); - } - // Request first block (NOT random!) ++++++++++++++++++++++++++++++ - if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { - LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", itLink ); - } - // See if requesting the block succeeded ++++++++++++++++++++++ - if ( !dnbd3_get_reply( sock, &reply ) ) { - LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", itLink ); - } - // check reply header - if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { - ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); - } - if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { - ERROR_GOTO( server_failed, "[RTT%d] Could not read first block payload", itLink ); - } - clock_gettime( BEST_CLOCK_SOURCE, &end ); - // Measurement done - everything fine so far - mutex_lock( &uplink->rttLock ); - const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer ); - // Penaltize rtt if this was a cycle; this will treat this server with lower priority - // in the near future too, so we prevent alternating between two servers that are both - // part of a cycle and have the lowest latency. - const unsigned int rtt = (unsigned int)((end.tv_sec - start.tv_sec) * 1000000 - + (end.tv_nsec - start.tv_nsec) / 1000 - + ( (isCurrent && uplink->cycleDetected) ? 1000000 : 0 )); // µs - unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); - // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time - if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000; - mutex_unlock( &uplink->rttLock ); - if ( uplink->fd != -1 && isCurrent ) { - // Was measuring current server - currentRtt = avg; - close( sock ); - } else if ( avg < bestRtt ) { - // Was another server, update "best" - if ( bestSock != -1 ) close( bestSock ); - bestSock = sock; - bestRtt = avg; - bestIndex = itAlt; - bestProtocolVersion = protocolVersion; - } else { - // Was too slow, ignore - close( sock ); - } - // We're done, call continue - continue; - // Jump here if anything went wrong - // This will cleanup and continue - server_failed: ; - altservers_serverFailed( &servers[itAlt] ); - server_image_not_available: ; - close( sock ); - } - // Done testing all servers. See if we should switch - if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { - // yep - if ( currentRtt > 10000000 || uplink->fd == -1 ) { - LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); - } else { - LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); - } - sock_setTimeout( bestSock, _uplinkTimeout ); - mutex_lock( &uplink->rttLock ); - uplink->betterFd = bestSock; - uplink->betterServer = servers[bestIndex]; - uplink->betterVersion = bestProtocolVersion; - uplink->rttTestResult = RTT_DOCHANGE; - mutex_unlock( &uplink->rttLock ); - signal_call( uplink->signal ); - } else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) { - // No server was reachable - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_NOT_REACHABLE; - mutex_unlock( &uplink->rttLock ); - } else { - // nope - if ( bestSock != -1 ) close( bestSock ); - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_DONTCHANGE; - uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away - mutex_unlock( &uplink->rttLock ); - if ( !image->working ) { - image->working = true; - LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink ); - } + // Select image ++++++++++++++++++++++++++++++ + if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) { + goto image_failed; + } + // See if selecting the image succeeded ++++++++++++++++++++++++++++++ + uint16_t protocolVersion = 0; + uint16_t rid; + uint64_t imageSize; + char *name; + serialized_buffer_t serialized; + if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { + goto image_failed; + } + if ( protocolVersion < MIN_SUPPORTED_SERVER ) { // Server version unsupported; global fail + goto server_failed; + } + if ( name == NULL || strcmp( name, image->name ) != 0 ) { + ERROR_GOTO( image_failed, "[RTT] Server offers image '%s' instead of '%s'", name, image->name ); + } + if ( rid != image->rid ) { + ERROR_GOTO( image_failed, "[RTT] Server provides rid %d instead of %d", (int)rid, (int)image->rid ); + } + if ( imageSize != image->virtualFilesize ) { + ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); + } + // Request block (NOT random! First or from queue) ++++++++++++ + if ( !dnbd3_get_block( sock, offset, length, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request block", server ); + } + // See if requesting the block succeeded ++++++++++++++++++++++ + dnbd3_reply_t reply; + if ( !dnbd3_get_reply( sock, &reply ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server ); + } + // check reply header + if ( reply.cmd != CMD_GET_BLOCK || reply.size != length ) { + // Sanity check failed; count this as global error (malicious/broken server) + ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); + } + // flush payload to include this into measurement + char buffer[DNBD3_BLOCK_SIZE]; + uint32_t todo = length; + ssize_t ret; + while ( todo != 0 && ( ret = recv( sock, buffer, MIN( DNBD3_BLOCK_SIZE, todo ), MSG_WAITALL ) ) > 0 ) { + todo -= (uint32_t)ret; + } + if ( todo != 0 ) { + ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server ); + } + clock_gettime( BEST_CLOCK_SOURCE, &end ); + // Measurement done - everything fine so far + mutex_lock( &uplink->rttLock ); + const bool isCurrent = ( uplink->current.index == server ); + mutex_unlock( &uplink->rttLock ); + uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000 + + (end.tv_nsec - start.tv_nsec) / 1000); // µs + uint32_t avg = altservers_updateRtt( uplink, server, rtt ); + // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time + if ( ( uplink->cycleDetected || panic ) && isCurrent ) { + avg = (avg * 2) + 50000; + } + if ( !panic && isCurrent ) { + // Was measuring current server + currentRtt = avg; + close( sock ); + } else if ( avg < bestRtt ) { + // Was another server, update "best" + if ( best.fd != -1 ) { + close( best.fd ); } - image_release( image ); - // end of loop over all pending uplinks - mutex_lock( &pendingLockWrite ); - pending[itLink] = NULL; - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); + best.fd = sock; + bestRtt = avg; + best.index = server; + best.version = protocolVersion; + } else { + // Was too slow, ignore + close( sock ); + } + // We're done, call continue + continue; + // Jump here if anything went wrong + // This will cleanup and continue +image_failed: + altservers_imageFailed( uplink, server ); + goto failed; +server_failed: + altservers_serverFailed( server ); +failed: + close( sock ); + } + // Done testing all servers. See if we should switch + if ( best.fd != -1 && (panic || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { + // yep + if ( currentRtt > 10000000 || panic ) { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); + } else { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); } - // Save cache maps of all images if applicable - declare_now; - // TODO: Has nothing to do with alt servers really, maybe move somewhere else? - if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) { - timing_gets( &nextCloseUnusedFd, 900 ); - image_closeUnusedFd(); + mutex_lock( &uplink->rttLock ); + uplink->better = best; + uplink->rttTestResult = RTT_DOCHANGE; + mutex_unlock( &uplink->rttLock ); + signal_call( uplink->signal ); + } else if ( best.fd == -1 && currentRtt == RTT_UNREACHABLE ) { + // No server was reachable, including current + uplink->rttTestResult = RTT_NOT_REACHABLE; + } else { + // nope + if ( best.fd != -1 ) { + close( best.fd ); } + uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away + mutex_lock( &uplink->rttLock ); + uplink->rttTestResult = RTT_DONTCHANGE; + mutex_unlock( &uplink->rttLock ); } - cleanup: ; - if ( runSignal != NULL ) signal_close( runSignal ); - runSignal = NULL; - return NULL ; + image_release( image ); } |