From 8c185da50460916115c66ea8e3be16c71759e06a Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 8 Feb 2019 16:04:51 +0100 Subject: [FUSE] Consider RTT of active connection for switch-decisions --- src/fuse/connection.c | 175 ++++++++++++++++++++++++++++++++++---------------- src/fuse/connection.h | 3 +- 2 files changed, 123 insertions(+), 55 deletions(-) (limited to 'src/fuse') diff --git a/src/fuse/connection.c b/src/fuse/connection.c index 8f28c09..fb2d2c8 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -62,11 +62,22 @@ typedef struct _alt_server { int rtts[RTT_COUNT]; int rttIndex; int bestCount; + int liveRtt; } alt_server_t; -static alt_server_t altservers[MAX_ALTS]; static dnbd3_server_entry_t newservers[MAX_ALTS]; -static pthread_spinlock_t altLock; +static pthread_mutex_t newAltLock = PTHREAD_MUTEX_INITIALIZER; +static alt_server_t altservers[MAX_ALTS]; +// WR: Use when re-assigning or sorting altservers, i.e. an index in altservers +// changes its meaning (host). Also used for newservers. +// RD: Use when reading the list or modifying individual entries data, like RTT +// and fail count. Isn't super clean as we still might have races here, but mostly +// the code is clean in this regard, so we should only have stale data somewhere +// but nothing nonsensical. +static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER; +#define lock_read pthread_rwlock_rdlock +#define lock_write pthread_rwlock_wrlock +#define unlock_rw pthread_rwlock_unlock /* Static methods */ @@ -205,8 +216,7 @@ bool connection_initThreads() threadInitDone = true; logadd( LOG_DEBUG1, "Initializing stuff" ); if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 - || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 - || pthread_spin_init( &altLock, PTHREAD_PROCESS_PRIVATE ) != 0 ) { + || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) { logadd( LOG_ERROR, "Mutex or spinlock init failure" ); success = false; } else { @@ -285,7 +295,7 @@ size_t connection_printStats(char *buffer, const size_t len) buffer += ret; } int i = -1; - pthread_spin_lock( &altLock ); + lock_read( &altLock ); while ( remaining > 3 && ++i < MAX_ALTS ) { if ( altservers[i].host.type == 0 ) continue; @@ -312,8 +322,8 @@ size_t connection_printStats(char *buffer, const size_t len) value = altservers[i].rtt; width += 3; } - ret = snprintf( buffer, remaining, "% *d %s Unreachable: % 4d BestCount: % 4d\n", - width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount ); + ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n", + width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt ); if ( ret < 0 ) { ret = 0; } @@ -324,7 +334,7 @@ size_t connection_printStats(char *buffer, const size_t len) remaining -= ret; buffer += ret; } - pthread_spin_unlock( &altLock ); + unlock_rw( &altLock ); return len - remaining; } @@ -349,7 +359,9 @@ static void* connection_receiveThreadMain(void *sockPtr) // Get block reply. find matching request dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); if ( request == NULL ) { - logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" ); + // This happens if the alt server probing thread tears down our connection + // and did a direct RTT probe to satisfy this very request. + logadd( LOG_DEBUG1, "Got block reply with no matching request" ); if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { logadd( LOG_DEBUG1, "....and choked on reply payload" ); goto fail; @@ -362,6 +374,20 @@ static void* connection_receiveThreadMain(void *sockPtr) connection_read( request ); goto fail; } + // Check RTT + uint64_t diff = nowMicro() - request->time; + if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s + lock_read( &altLock ); + for ( int i = 0; i < MAX_ALTS; ++i ) { + if ( altservers[i].host.type == 0 ) + continue; + if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) { + altservers[i].liveRtt = ( altservers[i].liveRtt * 2 + (int)diff ) / 3; + break; + } + } + unlock_rw( &altLock ); + } // Success, wake up caller request->success = true; request->finished = true; @@ -377,9 +403,9 @@ static void* connection_receiveThreadMain(void *sockPtr) logadd( LOG_DEBUG1, "Error receiving list of alt servers." ); goto fail; } - pthread_spin_lock( &altLock ); + pthread_mutex_lock( &newAltLock ); memcpy( newservers, entries, relevantSize ); - pthread_spin_unlock( &altLock ); + pthread_mutex_unlock( &newAltLock ); } else { // TODO: Handle the others? if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { @@ -458,7 +484,8 @@ static void* connection_backgroundThread(void *something UNUSED) static void addAltServers() { - pthread_spin_lock( &altLock ); + pthread_mutex_lock( &newAltLock ); + lock_write( &altLock ); for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) { if ( newservers[nIdx].host.type == 0 ) continue; @@ -490,11 +517,13 @@ static void addAltServers() altservers[slot].rtts[0] = RTT_UNREACHABLE; altservers[slot].rttIndex = 1; altservers[slot].host = newservers[nIdx].host; + altservers[slot].liveRtt = 0; } skip_server:; } memset( newservers, 0, sizeof(newservers) ); - pthread_spin_unlock( &altLock ); + unlock_rw( &altLock ); + pthread_mutex_unlock( &newAltLock ); } /** @@ -505,7 +534,7 @@ skip_server:; static void sortAltServers() { int ac = 0; - pthread_spin_lock( &altLock ); + lock_write( &altLock ); for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) { alt_server_t * const inactive = &altservers[ia]; if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 ) @@ -525,45 +554,72 @@ static void sortAltServers() inactive->bestCount = 0; inactive->rtts[0] = RTT_UNREACHABLE; inactive->rttIndex = 1; + inactive->liveRtt = 0; active->host = tmp; active->consecutiveFails = 0; active->bestCount = 0; active->rtts[0] = RTT_UNREACHABLE; active->rttIndex = 1; + active->liveRtt = 0; } - pthread_spin_unlock( &altLock ); + unlock_rw( &altLock ); } static void probeAltServers() { serialized_buffer_t buffer; dnbd3_reply_t reply; - int bestIndex = -1; - int currentIndex = -1; int bestSock = -1; - int currentRtt = RTT_UNREACHABLE; uint16_t remoteRid, remoteProto; uint64_t remoteSize; char *remoteName; bool doSwitch; - const bool panic = connection.sockFd == -1; - + bool panic = connection.sockFd == -1; uint64_t testOffset = 0; uint32_t testLength = RTT_BLOCK_SIZE; dnbd3_async_t *request = NULL; - if ( panic ) { - pthread_spin_lock( &requests.lock ); - if ( requests.head != NULL ) { + uint64_t now; + alt_server_t *current = NULL, *best = NULL; + + if ( !panic ) { + lock_read( &altLock ); + for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { + if ( altservers[altIndex].host.type != 0 + && isSameAddressPort( &altservers[altIndex].host, &connection.currentServer ) ) { + current = &altservers[altIndex]; + break; + } + } + unlock_rw( &altLock ); + } + now = nowMicro(); + pthread_spin_lock( &requests.lock ); + if ( requests.head != NULL ) { + if ( !panic && current != NULL ) { + const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second + dnbd3_async_t *iterator; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + if ( iterator->time == 0 ) + continue; // Huh? + // A request with measurement tag is pending + if ( now > iterator->time && now - iterator->time > maxDelay ) { + panic = true; + break; + } + } + } + if ( panic ) { request = requests.head; testOffset = requests.head->offset; testLength = requests.head->length; } - pthread_spin_unlock( &requests.lock ); - if ( testOffset != 0 ) { - logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength ); - } + } + pthread_spin_unlock( &requests.lock ); + if ( testOffset != 0 ) { + logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength ); } + lock_read( &altLock ); for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) { alt_server_t * const srv = &altservers[altIndex]; if ( srv->host.type == 0 ) @@ -637,6 +693,7 @@ static void probeAltServers() // Yay, success // Panic mode? Just switch to server if ( panic ) { + unlock_rw( &altLock ); switchConnection( sock, srv ); return; } @@ -649,15 +706,16 @@ static void probeAltServers() for ( int i = 0; i < RTT_COUNT; ++i ) { srv->rtt += srv->rtts[i]; } - srv->rtt /= RTT_COUNT; - // Remember rtt if this server matches the current one - if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) { - currentRtt = srv->rtt; - currentIndex = altIndex; + if ( srv->liveRtt != 0 ) { + // Make live rtt measurement influence result + srv->rtt = ( srv->rtt + srv->liveRtt ) / ( RTT_COUNT + 1 ); + } else { + srv->rtt /= RTT_COUNT; } + // Keep socket open if this is currently the best one - if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) { - bestIndex = altIndex; + if ( best == NULL || best->rtt > srv->rtt ) { + best = srv; if ( bestSock != -1 ) { close( bestSock ); } @@ -674,21 +732,25 @@ fail:; srv->consecutiveFails += 1; } doSwitch = false; - if ( bestIndex != -1 ) { + if ( best != NULL ) { // Time-sensitive switch decision: If a server was best for some consecutive measurements, // we switch no matter how small the difference to the current server is - pthread_spin_lock( &altLock ); - for ( int i = 0; i < MAX_ALTS_ACTIVE; ++i ) { - if ( i == bestIndex ) { - if ( altservers[i].bestCount < 50 ) { - altservers[i].bestCount += 2; + for ( int altIndex = 0; altIndex < MAX_ALTS_ACTIVE; ++altIndex ) { + alt_server_t * const srv = &altservers[altIndex]; + // Decay liveRtt slowly... + if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) { + srv->liveRtt -= ( srv->liveRtt / 100 + 1 ); + } + if ( srv == best ) { + if ( srv->bestCount < 50 ) { + srv->bestCount += 2; } // Switch with increasing probability the higher the bestCount is - if ( altservers[i].bestCount > 12 && altservers[i].rtt < currentRtt && altservers[i].bestCount > rand() % 50 ) { + if ( srv->bestCount > 12 && ( current == NULL || srv->rtt < current->rtt ) && srv->bestCount > rand() % 50 ) { doSwitch = true; } - } else if ( altservers[i].bestCount > 0 ) { - altservers[i].bestCount--; + } else if ( srv->bestCount > 0 ) { + srv->bestCount--; } } for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) { @@ -696,28 +758,31 @@ fail:; altservers[i].consecutiveFails--; } } - pthread_spin_unlock( &altLock ); // This takes care of the situation where two servers alternate being the best server all the time - if ( doSwitch && currentIndex != -1 && altservers[bestIndex].bestCount - altservers[currentIndex].bestCount < 8 ) { + if ( doSwitch && current != NULL && best->bestCount - current->bestCount < 8 ) { doSwitch = false; } // Regular logic: Apply threshold when considering switch - if ( !doSwitch ) { - doSwitch = currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD - || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1000; + if ( !doSwitch && current != NULL ) { + doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD + || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000; } } // Switch if a better server was found if ( doSwitch ) { - logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt ); + logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", current == NULL ? 0 : current->rtt, best->rtt ); for ( int i = 0; i < MAX_ALTS; ++i ) { - if ( i != bestIndex ) { + if ( &altservers[i] != best ) { altservers[i].bestCount = 0; } } - switchConnection( bestSock, &altservers[bestIndex] ); - } else if ( bestIndex != -1 ) { - // No switch + unlock_rw( &altLock ); + switchConnection( bestSock, best ); + return; + } + // No switch + unlock_rw( &altLock ); + if ( best != NULL ) { close( bestSock ); } } @@ -814,8 +879,10 @@ static void enqueueRequest(dnbd3_async_t *request) request->next = NULL; request->finished = false; request->success = false; - pthread_spin_lock( &requests.lock ); //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line ); + // Measure latency and add to switch formula + request->time = nowMicro(); + pthread_spin_lock( &requests.lock ); if ( requests.head == NULL ) { requests.head = requests.tail = request; } else { diff --git a/src/fuse/connection.h b/src/fuse/connection.h index 37bca88..929777a 100644 --- a/src/fuse/connection.h +++ b/src/fuse/connection.h @@ -10,10 +10,11 @@ struct _dnbd3_async; typedef struct _dnbd3_async { struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller) + dnbd3_signal_t* signal; // Used to signal the caller char* buffer; // Caller-provided buffer to be filled + uint64_t time; // When request was put on wire, 0 if not measuring uint64_t offset; uint32_t length; - dnbd3_signal_t* signal; // Used to signal the caller bool finished; // Will be set to true if the request has been handled bool success; // Will be set to true if the request succeeded } dnbd3_async_t; -- cgit v1.2.3-55-g7522