summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2013-11-10 18:19:11 +0100
committerSimon Rettberg2013-11-10 18:19:11 +0100
commit53dd7bc20968ddfaa601c517e6feb745bb3a21ab (patch)
tree9629942c7a3d378208d1ebf0c6ccc5aaa0fd182e
parent[SERVER] Extend endianness detection (diff)
downloaddnbd3-53dd7bc20968ddfaa601c517e6feb745bb3a21ab.tar.gz
dnbd3-53dd7bc20968ddfaa601c517e6feb745bb3a21ab.tar.xz
dnbd3-53dd7bc20968ddfaa601c517e6feb745bb3a21ab.zip
[SERVER] Split "pending" lock for alt-server finding into producer and consumer lock to fix a potential NPA when an uplink dies
Also some refactoring of variable names and more comments
-rw-r--r--src/server/altservers.c180
1 files changed, 105 insertions, 75 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 2cb5061..2eca369 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -17,12 +17,13 @@
#include "protocol.h"
static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
-static pthread_spinlock_t pendingLock;
+static pthread_spinlock_t pendingLockProduce; // Lock for adding something to pending. (NULL -> nonNULL)
+static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removegin something (nunNULL -> NULL)
static int signalPipe = -1;
-static dnbd3_alt_server_t _alt_servers[SERVER_MAX_ALTS];
-static int _num_alts = 0;
-static pthread_spinlock_t _alts_lock;
+static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
+static int numAltServers = 0;
+static pthread_spinlock_t altServersLock;
static int initDone = FALSE;
static pthread_t altThread;
@@ -32,13 +33,13 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const
int altservers_getCount()
{
- return _num_alts;
+ return numAltServers;
}
void altservers_init()
{
- spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE );
- memset( _alt_servers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
+ spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE );
+ memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
if ( 0 != pthread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
memlogf( "[ERROR] Could not start altservers connector thread" );
exit( EXIT_FAILURE );
@@ -49,7 +50,6 @@ void altservers_init()
void altservers_shutdown()
{
if ( !initDone ) return;
- spin_destroy( &_alts_lock );
pthread_join( altThread, NULL );
}
@@ -90,27 +90,27 @@ int altservers_load()
int altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate)
{
int i, freeSlot = -1;
- spin_lock( &_alts_lock );
- for (i = 0; i < _num_alts; ++i) {
- if ( isSameAddressPort( &_alt_servers[i].host, host ) ) {
- spin_unlock( &_alts_lock );
+ spin_lock( &altServersLock );
+ for (i = 0; i < numAltServers; ++i) {
+ if ( isSameAddressPort( &altServers[i].host, host ) ) {
+ spin_unlock( &altServersLock );
return FALSE;
- } else if ( freeSlot == -1 && _alt_servers[i].host.type == 0 ) {
+ } else if ( freeSlot == -1 && altServers[i].host.type == 0 ) {
freeSlot = i;
}
}
if ( freeSlot == -1 ) {
- if ( _num_alts >= SERVER_MAX_ALTS ) {
+ if ( numAltServers >= SERVER_MAX_ALTS ) {
memlogf( "[WARNING] Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS );
- spin_unlock( &_alts_lock );
+ spin_unlock( &altServersLock );
return FALSE;
}
- freeSlot = _num_alts++;
+ freeSlot = numAltServers++;
}
- _alt_servers[freeSlot].host = *host;
- _alt_servers[freeSlot].isPrivate = isPrivate;
- if ( comment != NULL ) snprintf( _alt_servers[freeSlot].comment, COMMENT_LENGTH, "%s", comment );
- spin_unlock( &_alts_lock );
+ altServers[freeSlot].host = *host;
+ altServers[freeSlot].isPrivate = isPrivate;
+ if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment );
+ spin_unlock( &altServersLock );
return TRUE;
}
@@ -120,25 +120,33 @@ int altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate)
void altservers_findUplink(dnbd3_connection_t *uplink)
{
int i;
+ // if betterFd != -1 it means the uplink is supposed to switch to another
+ // server. As this function here is called by the uplink thread, it can
+ // never be that the uplink is supposed to switch, but instead calls
+ // this function.
assert( uplink->betterFd == -1 );
- spin_lock( &pendingLock );
+ spin_lock( &pendingLockProduce );
+ // it is however possible that an RTT measurement is currently in progress,
+ // so check for that case and do nothing if one is in progress
if ( uplink->rttTestResult == RTT_INPROGRESS ) {
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != uplink ) continue;
- spin_unlock( &pendingLock );
+ // Yep, measuring right now
+ spin_unlock( &pendingLockProduce );
return;
}
}
+ // Find free slot for measurement
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != NULL ) continue;
pending[i] = uplink;
uplink->rttTestResult = RTT_INPROGRESS;
- spin_unlock( &pendingLock );
- write( signalPipe, "", 1 );
+ spin_unlock( &pendingLockProduce );
+ write( signalPipe, "", 1 ); // Wake altservers thread up
return;
}
// End of loop - no free slot
- spin_unlock( &pendingLock );
+ spin_unlock( &pendingLockProduce );
memlogf( "[WARNING] No more free RTT measurement slots, ignoring a request..." );
}
@@ -147,39 +155,40 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
*/
void altservers_removeUplink(dnbd3_connection_t *uplink)
{
- spin_lock( &pendingLock );
+ pthread_mutex_lock( &pendingLockConsume );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] == uplink ) pending[i] = NULL;
}
- spin_unlock( &pendingLock );
+ pthread_mutex_unlock( &pendingLockConsume );
}
/**
* Get <size> known (working) alt servers, ordered by network closeness
* (by finding the smallest possible subnet)
- * Private servers are excluded
+ * Private servers are excluded, so this is what you want to call to
+ * get a list of servers you can tell a client about
*/
int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size)
{
- if ( host == NULL || host->type == 0 || _num_alts == 0 || output == NULL || size <= 0 ) return 0;
+ if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0;
int i, j;
int count = 0;
int distance[size];
- spin_lock( &_alts_lock );
- for (i = 0; i < _num_alts; ++i) {
- if ( host->type != _alt_servers[i].host.type ) continue; // Wrong address family
- if ( _alt_servers[i].isPrivate ) continue; // Do not tell clients about private servers
+ spin_lock( &altServersLock );
+ for (i = 0; i < numAltServers; ++i) {
+ if ( host->type != altServers[i].host.type ) continue; // Wrong address family
+ if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers
// TODO: Prefer same AF here, but if in the end we got less servers than requested, add
// servers of other AF too (after this loop)
if ( count == 0 ) {
// Trivial - this is the first entry
- output[0].host = _alt_servers[i].host;
+ output[0].host = altServers[i].host;
output[0].failures = 0;
distance[0] = altservers_netCloseness( host, &output[0].host );
count++;
} else {
// Other entries already exist, insert in proper position
- const int dist = altservers_netCloseness( host, &_alt_servers[i].host );
+ const int dist = altservers_netCloseness( host, &altServers[i].host );
for (j = 0; j < size; ++j) {
if ( j < count && dist <= distance[j] ) continue;
if ( j > count ) break; // Should never happen but just in case...
@@ -191,7 +200,7 @@ int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int
if ( count < size ) {
count++;
}
- output[j].host = _alt_servers[i].host;
+ output[j].host = altServers[i].host;
output[j].failures = 0;
distance[j] = dist;
break;
@@ -199,38 +208,42 @@ int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int
}
}
// TODO: "if count < size then add servers of other address families"
- spin_unlock( &_alts_lock );
+ spin_unlock( &altServersLock );
return count;
}
/**
* Get <size> alt servers. If there are more alt servers than
- * requested, random servers will be picked
+ * requested, random servers will be picked.
+ * This function is suited for finding uplink servers as
+ * it includes private servers and ignores any "client only" servers
*/
int altservers_get(dnbd3_host_t *output, int size)
{
if ( size <= 0 ) return 0;
int count = 0, i;
const time_t now = time( NULL );
- spin_lock( &_alts_lock );
+ spin_lock( &altServersLock );
// Flip first server in list with a random one every time this is called
- if ( _num_alts > 1 ) {
- const dnbd3_alt_server_t tmp = _alt_servers[0];
+ if ( numAltServers > 1 ) {
+ const dnbd3_alt_server_t tmp = altServers[0];
do {
- i = rand() % _num_alts;
+ i = rand() % numAltServers;
} while ( i == 0 );
- _alt_servers[0] = _alt_servers[i];
- _alt_servers[i] = tmp;
+ altServers[0] = altServers[i];
+ altServers[i] = tmp;
}
- for (i = 0; i < _num_alts; ++i) {
- if ( _alt_servers[i].host.type == 0 ) continue;
- if ( _proxyPrivateOnly && !_alt_servers[i].isPrivate ) continue;
- if ( _alt_servers[i].numFails > SERVER_MAX_UPLINK_FAILS && now - _alt_servers[i].lastFail > SERVER_BAD_UPLINK_IGNORE ) continue;
- _alt_servers[i].numFails = 0;
- output[count++] = _alt_servers[i].host;
+ for (i = 0; i < numAltServers; ++i) {
+ if ( altServers[i].host.type == 0 ) continue; // Slot is empty
+ if ( _proxyPrivateOnly && !altServers[i].isPrivate ) continue; // Config says to consider private alt-servers only? ignore!
+ if ( altServers[i].numFails > SERVER_MAX_UPLINK_FAILS // server failed X times in a row
+ && now - altServers[i].lastFail > SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore!
+ // server seems ok, include in output and reset its fail counter
+ altServers[i].numFails = 0;
+ output[count++] = altServers[i].host;
if ( count >= size ) break;
}
- spin_unlock( &_alts_lock );
+ spin_unlock( &altServersLock );
return count;
}
@@ -241,24 +254,24 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const
{
unsigned int avg = rtt;
int i;
- spin_lock( &_alts_lock );
- for (i = 0; i < _num_alts; ++i) {
- if ( !isSameAddressPort( host, &_alt_servers[i].host ) ) continue;
- _alt_servers[i].rtt[++_alt_servers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
+ spin_lock( &altServersLock );
+ for (i = 0; i < numAltServers; ++i) {
+ if ( !isSameAddressPort( host, &altServers[i].host ) ) continue;
+ altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
#if SERVER_RTT_PROBES == 5
- avg = (_alt_servers[i].rtt[0] + _alt_servers[i].rtt[1] + _alt_servers[i].rtt[2] + _alt_servers[i].rtt[3] + _alt_servers[i].rtt[4])
+ avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2] + altServers[i].rtt[3] + altServers[i].rtt[4])
/ SERVER_RTT_PROBES;
#else
#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES
avg = 0;
for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
- avg += _alt_servers[i].rtt[j];
+ avg += altServers[i].rtt[j];
}
avg /= SERVER_RTT_PROBES;
#endif
break;
}
- spin_unlock( &_alts_lock );
+ spin_unlock( &altServersLock );
return avg;
}
@@ -291,18 +304,29 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
{
int i;
const time_t now = time( NULL );
- spin_lock( &_alts_lock );
- for (i = 0; i < _num_alts; ++i) {
- if ( !isSameAddressPort( host, &_alt_servers[i].host ) ) continue;
- if ( now - _alt_servers[i].lastFail > SERVER_RTT_DELAY_INIT ) {
- _alt_servers[i].numFails++;
- _alt_servers[i].lastFail = now;
+ spin_lock( &altServersLock );
+ for (i = 0; i < numAltServers; ++i) {
+ if ( !isSameAddressPort( host, &altServers[i].host ) ) continue;
+ // Do only increase counter if last fail was not too recent. This is
+ // to prevent the counter from increasing rapidly if many images use the
+ // same uplink. If there's a network hickup, all uplinks will call this
+ // function and would increase the counter too quickly, disabling the server.
+ if ( now - altServers[i].lastFail > SERVER_RTT_DELAY_INIT ) {
+ altServers[i].numFails++;
+ altServers[i].lastFail = now;
}
break;
}
- spin_unlock( &_alts_lock );
+ spin_unlock( &altServersLock );
}
-
+/**
+ * Mainloop of this module. It will wait for requests by uplinks to find a
+ * suitable uplink server for them. If found, it will tell the uplink about
+ * the best server found. Currently the RTT history is kept per server and
+ * not per uplink, so if many images use the same uplink server, the history
+ * will update quite quickly. Needs to be improved some time, ie. by only
+ * updating the rtt if the last update was at least X seconds ago.
+ */
static void *altservers_main(void *data)
{
const int MAXEVENTS = 3;
@@ -320,7 +344,7 @@ static void *altservers_main(void *data)
setThreadName( "altserver-check" );
blockNoncriticalSignals();
// Init spinlock
- spin_init( &pendingLock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &pendingLockProduce, PTHREAD_PROCESS_PRIVATE );
// Init waiting links queue
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i)
pending[i] = NULL;
@@ -370,10 +394,13 @@ static void *altservers_main(void *data)
}
}
// Work your way through the queue
- spin_lock( &pendingLock );
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- if ( pending[itLink] == NULL ) continue;
- spin_unlock( &pendingLock );
+ if ( pending[itLink] == NULL ) continue; // Check once before locking, as a mutex is expensive
+ pthread_mutex_lock( &pendingLockConsume );
+ if ( pending[itLink] == NULL ) { // Check again after locking
+ continue;
+ pthread_mutex_unlock( &pendingLockConsume );
+ }
dnbd3_connection_t * const uplink = pending[itLink];
assert( uplink->rttTestResult == RTT_INPROGRESS );
// Now get 4 alt servers
@@ -394,7 +421,7 @@ static void *altservers_main(void *data)
unsigned int bestRtt = 0xfffffff;
unsigned int currentRtt = 0xfffffff;
for (itAlt = 0; itAlt < numAlts; ++itAlt) {
- usleep( 1000 );
+ usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...)
// Connect
clock_gettime( CLOCK_MONOTONIC_RAW, &start );
int sock = sock_connect( &servers[itAlt], 750, 1250 );
@@ -425,7 +452,7 @@ static void *altservers_main(void *data)
// Request random block ++++++++++++++++++++++++++++++
fixup_request( request );
if ( !dnbd3_get_block( sock,
- (((uint64_t)start.tv_nsec | (uint64_t)rand()) * DNBD3_BLOCK_SIZE )% uplink->image->filesize,
+ (((uint64_t)start.tv_nsec ^ (uint64_t)rand()) * DNBD3_BLOCK_SIZE )% uplink->image->filesize,
DNBD3_BLOCK_SIZE) ) {
ERROR_GOTO_VA( server_failed, "[ERROR] Could not request random block for %s", uplink->image->lower_name );
}
@@ -449,18 +476,23 @@ static void *altservers_main(void *data)
const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs
const unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt );
if ( uplink->fd != -1 && isSameAddressPort( &servers[itAlt], &uplink->currentServer ) ) {
+ // Was measuring current server
currentRtt = avg;
close( sock );
} else if ( avg < bestRtt ) {
+ // Was another server, update "best"
if ( bestSock != -1 ) close( bestSock );
bestSock = sock;
bestRtt = avg;
bestIndex = itAlt;
} else {
+ // Was too slow, ignore
close( sock );
}
+ // We're done, call continue
continue;
// Jump here if anything went wrong
+ // This will cleanup and continue
server_failed: ;
altservers_serverFailed( &servers[itAlt] );
server_image_not_available: ;
@@ -480,12 +512,10 @@ static void *altservers_main(void *data)
}
// end of loop over all pending uplinks
pending[itLink] = NULL;
- spin_lock( &pendingLock );
+ pthread_mutex_unlock( &pendingLockConsume );
}
- spin_unlock( &pendingLock );
}
cleanup: ;
- spin_destroy( &pendingLock );
if ( fdEpoll != -1 ) close( fdEpoll );
if ( readPipe != -1 ) close( readPipe );
if ( signalPipe != -1 ) close( signalPipe );