summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/altservers.c738
-rw-r--r--src/server/altservers.h16
-rw-r--r--src/server/globals.h41
-rw-r--r--src/server/image.c6
-rw-r--r--src/server/net.c16
-rw-r--r--src/server/server.c8
-rw-r--r--src/server/uplink.c117
-rw-r--r--src/server/uplink.h2
-rw-r--r--src/serverconfig.h10
9 files changed, 469 insertions, 485 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index fbe10a8..493ed9e 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -1,5 +1,6 @@
#include "altservers.h"
#include "locks.h"
+#include "threadpool.h"
#include "helper.h"
#include "image.h"
#include "fileutil.h"
@@ -14,46 +15,22 @@
#define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0);
#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__)
-static dnbd3_uplink_t * _Atomic pending[SERVER_MAX_PENDING_ALT_CHECKS];
-static dnbd3_signal_t * _Atomic runSignal = NULL;
-
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
static atomic_int numAltServers = 0;
static pthread_mutex_t altServersLock;
+static ticks nextCloseUnusedFd; // TODO: Move away
-static pthread_t altThread;
-
-static void *altservers_main(void *data);
-static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt);
+static void *altservers_runCheck(void *data);
+static int altservers_getListForUplink(dnbd3_uplink_t *uplink, int *servers, int size, int current);
+static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink);
+static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt);
+static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server);
void altservers_init()
{
srand( (unsigned int)time( NULL ) );
- // Init spinlock
+ // Init lock
mutex_init( &altServersLock, LOCK_ALT_SERVER_LIST );
- // Init signal
- runSignal = signal_new();
- if ( runSignal == NULL ) {
- logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." );
- exit( EXIT_FAILURE );
- }
- memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
- if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
- logadd( LOG_ERROR, "Could not start altservers connector thread" );
- exit( EXIT_FAILURE );
- }
- // Init waiting links queue -- this is currently a global static array so
- // it will already be zero, but in case we refactor later do it explicitly
- for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- pending[i] = NULL;
- }
-}
-
-void altservers_shutdown()
-{
- if ( runSignal == NULL ) return;
- signal_call( runSignal ); // Wake altservers thread up
- thread_join( altThread, NULL );
}
static void addalt(int argc, char **argv, void *data)
@@ -121,7 +98,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate
/**
* ONLY called from the passed uplink's main thread
*/
-void altservers_findUplink(dnbd3_uplink_t *uplink)
+void altservers_findUplinkAsync(dnbd3_uplink_t *uplink)
{
if ( uplink->shutdown )
return;
@@ -135,67 +112,11 @@ void altservers_findUplink(dnbd3_uplink_t *uplink)
assert( uplink->better.fd == -1 );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
- // XXX As this function is only ever called by the image's uplink thread,
- // it cannot happen that the uplink ends up in this list concurrently
mutex_lock( &uplink->rttLock );
- if ( uplink->rttTestResult == RTT_INPROGRESS ) {
- for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] != uplink ) continue;
- // Yep, measuring right now
- return;
- }
- }
- // Find free slot for measurement
- uplink->rttTestResult = RTT_INPROGRESS;
- for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] != NULL ) continue;
- dnbd3_uplink_t *null = NULL;
- if ( atomic_compare_exchange_strong( &pending[i], &null, uplink ) ) {
- mutex_unlock( &uplink->rttLock );
- atomic_thread_fence( memory_order_release );
- signal_call( runSignal ); // Wake altservers thread up
- return;
- }
+ if ( uplink->rttTestResult != RTT_INPROGRESS ) {
+ threadpool_run( &altservers_runCheck, uplink );
}
- // End of loop - no free slot
- uplink->rttTestResult = RTT_NOT_REACHABLE;
mutex_unlock( &uplink->rttLock );
- logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." );
-}
-
-/**
- * The given uplink is about to disappear,
- * wait until any pending RTT check is done.
- */
-void altservers_removeUplink(dnbd3_uplink_t *uplink)
-{
- assert( uplink != NULL );
- assert( uplink->shutdown );
- int i;
- for ( i = 1 ;; ++i ) {
- atomic_thread_fence( memory_order_acquire );
- if ( runSignal == NULL ) {
- // Thread is already done, remove manually
- uplink->rttTestResult = RTT_NOT_REACHABLE;
- break;
- }
- // Thread still running, wait until test is done
- bool found = false;
- for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] == uplink ) {
- found = true;
- break;
- }
- }
- if ( !found ) // No more test running
- break;
- usleep( 10000 ); // 10ms
- signal_call( runSignal ); // Wake altservers thread up
- if ( i % 500 == 0 ) {
- logadd( LOG_INFO, "Still waiting for altserver check for uplink %p...", (void*)uplink );
- }
- }
- logadd( LOG_DEBUG1, "Waited for %d iterations for altservers check when tearing down uplink", i );
}
/**
@@ -209,90 +130,124 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output
if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0;
int i, j;
int count = 0;
- int scores[size];
- int score;
- mutex_lock( &altServersLock );
+ uint16_t scores[SERVER_MAX_ALTS] = { 0 };
if ( size > numAltServers ) size = numAltServers;
- for (i = 0; i < numAltServers; ++i) {
- if ( altServers[i].host.type == 0 ) continue; // Slot is empty
- if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers
+ mutex_lock( &altServersLock );
+ for ( i = 0; i < numAltServers; ++i ) {
+ if ( altServers[i].host.type == 0 || altServers[i].isPrivate )
+ continue; // Slot is empty or uplink is for replication only
if ( host->type == altServers[i].host.type ) {
- score = altservers_netCloseness( host, &altServers[i].host ) - altServers[i].numFails;
+ scores[i] = 10 + altservers_netCloseness( host, &altServers[i].host );
} else {
- score = -( altServers[i].numFails + 128 ); // Wrong address family
+ scores[i] = 1; // Wrong address family
}
- if ( count == 0 ) {
- // Trivial - this is the first entry
- output[0].host = altServers[i].host;
- output[0].failures = 0;
- scores[0] = score;
- count++;
- } else {
- // Other entries already exist, insert in proper position
- for (j = 0; j < size; ++j) {
- if ( j < count && score <= scores[j] ) continue;
- if ( j > count ) break; // Should never happen but just in case...
- if ( j < count && j + 1 < size ) {
- // Check if we're in the middle and need to move other entries...
- memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) );
- memmove( &scores[j + 1], &scores[j], sizeof(int) * (size - j - 1) );
- }
- if ( count < size ) {
- count++;
- }
- output[j].host = altServers[i].host;
- output[j].failures = 0;
- scores[j] = score;
- break;
+ }
+ while ( count < size ) {
+ i = -1;
+ for ( j = 0; j < numAltServers; ++j ) {
+ if ( scores[j] == 0 )
+ continue;
+ if ( i == -1 || scores[j] > scores[i] ) {
+ i = j;
}
}
+ if ( i == -1 )
+ break;
+ output[count].host = altServers[i].host;
+ output[count].failures = 0;
+ count++;
}
mutex_unlock( &altServersLock );
return count;
}
+bool altservers_toString(int server, char *buffer, size_t len)
+{
+ return host_to_string( &altServers[server].host, buffer, len );
+}
+
+static bool isUsableForUplink( dnbd3_uplink_t *uplink, int server, ticks *now )
+{
+ dnbd3_alt_local_t *local = ( uplink == NULL ? NULL : &uplink->altData[server] );
+ dnbd3_alt_server_t *global = &altServers[server];
+ if ( global->isClientOnly || ( !global->isPrivate && _proxyPrivateOnly ) )
+ return false;
+ // Blocked locally (image not found on server...)
+ if ( local != NULL && local->blocked ) {
+ if ( --local->fails > 0 )
+ return false;
+ local->blocked = false;
+ }
+ if ( global->blocked ) {
+ if ( timing_diff( &global->lastFail, now ) < SERVER_GLOBAL_DUP_TIME )
+ return false;
+ global->lastFail = *now;
+ if ( --global->fails > 0 )
+ return false;
+ global->blocked = false;
+ }
+ // Not blocked, depend on both fail counters
+ int fails = ( local == NULL ? 0 : local->fails ) + global->fails;
+ return fails < SERVER_BAD_UPLINK_MIN || ( rand() % fails ) < SERVER_BAD_UPLINK_MIN;
+}
+
+int altservers_getHostListForReplication(dnbd3_host_t *servers, int size)
+{
+ int idx[size];
+ int num = altservers_getListForUplink( NULL, idx, size, -1 );
+ for ( int i = 0; i < num; ++i ) {
+ servers[i] = altServers[i].host;
+ }
+ return num;
+}
+
/**
* Get <size> alt servers. If there are more alt servers than
* requested, random servers will be picked.
* This function is suited for finding uplink servers as
* it includes private servers and ignores any "client only" servers
+ * @param current index of server for current connection, or -1 in panic mode
*/
-int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency)
+static int altservers_getListForUplink(dnbd3_uplink_t *uplink, int *servers, int size, int current)
{
- if ( size <= 0 ) return 0;
- int count = 0, i;
- ticks now;
- timing_get( &now );
+ if ( size <= 0 )
+ return 0;
+ int count = 0;
+ declare_now;
mutex_lock( &altServersLock );
- // Flip first server in list with a random one every time this is called
- if ( numAltServers > 1 ) {
- const dnbd3_alt_server_t tmp = altServers[0];
- do {
- i = rand() % numAltServers;
- } while ( i == 0 );
- altServers[0] = altServers[i];
- altServers[i] = tmp;
- }
- // We iterate over the list twice. First run adds servers with 0 failures only,
- // second one also considers those that failed (not too many times)
- if ( size > numAltServers ) size = numAltServers;
- for (i = 0; i < numAltServers * 2; ++i) {
- dnbd3_alt_server_t *srv = &altServers[i % numAltServers];
- if ( srv->host.type == 0 ) continue; // Slot is empty
- if ( _proxyPrivateOnly && !srv->isPrivate ) continue; // Config says to consider private alt-servers only? ignore!
- if ( srv->isClientOnly ) continue;
- bool first = ( i < numAltServers );
- if ( first ) {
- if ( srv->numFails > 0 ) continue;
- } else {
- if ( srv->numFails == 0 ) continue; // Already added in first iteration
- if ( !emergency && srv->numFails > SERVER_BAD_UPLINK_THRES // server failed X times in a row
- && timing_diff( &srv->lastFail, &now ) < SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore!
- if ( !emergency ) srv->numFails--;
+ // If we don't have enough servers to randomize, take a shortcut
+ if ( numAltServers <= size ) {
+ for ( int i = 0; i < numAltServers; ++i ) {
+ if ( current == -1 || i == current || isUsableForUplink( uplink, i, &now ) ) {
+ servers[count++] = i;
+ }
+ }
+ } else {
+ // Plenty of alt servers; randomize
+ uint8_t state[SERVER_MAX_ALTS] = { 0 };
+ if ( current != -1 ) { // Make sure we also test the current server
+ servers[count++] = current;
+ state[current] = 2;
+ }
+ for ( int tr = size * 10; tr > 0 && count < size; --tr ) {
+ int idx = rand() % numAltServers;
+ if ( state[idx] != 0 )
+ continue;
+ if ( isUsableForUplink( uplink, idx, &now ) ) {
+ servers[count++] = idx;
+ state[idx] = 2; // Used
+ } else {
+ state[idx] = 1; // Potential
+ }
+ }
+ // If panic mode, consider others too
+ for ( int tr = size * 10; current == -1 && tr > 0 && count < size; --tr ) {
+ int idx = rand() % numAltServers;
+ if ( state[idx] == 2 )
+ continue;
+ servers[count++] = idx;
+ state[idx] = 2; // Used
}
- // server seems ok, include in output and decrease its fail counter
- output[count++] = srv->host;
- if ( count >= size ) break;
}
mutex_unlock( &altServersLock );
return count;
@@ -320,7 +275,7 @@ json_t* altservers_toJson()
"rtt", rtts,
"isPrivate", (int)src[i].isPrivate,
"isClientOnly", (int)src[i].isClientOnly,
- "numFails", src[i].numFails
+ "numFails", src[i].fails
);
json_array_append_new( list, server );
}
@@ -329,32 +284,27 @@ json_t* altservers_toJson()
/**
* Update rtt history of given server - returns the new average for that server.
- * XXX HOLD altServersLock WHEN CALLING THIS!
*/
-static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt)
+static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt)
{
- unsigned int avg = rtt;
- int i;
- for (i = 0; i < numAltServers; ++i) {
- if ( !isSameAddressPort( host, &altServers[i].host ) ) continue;
- altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
-#if SERVER_RTT_PROBES == 5
- avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2]
- + altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES;
-#else
-#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES
- avg = 0;
- for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
- avg += altServers[i].rtt[j];
+ uint32_t avg = 0, j;
+ dnbd3_alt_local_t *local = &uplink->altData[index];
+ mutex_lock( &altServersLock );
+ if ( likely( local->initDone ) ) {
+ local->rtt[++local->rttIndex % SERVER_RTT_PROBES] = rtt;
+ for ( j = 0; j < SERVER_RTT_PROBES; ++j ) {
+ avg += local->rtt[j];
}
avg /= SERVER_RTT_PROBES;
-#endif
- // If we got a new rtt value, server must be working
- if ( altServers[i].numFails > 0 ) {
- altServers[i].numFails--;
+ } else { // First rtt measurement -- copy to every slot
+ for ( j = 0; j < SERVER_RTT_PROBES; ++j ) {
+ local->rtt[j] = rtt;
}
- break;
+ avg = rtt;
+ local->initDone = true;
}
+ altServers[index].rtt[++altServers[index].rttIndex % SERVER_RTT_PROBES] = avg;
+ mutex_unlock( &altServersLock );
return avg;
}
@@ -383,40 +333,33 @@ int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2)
* track of how often servers fail, and consider them disabled for some time if they
* fail too many times.
*/
-void altservers_serverFailed(const dnbd3_host_t * const host)
+void altservers_serverFailed(int server)
{
- int i;
- int foundIndex = -1, lastOk = -1;
- ticks now;
- timing_get( &now );
+ declare_now;
mutex_lock( &altServersLock );
- for (i = 0; i < numAltServers; ++i) {
- if ( foundIndex == -1 ) {
- // Looking for the failed server in list
- if ( isSameAddressPort( host, &altServers[i].host ) ) {
- foundIndex = i;
- }
- } else if ( altServers[i].host.type != 0 && altServers[i].numFails == 0 ) {
- lastOk = i;
+ if ( timing_diff( &altServers[server].lastFail, &now ) > SERVER_GLOBAL_DUP_TIME ) {
+ altServers[server].lastFail = now;
+ if ( altServers[server].fails++ >= SERVER_BAD_UPLINK_MAX ) {
+ altServers[server].blocked = true;
}
}
- // Do only increase counter if last fail was not too recent. This is
- // to prevent the counter from increasing rapidly if many images use the
- // same uplink. If there's a network hickup, all uplinks will call this
- // function and would increase the counter too quickly, disabling the server.
- if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) {
- altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE;
- altServers[foundIndex].lastFail = now;
- if ( lastOk != -1 ) {
- // Make sure non-working servers are put at the end of the list, so they're less likely
- // to get picked when testing servers for uplink connections.
- const dnbd3_alt_server_t tmp = altServers[foundIndex];
- altServers[foundIndex] = altServers[lastOk];
- altServers[lastOk] = tmp;
- }
+ mutex_unlock( &altServersLock );
+}
+
+/**
+ * Called from RTT checker if connecting to a server succeeded but
+ * subsequently selecting the given image failed. Handle this within
+ * the uplink and don't increase the global fail counter.
+ */
+static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server)
+{
+ mutex_lock( &altServersLock );
+ if ( uplink->altData[server].fails++ >= SERVER_BAD_UPLINK_MAX ) {
+ uplink->altData[server].blocked = true;
}
mutex_unlock( &altServersLock );
}
+
/**
* Mainloop of this module. It will wait for requests by uplinks to find a
* suitable uplink server for them. If found, it will tell the uplink about
@@ -425,206 +368,213 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
* will update quite quickly. Needs to be improved some time, ie. by only
* updating the rtt if the last update was at least X seconds ago.
*/
-static void *altservers_main(void *data UNUSED)
+static void *altservers_runCheck(void *data)
+{
+ dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data;
+
+ assert( uplink != NULL );
+ setThreadName( "altserver-check" );
+ altservers_findUplinkInternal( uplink );
+ // Save cache maps of all images if applicable
+ // TODO: Has nothing to do with alt servers really, maybe move somewhere else?
+ declare_now;
+ if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) {
+ timing_gets( &nextCloseUnusedFd, 900 );
+ image_closeUnusedFd();
+ }
+ return NULL;
+}
+
+void altservers_findUplink(dnbd3_uplink_t *uplink)
+{
+ altservers_findUplinkInternal( uplink );
+ while ( uplink->rttTestResult == RTT_INPROGRESS ) {
+ usleep( 5000 );
+ }
+}
+
+int altservers_hostToIndex(dnbd3_host_t *host)
+{
+ for ( int i = 0; i < numAltServers; ++i ) {
+ if ( isSameAddressPort( host, &altServers[i].host ) )
+ return i;
+ }
+ return -1;
+}
+
+const dnbd3_host_t* altservers_indexToHost(int server)
+{
+ return &altServers[server].host;
+}
+
+// XXX Sync call above must block until async worker has finished XXX
+static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
{
const int ALTS = 4;
- int ret, itLink, itAlt, numAlts;
- bool found;
- char buffer[DNBD3_BLOCK_SIZE ];
- dnbd3_reply_t reply;
- dnbd3_host_t servers[ALTS + 1];
- serialized_buffer_t serialized;
+ int ret, itAlt, numAlts, current;
+ bool panic;
+ int servers[ALTS + 1];
struct timespec start, end;
- ticks nextCloseUnusedFd;
- setThreadName( "altserver-check" );
- blockNoncriticalSignals();
- timing_gets( &nextCloseUnusedFd, 900 );
- // LOOP
- while ( !_shutdown ) {
- // Wait 5 seconds max.
- ret = signal_wait( runSignal, 5000 );
- if ( _shutdown ) goto cleanup;
- if ( ret == SIGNAL_ERROR ) {
- if ( errno == EAGAIN || errno == EINTR ) continue;
- logadd( LOG_WARNING, "Error %d on signal_clear on alservers_main! Things will break!", errno );
- usleep( 100000 );
+ if ( _shutdown )
+ return;
+ mutex_lock( &uplink->rttLock );
+ // Maybe we already have a result, or check is currently running
+ if ( uplink->better.fd != -1 || uplink->rttTestResult == RTT_INPROGRESS ) {
+ mutex_unlock( &uplink->rttLock );
+ return;
+ }
+ assert( uplink->rttTestResult != RTT_DOCHANGE );
+ uplink->rttTestResult = RTT_INPROGRESS;
+ panic = ( uplink->current.fd == -1 );
+ current = uplink->current.index; // Current server index (or last one in panic mode)
+ mutex_unlock( &uplink->rttLock );
+ // First, get 4 alt servers
+ numAlts = altservers_getListForUplink( uplink, servers, ALTS, panic ? -1 : current );
+ // If we're already connected and only got one server anyways, there isn't much to do
+ if ( numAlts == 0 || ( numAlts == 1 && !panic ) ) {
+ uplink->rttTestResult = RTT_DONTCHANGE;
+ return;
+ }
+ dnbd3_image_t * const image = image_lock( uplink->image );
+ if ( image == NULL ) { // Check again after locking
+ uplink->rttTestResult = RTT_NOT_REACHABLE;
+ logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" );
+ return;
+ }
+ LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid );
+ assert( uplink->rttTestResult == RTT_INPROGRESS );
+ // Test them all
+ dnbd3_server_connection_t best = { .fd = -1 };
+ unsigned long bestRtt = RTT_UNREACHABLE;
+ unsigned long currentRtt = RTT_UNREACHABLE;
+ for (itAlt = 0; itAlt < numAlts; ++itAlt) {
+ int server = servers[itAlt];
+ // Connect
+ clock_gettime( BEST_CLOCK_SOURCE, &start );
+ int sock = sock_connect( &altServers[server].host, 750, 1000 );
+ if ( sock == -1 ) { // Connection failed means global error
+ altservers_serverFailed( server );
+ continue;
}
- // Work your way through the queue
- atomic_thread_fence( memory_order_acquire );
- for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- dnbd3_uplink_t * const uplink = pending[itLink];
- if ( uplink == NULL )
- continue;
- // First, get 4 alt servers
- numAlts = altservers_getListForUplink( servers, ALTS, uplink->current.fd == -1 );
- // If we're already connected and only got one server anyways, there isn't much to do
- if ( numAlts <= 1 && uplink->current.fd != -1 ) {
- uplink->rttTestResult = RTT_DONTCHANGE;
- continue;
- }
- dnbd3_image_t * const image = image_lock( uplink->image );
- if ( image == NULL ) { // Check again after locking
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_NOT_REACHABLE;
- assert( pending[itLink] == uplink );
- pending[itLink] = NULL;
- mutex_unlock( &uplink->rttLock );
- logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" );
- continue;
- }
- LOG( LOG_DEBUG2, "[%d] Running alt check", itLink );
- assert( uplink->rttTestResult == RTT_INPROGRESS );
- if ( uplink->current.fd != -1 ) {
- // Add current server if not already in list
- found = false;
- for (itAlt = 0; itAlt < numAlts; ++itAlt) {
- if ( !isSameAddressPort( &uplink->current.host, &servers[itAlt] ) ) continue;
- found = true;
- break;
- }
- if ( !found ) servers[numAlts++] = uplink->current.host;
- }
- // Test them all
- int bestSock = -1;
- int bestIndex = -1;
- int bestProtocolVersion = -1;
- unsigned long bestRtt = RTT_UNREACHABLE;
- unsigned long currentRtt = RTT_UNREACHABLE;
- for (itAlt = 0; itAlt < numAlts; ++itAlt) {
- usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...)
- // Connect
- clock_gettime( BEST_CLOCK_SOURCE, &start );
- int sock = sock_connect( &servers[itAlt], 750, 1000 );
- if ( sock < 0 ) continue;
- // Select image ++++++++++++++++++++++++++++++
- if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) {
- goto server_failed;
- }
- // See if selecting the image succeeded ++++++++++++++++++++++++++++++
- uint16_t protocolVersion, rid;
- uint64_t imageSize;
- char *name;
- if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) {
- goto server_image_not_available;
- }
- if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed;
- if ( name == NULL || strcmp( name, image->name ) != 0 ) {
- ERROR_GOTO( server_failed, "[RTT] Server offers image '%s'", name );
- }
- if ( rid != image->rid ) {
- ERROR_GOTO( server_failed, "[RTT] Server provides rid %d", (int)rid );
- }
- if ( imageSize != image->virtualFilesize ) {
- ERROR_GOTO( server_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize );
- }
- // Request first block (NOT random!) ++++++++++++++++++++++++++++++
- if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
- LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", itLink );
- }
- // See if requesting the block succeeded ++++++++++++++++++++++
- if ( !dnbd3_get_reply( sock, &reply ) ) {
- LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", itLink );
- }
- // check reply header
- if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
- ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size );
- }
- if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
- ERROR_GOTO( server_failed, "[RTT%d] Could not read first block payload", itLink );
- }
- clock_gettime( BEST_CLOCK_SOURCE, &end );
- // Measurement done - everything fine so far
- mutex_lock( &altServersLock );
- mutex_lock( &uplink->rttLock );
- const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->current.host );
- // Penaltize rtt if this was a cycle; this will treat this server with lower priority
- // in the near future too, so we prevent alternating between two servers that are both
- // part of a cycle and have the lowest latency.
- const unsigned int rtt = (unsigned int)((end.tv_sec - start.tv_sec) * 1000000
- + (end.tv_nsec - start.tv_nsec) / 1000
- + ( (isCurrent && uplink->cycleDetected) ? 1000000 : 0 )); // µs
- unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt );
- mutex_unlock( &altServersLock );
- // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
- if ( ( uplink->cycleDetected || uplink->current.fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000;
- mutex_unlock( &uplink->rttLock );
- if ( uplink->current.fd != -1 && isCurrent ) {
- // Was measuring current server
- currentRtt = avg;
- close( sock );
- } else if ( avg < bestRtt ) {
- // Was another server, update "best"
- if ( bestSock != -1 ) close( bestSock );
- bestSock = sock;
- bestRtt = avg;
- bestIndex = itAlt;
- bestProtocolVersion = protocolVersion;
- } else {
- // Was too slow, ignore
- close( sock );
- }
- // We're done, call continue
- continue;
- // Jump here if anything went wrong
- // This will cleanup and continue
- server_failed: ;
- altservers_serverFailed( &servers[itAlt] );
- server_image_not_available: ;
- close( sock );
- }
- // Done testing all servers. See if we should switch
- if ( bestSock != -1 && (uplink->current.fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
- // yep
- if ( currentRtt > 10000000 || uplink->current.fd == -1 ) {
- LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt );
- } else {
- LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
- }
- sock_setTimeout( bestSock, _uplinkTimeout );
- mutex_lock( &uplink->rttLock );
- uplink->better.fd = bestSock;
- uplink->better.host = servers[bestIndex];
- uplink->better.version = bestProtocolVersion;
- uplink->rttTestResult = RTT_DOCHANGE;
- mutex_unlock( &uplink->rttLock );
- signal_call( uplink->signal );
- } else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) {
- // No server was reachable
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_NOT_REACHABLE;
- mutex_unlock( &uplink->rttLock );
- } else {
- // nope
- if ( bestSock != -1 ) close( bestSock );
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_DONTCHANGE;
- uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
- mutex_unlock( &uplink->rttLock );
- if ( !image->working ) {
- image->working = true;
- LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink );
- }
- }
- image_release( image );
- // end of loop over all pending uplinks
- assert( pending[itLink] == uplink );
- pending[itLink] = NULL;
- atomic_thread_fence( memory_order_release );
+ // Select image ++++++++++++++++++++++++++++++
+ if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) {
+ goto image_failed;
}
- // Save cache maps of all images if applicable
- declare_now;
- // TODO: Has nothing to do with alt servers really, maybe move somewhere else?
- if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) {
- timing_gets( &nextCloseUnusedFd, 900 );
- image_closeUnusedFd();
+ // See if selecting the image succeeded ++++++++++++++++++++++++++++++
+ uint16_t protocolVersion, rid;
+ uint64_t imageSize;
+ char *name;
+ serialized_buffer_t serialized;
+ if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) {
+ goto image_failed;
}
+ if ( protocolVersion < MIN_SUPPORTED_SERVER ) { // Server version unsupported; global fail
+ goto server_failed;
+ }
+ if ( name == NULL || strcmp( name, image->name ) != 0 ) {
+ ERROR_GOTO( image_failed, "[RTT] Server offers image '%s' instead of '%s'", name, image->name );
+ }
+ if ( rid != image->rid ) {
+ ERROR_GOTO( image_failed, "[RTT] Server provides rid %d instead of %d", (int)rid, (int)image->rid );
+ }
+ if ( imageSize != image->virtualFilesize ) {
+ ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize );
+ }
+ // Request first block (NOT random!) ++++++++++++++++++++++++++++++
+ if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
+ LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server );
+ }
+ // See if requesting the block succeeded ++++++++++++++++++++++
+ dnbd3_reply_t reply;
+ if ( !dnbd3_get_reply( sock, &reply ) ) {
+ LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server );
+ }
+ // check reply header
+ if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
+ // Sanity check failed; count this as global error (malicious/broken server)
+ ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size );
+ }
+ // flush payload to include this into measurement
+ char buffer[DNBD3_BLOCK_SIZE];
+ if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
+ ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server );
+ }
+ clock_gettime( BEST_CLOCK_SOURCE, &end );
+ // Measurement done - everything fine so far
+ mutex_lock( &uplink->rttLock );
+ const bool isCurrent = ( uplink->current.index == server );
+ mutex_unlock( &uplink->rttLock );
+ // Penaltize rtt if this was a cycle; this will treat this server with lower priority
+ // in the near future too, so we prevent alternating between two servers that are both
+ // part of a cycle and have the lowest latency.
+ uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000
+ + (end.tv_nsec - start.tv_nsec) / 1000); // µs
+ uint32_t avg = altservers_updateRtt( uplink, server, rtt );
+ // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
+ if ( ( uplink->cycleDetected || panic ) && isCurrent ) {
+ avg = (avg * 2) + 50000;
+ }
+ if ( !panic && isCurrent ) {
+ // Was measuring current server
+ currentRtt = avg;
+ close( sock );
+ } else if ( avg < bestRtt ) {
+ // Was another server, update "best"
+ if ( best.fd != -1 ) {
+ close( best.fd );
+ }
+ best.fd = sock;
+ bestRtt = avg;
+ best.index = server;
+ best.version = protocolVersion;
+ } else {
+ // Was too slow, ignore
+ close( sock );
+ }
+ // We're done, call continue
+ continue;
+ // Jump here if anything went wrong
+ // This will cleanup and continue
+image_failed:
+ altservers_imageFailed( uplink, server );
+ goto failed;
+server_failed:
+ altservers_serverFailed( server );
+failed:
+ close( sock );
}
- cleanup: ;
- if ( runSignal != NULL ) {
- signal_close( runSignal );
+ // Done testing all servers. See if we should switch
+ if ( best.fd != -1 && (panic || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
+ // yep
+ if ( currentRtt > 10000000 || panic ) {
+ LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt );
+ } else {
+ LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
+ }
+ sock_setTimeout( best.fd, _uplinkTimeout );
+ mutex_lock( &uplink->rttLock );
+ uplink->better = best;
+ uplink->rttTestResult = RTT_DOCHANGE;
+ mutex_unlock( &uplink->rttLock );
+ signal_call( uplink->signal );
+ } else if ( best.fd == -1 && currentRtt == RTT_UNREACHABLE ) {
+ // No server was reachable, including current
+ uplink->rttTestResult = RTT_NOT_REACHABLE;
+ } else {
+ // nope
+ if ( best.fd != -1 ) {
+ close( best.fd );
+ }
+ if ( !image->working || uplink->cycleDetected ) {
+ image->working = true;
+ LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid );
+ }
+ uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
+ mutex_lock( &uplink->rttLock );
+ uplink->rttTestResult = RTT_DONTCHANGE;
+ mutex_unlock( &uplink->rttLock );
}
- runSignal = NULL;
- return NULL ;
+ image_release( image );
}
diff --git a/src/server/altservers.h b/src/server/altservers.h
index e03b900..8e2b964 100644
--- a/src/server/altservers.h
+++ b/src/server/altservers.h
@@ -7,23 +7,27 @@ struct json_t;
void altservers_init();
-void altservers_shutdown();
-
int altservers_load();
bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly);
-void altservers_findUplink(dnbd3_uplink_t *uplink);
+void altservers_findUplinkAsync(dnbd3_uplink_t *uplink);
-void altservers_removeUplink(dnbd3_uplink_t *uplink);
+void altservers_findUplink(dnbd3_uplink_t *uplink);
int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size);
-int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency);
+int altservers_getHostListForReplication(dnbd3_host_t *servers, int size);
+
+bool altservers_toString(int server, char *buffer, size_t len);
int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2);
-void altservers_serverFailed(const dnbd3_host_t * const host);
+void altservers_serverFailed(int server);
+
+int altservers_hostToIndex(dnbd3_host_t *host);
+
+const dnbd3_host_t* altservers_indexToHost(int server);
struct json_t* altservers_toJson();
diff --git a/src/server/globals.h b/src/server/globals.h
index 659e5a2..4d97c6b 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -30,10 +30,31 @@ typedef struct
uint8_t hopCount; // How many hops this request has already taken across proxies
} dnbd3_queued_request_t;
+typedef struct
+{
+ int fails; // Hard fail: Connection failed
+ int rttIndex;
+ uint32_t rtt[SERVER_RTT_PROBES];
+ bool isPrivate, isClientOnly;
+ bool blocked; // If true count down fails until 0 to enable again
+ ticks lastFail; // Last hard fail
+ dnbd3_host_t host;
+ char comment[COMMENT_LENGTH];
+} dnbd3_alt_server_t;
+
+typedef struct
+{
+ int fails; // Soft fail: Image not found
+ int rttIndex;
+ uint32_t rtt[SERVER_RTT_PROBES];
+ bool blocked; // True if server is to be ignored and fails should be counted down
+ bool initDone;
+} dnbd3_alt_local_t;
+
typedef struct {
- int fd; // Socket fd for this connection
- int version; // Protocol version of remote server
- dnbd3_host_t host; // IP/Port of remote server
+ int fd; // Socket fd for this connection
+ int version; // Protocol version of remote server
+ int index; // Entry in uplinks list
} dnbd3_server_connection_t;
#define RTT_IDLE 0 // Not in progress
@@ -51,7 +72,7 @@ struct _dnbd3_uplink
pthread_mutex_t queueLock; // lock for synchronization on request queue etc.
dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer
pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer
- int rttTestResult; // RTT_*
+ atomic_int rttTestResult; // RTT_*
int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD!
uint8_t *recvBuffer; // Buffer for receiving payload
uint32_t recvBufferLen; // Len of ^^
@@ -65,21 +86,11 @@ struct _dnbd3_uplink
atomic_int queueLen; // length of queue
uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives)
dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
+ dnbd3_alt_local_t altData[SERVER_MAX_ALTS];
};
typedef struct
{
- char comment[COMMENT_LENGTH];
- dnbd3_host_t host;
- unsigned int rtt[SERVER_RTT_PROBES];
- unsigned int rttIndex;
- bool isPrivate, isClientOnly;
- ticks lastFail;
- int numFails;
-} dnbd3_alt_server_t;
-
-typedef struct
-{
uint8_t host[16];
int bytes;
int bitMask;
diff --git a/src/server/image.c b/src/server/image.c
index d250715..1a6e0f8 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -1178,7 +1178,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
dnbd3_host_t servers[REP_NUM_SRV];
int uplinkSock = -1;
dnbd3_host_t uplinkServer;
- const int count = altservers_getListForUplink( servers, REP_NUM_SRV, false );
+ const int count = altservers_getHostListForReplication( servers, REP_NUM_SRV );
uint16_t remoteProtocolVersion;
uint16_t remoteRid = revision;
uint64_t remoteImageSize;
@@ -1491,7 +1491,7 @@ json_t* image_getListAsJson()
json_t *imagesJson = json_array();
json_t *jsonImage;
int i;
- char uplinkName[100] = { 0 };
+ char uplinkName[100];
uint64_t bytesReceived;
int completeness, idleTime;
declare_now;
@@ -1508,7 +1508,7 @@ json_t* image_getListAsJson()
uplinkName[0] = '\0';
} else {
bytesReceived = image->uplink->bytesReceived;
- if ( image->uplink->current.fd == -1 || !host_to_string( &image->uplink->current.host, uplinkName, sizeof(uplinkName) ) ) {
+ if ( !uplink_getHostString( image->uplink, uplinkName, sizeof(uplinkName) ) ) {
uplinkName[0] = '\0';
}
}
diff --git a/src/server/net.c b/src/server/net.c
index 7f3c1ce..4976eea 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -669,11 +669,19 @@ static void removeFromList(dnbd3_client_t *client)
{
int i;
mutex_lock( &_clients_lock );
- for ( i = _num_clients - 1; i >= 0; --i ) {
- if ( _clients[i] == client ) {
- _clients[i] = NULL;
+ if ( _num_clients != 0 ) {
+ for ( i = _num_clients - 1; i >= 0; --i ) {
+ if ( _clients[i] == client ) {
+ _clients[i] = NULL;
+ break;
+ }
+ }
+ if ( i != 0 && i + 1 == _num_clients ) {
+ do {
+ i--;
+ } while ( _clients[i] == NULL && i > 0 );
+ _num_clients = i + 1;
}
- if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients;
}
mutex_unlock( &_clients_lock );
}
diff --git a/src/server/server.c b/src/server/server.c
index 838aec2..640048a 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -121,9 +121,6 @@ void dnbd3_cleanup()
// Disable threadpool
threadpool_close();
- // Terminate the altserver checking thread
- altservers_shutdown();
-
// Terminate all uplinks
image_killUplinks();
@@ -198,6 +195,11 @@ int main(int argc, char *argv[])
case LONGOPT_CRC4:
return image_generateCrcFile( optarg ) ? 0 : EXIT_FAILURE;
case LONGOPT_ASSERT:
+ printf( "Testing use after free:\n" );
+ volatile char * volatile test = malloc( 10 );
+ test[0] = 1;
+ free( test );
+ test[1] = 2;
printf( "Testing a failing assertion:\n" );
assert( 4 == 5 );
printf( "Assertion 4 == 5 seems to hold. ;-)\n" );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index e21e28c..6c85580 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -96,17 +96,18 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->bytesReceived = 0;
uplink->idleTime = 0;
uplink->queueLen = 0;
- mutex_lock( &uplink->sendMutex );
- uplink->current.fd = -1;
- mutex_unlock( &uplink->sendMutex );
uplink->cacheFd = -1;
uplink->signal = NULL;
uplink->replicationHandle = REP_NONE;
mutex_lock( &uplink->rttLock );
+ mutex_lock( &uplink->sendMutex );
+ uplink->current.fd = -1;
+ mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
- if ( sock >= 0 ) {
+ if ( sock != -1 ) {
uplink->better.fd = sock;
- uplink->better.host = *host;
+ int index = altservers_hostToIndex( host );
+ uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access
uplink->rttTestResult = RTT_DOCHANGE;
uplink->better.version = version;
} else {
@@ -116,7 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
mutex_unlock( &uplink->rttLock );
uplink->recvBufferLen = 0;
uplink->shutdown = false;
- if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)link ) ) {
+ if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
@@ -148,8 +149,8 @@ void uplink_shutdown(dnbd3_image_t *image)
}
dnbd3_uplink_t * const uplink = image->uplink;
mutex_lock( &uplink->queueLock );
- if ( !uplink->shutdown ) {
- uplink->shutdown = true;
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
signal_call( uplink->signal );
thread = uplink->thread;
join = true;
@@ -211,13 +212,11 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) {
- mutex_unlock( &client->image->lock );
- logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- mutex_lock( &uplink->rttLock );
+ if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
+ mutex_unlock( &client->image->lock );
+ logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
return false;
}
@@ -256,12 +255,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
}
if ( unlikely( requestLoop ) ) {
- mutex_unlock( &uplink->queueLock );
- logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- mutex_lock( &uplink->rttLock );
uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
+ mutex_unlock( &uplink->queueLock );
+ logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
return false;
}
if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
@@ -311,6 +308,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( foundExisting != -1 )
return true; // Attached to pending request, do nothing
+ usleep( 10000 );
+
// See if we can fire away the request
if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
@@ -342,7 +341,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( state == -1 ) {
logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" );
} else if ( state == ULR_NEW ) {
- logadd( LOG_DEBUG2, "Succesful direct uplink request" );
+ //logadd( LOG_DEBUG2, "Direct uplink request" );
} else {
logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
}
@@ -352,10 +351,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
}
- if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
- }
+ if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
return true;
}
@@ -443,7 +440,7 @@ static void* uplink_mainloop(void *data)
uplink->image->working = true;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
- if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) {
+ if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
@@ -525,9 +522,7 @@ static void* uplink_mainloop(void *data)
}
}
// See if we should trigger an RTT measurement
- mutex_lock( &uplink->rttLock );
- const int rttTestResult = uplink->rttTestResult;
- mutex_unlock( &uplink->rttLock );
+ int rttTestResult = uplink->rttTestResult;
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
// It seems it's time for a check
@@ -538,7 +533,7 @@ static void* uplink_mainloop(void *data)
goto cleanup;
} else if ( !uplink_connectionShouldShutdown( uplink ) ) {
// Not complete - do measurement
- altservers_findUplink( uplink ); // This will set RTT_INPROGRESS (synchronous)
+ altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous)
if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
uplink->nextReplicationIndex = 0;
}
@@ -547,11 +542,9 @@ static void* uplink_mainloop(void *data)
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_IDLE;
- mutex_unlock( &uplink->rttLock );
+ atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE );
discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
if ( uplink->current.fd != -1 && !uplink->shutdown ) {
@@ -581,36 +574,38 @@ static void* uplink_mainloop(void *data)
#endif
}
cleanup: ;
- if ( !uplink->shutdown ) {
- uplink->shutdown = true;
+ // Detach depends on whether someone is joining this thread...
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
thread_detach( uplink->thread );
}
- altservers_removeUplink( uplink );
uplink_saveCacheMap( uplink );
- mutex_lock( &uplink->image->lock );
- if ( uplink->image->uplink == uplink ) {
- uplink->image->uplink = NULL;
+ dnbd3_image_t *image = uplink->image;
+ mutex_lock( &image->lock );
+ // in the list anymore, but we want to prevent it from being freed in either case
+ if ( image->uplink == uplink ) {
+ image->uplink = NULL;
}
+ mutex_unlock( &image->lock ); // Do NOT use image without locking it
mutex_lock( &uplink->queueLock );
- const int fd = uplink->current.fd;
- const dnbd3_signal_t* signal = uplink->signal;
- mutex_lock( &uplink->sendMutex );
- uplink->current.fd = -1;
- mutex_unlock( &uplink->sendMutex );
- uplink->signal = NULL;
- // Do not access uplink->image after unlocking, since we set
- // image->uplink to NULL. Acquire with image_lock first,
- // like done below when checking whether to re-init uplink
- mutex_unlock( &uplink->image->lock );
- mutex_unlock( &uplink->queueLock );
- if ( fd != -1 ) close( fd );
- if ( signal != NULL ) signal_close( signal );
- // Wait for the RTT check to finish/fail if it's in progress
- while ( uplink->rttTestResult == RTT_INPROGRESS )
+ // Wait for active RTT measurement to finish
+ while ( uplink->rttTestResult == RTT_INPROGRESS ) {
usleep( 10000 );
+ }
+ signal_close( uplink->signal );
+ mutex_lock( &uplink->rttLock );
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
+ }
if ( uplink->better.fd != -1 ) {
close( uplink->better.fd );
+ uplink->better.fd = -1;
}
+ mutex_unlock( &uplink->sendMutex );
+ mutex_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->queueLock );
mutex_destroy( &uplink->queueLock );
mutex_destroy( &uplink->rttLock );
mutex_destroy( &uplink->sendMutex );
@@ -619,9 +614,9 @@ static void* uplink_mainloop(void *data)
if ( uplink->cacheFd != -1 ) {
close( uplink->cacheFd );
}
- dnbd3_image_t *image = image_lock( uplink->image );
free( uplink ); // !!!
- if ( image != NULL ) {
+ if ( image_lock( image ) != NULL ) {
+ // Image is still in list...
if ( !_shutdown && image->cache_map != NULL ) {
// Ingegrity checker must have found something in the meantime
uplink_init( image, -1, NULL, 0 );
@@ -656,7 +651,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
// the thread will re-send this request as soon as the connection
// is reestablished.
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( &uplink->current.host );
+ altservers_serverFailed( uplink->current.index );
return;
}
mutex_lock( &uplink->queueLock );
@@ -973,7 +968,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
if ( uplink->current.fd == -1 )
return;
- altservers_serverFailed( &uplink->current.host );
+ altservers_serverFailed( uplink->current.index );
mutex_lock( &uplink->sendMutex );
close( uplink->current.fd );
uplink->current.fd = -1;
@@ -1138,3 +1133,13 @@ static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink)
&& ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );
}
+bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
+{
+ int current;
+ mutex_lock( &uplink->rttLock );
+ current = uplink->current.fd == -1 ? -1 : uplink->current.index;
+ mutex_unlock( &uplink->rttLock );
+ if ( current == -1 )
+ return false;
+ return altservers_toString( current, buffer, len );
+}
diff --git a/src/server/uplink.h b/src/server/uplink.h
index 4fd41b0..acc8e11 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -16,4 +16,6 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
void uplink_shutdown(dnbd3_image_t *image);
+bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len);
+
#endif /* UPLINK_H_ */
diff --git a/src/serverconfig.h b/src/serverconfig.h
index 0cbb320..239f0a2 100644
--- a/src/serverconfig.h
+++ b/src/serverconfig.h
@@ -6,10 +6,12 @@
// +++++ Performance/memory related
#define SERVER_MAX_CLIENTS 4000
#define SERVER_MAX_IMAGES 5000
-#define SERVER_MAX_ALTS 100
+#define SERVER_MAX_ALTS 50
// +++++ Uplink handling (proxy mode)
-#define SERVER_UPLINK_FAIL_INCREASE 5 // On server failure, increase numFails by this value
-#define SERVER_BAD_UPLINK_THRES 40 // Thresold for numFails at which we ignore a server for the time span below
+#define SERVER_GLOBAL_DUP_TIME 6 // How many seconds to wait before changing global fail counter again
+#define SERVER_BAD_UPLINK_MIN 10 // Thresold for fails at which we start ignoring the server occasionally
+#define SERVER_BAD_UPLINK_MAX 20 // Hard block server if it failed this many times
+#define SERVER_BAD_UPLINK_LOCAL_BLOCK 10 // If a server didn't supply the requested image this many times, block it for some time
#define SERVER_BAD_UPLINK_IGNORE 180 // How many seconds is a server ignored
#define SERVER_MAX_UPLINK_QUEUE 1500 // Maximum number of queued requests per uplink
#define SERVER_UPLINK_QUEUELEN_THRES 900 // Threshold where we start dropping incoming clients
@@ -33,7 +35,7 @@
#define SERVER_RTT_PROBES 5 // How many probes to average over
#define SERVER_RTT_INTERVAL_INIT 5 // Initial interval between probes
#define SERVER_RTT_INTERVAL_MAX 45 // Maximum interval between probes
-#define SERVER_RTT_BACKOFF_COUNT 5 // If we can't reach any uplink server this many times, consider the uplink bad
+#define SERVER_RTT_MAX_UNREACH 10 // If no server was reachable this many times, stop RTT measurements for a while
#define SERVER_RTT_INTERVAL_FAILED 180 // Interval to use if no uplink server is reachable for above many times
#define SERVER_REMOTE_IMAGE_CHECK_CACHETIME 120 // 2 minutes