diff options
Diffstat (limited to 'src/fuse/connection.c')
-rw-r--r-- | src/fuse/connection.c | 927 |
1 files changed, 927 insertions, 0 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c new file mode 100644 index 0000000..fc9f05b --- /dev/null +++ b/src/fuse/connection.c @@ -0,0 +1,927 @@ +#include "connection.h" +#include "helper.h" +#include "../clientconfig.h" +#include "../shared/protocol.h" +#include "../shared/fdsignal.h" +#include "../shared/sockhelper.h" +#include "../shared/log.h" + +#include <stdlib.h> +#include <pthread.h> +#include <string.h> +#include <stdio.h> +#include <unistd.h> +#include <errno.h> +#include <time.h> +#include <inttypes.h> + +/* Constants */ +static const size_t SHORTBUF = 100; +#define MAX_ALTS (16) +#define MAX_ALTS_ACTIVE (5) +#define MAX_HOSTS_PER_ADDRESS (2) +// If a server wasn't reachable this many times, we slowly start skipping it on measurements +static const int FAIL_BACKOFF_START_COUNT = 8; +#define RTT_COUNT (4) + +/* Module variables */ + +// Init guard +static bool connectionInitDone = false; +static bool threadInitDone = false; +static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; +static bool keepRunning = true; +static bool learnNewServers; + +// List of pending requests +static struct { + dnbd3_async_t *head; + dnbd3_async_t *tail; + pthread_spinlock_t lock; +} requests; + +// Connection for the image +static struct { + char *name; + uint16_t rid; + uint64_t size; +} image; + +static struct { + int sockFd; + pthread_mutex_t sendMutex; + dnbd3_signal_t* panicSignal; + dnbd3_host_t currentServer; + ticks startupTime; +} connection; + +// Known alt servers +typedef struct _alt_server { + dnbd3_host_t host; + int consecutiveFails; + int rtt; + int rtts[RTT_COUNT]; + int rttIndex; + int bestCount; + int liveRtt; +} alt_server_t; + +static dnbd3_server_entry_t newservers[MAX_ALTS]; +static pthread_mutex_t newAltLock = PTHREAD_MUTEX_INITIALIZER; +static alt_server_t altservers[MAX_ALTS]; +// WR: Use when re-assigning or sorting altservers, i.e. an index in altservers +// changes its meaning (host). Also used for newservers. +// RD: Use when reading the list or modifying individual entries data, like RTT +// and fail count. Isn't super clean as we still might have races here, but mostly +// the code is clean in this regard, so we should only have stale data somewhere +// but nothing nonsensical. +static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER; +#define lock_read pthread_rwlock_rdlock +#define lock_write pthread_rwlock_wrlock +#define unlock_rw pthread_rwlock_unlock + +/* Static methods */ + + +static void* connection_receiveThreadMain(void *sock); +static void* connection_backgroundThread(void *something); + +static void addAltServers(); +static void sortAltServers(); +static void probeAltServers(); +static void switchConnection(int sockFd, alt_server_t *srv); +static void requestAltServers(); +static bool throwDataAway(int sockFd, uint32_t amount); + +static void enqueueRequest(dnbd3_async_t *request); +static dnbd3_async_t* removeRequest(dnbd3_async_t *request); + +bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew) +{ + int sock = -1; + char host[SHORTBUF]; + size_t hlen; + serialized_buffer_t buffer; + uint16_t remoteVersion, remoteRid; + char *remoteName; + uint64_t remoteSize; + struct sockaddr_storage sa; + socklen_t salen; + poll_list_t *cons = sock_newPollList(); + + timing_setBase(); + pthread_mutex_lock( &mutexInit ); + if ( !connectionInitDone && keepRunning ) { + dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; + const char *current, *end; + int altIndex = 0; + learnNewServers = doLearnNew; + memset( altservers, 0, sizeof altservers ); + connection.sockFd = -1; + current = hosts; + do { + // Get next host from string + while ( *current == ' ' ) current++; + end = strchr( current, ' ' ); + size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); + if ( len > SHORTBUF ) len = SHORTBUF; + snprintf( host, len, "%s", current ); + int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); + for ( int i = 0; i < newHosts; ++i ) { + if ( altIndex >= MAX_ALTS ) + break; + altservers[altIndex].host = tempHosts[i]; + altIndex += 1; + } + current = end + 1; + } while ( end != NULL && altIndex < MAX_ALTS ); + logadd( LOG_INFO, "Got %d servers from init call", altIndex ); + // Connect + for ( int i = 0; i < altIndex + 5; ++i ) { + if ( i >= altIndex ) { + // Additional iteration - no corresponding slot in altservers, this + // is just so we can make a final calls with longer timeout + sock = sock_multiConnect( cons, NULL, 400, 1000 ); + if ( sock == -2 ) { + logadd( LOG_ERROR, "Could not connect to any host" ); + sock = -1; + break; + } + } else { + if ( altservers[i].host.type == 0 ) + continue; + // Try to connect - 100ms timeout + sock = sock_multiConnect( cons, &altservers[i].host, 100, 1000 ); + } + if ( sock == -2 || sock == -1 ) + continue; + salen = sizeof(sa); + if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) { + logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno ); + close( sock ); + sock = -1; + continue; + } + hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) ); + logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host ); + if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) { + logadd( LOG_ERROR, "Could not send select image" ); + } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { + logadd( LOG_ERROR, "Could not read select image reply (%d)", errno ); + } else if ( rid != 0 && rid != remoteRid ) { + logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid ); + } else { + logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid ); + logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid ); + sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 ); + image.name = strdup( remoteName ); + image.rid = remoteRid; + image.size = remoteSize; + if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) { + logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" ); + connection.currentServer.type = 0; + } + connection.panicSignal = signal_new(); + timing_get( &connection.startupTime ); + connection.sockFd = sock; + requests.head = NULL; + requests.tail = NULL; + requestAltServers(); + break; + } + // Failed + if ( sock != -1 ) { + close( sock ); + sock = -1; + } + } + if ( sock != -1 ) { + connectionInitDone = true; + } + } + pthread_mutex_unlock( &mutexInit ); + sock_destroyPollList( cons ); + return sock != -1; +} + +bool connection_initThreads() +{ + pthread_mutex_lock( &mutexInit ); + if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) { + pthread_mutex_unlock( &mutexInit ); + return false; + } + bool success = true; + pthread_t thread; + threadInitDone = true; + logadd( LOG_DEBUG1, "Initializing stuff" ); + if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 + || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) { + logadd( LOG_ERROR, "Mutex or spinlock init failure" ); + success = false; + } else { + if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) { + logadd( LOG_ERROR, "Could not create receive thread" ); + success = false; + } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) { + logadd( LOG_ERROR, "Could not create background thread" ); + success = false; + } + } + if ( !success ) { + close( connection.sockFd ); + connection.sockFd = -1; + } + pthread_mutex_unlock( &mutexInit ); + return success; +} + +uint64_t connection_getImageSize() +{ + return image.size; +} + +bool connection_read(dnbd3_async_t *request) +{ + if ( !connectionInitDone ) return false; + pthread_mutex_lock( &connection.sendMutex ); + enqueueRequest( request ); + if ( connection.sockFd != -1 ) { + if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + pthread_mutex_unlock( &connection.sendMutex ); + signal_call( connection.panicSignal ); + return true; + } + } + pthread_mutex_unlock( &connection.sendMutex ); + return true; +} + +void connection_close() +{ + if ( keepRunning ) { + logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" ); + } + pthread_mutex_lock( &mutexInit ); + keepRunning = false; + if ( !connectionInitDone ) { + pthread_mutex_unlock( &mutexInit ); + return; + } + pthread_mutex_unlock( &mutexInit ); + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + shutdown( connection.sockFd, SHUT_RDWR ); + } + pthread_mutex_unlock( &connection.sendMutex ); +} + +size_t connection_printStats(char *buffer, const size_t len) +{ + int ret; + size_t remaining = len; + declare_now; + if ( remaining > 0 ) { + ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n", + image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) ); + if ( ret < 0 ) { + ret = 0; + } + if ( (size_t)ret >= remaining ) { + return len; + } + remaining -= ret; + buffer += ret; + } + int i = -1; + lock_read( &altLock ); + while ( remaining > 3 && ++i < MAX_ALTS ) { + if ( altservers[i].host.type == 0 ) + continue; + if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) { + *buffer++ = '*'; + } else if ( i >= MAX_ALTS_ACTIVE ) { + *buffer++ = '-'; + } else { + *buffer++ = ' '; + } + const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining ); + remaining -= (addrlen + 1); // For space or * above + buffer += addrlen; + if ( remaining < 3 ) + break; + int width = addrlen >= 35 ? 0 : 35 - (int)addrlen; + char *unit; + int value; + if ( altservers[i].rtt > 5000 ) { + unit = "ms "; + value = altservers[i].rtt / 1000; + } else { + unit = "µs"; + value = altservers[i].rtt; + width += 3; + } + ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n", + width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt ); + if ( ret < 0 ) { + ret = 0; + } + if ( (size_t)ret >= remaining ) { + remaining = 0; + break; + } + remaining -= ret; + buffer += ret; + } + unlock_rw( &altLock ); + return len - remaining; +} + +static void* connection_receiveThreadMain(void *sockPtr) +{ + int sockFd = (int)(size_t)sockPtr; + dnbd3_reply_t reply; + pthread_detach( pthread_self() ); + + while ( keepRunning ) { + int ret; + do { + ret = dnbd3_read_reply( sockFd, &reply, true ); + if ( ret == REPLY_OK ) break; + } while ( ret == REPLY_INTR || ret == REPLY_AGAIN ); + if ( ret != REPLY_OK ) { + logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret ); + goto fail; + } + + if ( reply.cmd == CMD_GET_BLOCK ) { + // Get block reply. find matching request + dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); + if ( request == NULL ) { + // This happens if the alt server probing thread tears down our connection + // and did a direct RTT probe to satisfy this very request. + logadd( LOG_DEBUG1, "Got block reply with no matching request" ); + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { + logadd( LOG_DEBUG1, "....and choked on reply payload" ); + goto fail; + } + } else { + // Found a match + const ssize_t ret = sock_recv( sockFd, request->buffer, request->length ); + if ( ret != (ssize_t)request->length ) { + logadd( LOG_DEBUG1, "receiving payload for a block reply failed" ); + connection_read( request ); + goto fail; + } + // Check RTT + declare_now; + uint64_t diff = timing_diffUs( &request->time, &now ); + if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s + lock_read( &altLock ); + for ( int i = 0; i < MAX_ALTS; ++i ) { + if ( altservers[i].host.type == 0 ) + continue; + if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) { + altservers[i].liveRtt = ( altservers[i].liveRtt * 3 + (int)diff ) / 4; + break; + } + } + unlock_rw( &altLock ); + } + // Success, wake up caller + request->success = true; + request->finished = true; + signal_call( request->signal ); + } + } else if ( reply.cmd == CMD_GET_SERVERS ) { + // List of known alt servers + dnbd3_server_entry_t entries[MAX_ALTS]; + const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS ); + const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count; + if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize + || !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) { + logadd( LOG_DEBUG1, "Error receiving list of alt servers." ); + goto fail; + } + pthread_mutex_lock( &newAltLock ); + memcpy( newservers, entries, relevantSize ); + pthread_mutex_unlock( &newAltLock ); + } else { + // TODO: Handle the others? + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { + logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd ); + goto fail; + } + } + } + logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" ); +fail:; + // Make sure noone is trying to use the socket for sending by locking, + pthread_mutex_lock( &connection.sendMutex ); + // then just set the fd to -1, but only if it's the same fd as ours, + // as someone could have established a new connection already + if ( connection.sockFd == sockFd ) { + connection.sockFd = -1; + signal_call( connection.panicSignal ); + } + pthread_mutex_unlock( &connection.sendMutex ); + // As we're the only reader, it's safe to close the socket now + close( sockFd ); + return NULL; +} + +static void* connection_backgroundThread(void *something UNUSED) +{ + ticks nextKeepalive; + ticks nextRttCheck; + + timing_get( &nextKeepalive ); + nextRttCheck = nextKeepalive; + while ( keepRunning ) { + ticks now; + timing_get( &now ); + uint32_t wt1 = timing_diffMs( &now, &nextKeepalive ); + uint32_t wt2 = timing_diffMs( &now, &nextRttCheck ); + if ( wt1 > 0 && wt2 > 0 ) { + int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 ); + if ( waitRes == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno ); + } + timing_get( &now ); + } + // Woken up, see what we have to do + const bool panic = connection.sockFd == -1; + // Check alt servers + if ( panic || timing_reachedPrecise( &nextRttCheck, &now ) ) { + if ( learnNewServers ) { + addAltServers(); + } + sortAltServers(); + probeAltServers(); + if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) { + timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP ); + } else { + timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL ); + } + } + // Send keepalive packet + if ( timing_reachedPrecise( &nextKeepalive, &now ) ) { + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + request.cmd = CMD_KEEPALIVE; + request.handle = request.offset = request.size = 0; + fixup_request( request ); + ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 ); + if ( (size_t)ret != sizeof request ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + nextRttCheck = now; + } + } + pthread_mutex_unlock( &connection.sendMutex ); + timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET ); + } + } + return NULL; +} + +// Private quick helpers + +static void addAltServers() +{ + pthread_mutex_lock( &newAltLock ); + lock_write( &altLock ); + for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) { + if ( newservers[nIdx].host.type == 0 ) + continue; + // Got a new alt server, see if it's already known + for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { + if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) { + goto skip_server; + } + } + // Not known yet, add - find free slot + int slot = -1; + for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { + if ( altservers[eIdx].host.type == 0 ) { + slot = eIdx; // free - bail out and use this one + break; + } + if ( altservers[eIdx].consecutiveFails > FAIL_BACKOFF_START_COUNT + && slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) { + // Replace an existing alt-server that failed recently if we got no more slots + slot = eIdx; + } + } + if ( slot != -1 ) { + char txt[200]; + sock_printHost( &newservers[nIdx].host, txt, 200 ); + logadd( LOG_DEBUG1, "new server %s in slot %d", txt, slot ); + altservers[slot].consecutiveFails = 0; + altservers[slot].bestCount = 0; + altservers[slot].rtts[0] = RTT_UNREACHABLE; + altservers[slot].rttIndex = 1; + altservers[slot].host = newservers[nIdx].host; + altservers[slot].liveRtt = 0; + } +skip_server:; + } + memset( newservers, 0, sizeof(newservers) ); + unlock_rw( &altLock ); + pthread_mutex_unlock( &newAltLock ); +} + +/** + * Find a server at index >= MAX_ALTS_ACTIVE (one that isn't considered for switching over) + * that has been inactive for a while, then look if there's an active server that's failed + * a couple of times recently. Swap both if found. + */ +static void sortAltServers() +{ + int ac = 0; + lock_write( &altLock ); + for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) { + alt_server_t * const inactive = &altservers[ia]; + if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 ) + continue; + while ( ac < MAX_ALTS_ACTIVE ) { + if ( altservers[ac].host.type == 0 || altservers[ac].consecutiveFails > FAIL_BACKOFF_START_COUNT ) + break; + ac++; + } + if ( ac == MAX_ALTS_ACTIVE ) + break; + // Switch! + alt_server_t * const active = &altservers[ac]; + dnbd3_host_t tmp = inactive->host; + inactive->host = active->host; + inactive->consecutiveFails = FAIL_BACKOFF_START_COUNT * 4; + inactive->bestCount = 0; + inactive->rtts[0] = RTT_UNREACHABLE; + inactive->rttIndex = 1; + inactive->liveRtt = 0; + active->host = tmp; + active->consecutiveFails = 0; + active->bestCount = 0; + active->rtts[0] = RTT_UNREACHABLE; + active->rttIndex = 1; + active->liveRtt = 0; + } + unlock_rw( &altLock ); +} + +static void probeAltServers() +{ + serialized_buffer_t buffer; + dnbd3_reply_t reply; + int bestSock = -1; + uint16_t remoteRid, remoteProto; + uint64_t remoteSize; + char *remoteName; + bool doSwitch; + bool panic = connection.sockFd == -1; + uint64_t testOffset = 0; + uint32_t testLength = RTT_BLOCK_SIZE; + dnbd3_async_t *request = NULL; + alt_server_t *current = NULL, *best = NULL; + + if ( !panic ) { + lock_read( &altLock ); + for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { + if ( altservers[altIndex].host.type != 0 + && isSameAddressPort( &altservers[altIndex].host, &connection.currentServer ) ) { + current = &altservers[altIndex]; + break; + } + } + unlock_rw( &altLock ); + } + declare_now; + pthread_spin_lock( &requests.lock ); + if ( requests.head != NULL ) { + if ( !panic && current != NULL ) { + const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second + dnbd3_async_t *iterator; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + // A request with measurement tag is pending + if ( timing_diffUs( &iterator->time, &now ) > maxDelay ) { + panic = true; + break; + } + } + } + if ( panic ) { + request = requests.head; + testOffset = requests.head->offset; + testLength = requests.head->length; + } + } + pthread_spin_unlock( &requests.lock ); + if ( testOffset != 0 ) { + logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength ); + } + + lock_read( &altLock ); + for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) { + alt_server_t * const srv = &altservers[altIndex]; + if ( srv->host.type == 0 ) + continue; + if ( !panic && srv->consecutiveFails > FAIL_BACKOFF_START_COUNT + && rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) { + continue; + } + if ( srv->rttIndex >= RTT_COUNT ) { + srv->rttIndex = 0; + } else { + srv->rttIndex += 1; + } + // Probe + ticks start; + timing_get( &start ); + errno = 0; + int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); + if ( sock == -1 ) { + logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno ); + goto fail; + } + if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { + logadd( LOG_DEBUG1, "probe: select_image failed" ); + goto fail; + } + if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) { + logadd( LOG_DEBUG1, "probe: select image reply failed" ); + goto fail; + } + if ( remoteProto < MIN_SUPPORTED_SERVER ) { + logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto ); + srv->consecutiveFails += 10; + goto fail; + } + if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) { + logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName ); + srv->consecutiveFails += 10; + goto fail; + } + if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) { + logadd( LOG_DEBUG1, "-> block request fail" ); + goto fail; + } + int a = 111; + if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) { + logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size ); + goto fail; + } + if ( request != NULL && removeRequest( request ) != NULL ) { + // Request successfully removed from queue + const ssize_t ret = sock_recv( sock, request->buffer, request->length ); + if ( ret != (ssize_t)request->length ) { + logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" ); + // Failure, add to queue again + connection_read( request ); + goto fail; + } + // Success, wake up caller + logadd( LOG_DEBUG1, "[RTT] Successful direct probe" ); + request->success = true; + request->finished = true; + signal_call( request->signal ); + } else { + // Wasn't a request that's in our request queue + if ( !throwDataAway( sock, testLength ) ) { + logadd( LOG_DEBUG1, "<- get block reply payload fail" ); + goto fail; + } + } + + // Yay, success + // Panic mode? Just switch to server + if ( panic ) { + unlock_rw( &altLock ); + switchConnection( sock, srv ); + return; + } + // Non-panic mode: + // Update stats of server + ticks end; + timing_get( &end ); + srv->consecutiveFails = 0; + srv->rtts[srv->rttIndex] = (int)timing_diffUs( &start, &end ); + int newRtt = 0; + for ( int i = 0; i < RTT_COUNT; ++i ) { + newRtt += srv->rtts[i]; + } + if ( srv->liveRtt != 0 ) { + // Make live rtt measurement influence result + newRtt = ( newRtt + srv->liveRtt ) / ( RTT_COUNT + 1 ); + } else { + newRtt /= RTT_COUNT; + } + srv->rtt = newRtt; + + // Keep socket open if this is currently the best one + if ( best == NULL || best->rtt > srv->rtt ) { + best = srv; + if ( bestSock != -1 ) { + close( bestSock ); + } + bestSock = sock; + } else { + close( sock ); + } + continue; +fail:; + if ( sock != -1 ) { + close( sock ); + } + srv->rtts[srv->rttIndex] = RTT_UNREACHABLE; + srv->consecutiveFails += 1; + } + doSwitch = false; + if ( best != NULL ) { + // Time-sensitive switch decision: If a server was best for some consecutive measurements, + // we switch no matter how small the difference to the current server is + for ( int altIndex = 0; altIndex < MAX_ALTS_ACTIVE; ++altIndex ) { + alt_server_t * const srv = &altservers[altIndex]; + // Decay liveRtt slowly... + if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) { + srv->liveRtt -= ( ( srv->liveRtt / 100 ) + 1 ); + } + if ( srv == best ) { + if ( srv->bestCount < 50 ) { + srv->bestCount += 2; + } + // Switch with increasing probability the higher the bestCount is + if ( srv->bestCount > 12 && ( current == NULL || srv->rtt < current->rtt ) && srv->bestCount > rand() % 50 ) { + doSwitch = true; + } + } else if ( srv->bestCount > 0 ) { + srv->bestCount--; + } + } + for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) { + if ( altservers[i].consecutiveFails > 0 ) { + altservers[i].consecutiveFails--; + } + } + // This takes care of the situation where two servers alternate being the best server all the time + if ( doSwitch && current != NULL && best->bestCount - current->bestCount < 8 ) { + doSwitch = false; + } + // Regular logic: Apply threshold when considering switch + if ( !doSwitch && current != NULL ) { + doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD + || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000; + } + } + // Switch if a better server was found + if ( doSwitch ) { + logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", current == NULL ? 0 : current->rtt, best->rtt ); + for ( int i = 0; i < MAX_ALTS; ++i ) { + if ( &altservers[i] != best ) { + altservers[i].bestCount = 0; + } + } + unlock_rw( &altLock ); + switchConnection( bestSock, best ); + return; + } + // No switch + unlock_rw( &altLock ); + if ( best != NULL ) { + close( bestSock ); + } +} + +static void switchConnection(int sockFd, alt_server_t *srv) +{ + pthread_t thread; + struct sockaddr_storage addr; + socklen_t addrLen = sizeof(addr); + char message[200] = "Connection switched to "; + const size_t len = strlen( message ); + int ret; + dnbd3_async_t *queue, *it; + + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + shutdown( connection.sockFd, SHUT_RDWR ); + } + ret = getpeername( sockFd, (struct sockaddr*)&addr, &addrLen ); + if ( ret == 0 ) { + connection.currentServer = srv->host; + connection.sockFd = sockFd; + pthread_spin_lock( &requests.lock ); + queue = requests.head; + requests.head = requests.tail = NULL; + pthread_spin_unlock( &requests.lock ); + } else { + connection.sockFd = -1; + } + requestAltServers(); + pthread_mutex_unlock( &connection.sendMutex ); + if ( ret != 0 ) { + close( sockFd ); + logadd( LOG_WARNING, "Could not getpeername after connection switch, assuming connection already dead again. (Errno=%d)", errno ); + signal_call( connection.panicSignal ); + return; + } + timing_get( &connection.startupTime ); + pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd ); + sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len ); + logadd( LOG_INFO, "%s", message ); + // resend queue + if ( queue != NULL ) { + pthread_mutex_lock( &connection.sendMutex ); + dnbd3_async_t *next = NULL; + for ( it = queue; it != NULL; it = next ) { + logadd( LOG_DEBUG1, "Requeue after server change" ); + next = it->next; + enqueueRequest( it ); + if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it, 0 ) ) { + logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" ); + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + signal_call( connection.panicSignal ); + } + } + pthread_mutex_unlock( &connection.sendMutex ); + } +} + +/** + * Does not lock, so get the sendMutex first! + */ +static void requestAltServers() +{ + if ( connection.sockFd == -1 || !learnNewServers ) + return; + dnbd3_request_t request = { 0 }; + request.magic = dnbd3_packet_magic; + request.cmd = CMD_GET_SERVERS; + fixup_request( request ); + if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) { + logadd( LOG_WARNING, "Connection failed while requesting alt server list" ); + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + } +} + +static bool throwDataAway(int sockFd, uint32_t amount) +{ + size_t done = 0; + char tempBuffer[SHORTBUF]; + while ( done < amount ) { + const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) ); + if ( ret <= 0 ) + return false; + done += (size_t)ret; + } + return true; +} + +static void enqueueRequest(dnbd3_async_t *request) +{ + request->next = NULL; + request->finished = false; + request->success = false; + //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line ); + // Measure latency and add to switch formula + timing_get( &request->time ); + pthread_spin_lock( &requests.lock ); + if ( requests.head == NULL ) { + requests.head = requests.tail = request; + } else { + requests.tail->next = request; + requests.tail = request; + } + pthread_spin_unlock( &requests.lock ); +} + +static dnbd3_async_t* removeRequest(dnbd3_async_t *request) +{ + pthread_spin_lock( &requests.lock ); + //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line ); + dnbd3_async_t *iterator, *prev = NULL; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + if ( iterator == request ) { + // Found it, break! + if ( prev != NULL ) { + prev->next = iterator->next; + } else { + requests.head = iterator->next; + } + if ( requests.tail == iterator ) { + requests.tail = prev; + } + break; + } + prev = iterator; + } + pthread_spin_unlock( &requests.lock ); + return iterator; +} + |