diff options
Diffstat (limited to 'src/server/altservers.c')
-rw-r--r-- | src/server/altservers.c | 79 |
1 files changed, 57 insertions, 22 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c index 943345c..4413ca6 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -5,16 +5,16 @@ #include "helper.h" #include "image.h" #include "fileutil.h" -#include "../shared/protocol.h" -#include "../shared/timing.h" -#include "../serverconfig.h" +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/config/server.h> #include "reference.h" #include <assert.h> #include <inttypes.h> #include <jansson.h> -#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, image->name, (int)image->rid) +#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, PIMG(image)) #define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0); #define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) @@ -172,7 +172,7 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) if ( uplink->rttTestResult != RTT_INPROGRESS ) { dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref ); if ( current == uplink ) { - threadpool_run( &altservers_runCheck, uplink ); + threadpool_run( &altservers_runCheck, uplink, "UPLINK" ); } else if ( current != NULL ) { ref_put( ¤t->reference ); } @@ -268,12 +268,32 @@ int altservers_getHostListForReplication(const char *image, dnbd3_host_t *server int idx[size]; int num = altservers_getListForUplink( NULL, image, idx, size, -1 ); for ( int i = 0; i < num; ++i ) { - servers[i] = altServers[i].host; + servers[i] = altServers[idx[i]].host; } return num; } /** + * Returns true if there is at least one alt-server the + * given image name would be allowed to be cloned from. + */ +bool altservers_imageHasAltServers(const char *image) +{ + bool ret = false; + mutex_lock( &altServersLock ); + for ( int i = 0; i < numAltServers; ++i ) { + if ( altServers[i].isClientOnly || ( !altServers[i].isPrivate && _proxyPrivateOnly ) ) + continue; + if ( !isImageAllowed( &altServers[i], image ) ) + continue; + ret = true; + break; + } + mutex_unlock( &altServersLock ); + return ret; +} + +/** * Get <size> alt servers. If there are more alt servers than * requested, random servers will be picked. * This function is suited for finding uplink servers as @@ -450,6 +470,11 @@ static void *altservers_runCheck(void *data) void altservers_findUplink(dnbd3_uplink_t *uplink) { altservers_findUplinkInternal( uplink ); + // Above function is sync, which means normally when it + // returns, rttTestResult will not be RTT_INPROGRESS. + // But we might have an ansync call running in parallel, which would + // mean the above call returns immediately. Wait for that check + // to finish too. while ( uplink->rttTestResult == RTT_INPROGRESS ) { usleep( 5000 ); } @@ -504,17 +529,29 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" ); return; } - LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid ); + logadd( LOG_DEBUG2, "Running alt check for %s:%d", PIMG(image) ); assert( uplink->rttTestResult == RTT_INPROGRESS ); // Test them all dnbd3_server_connection_t best = { .fd = -1 }; unsigned long bestRtt = RTT_UNREACHABLE; unsigned long currentRtt = RTT_UNREACHABLE; + uint64_t offset = 0; + uint32_t length = DNBD3_BLOCK_SIZE; + // Try to use the range of the first request in the queue as RTT block. + // In case we have a cluster of servers where none of them has a complete + // copy, we at least make sure the one we're potentially switching to + // has the next block we're about to request. + mutex_lock( &uplink->queueLock ); + if ( uplink->queue != NULL ) { + offset = uplink->queue->from; + length = (uint32_t)( uplink->queue->to - offset ); + } + mutex_unlock( &uplink->queueLock ); for (itAlt = 0; itAlt < numAlts; ++itAlt) { int server = servers[itAlt]; // Connect clock_gettime( BEST_CLOCK_SOURCE, &start ); - int sock = sock_connect( &altServers[server].host, 750, 1000 ); + int sock = sock_connect( &altServers[server].host, 750, _uplinkTimeout ); if ( sock == -1 ) { // Connection failed means global error altservers_serverFailed( server ); continue; @@ -524,7 +561,8 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) goto image_failed; } // See if selecting the image succeeded ++++++++++++++++++++++++++++++ - uint16_t protocolVersion, rid; + uint16_t protocolVersion = 0; + uint16_t rid; uint64_t imageSize; char *name; serialized_buffer_t serialized; @@ -543,9 +581,9 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) if ( imageSize != image->virtualFilesize ) { ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); } - // Request first block (NOT random!) ++++++++++++++++++++++++++++++ - if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { - LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server ); + // Request block (NOT random! First or from queue) ++++++++++++ + if ( !dnbd3_get_block( sock, offset, length, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request block", server ); } // See if requesting the block succeeded ++++++++++++++++++++++ dnbd3_reply_t reply; @@ -553,13 +591,18 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server ); } // check reply header - if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { + if ( reply.cmd != CMD_GET_BLOCK || reply.size != length ) { // Sanity check failed; count this as global error (malicious/broken server) ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); } // flush payload to include this into measurement char buffer[DNBD3_BLOCK_SIZE]; - if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { + uint32_t todo = length; + ssize_t ret; + while ( todo != 0 && ( ret = recv( sock, buffer, MIN( DNBD3_BLOCK_SIZE, todo ), MSG_WAITALL ) ) > 0 ) { + todo -= (uint32_t)ret; + } + if ( todo != 0 ) { ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server ); } clock_gettime( BEST_CLOCK_SOURCE, &end ); @@ -567,9 +610,6 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) mutex_lock( &uplink->rttLock ); const bool isCurrent = ( uplink->current.index == server ); mutex_unlock( &uplink->rttLock ); - // Penaltize rtt if this was a cycle; this will treat this server with lower priority - // in the near future too, so we prevent alternating between two servers that are both - // part of a cycle and have the lowest latency. uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000); // µs uint32_t avg = altservers_updateRtt( uplink, server, rtt ); @@ -614,7 +654,6 @@ failed: } else { LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); } - sock_setTimeout( best.fd, _uplinkTimeout ); mutex_lock( &uplink->rttLock ); uplink->better = best; uplink->rttTestResult = RTT_DOCHANGE; @@ -628,10 +667,6 @@ failed: if ( best.fd != -1 ) { close( best.fd ); } - if ( !image->working || uplink->cycleDetected ) { - image->working = true; - LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid ); - } uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away mutex_lock( &uplink->rttLock ); uplink->rttTestResult = RTT_DONTCHANGE; |