summaryrefslogtreecommitdiffstats
path: root/src/server/altservers.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/altservers.c')
-rw-r--r--src/server/altservers.c79
1 files changed, 57 insertions, 22 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 943345c..4413ca6 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -5,16 +5,16 @@
#include "helper.h"
#include "image.h"
#include "fileutil.h"
-#include "../shared/protocol.h"
-#include "../shared/timing.h"
-#include "../serverconfig.h"
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/timing.h>
+#include <dnbd3/config/server.h>
#include "reference.h"
#include <assert.h>
#include <inttypes.h>
#include <jansson.h>
-#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, image->name, (int)image->rid)
+#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, PIMG(image))
#define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0);
#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__)
@@ -172,7 +172,7 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink)
if ( uplink->rttTestResult != RTT_INPROGRESS ) {
dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref );
if ( current == uplink ) {
- threadpool_run( &altservers_runCheck, uplink );
+ threadpool_run( &altservers_runCheck, uplink, "UPLINK" );
} else if ( current != NULL ) {
ref_put( &current->reference );
}
@@ -268,12 +268,32 @@ int altservers_getHostListForReplication(const char *image, dnbd3_host_t *server
int idx[size];
int num = altservers_getListForUplink( NULL, image, idx, size, -1 );
for ( int i = 0; i < num; ++i ) {
- servers[i] = altServers[i].host;
+ servers[i] = altServers[idx[i]].host;
}
return num;
}
/**
+ * Returns true if there is at least one alt-server the
+ * given image name would be allowed to be cloned from.
+ */
+bool altservers_imageHasAltServers(const char *image)
+{
+ bool ret = false;
+ mutex_lock( &altServersLock );
+ for ( int i = 0; i < numAltServers; ++i ) {
+ if ( altServers[i].isClientOnly || ( !altServers[i].isPrivate && _proxyPrivateOnly ) )
+ continue;
+ if ( !isImageAllowed( &altServers[i], image ) )
+ continue;
+ ret = true;
+ break;
+ }
+ mutex_unlock( &altServersLock );
+ return ret;
+}
+
+/**
* Get <size> alt servers. If there are more alt servers than
* requested, random servers will be picked.
* This function is suited for finding uplink servers as
@@ -450,6 +470,11 @@ static void *altservers_runCheck(void *data)
void altservers_findUplink(dnbd3_uplink_t *uplink)
{
altservers_findUplinkInternal( uplink );
+ // Above function is sync, which means normally when it
+ // returns, rttTestResult will not be RTT_INPROGRESS.
+ // But we might have an ansync call running in parallel, which would
+ // mean the above call returns immediately. Wait for that check
+ // to finish too.
while ( uplink->rttTestResult == RTT_INPROGRESS ) {
usleep( 5000 );
}
@@ -504,17 +529,29 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" );
return;
}
- LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid );
+ logadd( LOG_DEBUG2, "Running alt check for %s:%d", PIMG(image) );
assert( uplink->rttTestResult == RTT_INPROGRESS );
// Test them all
dnbd3_server_connection_t best = { .fd = -1 };
unsigned long bestRtt = RTT_UNREACHABLE;
unsigned long currentRtt = RTT_UNREACHABLE;
+ uint64_t offset = 0;
+ uint32_t length = DNBD3_BLOCK_SIZE;
+ // Try to use the range of the first request in the queue as RTT block.
+ // In case we have a cluster of servers where none of them has a complete
+ // copy, we at least make sure the one we're potentially switching to
+ // has the next block we're about to request.
+ mutex_lock( &uplink->queueLock );
+ if ( uplink->queue != NULL ) {
+ offset = uplink->queue->from;
+ length = (uint32_t)( uplink->queue->to - offset );
+ }
+ mutex_unlock( &uplink->queueLock );
for (itAlt = 0; itAlt < numAlts; ++itAlt) {
int server = servers[itAlt];
// Connect
clock_gettime( BEST_CLOCK_SOURCE, &start );
- int sock = sock_connect( &altServers[server].host, 750, 1000 );
+ int sock = sock_connect( &altServers[server].host, 750, _uplinkTimeout );
if ( sock == -1 ) { // Connection failed means global error
altservers_serverFailed( server );
continue;
@@ -524,7 +561,8 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
goto image_failed;
}
// See if selecting the image succeeded ++++++++++++++++++++++++++++++
- uint16_t protocolVersion, rid;
+ uint16_t protocolVersion = 0;
+ uint16_t rid;
uint64_t imageSize;
char *name;
serialized_buffer_t serialized;
@@ -543,9 +581,9 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
if ( imageSize != image->virtualFilesize ) {
ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize );
}
- // Request first block (NOT random!) ++++++++++++++++++++++++++++++
- if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
- LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server );
+ // Request block (NOT random! First or from queue) ++++++++++++
+ if ( !dnbd3_get_block( sock, offset, length, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
+ LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request block", server );
}
// See if requesting the block succeeded ++++++++++++++++++++++
dnbd3_reply_t reply;
@@ -553,13 +591,18 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server );
}
// check reply header
- if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
+ if ( reply.cmd != CMD_GET_BLOCK || reply.size != length ) {
// Sanity check failed; count this as global error (malicious/broken server)
ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size );
}
// flush payload to include this into measurement
char buffer[DNBD3_BLOCK_SIZE];
- if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
+ uint32_t todo = length;
+ ssize_t ret;
+ while ( todo != 0 && ( ret = recv( sock, buffer, MIN( DNBD3_BLOCK_SIZE, todo ), MSG_WAITALL ) ) > 0 ) {
+ todo -= (uint32_t)ret;
+ }
+ if ( todo != 0 ) {
ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server );
}
clock_gettime( BEST_CLOCK_SOURCE, &end );
@@ -567,9 +610,6 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
mutex_lock( &uplink->rttLock );
const bool isCurrent = ( uplink->current.index == server );
mutex_unlock( &uplink->rttLock );
- // Penaltize rtt if this was a cycle; this will treat this server with lower priority
- // in the near future too, so we prevent alternating between two servers that are both
- // part of a cycle and have the lowest latency.
uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000
+ (end.tv_nsec - start.tv_nsec) / 1000); // µs
uint32_t avg = altservers_updateRtt( uplink, server, rtt );
@@ -614,7 +654,6 @@ failed:
} else {
LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
}
- sock_setTimeout( best.fd, _uplinkTimeout );
mutex_lock( &uplink->rttLock );
uplink->better = best;
uplink->rttTestResult = RTT_DOCHANGE;
@@ -628,10 +667,6 @@ failed:
if ( best.fd != -1 ) {
close( best.fd );
}
- if ( !image->working || uplink->cycleDetected ) {
- image->working = true;
- LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid );
- }
uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;