summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-22 16:14:27 +0200
committerSimon Rettberg2019-08-22 16:14:27 +0200
commit5fb4ef278be86fb6bda487f65ec4855d830bf4e5 (patch)
treef06c1ce5466b14bb4ff47dbd2939e11b8eedf6d0
parent[SERVER] Put request handle into CMD_ERROR reply (diff)
downloaddnbd3-5fb4ef278be86fb6bda487f65ec4855d830bf4e5.tar.gz
dnbd3-5fb4ef278be86fb6bda487f65ec4855d830bf4e5.tar.xz
dnbd3-5fb4ef278be86fb6bda487f65ec4855d830bf4e5.zip
[SERVER] Get rid of alt-servers thread, per-uplink rtt history
Alt-Server checks are now run using the threadpool, so we don't need a queue and dedicated thread anymore. The rtt history is now kept per uplink, so many uplinks won't overwhelm the history, making its time window very short. Also the fail counter is now split up; a global one for when the server actually isn't reachable, a local (per-uplink) one for when the server is reachable but doesn't serve the requested image.
-rw-r--r--src/server/altservers.c738
-rw-r--r--src/server/altservers.h16
-rw-r--r--src/server/globals.h41
-rw-r--r--src/server/image.c6
-rw-r--r--src/server/net.c16
-rw-r--r--src/server/server.c8
-rw-r--r--src/server/uplink.c117
-rw-r--r--src/server/uplink.h2
-rw-r--r--src/serverconfig.h10
9 files changed, 469 insertions, 485 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index fbe10a8..493ed9e 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -1,5 +1,6 @@
#include "altservers.h"
#include "locks.h"
+#include "threadpool.h"
#include "helper.h"
#include "image.h"
#include "fileutil.h"
@@ -14,46 +15,22 @@
#define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0);
#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__)
-static dnbd3_uplink_t * _Atomic pending[SERVER_MAX_PENDING_ALT_CHECKS];
-static dnbd3_signal_t * _Atomic runSignal = NULL;
-
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
static atomic_int numAltServers = 0;
static pthread_mutex_t altServersLock;
+static ticks nextCloseUnusedFd; // TODO: Move away
-static pthread_t altThread;
-
-static void *altservers_main(void *data);
-static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt);
+static void *altservers_runCheck(void *data);
+static int altservers_getListForUplink(dnbd3_uplink_t *uplink, int *servers, int size, int current);
+static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink);
+static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt);
+static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server);
void altservers_init()
{
srand( (unsigned int)time( NULL ) );
- // Init spinlock
+ // Init lock
mutex_init( &altServersLock, LOCK_ALT_SERVER_LIST );
- // Init signal
- runSignal = signal_new();
- if ( runSignal == NULL ) {
- logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." );
- exit( EXIT_FAILURE );
- }
- memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
- if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
- logadd( LOG_ERROR, "Could not start altservers connector thread" );
- exit( EXIT_FAILURE );
- }
- // Init waiting links queue -- this is currently a global static array so
- // it will already be zero, but in case we refactor later do it explicitly
- for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- pending[i] = NULL;
- }
-}
-
-void altservers_shutdown()
-{
- if ( runSignal == NULL ) return;
- signal_call( runSignal ); // Wake altservers thread up
- thread_join( altThread, NULL );
}
static void addalt(int argc, char **argv, void *data)
@@ -121,7 +98,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate
/**
* ONLY called from the passed uplink's main thread
*/
-void altservers_findUplink(dnbd3_uplink_t *uplink)
+void altservers_findUplinkAsync(dnbd3_uplink_t *uplink)
{
if ( uplink->shutdown )
return;
@@ -135,67 +112,11 @@ void altservers_findUplink(dnbd3_uplink_t *uplink)
assert( uplink->better.fd == -1 );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
- // XXX As this function is only ever called by the image's uplink thread,
- // it cannot happen that the uplink ends up in this list concurrently
mutex_lock( &uplink->rttLock );
- if ( uplink->rttTestResult == RTT_INPROGRESS ) {
- for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] != uplink ) continue;
- // Yep, measuring right now
- return;
- }
- }
- // Find free slot for measurement
- uplink->rttTestResult = RTT_INPROGRESS;
- for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] != NULL ) continue;
- dnbd3_uplink_t *null = NULL;
- if ( atomic_compare_exchange_strong( &pending[i], &null, uplink ) ) {
- mutex_unlock( &uplink->rttLock );
- atomic_thread_fence( memory_order_release );
- signal_call( runSignal ); // Wake altservers thread up
- return;
- }
+ if ( uplink->rttTestResult != RTT_INPROGRESS ) {
+ threadpool_run( &altservers_runCheck, uplink );
}
- // End of loop - no free slot
- uplink->rttTestResult = RTT_NOT_REACHABLE;
mutex_unlock( &uplink->rttLock );
- logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." );
-}
-
-/**
- * The given uplink is about to disappear,
- * wait until any pending RTT check is done.
- */
-void altservers_removeUplink(dnbd3_uplink_t *uplink)
-{
- assert( uplink != NULL );
- assert( uplink->shutdown );
- int i;
- for ( i = 1 ;; ++i ) {
- atomic_thread_fence( memory_order_acquire );
- if ( runSignal == NULL ) {
- // Thread is already done, remove manually
- uplink->rttTestResult = RTT_NOT_REACHABLE;
- break;
- }
- // Thread still running, wait until test is done
- bool found = false;
- for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] == uplink ) {
- found = true;
- break;
- }
- }
- if ( !found ) // No more test running
- break;
- usleep( 10000 ); // 10ms
- signal_call( runSignal ); // Wake altservers thread up
- if ( i % 500 == 0 ) {
- logadd( LOG_INFO, "Still waiting for altserver check for uplink %p...", (void*)uplink );
- }
- }
- logadd( LOG_DEBUG1, "Waited for %d iterations for altservers check when tearing down uplink", i );
}
/**
@@ -209,90 +130,124 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output
if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0;
int i, j;
int count = 0;
- int scores[size];
- int score;
- mutex_lock( &altServersLock );
+ uint16_t scores[SERVER_MAX_ALTS] = { 0 };
if ( size > numAltServers ) size = numAltServers;
- for (i = 0; i < numAltServers; ++i) {
- if ( altServers[i].host.type == 0 ) continue; // Slot is empty
- if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers
+ mutex_lock( &altServersLock );
+ for ( i = 0; i < numAltServers; ++i ) {
+ if ( altServers[i].host.type == 0 || altServers[i].isPrivate )
+ continue; // Slot is empty or uplink is for replication only
if ( host->type == altServers[i].host.type ) {
- score = altservers_netCloseness( host, &altServers[i].host ) - altServers[i].numFails;
+ scores[i] = 10 + altservers_netCloseness( host, &altServers[i].host );
} else {
- score = -( altServers[i].numFails + 128 ); // Wrong address family
+ scores[i] = 1; // Wrong address family
}
- if ( count == 0 ) {
- // Trivial - this is the first entry
- output[0].host = altServers[i].host;
- output[0].failures = 0;
- scores[0] = score;
- count++;
- } else {
- // Other entries already exist, insert in proper position
- for (j = 0; j < size; ++j) {
- if ( j < count && score <= scores[j] ) continue;
- if ( j > count ) break; // Should never happen but just in case...
- if ( j < count && j + 1 < size ) {
- // Check if we're in the middle and need to move other entries...
- memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) );
- memmove( &scores[j + 1], &scores[j], sizeof(int) * (size - j - 1) );
- }
- if ( count < size ) {
- count++;
- }
- output[j].host = altServers[i].host;
- output[j].failures = 0;
- scores[j] = score;
- break;
+ }
+ while ( count < size ) {
+ i = -1;
+ for ( j = 0; j < numAltServers; ++j ) {
+ if ( scores[j] == 0 )
+ continue;
+ if ( i == -1 || scores[j] > scores[i] ) {
+ i = j;
}
}
+ if ( i == -1 )
+ break;
+ output[count].host = altServers[i].host;
+ output[count].failures = 0;
+ count++;
}
mutex_unlock( &altServersLock );
return count;
}
+bool altservers_toString(int server, char *buffer, size_t len)
+{
+ return host_to_string( &altServers[server].host, buffer, len );
+}
+
+static bool isUsableForUplink( dnbd3_uplink_t *uplink, int server, ticks *now )
+{
+ dnbd3_alt_local_t *local = ( uplink == NULL ? NULL : &uplink->altData[server] );
+ dnbd3_alt_server_t *global = &altServers[server];
+ if ( global->isClientOnly || ( !global->isPrivate && _proxyPrivateOnly ) )
+ return false;
+ // Blocked locally (image not found on server...)
+ if ( local != NULL && local->blocked ) {
+ if ( --local->fails > 0 )
+ return false;
+ local->blocked = false;
+ }
+ if ( global->blocked ) {
+ if ( timing_diff( &global->lastFail, now ) < SERVER_GLOBAL_DUP_TIME )
+ return false;
+ global->lastFail = *now;
+ if ( --global->fails > 0 )
+ return false;
+ global->blocked = false;
+ }
+ // Not blocked, depend on both fail counters
+ int fails = ( local == NULL ? 0 : local->fails ) + global->fails;
+ return fails < SERVER_BAD_UPLINK_MIN || ( rand() % fails ) < SERVER_BAD_UPLINK_MIN;
+}
+
+int altservers_getHostListForReplication(dnbd3_host_t *servers, int size)
+{
+ int idx[size];
+ int num = altservers_getListForUplink( NULL, idx, size, -1 );
+ for ( int i = 0; i < num; ++i ) {
+ servers[i] = altServers[i].host;
+ }
+ return num;
+}
+
/**
* Get <size> alt servers. If there are more alt servers than
* requested, random servers will be picked.
* This function is suited for finding uplink servers as
* it includes private servers and ignores any "client only" servers
+ * @param current index of server for current connection, or -1 in panic mode
*/
-int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency)
+static int altservers_getListForUplink(dnbd3_uplink_t *uplink, int *servers, int size, int current)
{
- if ( size <= 0 ) return 0;
- int count = 0, i;
- ticks now;
- timing_get( &now );
+ if ( size <= 0 )
+ return 0;
+ int count = 0;
+ declare_now;
mutex_lock( &altServersLock );
- // Flip first server in list with a random one every time this is called
- if ( numAltServers > 1 ) {
- const dnbd3_alt_server_t tmp = altServers[0];
- do {
- i = rand() % numAltServers;
- } while ( i == 0 );
- altServers[0] = altServers[i];
- altServers[i] = tmp;
- }
- // We iterate over the list twice. First run adds servers with 0 failures only,
- // second one also considers those that failed (not too many times)
- if ( size > numAltServers ) size = numAltServers;
- for (i = 0; i < numAltServers * 2; ++i) {
- dnbd3_alt_server_t *srv = &altServers[i % numAltServers];
- if ( srv->host.type == 0 ) continue; // Slot is empty
- if ( _proxyPrivateOnly && !srv->isPrivate ) continue; // Config says to consider private alt-servers only? ignore!
- if ( srv->isClientOnly ) continue;
- bool first = ( i < numAltServers );
- if ( first ) {
- if ( srv->numFails > 0 ) continue;
- } else {
- if ( srv->numFails == 0 ) continue; // Already added in first iteration
- if ( !emergency && srv->numFails > SERVER_BAD_UPLINK_THRES // server failed X times in a row
- && timing_diff( &srv->lastFail, &now ) < SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore!
- if ( !emergency ) srv->numFails--;
+ // If we don't have enough servers to randomize, take a shortcut
+ if ( numAltServers <= size ) {
+ for ( int i = 0; i < numAltServers; ++i ) {
+ if ( current == -1 || i == current || isUsableForUplink( uplink, i, &now ) ) {
+ servers[count++] = i;
+ }
+ }
+ } else {
+ // Plenty of alt servers; randomize
+ uint8_t state[SERVER_MAX_ALTS] = { 0 };
+ if ( current != -1 ) { // Make sure we also test the current server
+ servers[count++] = current;
+ state[current] = 2;
+ }
+ for ( int tr = size * 10; tr > 0 && count < size; --tr ) {
+ int idx = rand() % numAltServers;
+ if ( state[idx] != 0 )
+ continue;
+ if ( isUsableForUplink( uplink, idx, &now ) ) {
+ servers[count++] = idx;
+ state[idx] = 2; // Used
+ } else {
+ state[idx] = 1; // Potential
+ }
+ }
+ // If panic mode, consider others too
+ for ( int tr = size * 10; current == -1 && tr > 0 && count < size; --tr ) {
+ int idx = rand() % numAltServers;
+ if ( state[idx] == 2 )
+ continue;
+ servers[count++] = idx;
+ state[idx] = 2; // Used
}
- // server seems ok, include in output and decrease its fail counter
- output[count++] = srv->host;
- if ( count >= size ) break;
}
mutex_unlock( &altServersLock );
return count;
@@ -320,7 +275,7 @@ json_t* altservers_toJson()
"rtt", rtts,
"isPrivate", (int)src[i].isPrivate,
"isClientOnly", (int)src[i].isClientOnly,
- "numFails", src[i].numFails
+ "numFails", src[i].fails
);
json_array_append_new( list, server );
}
@@ -329,32 +284,27 @@ json_t* altservers_toJson()
/**
* Update rtt history of given server - returns the new average for that server.
- * XXX HOLD altServersLock WHEN CALLING THIS!
*/
-static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt)
+static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt)
{
- unsigned int avg = rtt;
- int i;
- for (i = 0; i < numAltServers; ++i) {
- if ( !isSameAddressPort( host, &altServers[i].host ) ) continue;
- altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
-#if SERVER_RTT_PROBES == 5
- avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2]
- + altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES;
-#else
-#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES
- avg = 0;
- for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
- avg += altServers[i].rtt[j];
+ uint32_t avg = 0, j;
+ dnbd3_alt_local_t *local = &uplink->altData[index];
+ mutex_lock( &altServersLock );
+ if ( likely( local->initDone ) ) {
+ local->rtt[++local->rttIndex % SERVER_RTT_PROBES] = rtt;
+ for ( j = 0; j < SERVER_RTT_PROBES; ++j ) {
+ avg += local->rtt[j];
}
avg /= SERVER_RTT_PROBES;
-#endif
- // If we got a new rtt value, server must be working
- if ( altServers[i].numFails > 0 ) {
- altServers[i].numFails--;
+ } else { // First rtt measurement -- copy to every slot
+ for ( j = 0; j < SERVER_RTT_PROBES; ++j ) {
+ local->rtt[j] = rtt;
}
- break;
+ avg = rtt;
+ local->initDone = true;
}
+ altServers[index].rtt[++altServers[index].rttIndex % SERVER_RTT_PROBES] = avg;
+ mutex_unlock( &altServersLock );
return avg;
}
@@ -383,40 +333,33 @@ int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2)
* track of how often servers fail, and consider them disabled for some time if they
* fail too many times.
*/
-void altservers_serverFailed(const dnbd3_host_t * const host)
+void altservers_serverFailed(int server)
{
- int i;
- int foundIndex = -1, lastOk = -1;
- ticks now;
- timing_get( &now );
+ declare_now;
mutex_lock( &altServersLock );
- for (i = 0; i < numAltServers; ++i) {
- if ( foundIndex == -1 ) {
- // Looking for the failed server in list
- if ( isSameAddressPort( host, &altServers[i].host ) ) {
- foundIndex = i;
- }
- } else if ( altServers[i].host.type != 0 && altServers[i].numFails == 0 ) {
- lastOk = i;
+ if ( timing_diff( &altServers[server].lastFail, &now ) > SERVER_GLOBAL_DUP_TIME ) {
+ altServers[server].lastFail = now;
+ if ( altServers[server].fails++ >= SERVER_BAD_UPLINK_MAX ) {
+ altServers[server].blocked = true;
}
}
- // Do only increase counter if last fail was not too recent. This is
- // to prevent the counter from increasing rapidly if many images use the
- // same uplink. If there's a network hickup, all uplinks will call this
- // function and would increase the counter too quickly, disabling the server.
- if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) {
- altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE;
- altServers[foundIndex].lastFail = now;
- if ( lastOk != -1 ) {
- // Make sure non-working servers are put at the end of the list, so they're less likely
- // to get picked when testing servers for uplink connections.
- const dnbd3_alt_server_t tmp = altServers[foundIndex];
- altServers[foundIndex] = altServers[lastOk];
- altServers[lastOk] = tmp;
- }
+ mutex_unlock( &altServersLock );
+}
+
+/**
+ * Called from RTT checker if connecting to a server succeeded but
+ * subsequently selecting the given image failed. Handle this within
+ * the uplink and don't increase the global fail counter.
+ */
+static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server)
+{
+ mutex_lock( &altServersLock );
+ if ( uplink->altData[server].fails++ >= SERVER_BAD_UPLINK_MAX ) {
+ uplink->altData[server].blocked = true;
}
mutex_unlock( &altServersLock );
}
+
/**
* Mainloop of this module. It will wait for requests by uplinks to find a
* suitable uplink server for them. If found, it will tell the uplink about
@@ -425,206 +368,213 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
* will update quite quickly. Needs to be improved some time, ie. by only
* updating the rtt if the last update was at least X seconds ago.
*/
-static void *altservers_main(void *data UNUSED)
+static void *altservers_runCheck(void *data)
+{
+ dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data;
+
+ assert( uplink != NULL );
+ setThreadName( "altserver-check" );
+ altservers_findUplinkInternal( uplink );
+ // Save cache maps of all images if applicable
+ // TODO: Has nothing to do with alt servers really, maybe move somewhere else?
+ declare_now;
+ if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) {
+ timing_gets( &nextCloseUnusedFd, 900 );
+ image_closeUnusedFd();
+ }
+ return NULL;
+}
+
+void altservers_findUplink(dnbd3_uplink_t *uplink)
+{
+ altservers_findUplinkInternal( uplink );
+ while ( uplink->rttTestResult == RTT_INPROGRESS ) {
+ usleep( 5000 );
+ }
+}
+
+int altservers_hostToIndex(dnbd3_host_t *host)
+{
+ for ( int i = 0; i < numAltServers; ++i ) {
+ if ( isSameAddressPort( host, &altServers[i].host ) )
+ return i;
+ }
+ return -1;
+}
+
+const dnbd3_host_t* altservers_indexToHost(int server)
+{
+ return &altServers[server].host;
+}
+
+// XXX Sync call above must block until async worker has finished XXX
+static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
{
const int ALTS = 4;
- int ret, itLink, itAlt, numAlts;
- bool found;
- char buffer[DNBD3_BLOCK_SIZE ];
- dnbd3_reply_t reply;
- dnbd3_host_t servers[ALTS + 1];
- serialized_buffer_t serialized;
+ int ret, itAlt, numAlts, current;
+ bool panic;
+ int servers[ALTS + 1];
struct timespec start, end;
- ticks nextCloseUnusedFd;
- setThreadName( "altserver-check" );
- blockNoncriticalSignals();
- timing_gets( &nextCloseUnusedFd, 900 );
- // LOOP
- while ( !_shutdown ) {
- // Wait 5 seconds max.
- ret = signal_wait( runSignal, 5000 );
- if ( _shutdown ) goto cleanup;
- if ( ret == SIGNAL_ERROR ) {
- if ( errno == EAGAIN || errno == EINTR ) continue;
- logadd( LOG_WARNING, "Error %d on signal_clear on alservers_main! Things will break!", errno );
- usleep( 100000 );
+ if ( _shutdown )
+ return;
+ mutex_lock( &uplink->rttLock );
+ // Maybe we already have a result, or check is currently running
+ if ( uplink->better.fd != -1 || uplink->rttTestResult == RTT_INPROGRESS ) {
+ mutex_unlock( &uplink->rttLock );
+ return;
+ }
+ assert( uplink->rttTestResult != RTT_DOCHANGE );
+ uplink->rttTestResult = RTT_INPROGRESS;
+ panic = ( uplink->current.fd == -1 );
+ current = uplink->current.index; // Current server index (or last one in panic mode)
+ mutex_unlock( &uplink->rttLock );
+ // First, get 4 alt servers
+ numAlts = altservers_getListForUplink( uplink, servers, ALTS, panic ? -1 : current );
+ // If we're already connected and only got one server anyways, there isn't much to do
+ if ( numAlts == 0 || ( numAlts == 1 && !panic ) ) {
+ uplink->rttTestResult = RTT_DONTCHANGE;
+ return;
+ }
+ dnbd3_image_t * const image = image_lock( uplink->image );
+ if ( image == NULL ) { // Check again after locking
+ uplink->rttTestResult = RTT_NOT_REACHABLE;
+ logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" );
+ return;
+ }
+ LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid );
+ assert( uplink->rttTestResult == RTT_INPROGRESS );
+ // Test them all
+ dnbd3_server_connection_t best = { .fd = -1 };
+ unsigned long bestRtt = RTT_UNREACHABLE;
+ unsigned long currentRtt = RTT_UNREACHABLE;
+ for (itAlt = 0; itAlt < numAlts; ++itAlt) {
+ int server = servers[itAlt];
+ // Connect
+ clock_gettime( BEST_CLOCK_SOURCE, &start );
+ int sock = sock_connect( &altServers[server].host, 750, 1000 );
+ if ( sock == -1 ) { // Connection failed means global error
+ altservers_serverFailed( server );
+ continue;
}
- // Work your way through the queue
- atomic_thread_fence( memory_order_acquire );
- for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- dnbd3_uplink_t * const uplink = pending[itLink];
- if ( uplink == NULL )
- continue;
- // First, get 4 alt servers
- numAlts = altservers_getListForUplink( servers, ALTS, uplink->current.fd == -1 );
- // If we're already connected and only got one server anyways, there isn't much to do
- if ( numAlts <= 1 && uplink->current.fd != -1 ) {
- uplink->rttTestResult = RTT_DONTCHANGE;
- continue;
- }
- dnbd3_image_t * const image = image_lock( uplink->image );
- if ( image == NULL ) { // Check again after locking
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_NOT_REACHABLE;
- assert( pending[itLink] == uplink );
- pending[itLink] = NULL;
- mutex_unlock( &uplink->rttLock );
- logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" );
- continue;
- }
- LOG( LOG_DEBUG2, "[%d] Running alt check", itLink );
- assert( uplink->rttTestResult == RTT_INPROGRESS );
- if ( uplink->current.fd != -1 ) {
- // Add current server if not already in list
- found = false;
- for (itAlt = 0; itAlt < numAlts; ++itAlt) {
- if ( !isSameAddressPort( &uplink->current.host, &servers[itAlt] ) ) continue;
- found = true;
- break;
- }
- if ( !found ) servers[numAlts++] = uplink->current.host;
- }
- // Test them all
- int bestSock = -1;
- int bestIndex = -1;
- int bestProtocolVersion = -1;
- unsigned long bestRtt = RTT_UNREACHABLE;
- unsigned long currentRtt = RTT_UNREACHABLE;
- for (itAlt = 0; itAlt < numAlts; ++itAlt) {
- usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...)
- // Connect
- clock_gettime( BEST_CLOCK_SOURCE, &start );
- int sock = sock_connect( &servers[itAlt], 750, 1000 );
- if ( sock < 0 ) continue;
- // Select image ++++++++++++++++++++++++++++++
- if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) {
- goto server_failed;
- }
- // See if selecting the image succeeded ++++++++++++++++++++++++++++++
- uint16_t protocolVersion, rid;
- uint64_t imageSize;
- char *name;
- if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) {
- goto server_image_not_available;
- }
- if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed;
- if ( name == NULL || strcmp( name, image->name ) != 0 ) {
- ERROR_GOTO( server_failed, "[RTT] Server offers image '%s'", name );
- }
- if ( rid != image->rid ) {
- ERROR_GOTO( server_failed, "[RTT] Server provides rid %d", (int)rid );
- }
- if ( imageSize != image->virtualFilesize ) {
- ERROR_GOTO( server_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize );
- }
- // Request first block (NOT random!) ++++++++++++++++++++++++++++++
- if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
- LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", itLink );
- }
- // See if requesting the block succeeded ++++++++++++++++++++++
- if ( !dnbd3_get_reply( sock, &reply ) ) {
- LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", itLink );
- }
- // check reply header
- if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
- ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size );
- }
- if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
- ERROR_GOTO( server_failed, "[RTT%d] Could not read first block payload", itLink );
- }
- clock_gettime( BEST_CLOCK_SOURCE, &end );
- // Measurement done - everything fine so far
- mutex_lock( &altServersLock );
- mutex_lock( &uplink->rttLock );
- const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->current.host );
- // Penaltize rtt if this was a cycle; this will treat this server with lower priority
- // in the near future too, so we prevent alternating between two servers that are both
- // part of a cycle and have the lowest latency.
- const unsigned int rtt = (unsigned int)((end.tv_sec - start.tv_sec) * 1000000
- + (end.tv_nsec - start.tv_nsec) / 1000
- + ( (isCurrent && uplink->cycleDetected) ? 1000000 : 0 )); // µs
- unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt );
- mutex_unlock( &altServersLock );
- // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
- if ( ( uplink->cycleDetected || uplink->current.fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000;
- mutex_unlock( &uplink->rttLock );
- if ( uplink->current.fd != -1 && isCurrent ) {
- // Was measuring current server
- currentRtt = avg;
- close( sock );
- } else if ( avg < bestRtt ) {
- // Was another server, update "best"
- if ( bestSock != -1 ) close( bestSock );
- bestSock = sock;
- bestRtt = avg;
- bestIndex = itAlt;
- bestProtocolVersion = protocolVersion;
- } else {
- // Was too slow, ignore
- close( sock );
- }
- // We're done, call continue
- continue;
- // Jump here if anything went wrong
- // This will cleanup and continue
- server_failed: ;
- altservers_serverFailed( &servers[itAlt] );
- server_image_not_available: ;
- close( sock );
- }
- // Done testing all servers. See if we should switch
- if ( bestSock != -1 && (uplink->current.fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
- // yep
- if ( currentRtt > 10000000 || uplink->current.fd == -1 ) {
- LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt );
- } else {
- LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
- }
- sock_setTimeout( bestSock, _uplinkTimeout );
- mutex_lock( &uplink->rttLock );
- uplink->better.fd = bestSock;
- uplink->better.host = servers[bestIndex];
- uplink->better.version = bestProtocolVersion;
- uplink->rttTestResult = RTT_DOCHANGE;
- mutex_unlock( &uplink->rttLock );
- signal_call( uplink->signal );
- } else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) {
- // No server was reachable
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_NOT_REACHABLE;
- mutex_unlock( &uplink->rttLock );
- } else {
- // nope
- if ( bestSock != -1 ) close( bestSock );
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_DONTCHANGE;
- uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
- mutex_unlock( &uplink->rttLock );
- if ( !image->working ) {
- image->working = true;
- LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink );
- }
- }
- image_release( image );
- // end of loop over all pending uplinks
- assert( pending[itLink] == uplink );
- pending[itLink] = NULL;
- atomic_thread_fence( memory_order_release );
+ // Select image ++++++++++++++++++++++++++++++
+ if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) {
+ goto image_failed;
}
- // Save cache maps of all images if applicable
- declare_now;
- // TODO: Has nothing to do with alt servers really, maybe move somewhere else?
- if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) {
- timing_gets( &nextCloseUnusedFd, 900 );
- image_closeUnusedFd();
+ // See if selecting the image succeeded ++++++++++++++++++++++++++++++
+ uint16_t protocolVersion, rid;
+ uint64_t imageSize;
+ char *name;
+ serialized_buffer_t serialized;
+ if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) {
+ goto image_failed;
}
+ if ( protocolVersion < MIN_SUPPORTED_SERVER ) { // Server version unsupported; global fail
+ goto server_failed;
+ }
+ if ( name == NULL || strcmp( name, image->name ) != 0 ) {
+ ERROR_GOTO( image_failed, "[RTT] Server offers image '%s' instead of '%s'", name, image->name );
+ }
+ if ( rid != image->rid ) {
+ ERROR_GOTO( image_failed, "[RTT] Server provides rid %d instead of %d", (int)rid, (int)image->rid );
+ }
+ if ( imageSize != image->virtualFilesize ) {
+ ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize );
+ }
+ // Request first block (NOT random!) ++++++++++++++++++++++++++++++
+ if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
+ LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server );
+ }
+ // See if requesting the block succeeded ++++++++++++++++++++++
+ dnbd3_reply_t reply;
+ if ( !dnbd3_get_reply( sock, &reply ) ) {
+ LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server );
+ }
+ // check reply header
+ if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
+ // Sanity check failed; count this as global error (malicious/broken server)
+ ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size );
+ }
+ // flush payload to include this into measurement
+ char buffer[DNBD3_BLOCK_SIZE];
+ if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
+ ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server );
+ }
+ clock_gettime( BEST_CLOCK_SOURCE, &end );
+ // Measurement done - everything fine so far
+ mutex_lock( &uplink->rttLock );
+ const bool isCurrent = ( uplink->current.index == server );
+ mutex_unlock( &uplink->rttLock );
+ // Penaltize rtt if this was a cycle; this will treat this server with lower priority
+ // in the near future too, so we prevent alternating between two servers that are both
+ // part of a cycle and have the lowest latency.
+ uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000
+ + (end.tv_nsec - start.tv_nsec) / 1000); // µs
+ uint32_t avg = altservers_updateRtt( uplink, server, rtt );
+ // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
+ if ( ( uplink->cycleDetected || panic ) && isCurrent ) {
+ avg = (avg * 2) + 50000;
+ }
+ if ( !panic && isCurrent ) {
+ // Was measuring current server
+ currentRtt = avg;
+ close( sock );
+ } else if ( avg < bestRtt ) {
+ // Was another server, update "best"
+ if ( best.fd != -1 ) {
+ close( best.fd );
+ }
+ best.fd = sock;
+ bestRtt = avg;
+ best.index = server;
+ best.version = protocolVersion;
+ } else {
+ // Was too slow, ignore
+ close( sock );
+ }
+ // We're done, call continue
+ continue;
+ // Jump here if anything went wrong
+ // This will cleanup and continue
+image_failed:
+ altservers_imageFailed( uplink, server );
+ goto failed;
+server_failed:
+ altservers_serverFailed( server );
+failed:
+ close( sock );
}
- cleanup: ;
- if ( runSignal != NULL ) {
- signal_close( runSignal );
+ // Done testing all servers. See if we should switch
+ if ( best.fd != -1 && (panic || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
+ // yep
+ if ( currentRtt > 10000000 || panic ) {
+ LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt );
+ } else {
+ LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
+ }
+ sock_setTimeout( best.fd, _uplinkTimeout );
+ mutex_lock( &uplink->rttLock );
+ uplink->better = best;
+ uplink->rttTestResult = RTT_DOCHANGE;
+ mutex_unlock( &uplink->rttLock );
+ signal_call( uplink->signal );
+ } else if ( best.fd == -1 && currentRtt == RTT_UNREACHABLE ) {
+ // No server was reachable, including current
+ uplink->rttTestResult = RTT_NOT_REACHABLE;
+ } else {
+ // nope
+ if ( best.fd != -1 ) {
+ close( best.fd );
+ }
+ if ( !image->working || uplink->cycleDetected ) {
+ image->working = true;
+ LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid );
+ }
+ uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
+ mutex_lock( &uplink->rttLock );
+ uplink->rttTestResult = RTT_DONTCHANGE;
+ mutex_unlock( &uplink->rttLock );
}
- runSignal = NULL;
- return NULL ;
+ image_release( image );
}
diff --git a/src/server/altservers.h b/src/server/altservers.h
index e03b900..8e2b964 100644
--- a/src/server/altservers.h
+++ b/src/server/altservers.h
@@ -7,23 +7,27 @@ struct json_t;
void altservers_init();
-void altservers_shutdown();
-
int altservers_load();
bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly);
-void altservers_findUplink(dnbd3_uplink_t *uplink);
+void altservers_findUplinkAsync(dnbd3_uplink_t *uplink);
-void altservers_removeUplink(dnbd3_uplink_t *uplink);
+void altservers_findUplink(dnbd3_uplink_t *uplink);
int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size);
-int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency);
+int altservers_getHostListForReplication(dnbd3_host_t *servers, int size);
+
+bool altservers_toString(int server, char *buffer, size_t len);
int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2);
-void altservers_serverFailed(const dnbd3_host_t * const host);
+void altservers_serverFailed(int server);
+
+int altservers_hostToIndex(dnbd3_host_t *host);
+
+const dnbd3_host_t* altservers_indexToHost(int server);
struct json_t* altservers_toJson();
diff --git a/src/server/globals.h b/src/server/globals.h
index 659e5a2..4d97c6b 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -30,10 +30,31 @@ typedef struct
uint8_t hopCount; // How many hops this request has already taken across proxies
} dnbd3_queued_request_t;
+typedef struct
+{
+ int fails; // Hard fail: Connection failed
+ int rttIndex;
+ uint32_t rtt[SERVER_RTT_PROBES];
+ bool isPrivate, isClientOnly;
+ bool blocked; // If true count down fails until 0 to enable again
+ ticks lastFail; // Last hard fail
+ dnbd3_host_t host;
+ char comment[COMMENT_LENGTH];
+} dnbd3_alt_server_t;
+
+typedef struct
+{
+ int fails; // Soft fail: Image not found
+ int rttIndex;
+ uint32_t rtt[SERVER_RTT_PROBES];
+ bool blocked; // True if server is to be ignored and fails should be counted down
+ bool initDone;
+} dnbd3_alt_local_t;
+
typedef struct {
- int fd; // Socket fd for this connection
- int version; // Protocol version of remote server
- dnbd3_host_t host; // IP/Port of remote server
+ int fd; // Socket fd for this connection
+ int version; // Protocol version of remote server
+ int index; // Entry in uplinks list
} dnbd3_server_connection_t;
#define RTT_IDLE 0 // Not in progress
@@ -51,7 +72,7 @@ struct _dnbd3_uplink
pthread_mutex_t queueLock; // lock for synchronization on request queue etc.
dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer
pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer
- int rttTestResult; // RTT_*
+ atomic_int rttTestResult; // RTT_*
int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD!
uint8_t *recvBuffer; // Buffer for receiving payload
uint32_t recvBufferLen; // Len of ^^
@@ -65,21 +86,11 @@ struct _dnbd3_uplink
atomic_int queueLen; // length of queue
uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives)
dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
+ dnbd3_alt_local_t altData[SERVER_MAX_ALTS];
};
typedef struct
{
- char comment[COMMENT_LENGTH];
- dnbd3_host_t host;
- unsigned int rtt[SERVER_RTT_PROBES];
- unsigned int rttIndex;
- bool isPrivate, isClientOnly;
- ticks lastFail;
- int numFails;
-} dnbd3_alt_server_t;
-
-typedef struct
-{
uint8_t host[16];
int bytes;
int bitMask;
diff --git a/src/server/image.c b/src/server/image.c
index d250715..1a6e0f8 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -1178,7 +1178,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
dnbd3_host_t servers[REP_NUM_SRV];
int uplinkSock = -1;
dnbd3_host_t uplinkServer;
- const int count = altservers_getListForUplink( servers, REP_NUM_SRV, false );
+ const int count = altservers_getHostListForReplication( servers, REP_NUM_SRV );
uint16_t remoteProtocolVersion;
uint16_t remoteRid = revision;
uint64_t remoteImageSize;
@@ -1491,7 +1491,7 @@ json_t* image_getListAsJson()
json_t *imagesJson = json_array();
json_t *jsonImage;
int i;
- char uplinkName[100] = { 0 };
+ char uplinkName[100];
uint64_t bytesReceived;
int completeness, idleTime;
declare_now;
@@ -1508,7 +1508,7 @@ json_t* image_getListAsJson()
uplinkName[0] = '\0';
} else {
bytesReceived = image->uplink->bytesReceived;
- if ( image->uplink->current.fd == -1 || !host_to_string( &image->uplink->current.host, uplinkName, sizeof(uplinkName) ) ) {
+ if ( !uplink_getHostString( image->uplink, uplinkName, sizeof(uplinkName) ) ) {
uplinkName[0] = '\0';
}
}
diff --git a/src/server/net.c b/src/server/net.c
index 7f3c1ce..4976eea 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -669,11 +669,19 @@ static void removeFromList(dnbd3_client_t *client)
{
int i;
mutex_lock( &_clients_lock );
- for ( i = _num_clients - 1; i >= 0; --i ) {
- if ( _clients[i] == client ) {
- _clients[i] = NULL;
+ if ( _num_clients != 0 ) {
+ for ( i = _num_clients - 1; i >= 0; --i ) {
+ if ( _clients[i] == client ) {
+ _clients[i] = NULL;
+ break;
+ }
+ }
+ if ( i != 0 && i + 1 == _num_clients ) {
+ do {
+ i--;
+ } while ( _clients[i] == NULL && i > 0 );
+ _num_clients = i + 1;
}
- if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients;
}
mutex_unlock( &_clients_lock );
}
diff --git a/src/server/server.c b/src/server/server.c
index 838aec2..640048a 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -121,9 +121,6 @@ void dnbd3_cleanup()
// Disable threadpool
threadpool_close();
- // Terminate the altserver checking thread
- altservers_shutdown();
-
// Terminate all uplinks
image_killUplinks();
@@ -198,6 +195,11 @@ int main(int argc, char *argv[])
case LONGOPT_CRC4:
return image_generateCrcFile( optarg ) ? 0 : EXIT_FAILURE;
case LONGOPT_ASSERT:
+ printf( "Testing use after free:\n" );
+ volatile char * volatile test = malloc( 10 );
+ test[0] = 1;
+ free( test );
+ test[1] = 2;
printf( "Testing a failing assertion:\n" );
assert( 4 == 5 );
printf( "Assertion 4 == 5 seems to hold. ;-)\n" );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index e21e28c..6c85580 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -96,17 +96,18 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->bytesReceived = 0;
uplink->idleTime = 0;
uplink->queueLen = 0;
- mutex_lock( &uplink->sendMutex );
- uplink->current.fd = -1;
- mutex_unlock( &uplink->sendMutex );
uplink->cacheFd = -1;
uplink->signal = NULL;
uplink->replicationHandle = REP_NONE;
mutex_lock( &uplink->rttLock );
+ mutex_lock( &uplink->sendMutex );
+ uplink->current.fd = -1;
+ mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
- if ( sock >= 0 ) {
+ if ( sock != -1 ) {
uplink->better.fd = sock;
- uplink->better.host = *host;
+ int index = altservers_hostToIndex( host );
+ uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access
uplink->rttTestResult = RTT_DOCHANGE;
uplink->better.version = version;
} else {
@@ -116,7 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
mutex_unlock( &uplink->rttLock );
uplink->recvBufferLen = 0;
uplink->shutdown = false;
- if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)link ) ) {
+ if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
@@ -148,8 +149,8 @@ void uplink_shutdown(dnbd3_image_t *image)
}
dnbd3_uplink_t * const uplink = image->uplink;
mutex_lock( &uplink->queueLock );
- if ( !uplink->shutdown ) {
- uplink->shutdown = true;
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
signal_call( uplink->signal );
thread = uplink->thread;
join = true;
@@ -211,13 +212,11 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) {
- mutex_unlock( &client->image->lock );
- logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- mutex_lock( &uplink->rttLock );
+ if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
+ mutex_unlock( &client->image->lock );
+ logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
return false;
}
@@ -256,12 +255,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
}
if ( unlikely( requestLoop ) ) {
- mutex_unlock( &uplink->queueLock );
- logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- mutex_lock( &uplink->rttLock );
uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
+ mutex_unlock( &uplink->queueLock );
+ logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
return false;
}
if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
@@ -311,6 +308,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( foundExisting != -1 )
return true; // Attached to pending request, do nothing
+ usleep( 10000 );
+
// See if we can fire away the request
if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
@@ -342,7 +341,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( state == -1 ) {
logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" );
} else if ( state == ULR_NEW ) {
- logadd( LOG_DEBUG2, "Succesful direct uplink request" );
+ //logadd( LOG_DEBUG2, "Direct uplink request" );
} else {
logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
}
@@ -352,10 +351,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
}
- if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
- }
+ if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
return true;
}
@@ -443,7 +440,7 @@ static void* uplink_mainloop(void *data)
uplink->image->working = true;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
- if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) {
+ if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
@@ -525,9 +522,7 @@ static void* uplink_mainloop(void *data)
}
}
// See if we should trigger an RTT measurement
- mutex_lock( &uplink->rttLock );
- const int rttTestResult = uplink->rttTestResult;
- mutex_unlock( &uplink->rttLock );
+ int rttTestResult = uplink->rttTestResult;
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
// It seems it's time for a check
@@ -538,7 +533,7 @@ static void* uplink_mainloop(void *data)
goto cleanup;
} else if ( !uplink_connectionShouldShutdown( uplink ) ) {
// Not complete - do measurement
- altservers_findUplink( uplink ); // This will set RTT_INPROGRESS (synchronous)
+ altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous)
if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
uplink->nextReplicationIndex = 0;
}
@@ -547,11 +542,9 @@ static void* uplink_mainloop(void *data)
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_IDLE;
- mutex_unlock( &uplink->rttLock );
+ atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE );
discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
if ( uplink->current.fd != -1 && !uplink->shutdown ) {
@@ -581,36 +574,38 @@ static void* uplink_mainloop(void *data)
#endif
}
cleanup: ;
- if ( !uplink->shutdown ) {
- uplink->shutdown = true;
+ // Detach depends on whether someone is joining this thread...
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
thread_detach( uplink->thread );
}
- altservers_removeUplink( uplink );
uplink_saveCacheMap( uplink );
- mutex_lock( &uplink->image->lock );
- if ( uplink->image->uplink == uplink ) {
- uplink->image->uplink = NULL;
+ dnbd3_image_t *image = uplink->image;
+ mutex_lock( &image->lock );
+ // in the list anymore, but we want to prevent it from being freed in either case
+ if ( image->uplink == uplink ) {
+ image->uplink = NULL;
}
+ mutex_unlock( &image->lock ); // Do NOT use image without locking it
mutex_lock( &uplink->queueLock );
- const int fd = uplink->current.fd;
- const dnbd3_signal_t* signal = uplink->signal;
- mutex_lock( &uplink->sendMutex );
- uplink->current.fd = -1;
- mutex_unlock( &uplink->sendMutex );
- uplink->signal = NULL;
- // Do not access uplink->image after unlocking, since we set
- // image->uplink to NULL. Acquire with image_lock first,
- // like done below when checking whether to re-init uplink
- mutex_unlock( &uplink->image->lock );
- mutex_unlock( &uplink->queueLock );
- if ( fd != -1 ) close( fd );
- if ( signal != NULL ) signal_close( signal );
- // Wait for the RTT check to finish/fail if it's in progress
- while ( uplink->rttTestResult == RTT_INPROGRESS )
+ // Wait for active RTT measurement to finish
+ while ( uplink->rttTestResult == RTT_INPROGRESS ) {
usleep( 10000 );
+ }
+ signal_close( uplink->signal );
+ mutex_lock( &uplink->rttLock );
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
+ }
if ( uplink->better.fd != -1 ) {
close( uplink->better.fd );
+ uplink->better.fd = -1;
}
+ mutex_unlock( &uplink->sendMutex );
+ mutex_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->queueLock );
mutex_destroy( &uplink->queueLock );
mutex_destroy( &uplink->rttLock );
mutex_destroy( &uplink->sendMutex );
@@ -619,9 +614,9 @@ static void* uplink_mainloop(void *data)
if ( uplink->cacheFd != -1 ) {
close( uplink->cacheFd );
}
- dnbd3_image_t *image = image_lock( uplink->image );
free( uplink ); // !!!
- if ( image != NULL ) {
+ if ( image_lock( image ) != NULL ) {
+ // Image is still in list...
if ( !_shutdown && image->cache_map != NULL ) {
// Ingegrity checker must have found something in the meantime
uplink_init( image, -1, NULL, 0 );
@@ -656,7 +651,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
// the thread will re-send this request as soon as the connection
// is reestablished.
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( &uplink->current.host );
+ altservers_serverFailed( uplink->current.index );
return;
}
mutex_lock( &uplink->queueLock );
@@ -973,7 +968,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
if ( uplink->current.fd == -1 )
return;
- altservers_serverFailed( &uplink->current.host );
+ altservers_serverFailed( uplink->current.index );
mutex_lock( &uplink->sendMutex );
close( uplink->current.fd );
uplink->current.fd = -1;
@@ -1138,3 +1133,13 @@ static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink)
&& ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );
}
+bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
+{
+ int current;
+ mutex_lock( &uplink->rttLock );
+ current = uplink->current.fd == -1 ? -1 : uplink->current.index;
+ mutex_unlock( &uplink->rttLock );
+ if ( current == -1 )
+ return false;
+ return altservers_toString( current, buffer, len );
+}
diff --git a/src/server/uplink.h b/src/server/uplink.h
index 4fd41b0..acc8e11 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -16,4 +16,6 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
void uplink_shutdown(dnbd3_image_t *image);
+bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len);
+
#endif /* UPLINK_H_ */
diff --git a/src/serverconfig.h b/src/serverconfig.h
index 0cbb320..239f0a2 100644
--- a/src/serverconfig.h
+++ b/src/serverconfig.h
@@ -6,10 +6,12 @@
// +++++ Performance/memory related
#define SERVER_MAX_CLIENTS 4000
#define SERVER_MAX_IMAGES 5000
-#define SERVER_MAX_ALTS 100
+#define SERVER_MAX_ALTS 50
// +++++ Uplink handling (proxy mode)
-#define SERVER_UPLINK_FAIL_INCREASE 5 // On server failure, increase numFails by this value
-#define SERVER_BAD_UPLINK_THRES 40 // Thresold for numFails at which we ignore a server for the time span below
+#define SERVER_GLOBAL_DUP_TIME 6 // How many seconds to wait before changing global fail counter again
+#define SERVER_BAD_UPLINK_MIN 10 // Thresold for fails at which we start ignoring the server occasionally
+#define SERVER_BAD_UPLINK_MAX 20 // Hard block server if it failed this many times
+#define SERVER_BAD_UPLINK_LOCAL_BLOCK 10 // If a server didn't supply the requested image this many times, block it for some time
#define SERVER_BAD_UPLINK_IGNORE 180 // How many seconds is a server ignored
#define SERVER_MAX_UPLINK_QUEUE 1500 // Maximum number of queued requests per uplink
#define SERVER_UPLINK_QUEUELEN_THRES 900 // Threshold where we start dropping incoming clients
@@ -33,7 +35,7 @@
#define SERVER_RTT_PROBES 5 // How many probes to average over
#define SERVER_RTT_INTERVAL_INIT 5 // Initial interval between probes
#define SERVER_RTT_INTERVAL_MAX 45 // Maximum interval between probes
-#define SERVER_RTT_BACKOFF_COUNT 5 // If we can't reach any uplink server this many times, consider the uplink bad
+#define SERVER_RTT_MAX_UNREACH 10 // If no server was reachable this many times, stop RTT measurements for a while
#define SERVER_RTT_INTERVAL_FAILED 180 // Interval to use if no uplink server is reachable for above many times
#define SERVER_REMOTE_IMAGE_CHECK_CACHETIME 120 // 2 minutes