summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
authorSimon Rettberg2019-02-08 16:04:51 +0100
committerSimon Rettberg2019-02-08 16:04:51 +0100
commit8c185da50460916115c66ea8e3be16c71759e06a (patch)
treedbd3389aa2b975134b8a85732328ce929be80763 /src/fuse
parent[SERVER] uplink: Dedicated function for handling link failure (diff)
downloaddnbd3-8c185da50460916115c66ea8e3be16c71759e06a.tar.gz
dnbd3-8c185da50460916115c66ea8e3be16c71759e06a.tar.xz
dnbd3-8c185da50460916115c66ea8e3be16c71759e06a.zip
[FUSE] Consider RTT of active connection for switch-decisions
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/connection.c175
-rw-r--r--src/fuse/connection.h3
2 files changed, 123 insertions, 55 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 8f28c09..fb2d2c8 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -62,11 +62,22 @@ typedef struct _alt_server {
int rtts[RTT_COUNT];
int rttIndex;
int bestCount;
+ int liveRtt;
} alt_server_t;
-static alt_server_t altservers[MAX_ALTS];
static dnbd3_server_entry_t newservers[MAX_ALTS];
-static pthread_spinlock_t altLock;
+static pthread_mutex_t newAltLock = PTHREAD_MUTEX_INITIALIZER;
+static alt_server_t altservers[MAX_ALTS];
+// WR: Use when re-assigning or sorting altservers, i.e. an index in altservers
+// changes its meaning (host). Also used for newservers.
+// RD: Use when reading the list or modifying individual entries data, like RTT
+// and fail count. Isn't super clean as we still might have races here, but mostly
+// the code is clean in this regard, so we should only have stale data somewhere
+// but nothing nonsensical.
+static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER;
+#define lock_read pthread_rwlock_rdlock
+#define lock_write pthread_rwlock_wrlock
+#define unlock_rw pthread_rwlock_unlock
/* Static methods */
@@ -205,8 +216,7 @@ bool connection_initThreads()
threadInitDone = true;
logadd( LOG_DEBUG1, "Initializing stuff" );
if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
- || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0
- || pthread_spin_init( &altLock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
+ || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
logadd( LOG_ERROR, "Mutex or spinlock init failure" );
success = false;
} else {
@@ -285,7 +295,7 @@ size_t connection_printStats(char *buffer, const size_t len)
buffer += ret;
}
int i = -1;
- pthread_spin_lock( &altLock );
+ lock_read( &altLock );
while ( remaining > 3 && ++i < MAX_ALTS ) {
if ( altservers[i].host.type == 0 )
continue;
@@ -312,8 +322,8 @@ size_t connection_printStats(char *buffer, const size_t len)
value = altservers[i].rtt;
width += 3;
}
- ret = snprintf( buffer, remaining, "% *d %s Unreachable: % 4d BestCount: % 4d\n",
- width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount );
+ ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n",
+ width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt );
if ( ret < 0 ) {
ret = 0;
}
@@ -324,7 +334,7 @@ size_t connection_printStats(char *buffer, const size_t len)
remaining -= ret;
buffer += ret;
}
- pthread_spin_unlock( &altLock );
+ unlock_rw( &altLock );
return len - remaining;
}
@@ -349,7 +359,9 @@ static void* connection_receiveThreadMain(void *sockPtr)
// Get block reply. find matching request
dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
if ( request == NULL ) {
- logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" );
+ // This happens if the alt server probing thread tears down our connection
+ // and did a direct RTT probe to satisfy this very request.
+ logadd( LOG_DEBUG1, "Got block reply with no matching request" );
if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
logadd( LOG_DEBUG1, "....and choked on reply payload" );
goto fail;
@@ -362,6 +374,20 @@ static void* connection_receiveThreadMain(void *sockPtr)
connection_read( request );
goto fail;
}
+ // Check RTT
+ uint64_t diff = nowMicro() - request->time;
+ if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s
+ lock_read( &altLock );
+ for ( int i = 0; i < MAX_ALTS; ++i ) {
+ if ( altservers[i].host.type == 0 )
+ continue;
+ if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) {
+ altservers[i].liveRtt = ( altservers[i].liveRtt * 2 + (int)diff ) / 3;
+ break;
+ }
+ }
+ unlock_rw( &altLock );
+ }
// Success, wake up caller
request->success = true;
request->finished = true;
@@ -377,9 +403,9 @@ static void* connection_receiveThreadMain(void *sockPtr)
logadd( LOG_DEBUG1, "Error receiving list of alt servers." );
goto fail;
}
- pthread_spin_lock( &altLock );
+ pthread_mutex_lock( &newAltLock );
memcpy( newservers, entries, relevantSize );
- pthread_spin_unlock( &altLock );
+ pthread_mutex_unlock( &newAltLock );
} else {
// TODO: Handle the others?
if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
@@ -458,7 +484,8 @@ static void* connection_backgroundThread(void *something UNUSED)
static void addAltServers()
{
- pthread_spin_lock( &altLock );
+ pthread_mutex_lock( &newAltLock );
+ lock_write( &altLock );
for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) {
if ( newservers[nIdx].host.type == 0 )
continue;
@@ -490,11 +517,13 @@ static void addAltServers()
altservers[slot].rtts[0] = RTT_UNREACHABLE;
altservers[slot].rttIndex = 1;
altservers[slot].host = newservers[nIdx].host;
+ altservers[slot].liveRtt = 0;
}
skip_server:;
}
memset( newservers, 0, sizeof(newservers) );
- pthread_spin_unlock( &altLock );
+ unlock_rw( &altLock );
+ pthread_mutex_unlock( &newAltLock );
}
/**
@@ -505,7 +534,7 @@ skip_server:;
static void sortAltServers()
{
int ac = 0;
- pthread_spin_lock( &altLock );
+ lock_write( &altLock );
for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) {
alt_server_t * const inactive = &altservers[ia];
if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 )
@@ -525,45 +554,72 @@ static void sortAltServers()
inactive->bestCount = 0;
inactive->rtts[0] = RTT_UNREACHABLE;
inactive->rttIndex = 1;
+ inactive->liveRtt = 0;
active->host = tmp;
active->consecutiveFails = 0;
active->bestCount = 0;
active->rtts[0] = RTT_UNREACHABLE;
active->rttIndex = 1;
+ active->liveRtt = 0;
}
- pthread_spin_unlock( &altLock );
+ unlock_rw( &altLock );
}
static void probeAltServers()
{
serialized_buffer_t buffer;
dnbd3_reply_t reply;
- int bestIndex = -1;
- int currentIndex = -1;
int bestSock = -1;
- int currentRtt = RTT_UNREACHABLE;
uint16_t remoteRid, remoteProto;
uint64_t remoteSize;
char *remoteName;
bool doSwitch;
- const bool panic = connection.sockFd == -1;
-
+ bool panic = connection.sockFd == -1;
uint64_t testOffset = 0;
uint32_t testLength = RTT_BLOCK_SIZE;
dnbd3_async_t *request = NULL;
- if ( panic ) {
- pthread_spin_lock( &requests.lock );
- if ( requests.head != NULL ) {
+ uint64_t now;
+ alt_server_t *current = NULL, *best = NULL;
+
+ if ( !panic ) {
+ lock_read( &altLock );
+ for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
+ if ( altservers[altIndex].host.type != 0
+ && isSameAddressPort( &altservers[altIndex].host, &connection.currentServer ) ) {
+ current = &altservers[altIndex];
+ break;
+ }
+ }
+ unlock_rw( &altLock );
+ }
+ now = nowMicro();
+ pthread_spin_lock( &requests.lock );
+ if ( requests.head != NULL ) {
+ if ( !panic && current != NULL ) {
+ const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
+ dnbd3_async_t *iterator;
+ for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
+ if ( iterator->time == 0 )
+ continue; // Huh?
+ // A request with measurement tag is pending
+ if ( now > iterator->time && now - iterator->time > maxDelay ) {
+ panic = true;
+ break;
+ }
+ }
+ }
+ if ( panic ) {
request = requests.head;
testOffset = requests.head->offset;
testLength = requests.head->length;
}
- pthread_spin_unlock( &requests.lock );
- if ( testOffset != 0 ) {
- logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength );
- }
+ }
+ pthread_spin_unlock( &requests.lock );
+ if ( testOffset != 0 ) {
+ logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength );
}
+ lock_read( &altLock );
for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) {
alt_server_t * const srv = &altservers[altIndex];
if ( srv->host.type == 0 )
@@ -637,6 +693,7 @@ static void probeAltServers()
// Yay, success
// Panic mode? Just switch to server
if ( panic ) {
+ unlock_rw( &altLock );
switchConnection( sock, srv );
return;
}
@@ -649,15 +706,16 @@ static void probeAltServers()
for ( int i = 0; i < RTT_COUNT; ++i ) {
srv->rtt += srv->rtts[i];
}
- srv->rtt /= RTT_COUNT;
- // Remember rtt if this server matches the current one
- if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) {
- currentRtt = srv->rtt;
- currentIndex = altIndex;
+ if ( srv->liveRtt != 0 ) {
+ // Make live rtt measurement influence result
+ srv->rtt = ( srv->rtt + srv->liveRtt ) / ( RTT_COUNT + 1 );
+ } else {
+ srv->rtt /= RTT_COUNT;
}
+
// Keep socket open if this is currently the best one
- if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) {
- bestIndex = altIndex;
+ if ( best == NULL || best->rtt > srv->rtt ) {
+ best = srv;
if ( bestSock != -1 ) {
close( bestSock );
}
@@ -674,21 +732,25 @@ fail:;
srv->consecutiveFails += 1;
}
doSwitch = false;
- if ( bestIndex != -1 ) {
+ if ( best != NULL ) {
// Time-sensitive switch decision: If a server was best for some consecutive measurements,
// we switch no matter how small the difference to the current server is
- pthread_spin_lock( &altLock );
- for ( int i = 0; i < MAX_ALTS_ACTIVE; ++i ) {
- if ( i == bestIndex ) {
- if ( altservers[i].bestCount < 50 ) {
- altservers[i].bestCount += 2;
+ for ( int altIndex = 0; altIndex < MAX_ALTS_ACTIVE; ++altIndex ) {
+ alt_server_t * const srv = &altservers[altIndex];
+ // Decay liveRtt slowly...
+ if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) {
+ srv->liveRtt -= ( srv->liveRtt / 100 + 1 );
+ }
+ if ( srv == best ) {
+ if ( srv->bestCount < 50 ) {
+ srv->bestCount += 2;
}
// Switch with increasing probability the higher the bestCount is
- if ( altservers[i].bestCount > 12 && altservers[i].rtt < currentRtt && altservers[i].bestCount > rand() % 50 ) {
+ if ( srv->bestCount > 12 && ( current == NULL || srv->rtt < current->rtt ) && srv->bestCount > rand() % 50 ) {
doSwitch = true;
}
- } else if ( altservers[i].bestCount > 0 ) {
- altservers[i].bestCount--;
+ } else if ( srv->bestCount > 0 ) {
+ srv->bestCount--;
}
}
for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) {
@@ -696,28 +758,31 @@ fail:;
altservers[i].consecutiveFails--;
}
}
- pthread_spin_unlock( &altLock );
// This takes care of the situation where two servers alternate being the best server all the time
- if ( doSwitch && currentIndex != -1 && altservers[bestIndex].bestCount - altservers[currentIndex].bestCount < 8 ) {
+ if ( doSwitch && current != NULL && best->bestCount - current->bestCount < 8 ) {
doSwitch = false;
}
// Regular logic: Apply threshold when considering switch
- if ( !doSwitch ) {
- doSwitch = currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD
- || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1000;
+ if ( !doSwitch && current != NULL ) {
+ doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD
+ || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000;
}
}
// Switch if a better server was found
if ( doSwitch ) {
- logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt );
+ logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", current == NULL ? 0 : current->rtt, best->rtt );
for ( int i = 0; i < MAX_ALTS; ++i ) {
- if ( i != bestIndex ) {
+ if ( &altservers[i] != best ) {
altservers[i].bestCount = 0;
}
}
- switchConnection( bestSock, &altservers[bestIndex] );
- } else if ( bestIndex != -1 ) {
- // No switch
+ unlock_rw( &altLock );
+ switchConnection( bestSock, best );
+ return;
+ }
+ // No switch
+ unlock_rw( &altLock );
+ if ( best != NULL ) {
close( bestSock );
}
}
@@ -814,8 +879,10 @@ static void enqueueRequest(dnbd3_async_t *request)
request->next = NULL;
request->finished = false;
request->success = false;
- pthread_spin_lock( &requests.lock );
//logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
+ // Measure latency and add to switch formula
+ request->time = nowMicro();
+ pthread_spin_lock( &requests.lock );
if ( requests.head == NULL ) {
requests.head = requests.tail = request;
} else {
diff --git a/src/fuse/connection.h b/src/fuse/connection.h
index 37bca88..929777a 100644
--- a/src/fuse/connection.h
+++ b/src/fuse/connection.h
@@ -10,10 +10,11 @@ struct _dnbd3_async;
typedef struct _dnbd3_async {
struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller)
+ dnbd3_signal_t* signal; // Used to signal the caller
char* buffer; // Caller-provided buffer to be filled
+ uint64_t time; // When request was put on wire, 0 if not measuring
uint64_t offset;
uint32_t length;
- dnbd3_signal_t* signal; // Used to signal the caller
bool finished; // Will be set to true if the request has been handled
bool success; // Will be set to true if the request succeeded
} dnbd3_async_t;