summaryrefslogtreecommitdiffstats
path: root/src/fuse/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fuse/connection.c')
-rw-r--r--src/fuse/connection.c927
1 files changed, 927 insertions, 0 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
new file mode 100644
index 0000000..fc9f05b
--- /dev/null
+++ b/src/fuse/connection.c
@@ -0,0 +1,927 @@
+#include "connection.h"
+#include "helper.h"
+#include "../clientconfig.h"
+#include "../shared/protocol.h"
+#include "../shared/fdsignal.h"
+#include "../shared/sockhelper.h"
+#include "../shared/log.h"
+
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <time.h>
+#include <inttypes.h>
+
+/* Constants */
+static const size_t SHORTBUF = 100;
+#define MAX_ALTS (16)
+#define MAX_ALTS_ACTIVE (5)
+#define MAX_HOSTS_PER_ADDRESS (2)
+// If a server wasn't reachable this many times, we slowly start skipping it on measurements
+static const int FAIL_BACKOFF_START_COUNT = 8;
+#define RTT_COUNT (4)
+
+/* Module variables */
+
+// Init guard
+static bool connectionInitDone = false;
+static bool threadInitDone = false;
+static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
+static bool keepRunning = true;
+static bool learnNewServers;
+
+// List of pending requests
+static struct {
+ dnbd3_async_t *head;
+ dnbd3_async_t *tail;
+ pthread_spinlock_t lock;
+} requests;
+
+// Connection for the image
+static struct {
+ char *name;
+ uint16_t rid;
+ uint64_t size;
+} image;
+
+static struct {
+ int sockFd;
+ pthread_mutex_t sendMutex;
+ dnbd3_signal_t* panicSignal;
+ dnbd3_host_t currentServer;
+ ticks startupTime;
+} connection;
+
+// Known alt servers
+typedef struct _alt_server {
+ dnbd3_host_t host;
+ int consecutiveFails;
+ int rtt;
+ int rtts[RTT_COUNT];
+ int rttIndex;
+ int bestCount;
+ int liveRtt;
+} alt_server_t;
+
+static dnbd3_server_entry_t newservers[MAX_ALTS];
+static pthread_mutex_t newAltLock = PTHREAD_MUTEX_INITIALIZER;
+static alt_server_t altservers[MAX_ALTS];
+// WR: Use when re-assigning or sorting altservers, i.e. an index in altservers
+// changes its meaning (host). Also used for newservers.
+// RD: Use when reading the list or modifying individual entries data, like RTT
+// and fail count. Isn't super clean as we still might have races here, but mostly
+// the code is clean in this regard, so we should only have stale data somewhere
+// but nothing nonsensical.
+static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER;
+#define lock_read pthread_rwlock_rdlock
+#define lock_write pthread_rwlock_wrlock
+#define unlock_rw pthread_rwlock_unlock
+
+/* Static methods */
+
+
+static void* connection_receiveThreadMain(void *sock);
+static void* connection_backgroundThread(void *something);
+
+static void addAltServers();
+static void sortAltServers();
+static void probeAltServers();
+static void switchConnection(int sockFd, alt_server_t *srv);
+static void requestAltServers();
+static bool throwDataAway(int sockFd, uint32_t amount);
+
+static void enqueueRequest(dnbd3_async_t *request);
+static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
+
+bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew)
+{
+ int sock = -1;
+ char host[SHORTBUF];
+ size_t hlen;
+ serialized_buffer_t buffer;
+ uint16_t remoteVersion, remoteRid;
+ char *remoteName;
+ uint64_t remoteSize;
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ poll_list_t *cons = sock_newPollList();
+
+ timing_setBase();
+ pthread_mutex_lock( &mutexInit );
+ if ( !connectionInitDone && keepRunning ) {
+ dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
+ const char *current, *end;
+ int altIndex = 0;
+ learnNewServers = doLearnNew;
+ memset( altservers, 0, sizeof altservers );
+ connection.sockFd = -1;
+ current = hosts;
+ do {
+ // Get next host from string
+ while ( *current == ' ' ) current++;
+ end = strchr( current, ' ' );
+ size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1);
+ if ( len > SHORTBUF ) len = SHORTBUF;
+ snprintf( host, len, "%s", current );
+ int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
+ for ( int i = 0; i < newHosts; ++i ) {
+ if ( altIndex >= MAX_ALTS )
+ break;
+ altservers[altIndex].host = tempHosts[i];
+ altIndex += 1;
+ }
+ current = end + 1;
+ } while ( end != NULL && altIndex < MAX_ALTS );
+ logadd( LOG_INFO, "Got %d servers from init call", altIndex );
+ // Connect
+ for ( int i = 0; i < altIndex + 5; ++i ) {
+ if ( i >= altIndex ) {
+ // Additional iteration - no corresponding slot in altservers, this
+ // is just so we can make a final calls with longer timeout
+ sock = sock_multiConnect( cons, NULL, 400, 1000 );
+ if ( sock == -2 ) {
+ logadd( LOG_ERROR, "Could not connect to any host" );
+ sock = -1;
+ break;
+ }
+ } else {
+ if ( altservers[i].host.type == 0 )
+ continue;
+ // Try to connect - 100ms timeout
+ sock = sock_multiConnect( cons, &altservers[i].host, 100, 1000 );
+ }
+ if ( sock == -2 || sock == -1 )
+ continue;
+ salen = sizeof(sa);
+ if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
+ logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno );
+ close( sock );
+ sock = -1;
+ continue;
+ }
+ hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) );
+ logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host );
+ if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
+ logadd( LOG_ERROR, "Could not send select image" );
+ } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
+ logadd( LOG_ERROR, "Could not read select image reply (%d)", errno );
+ } else if ( rid != 0 && rid != remoteRid ) {
+ logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid );
+ } else {
+ logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid );
+ logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid );
+ sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 );
+ image.name = strdup( remoteName );
+ image.rid = remoteRid;
+ image.size = remoteSize;
+ if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) {
+ logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" );
+ connection.currentServer.type = 0;
+ }
+ connection.panicSignal = signal_new();
+ timing_get( &connection.startupTime );
+ connection.sockFd = sock;
+ requests.head = NULL;
+ requests.tail = NULL;
+ requestAltServers();
+ break;
+ }
+ // Failed
+ if ( sock != -1 ) {
+ close( sock );
+ sock = -1;
+ }
+ }
+ if ( sock != -1 ) {
+ connectionInitDone = true;
+ }
+ }
+ pthread_mutex_unlock( &mutexInit );
+ sock_destroyPollList( cons );
+ return sock != -1;
+}
+
+bool connection_initThreads()
+{
+ pthread_mutex_lock( &mutexInit );
+ if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) {
+ pthread_mutex_unlock( &mutexInit );
+ return false;
+ }
+ bool success = true;
+ pthread_t thread;
+ threadInitDone = true;
+ logadd( LOG_DEBUG1, "Initializing stuff" );
+ if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
+ || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
+ logadd( LOG_ERROR, "Mutex or spinlock init failure" );
+ success = false;
+ } else {
+ if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create receive thread" );
+ success = false;
+ } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create background thread" );
+ success = false;
+ }
+ }
+ if ( !success ) {
+ close( connection.sockFd );
+ connection.sockFd = -1;
+ }
+ pthread_mutex_unlock( &mutexInit );
+ return success;
+}
+
+uint64_t connection_getImageSize()
+{
+ return image.size;
+}
+
+bool connection_read(dnbd3_async_t *request)
+{
+ if ( !connectionInitDone ) return false;
+ pthread_mutex_lock( &connection.sendMutex );
+ enqueueRequest( request );
+ if ( connection.sockFd != -1 ) {
+ if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ pthread_mutex_unlock( &connection.sendMutex );
+ signal_call( connection.panicSignal );
+ return true;
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ return true;
+}
+
+void connection_close()
+{
+ if ( keepRunning ) {
+ logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" );
+ }
+ pthread_mutex_lock( &mutexInit );
+ keepRunning = false;
+ if ( !connectionInitDone ) {
+ pthread_mutex_unlock( &mutexInit );
+ return;
+ }
+ pthread_mutex_unlock( &mutexInit );
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+}
+
+size_t connection_printStats(char *buffer, const size_t len)
+{
+ int ret;
+ size_t remaining = len;
+ declare_now;
+ if ( remaining > 0 ) {
+ ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n",
+ image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) );
+ if ( ret < 0 ) {
+ ret = 0;
+ }
+ if ( (size_t)ret >= remaining ) {
+ return len;
+ }
+ remaining -= ret;
+ buffer += ret;
+ }
+ int i = -1;
+ lock_read( &altLock );
+ while ( remaining > 3 && ++i < MAX_ALTS ) {
+ if ( altservers[i].host.type == 0 )
+ continue;
+ if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) {
+ *buffer++ = '*';
+ } else if ( i >= MAX_ALTS_ACTIVE ) {
+ *buffer++ = '-';
+ } else {
+ *buffer++ = ' ';
+ }
+ const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining );
+ remaining -= (addrlen + 1); // For space or * above
+ buffer += addrlen;
+ if ( remaining < 3 )
+ break;
+ int width = addrlen >= 35 ? 0 : 35 - (int)addrlen;
+ char *unit;
+ int value;
+ if ( altservers[i].rtt > 5000 ) {
+ unit = "ms ";
+ value = altservers[i].rtt / 1000;
+ } else {
+ unit = "µs";
+ value = altservers[i].rtt;
+ width += 3;
+ }
+ ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n",
+ width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt );
+ if ( ret < 0 ) {
+ ret = 0;
+ }
+ if ( (size_t)ret >= remaining ) {
+ remaining = 0;
+ break;
+ }
+ remaining -= ret;
+ buffer += ret;
+ }
+ unlock_rw( &altLock );
+ return len - remaining;
+}
+
+static void* connection_receiveThreadMain(void *sockPtr)
+{
+ int sockFd = (int)(size_t)sockPtr;
+ dnbd3_reply_t reply;
+ pthread_detach( pthread_self() );
+
+ while ( keepRunning ) {
+ int ret;
+ do {
+ ret = dnbd3_read_reply( sockFd, &reply, true );
+ if ( ret == REPLY_OK ) break;
+ } while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
+ if ( ret != REPLY_OK ) {
+ logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
+ goto fail;
+ }
+
+ if ( reply.cmd == CMD_GET_BLOCK ) {
+ // Get block reply. find matching request
+ dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
+ if ( request == NULL ) {
+ // This happens if the alt server probing thread tears down our connection
+ // and did a direct RTT probe to satisfy this very request.
+ logadd( LOG_DEBUG1, "Got block reply with no matching request" );
+ if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
+ logadd( LOG_DEBUG1, "....and choked on reply payload" );
+ goto fail;
+ }
+ } else {
+ // Found a match
+ const ssize_t ret = sock_recv( sockFd, request->buffer, request->length );
+ if ( ret != (ssize_t)request->length ) {
+ logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
+ connection_read( request );
+ goto fail;
+ }
+ // Check RTT
+ declare_now;
+ uint64_t diff = timing_diffUs( &request->time, &now );
+ if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s
+ lock_read( &altLock );
+ for ( int i = 0; i < MAX_ALTS; ++i ) {
+ if ( altservers[i].host.type == 0 )
+ continue;
+ if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) {
+ altservers[i].liveRtt = ( altservers[i].liveRtt * 3 + (int)diff ) / 4;
+ break;
+ }
+ }
+ unlock_rw( &altLock );
+ }
+ // Success, wake up caller
+ request->success = true;
+ request->finished = true;
+ signal_call( request->signal );
+ }
+ } else if ( reply.cmd == CMD_GET_SERVERS ) {
+ // List of known alt servers
+ dnbd3_server_entry_t entries[MAX_ALTS];
+ const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS );
+ const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count;
+ if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize
+ || !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) {
+ logadd( LOG_DEBUG1, "Error receiving list of alt servers." );
+ goto fail;
+ }
+ pthread_mutex_lock( &newAltLock );
+ memcpy( newservers, entries, relevantSize );
+ pthread_mutex_unlock( &newAltLock );
+ } else {
+ // TODO: Handle the others?
+ if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
+ logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd );
+ goto fail;
+ }
+ }
+ }
+ logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
+fail:;
+ // Make sure noone is trying to use the socket for sending by locking,
+ pthread_mutex_lock( &connection.sendMutex );
+ // then just set the fd to -1, but only if it's the same fd as ours,
+ // as someone could have established a new connection already
+ if ( connection.sockFd == sockFd ) {
+ connection.sockFd = -1;
+ signal_call( connection.panicSignal );
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ // As we're the only reader, it's safe to close the socket now
+ close( sockFd );
+ return NULL;
+}
+
+static void* connection_backgroundThread(void *something UNUSED)
+{
+ ticks nextKeepalive;
+ ticks nextRttCheck;
+
+ timing_get( &nextKeepalive );
+ nextRttCheck = nextKeepalive;
+ while ( keepRunning ) {
+ ticks now;
+ timing_get( &now );
+ uint32_t wt1 = timing_diffMs( &now, &nextKeepalive );
+ uint32_t wt2 = timing_diffMs( &now, &nextRttCheck );
+ if ( wt1 > 0 && wt2 > 0 ) {
+ int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 );
+ if ( waitRes == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
+ }
+ timing_get( &now );
+ }
+ // Woken up, see what we have to do
+ const bool panic = connection.sockFd == -1;
+ // Check alt servers
+ if ( panic || timing_reachedPrecise( &nextRttCheck, &now ) ) {
+ if ( learnNewServers ) {
+ addAltServers();
+ }
+ sortAltServers();
+ probeAltServers();
+ if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) {
+ timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP );
+ } else {
+ timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL );
+ }
+ }
+ // Send keepalive packet
+ if ( timing_reachedPrecise( &nextKeepalive, &now ) ) {
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ dnbd3_request_t request;
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_KEEPALIVE;
+ request.handle = request.offset = request.size = 0;
+ fixup_request( request );
+ ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 );
+ if ( (size_t)ret != sizeof request ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ nextRttCheck = now;
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET );
+ }
+ }
+ return NULL;
+}
+
+// Private quick helpers
+
+static void addAltServers()
+{
+ pthread_mutex_lock( &newAltLock );
+ lock_write( &altLock );
+ for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) {
+ if ( newservers[nIdx].host.type == 0 )
+ continue;
+ // Got a new alt server, see if it's already known
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) {
+ goto skip_server;
+ }
+ }
+ // Not known yet, add - find free slot
+ int slot = -1;
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( altservers[eIdx].host.type == 0 ) {
+ slot = eIdx; // free - bail out and use this one
+ break;
+ }
+ if ( altservers[eIdx].consecutiveFails > FAIL_BACKOFF_START_COUNT
+ && slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) {
+ // Replace an existing alt-server that failed recently if we got no more slots
+ slot = eIdx;
+ }
+ }
+ if ( slot != -1 ) {
+ char txt[200];
+ sock_printHost( &newservers[nIdx].host, txt, 200 );
+ logadd( LOG_DEBUG1, "new server %s in slot %d", txt, slot );
+ altservers[slot].consecutiveFails = 0;
+ altservers[slot].bestCount = 0;
+ altservers[slot].rtts[0] = RTT_UNREACHABLE;
+ altservers[slot].rttIndex = 1;
+ altservers[slot].host = newservers[nIdx].host;
+ altservers[slot].liveRtt = 0;
+ }
+skip_server:;
+ }
+ memset( newservers, 0, sizeof(newservers) );
+ unlock_rw( &altLock );
+ pthread_mutex_unlock( &newAltLock );
+}
+
+/**
+ * Find a server at index >= MAX_ALTS_ACTIVE (one that isn't considered for switching over)
+ * that has been inactive for a while, then look if there's an active server that's failed
+ * a couple of times recently. Swap both if found.
+ */
+static void sortAltServers()
+{
+ int ac = 0;
+ lock_write( &altLock );
+ for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) {
+ alt_server_t * const inactive = &altservers[ia];
+ if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 )
+ continue;
+ while ( ac < MAX_ALTS_ACTIVE ) {
+ if ( altservers[ac].host.type == 0 || altservers[ac].consecutiveFails > FAIL_BACKOFF_START_COUNT )
+ break;
+ ac++;
+ }
+ if ( ac == MAX_ALTS_ACTIVE )
+ break;
+ // Switch!
+ alt_server_t * const active = &altservers[ac];
+ dnbd3_host_t tmp = inactive->host;
+ inactive->host = active->host;
+ inactive->consecutiveFails = FAIL_BACKOFF_START_COUNT * 4;
+ inactive->bestCount = 0;
+ inactive->rtts[0] = RTT_UNREACHABLE;
+ inactive->rttIndex = 1;
+ inactive->liveRtt = 0;
+ active->host = tmp;
+ active->consecutiveFails = 0;
+ active->bestCount = 0;
+ active->rtts[0] = RTT_UNREACHABLE;
+ active->rttIndex = 1;
+ active->liveRtt = 0;
+ }
+ unlock_rw( &altLock );
+}
+
+static void probeAltServers()
+{
+ serialized_buffer_t buffer;
+ dnbd3_reply_t reply;
+ int bestSock = -1;
+ uint16_t remoteRid, remoteProto;
+ uint64_t remoteSize;
+ char *remoteName;
+ bool doSwitch;
+ bool panic = connection.sockFd == -1;
+ uint64_t testOffset = 0;
+ uint32_t testLength = RTT_BLOCK_SIZE;
+ dnbd3_async_t *request = NULL;
+ alt_server_t *current = NULL, *best = NULL;
+
+ if ( !panic ) {
+ lock_read( &altLock );
+ for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
+ if ( altservers[altIndex].host.type != 0
+ && isSameAddressPort( &altservers[altIndex].host, &connection.currentServer ) ) {
+ current = &altservers[altIndex];
+ break;
+ }
+ }
+ unlock_rw( &altLock );
+ }
+ declare_now;
+ pthread_spin_lock( &requests.lock );
+ if ( requests.head != NULL ) {
+ if ( !panic && current != NULL ) {
+ const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
+ dnbd3_async_t *iterator;
+ for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
+ // A request with measurement tag is pending
+ if ( timing_diffUs( &iterator->time, &now ) > maxDelay ) {
+ panic = true;
+ break;
+ }
+ }
+ }
+ if ( panic ) {
+ request = requests.head;
+ testOffset = requests.head->offset;
+ testLength = requests.head->length;
+ }
+ }
+ pthread_spin_unlock( &requests.lock );
+ if ( testOffset != 0 ) {
+ logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength );
+ }
+
+ lock_read( &altLock );
+ for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) {
+ alt_server_t * const srv = &altservers[altIndex];
+ if ( srv->host.type == 0 )
+ continue;
+ if ( !panic && srv->consecutiveFails > FAIL_BACKOFF_START_COUNT
+ && rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) {
+ continue;
+ }
+ if ( srv->rttIndex >= RTT_COUNT ) {
+ srv->rttIndex = 0;
+ } else {
+ srv->rttIndex += 1;
+ }
+ // Probe
+ ticks start;
+ timing_get( &start );
+ errno = 0;
+ int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
+ if ( sock == -1 ) {
+ logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno );
+ goto fail;
+ }
+ if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
+ logadd( LOG_DEBUG1, "probe: select_image failed" );
+ goto fail;
+ }
+ if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
+ logadd( LOG_DEBUG1, "probe: select image reply failed" );
+ goto fail;
+ }
+ if ( remoteProto < MIN_SUPPORTED_SERVER ) {
+ logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
+ srv->consecutiveFails += 10;
+ goto fail;
+ }
+ if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
+ logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
+ srv->consecutiveFails += 10;
+ goto fail;
+ }
+ if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) {
+ logadd( LOG_DEBUG1, "-> block request fail" );
+ goto fail;
+ }
+ int a = 111;
+ if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) {
+ logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size );
+ goto fail;
+ }
+ if ( request != NULL && removeRequest( request ) != NULL ) {
+ // Request successfully removed from queue
+ const ssize_t ret = sock_recv( sock, request->buffer, request->length );
+ if ( ret != (ssize_t)request->length ) {
+ logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" );
+ // Failure, add to queue again
+ connection_read( request );
+ goto fail;
+ }
+ // Success, wake up caller
+ logadd( LOG_DEBUG1, "[RTT] Successful direct probe" );
+ request->success = true;
+ request->finished = true;
+ signal_call( request->signal );
+ } else {
+ // Wasn't a request that's in our request queue
+ if ( !throwDataAway( sock, testLength ) ) {
+ logadd( LOG_DEBUG1, "<- get block reply payload fail" );
+ goto fail;
+ }
+ }
+
+ // Yay, success
+ // Panic mode? Just switch to server
+ if ( panic ) {
+ unlock_rw( &altLock );
+ switchConnection( sock, srv );
+ return;
+ }
+ // Non-panic mode:
+ // Update stats of server
+ ticks end;
+ timing_get( &end );
+ srv->consecutiveFails = 0;
+ srv->rtts[srv->rttIndex] = (int)timing_diffUs( &start, &end );
+ int newRtt = 0;
+ for ( int i = 0; i < RTT_COUNT; ++i ) {
+ newRtt += srv->rtts[i];
+ }
+ if ( srv->liveRtt != 0 ) {
+ // Make live rtt measurement influence result
+ newRtt = ( newRtt + srv->liveRtt ) / ( RTT_COUNT + 1 );
+ } else {
+ newRtt /= RTT_COUNT;
+ }
+ srv->rtt = newRtt;
+
+ // Keep socket open if this is currently the best one
+ if ( best == NULL || best->rtt > srv->rtt ) {
+ best = srv;
+ if ( bestSock != -1 ) {
+ close( bestSock );
+ }
+ bestSock = sock;
+ } else {
+ close( sock );
+ }
+ continue;
+fail:;
+ if ( sock != -1 ) {
+ close( sock );
+ }
+ srv->rtts[srv->rttIndex] = RTT_UNREACHABLE;
+ srv->consecutiveFails += 1;
+ }
+ doSwitch = false;
+ if ( best != NULL ) {
+ // Time-sensitive switch decision: If a server was best for some consecutive measurements,
+ // we switch no matter how small the difference to the current server is
+ for ( int altIndex = 0; altIndex < MAX_ALTS_ACTIVE; ++altIndex ) {
+ alt_server_t * const srv = &altservers[altIndex];
+ // Decay liveRtt slowly...
+ if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) {
+ srv->liveRtt -= ( ( srv->liveRtt / 100 ) + 1 );
+ }
+ if ( srv == best ) {
+ if ( srv->bestCount < 50 ) {
+ srv->bestCount += 2;
+ }
+ // Switch with increasing probability the higher the bestCount is
+ if ( srv->bestCount > 12 && ( current == NULL || srv->rtt < current->rtt ) && srv->bestCount > rand() % 50 ) {
+ doSwitch = true;
+ }
+ } else if ( srv->bestCount > 0 ) {
+ srv->bestCount--;
+ }
+ }
+ for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) {
+ if ( altservers[i].consecutiveFails > 0 ) {
+ altservers[i].consecutiveFails--;
+ }
+ }
+ // This takes care of the situation where two servers alternate being the best server all the time
+ if ( doSwitch && current != NULL && best->bestCount - current->bestCount < 8 ) {
+ doSwitch = false;
+ }
+ // Regular logic: Apply threshold when considering switch
+ if ( !doSwitch && current != NULL ) {
+ doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD
+ || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000;
+ }
+ }
+ // Switch if a better server was found
+ if ( doSwitch ) {
+ logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", current == NULL ? 0 : current->rtt, best->rtt );
+ for ( int i = 0; i < MAX_ALTS; ++i ) {
+ if ( &altservers[i] != best ) {
+ altservers[i].bestCount = 0;
+ }
+ }
+ unlock_rw( &altLock );
+ switchConnection( bestSock, best );
+ return;
+ }
+ // No switch
+ unlock_rw( &altLock );
+ if ( best != NULL ) {
+ close( bestSock );
+ }
+}
+
+static void switchConnection(int sockFd, alt_server_t *srv)
+{
+ pthread_t thread;
+ struct sockaddr_storage addr;
+ socklen_t addrLen = sizeof(addr);
+ char message[200] = "Connection switched to ";
+ const size_t len = strlen( message );
+ int ret;
+ dnbd3_async_t *queue, *it;
+
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ }
+ ret = getpeername( sockFd, (struct sockaddr*)&addr, &addrLen );
+ if ( ret == 0 ) {
+ connection.currentServer = srv->host;
+ connection.sockFd = sockFd;
+ pthread_spin_lock( &requests.lock );
+ queue = requests.head;
+ requests.head = requests.tail = NULL;
+ pthread_spin_unlock( &requests.lock );
+ } else {
+ connection.sockFd = -1;
+ }
+ requestAltServers();
+ pthread_mutex_unlock( &connection.sendMutex );
+ if ( ret != 0 ) {
+ close( sockFd );
+ logadd( LOG_WARNING, "Could not getpeername after connection switch, assuming connection already dead again. (Errno=%d)", errno );
+ signal_call( connection.panicSignal );
+ return;
+ }
+ timing_get( &connection.startupTime );
+ pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd );
+ sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len );
+ logadd( LOG_INFO, "%s", message );
+ // resend queue
+ if ( queue != NULL ) {
+ pthread_mutex_lock( &connection.sendMutex );
+ dnbd3_async_t *next = NULL;
+ for ( it = queue; it != NULL; it = next ) {
+ logadd( LOG_DEBUG1, "Requeue after server change" );
+ next = it->next;
+ enqueueRequest( it );
+ if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it, 0 ) ) {
+ logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" );
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ signal_call( connection.panicSignal );
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ }
+}
+
+/**
+ * Does not lock, so get the sendMutex first!
+ */
+static void requestAltServers()
+{
+ if ( connection.sockFd == -1 || !learnNewServers )
+ return;
+ dnbd3_request_t request = { 0 };
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_GET_SERVERS;
+ fixup_request( request );
+ if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) {
+ logadd( LOG_WARNING, "Connection failed while requesting alt server list" );
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ }
+}
+
+static bool throwDataAway(int sockFd, uint32_t amount)
+{
+ size_t done = 0;
+ char tempBuffer[SHORTBUF];
+ while ( done < amount ) {
+ const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) );
+ if ( ret <= 0 )
+ return false;
+ done += (size_t)ret;
+ }
+ return true;
+}
+
+static void enqueueRequest(dnbd3_async_t *request)
+{
+ request->next = NULL;
+ request->finished = false;
+ request->success = false;
+ //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
+ // Measure latency and add to switch formula
+ timing_get( &request->time );
+ pthread_spin_lock( &requests.lock );
+ if ( requests.head == NULL ) {
+ requests.head = requests.tail = request;
+ } else {
+ requests.tail->next = request;
+ requests.tail = request;
+ }
+ pthread_spin_unlock( &requests.lock );
+}
+
+static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
+{
+ pthread_spin_lock( &requests.lock );
+ //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line );
+ dnbd3_async_t *iterator, *prev = NULL;
+ for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
+ if ( iterator == request ) {
+ // Found it, break!
+ if ( prev != NULL ) {
+ prev->next = iterator->next;
+ } else {
+ requests.head = iterator->next;
+ }
+ if ( requests.tail == iterator ) {
+ requests.tail = prev;
+ }
+ break;
+ }
+ prev = iterator;
+ }
+ pthread_spin_unlock( &requests.lock );
+ return iterator;
+}
+