summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSimon Rettberg2023-02-12 11:26:05 +0100
committerSimon Rettberg2023-02-12 11:26:05 +0100
commitd3c25d55cfcbe56d7287c622c3d23814737ca49a (patch)
treef6b7eacb58221b69131fbbf413def60ad606cefa /src
parent[SHARED] sock_printable: Always null-terminate (diff)
downloaddnbd3-d3c25d55cfcbe56d7287c622c3d23814737ca49a.tar.gz
dnbd3-d3c25d55cfcbe56d7287c622c3d23814737ca49a.tar.xz
dnbd3-d3c25d55cfcbe56d7287c622c3d23814737ca49a.zip
[FUSE] Make initial connect entirely parallel wrt servers
Previously, we only did the actual socket connect in a concurrent matter. Once a connection was successfully established, we did a blocking handshake on the protocol level. If the server war particularly slow, this was bad as we would not try other servers until after this. Throw out the previous non-blocking async connect logic, switching to a multi-threaded approach, that spawns one thread per host/ip, offset by 200ms, until one of the attempts succeeds.
Diffstat (limited to 'src')
-rw-r--r--src/fuse/connection.c343
1 files changed, 232 insertions, 111 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index da181ea..5ad3ae7 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -32,7 +32,13 @@ static const int FAIL_BACKOFF_START_COUNT = 8;
static bool connectionInitDone = false;
static bool threadInitDone = false;
static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
+// For multi-threaded concurrent connection during init
+static pthread_mutex_t mutexCondConn = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t condConn = PTHREAD_COND_INITIALIZER;
+static atomic_int pendingConnectionAttempts = 0;
+// Shutdown flag
atomic_bool keepRunning = true;
+// Should we learn new alt-servers from servers we connect to?
static bool learnNewServers;
static pthread_t tidReceiver;
@@ -60,6 +66,12 @@ static struct {
ticks startupTime;
} connection;
+struct conn_data {
+ char *lowerImage;
+ uint16_t rid;
+ int idx;
+};
+
// Known alt servers
typedef struct _alt_server {
dnbd3_host_t host;
@@ -88,14 +100,17 @@ static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER;
/* Static methods */
+static void* connectThread(void * data);
static void* connection_receiveThreadMain( void *sock );
static void* connection_backgroundThread( void *something );
-static void addAltServers();
+static bool hasAltServer( dnbd3_host_t *host );
+static void addAltServers( void );
static void sortAltServers();
static void probeAltServers();
static void switchConnection( int sockFd, alt_server_t *srv );
-static void requestAltServers();
+static void requestAltServers( void );
+static bool sendAltServerRequest( int sock );
static bool throwDataAway( int sockFd, uint32_t amount );
static void enqueueRequest( dnbd3_async_t *request );
@@ -105,110 +120,202 @@ static void blockSignals();
bool connection_init( const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew )
{
- int sock = -1;
char host[SHORTBUF];
- size_t hlen;
- serialized_buffer_t buffer;
- uint16_t remoteVersion, remoteRid;
- char *remoteName;
- uint64_t remoteSize;
- struct sockaddr_storage sa;
- socklen_t salen;
- poll_list_t *cons = sock_newPollList();
+ dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
+ const char *current, *end;
+ int altIndex = 0;
timing_setBase();
pthread_mutex_lock( &mutexInit );
- if ( !connectionInitDone ) {
- dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
- const char *current, *end;
- int altIndex = 0;
- learnNewServers = doLearnNew;
- memset( altservers, 0, sizeof altservers );
- connection.sockFd = -1;
- current = hosts;
- do {
- // Get next host from string
- while ( *current == ' ' ) current++;
- end = strchr( current, ' ' );
- size_t len = ( end == NULL ? SHORTBUF : (size_t)( end - current ) + 1 );
- if ( len > SHORTBUF ) len = SHORTBUF;
- snprintf( host, len, "%s", current );
- int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
- for ( int i = 0; i < newHosts; ++i ) {
- if ( altIndex >= MAX_ALTS )
- break;
- altservers[altIndex].host = tempHosts[i];
- altIndex += 1;
- }
- current = end + 1;
- } while ( end != NULL && altIndex < MAX_ALTS );
- logadd( LOG_INFO, "Got %d servers from init call", altIndex );
- // Connect
- for ( int i = 0; i < altIndex + 5; ++i ) {
- if ( i >= altIndex ) {
- // Additional iteration - no corresponding slot in altservers, this
- // is just so we can make a final calls with longer timeout
- sock = sock_multiConnect( cons, NULL, 400, 3000 );
- if ( sock == -2 ) {
- logadd( LOG_ERROR, "Could not connect to any host" );
- sock = -1;
- break;
- }
- } else {
- if ( altservers[i].host.type == 0 )
- continue;
- // Try to connect - 100ms timeout
- sock = sock_multiConnect( cons, &altservers[i].host, 100, 3000 );
- }
- if ( sock == -2 || sock == -1 )
- continue;
- salen = sizeof( sa );
- if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
- logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno );
- close( sock );
- sock = -1;
+ if ( connectionInitDone ) {
+ pthread_mutex_unlock( &mutexInit );
+ return false;
+ }
+ learnNewServers = doLearnNew;
+ memset( altservers, 0, sizeof altservers );
+ connection.sockFd = -1;
+ current = hosts;
+ pthread_attr_t threadAttrs;
+ pthread_attr_init( &threadAttrs );
+ pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
+ // Resolve all hosts and connect
+ pthread_mutex_lock( &mutexCondConn );
+ do {
+ // Get next host from string
+ while ( *current == ' ' || *current == '\t' || *current == '\n' ) {
+ current++;
+ }
+ end = current;
+ while ( *end != ' ' && *end != '\t' && *end != '\n' && *end != '\0' ) {
+ end++;
+ }
+ if ( end == current )
+ break;
+ size_t len = (size_t)( end - current ) + 1;
+ if ( len > SHORTBUF ) {
+ len = SHORTBUF;
+ }
+ snprintf( host, len, "%s", current );
+ int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
+ for ( int i = 0; i < newHosts; ++i ) {
+ if ( altIndex >= MAX_ALTS )
+ break;
+ if ( hasAltServer( &tempHosts[i] ) )
continue;
- }
- hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof( host ) );
- logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host );
- if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
- logadd( LOG_ERROR, "Could not send select image" );
- } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
- logadd( LOG_ERROR, "Could not read select image reply (%d)", errno );
- } else if ( rid != 0 && rid != remoteRid ) {
- logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid );
- } else {
- logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid );
- logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid );
- sock_setTimeout( sock, SOCKET_TIMEOUT_RECV * 1000 );
- image.name = strdup( remoteName );
- image.rid = remoteRid;
- image.size = remoteSize;
- if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) {
- logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" );
- connection.currentServer.type = 0;
+ altservers[altIndex].host = tempHosts[i];
+ // Start thread for async connect if not connected yet
+ atomic_thread_fence( memory_order_acquire );
+ if ( connection.sockFd == -1 ) {
+ pthread_t t;
+ struct conn_data *cd = malloc( sizeof(*cd) );
+ // We cannot be sure a thread is taking longer than this function runs, so better copy
+ cd->lowerImage = strdup( lowerImage );
+ cd->rid = rid;
+ cd->idx = altIndex;
+ pendingConnectionAttempts++;
+ if ( ( errno = pthread_create( &t, &threadAttrs, &connectThread, (void*)cd ) ) != 0 ) {
+ pendingConnectionAttempts--;
+ logadd( LOG_ERROR, "Could not create connect thread %d, errno=%d", cd->idx, errno );
+ free( cd->lowerImage );
+ free( cd );
+ continue;
}
- connection.panicSignal = signal_new();
- timing_get( &connection.startupTime );
- connection.sockFd = sock;
- requests.head = NULL;
- requests.tail = NULL;
- requestAltServers();
- break;
- }
- // Failed
- if ( sock != -1 ) {
- close( sock );
- sock = -1;
+ struct timespec timeout;
+ clock_gettime( CLOCK_REALTIME, &timeout );
+ timeout.tv_nsec += 200 * 1000 * 1000;
+ if ( timeout.tv_nsec >= 1000 * 1000 * 1000 ) {
+ timeout.tv_nsec -= 1000 * 1000 * 1000;
+ timeout.tv_sec += 1;
+ }
+ pthread_cond_timedwait( &condConn, &mutexCondConn, &timeout );
}
+ // End async connect
+ altIndex += 1;
}
- if ( sock != -1 ) {
- connectionInitDone = true;
- }
+ current = end + 1;
+ } while ( end != NULL && altIndex < MAX_ALTS );
+ logadd( LOG_INFO, "Got %d servers from init call", altIndex );
+ // Wait a maximum of five seconds if we're not connected yet
+ if ( connection.sockFd == -1 && pendingConnectionAttempts > 0 ) {
+ struct timespec end;
+ clock_gettime( CLOCK_REALTIME, &end );
+ end.tv_sec += 5;
+ pthread_cond_timedwait( &condConn, &mutexCondConn, &end );
+ }
+ pthread_mutex_unlock( &mutexCondConn );
+ pthread_attr_destroy( &threadAttrs );
+ if ( connection.sockFd != -1 ) {
+ connectionInitDone = true;
}
pthread_mutex_unlock( &mutexInit );
- sock_destroyPollList( cons );
- return sock != -1;
+ return connectionInitDone;
+}
+
+static void* connectThread(void * data)
+{
+ struct conn_data *cd = (struct conn_data*)data;
+ int idx = cd->idx;
+ int sock = -1;
+ serialized_buffer_t buffer;
+ uint16_t remoteVersion, remoteRid;
+ char *remoteName;
+ uint64_t remoteSize;
+ char host[SHORTBUF];
+ struct sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+
+ if ( idx < 0 || idx >= MAX_ALTS || altservers[idx].host.type == 0 ) {
+ logadd( LOG_ERROR, "BUG: Index out of range, or empty server in connect thread (%d)", idx );
+ goto bailout;
+ }
+
+ sock_printHost( &altservers[idx].host, host, sizeof(host) );
+ logadd( LOG_INFO, "Trying to connect to %s", host );
+ sock = sock_connect( &altservers[idx].host, 1500, SOCKET_TIMEOUT_RECV * 1000 );
+ if ( sock == -1 ) {
+ logadd( LOG_INFO, "[%s] Connection failed", host );
+ goto bailout;
+ }
+
+ salen = sizeof( sa );
+ if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
+ logadd( LOG_ERROR, "[%s] getpeername on successful connection failed!? (errno=%d)", host, errno );
+ goto bailout;
+ }
+ atomic_thread_fence( memory_order_acquire );
+ if ( connection.sockFd != -1 )
+ goto bailout;
+
+ sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) );
+ logadd( LOG_INFO, "[%s] Connected", host );
+ if ( !dnbd3_select_image( sock, cd->lowerImage, cd->rid, 0 ) ) {
+ logadd( LOG_ERROR, "[%s] Could not send select image", host );
+ goto bailout;
+ }
+
+ if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
+ logadd( LOG_ERROR, "[%s] Could not read select image reply (%d)", host, errno );
+ goto bailout;
+ }
+ atomic_thread_fence( memory_order_acquire );
+ if ( connection.sockFd != -1 )
+ goto bailout;
+
+ if ( cd->rid != 0 && cd->rid != remoteRid ) {
+ logadd( LOG_ERROR, "[%s] rid mismatch (want: %d, got: %d)",
+ host, (int)cd->rid, (int)remoteRid );
+ goto bailout;
+ }
+ // Seems we got a winner
+ pthread_mutex_lock( &mutexCondConn );
+ if ( connection.sockFd != -1 || connectionInitDone ) {
+ pthread_mutex_unlock( &mutexCondConn );
+ logadd( LOG_INFO, "[%s] Raced by other connection", host );
+ goto bailout;
+ }
+ logadd( LOG_INFO, "Requested: '%s:%d'", cd->lowerImage, (int)cd->rid );
+ logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid );
+ image.name = strdup( remoteName );
+ image.rid = remoteRid;
+ image.size = remoteSize;
+ connection.currentServer = altservers[idx].host;
+ connection.panicSignal = signal_new();
+ timing_get( &connection.startupTime );
+ requests.head = NULL;
+ requests.tail = NULL;
+ if ( learnNewServers && !sendAltServerRequest( sock ) )
+ goto bailout;
+ // Everything good, tell main connect function
+ connection.sockFd = sock;
+ atomic_thread_fence( memory_order_release );
+ pendingConnectionAttempts--;
+ if ( idx != 0 ) {
+ // Make server first in list - enough to swap host, other data has not changed yet
+ lock_write( &altLock );
+ dnbd3_host_t tmp = altservers[idx].host;
+ altservers[idx].host = altservers[0].host;
+ altservers[0].host = tmp;
+ unlock_rw( &altLock );
+ }
+ pthread_cond_signal( &condConn );
+ pthread_mutex_unlock( &mutexCondConn );
+ return NULL;
+
+bailout:
+ if ( sock != -1 ) {
+ close( sock );
+ }
+ free( cd->lowerImage );
+ free( cd );
+ // Last one has to wake up main thread, which is waiting for up to 5 seconds for
+ // any connect thread to succeed. If none succeeded, there is no point in waiting
+ // any longer.
+ if ( --pendingConnectionAttempts == 0 ) {
+ pthread_mutex_lock( &mutexCondConn );
+ pthread_cond_signal( &condConn );
+ pthread_mutex_unlock( &mutexCondConn );
+ }
+ return NULL;
}
bool connection_initThreads()
@@ -509,7 +616,20 @@ static void* connection_backgroundThread( void *something UNUSED )
// Private quick helpers
-static void addAltServers()
+/**
+ * Check if given host is in list of altsevers.
+ * Does not lock 'altLock', do so at caller site.
+ */
+static bool hasAltServer( dnbd3_host_t *host )
+{
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( isSameAddress( host, &altservers[eIdx].host ) )
+ return true;
+ }
+ return false;
+}
+
+static void addAltServers( void )
{
pthread_mutex_lock( &newAltLock );
lock_write( &altLock );
@@ -517,11 +637,8 @@ static void addAltServers()
if ( newservers[nIdx].host.type == 0 )
continue;
// Got a new alt server, see if it's already known
- for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
- if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) {
- goto skip_server;
- }
- }
+ if ( hasAltServer( &newservers[nIdx].host ) )
+ continue;
// Not known yet, add - find free slot
int slot = -1;
for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
@@ -546,8 +663,6 @@ static void addAltServers()
altservers[slot].host = newservers[nIdx].host;
altservers[slot].liveRtt = 0;
}
-skip_server:
- ;
}
memset( newservers, 0, sizeof( newservers ) );
unlock_rw( &altLock );
@@ -875,21 +990,27 @@ static void switchConnection( int sockFd, alt_server_t *srv )
/**
* Does not lock, so get the sendMutex first!
*/
-static void requestAltServers()
+static void requestAltServers( void )
{
if ( connection.sockFd == -1 || !learnNewServers )
return;
- dnbd3_request_t request = { 0 };
- request.magic = dnbd3_packet_magic;
- request.cmd = CMD_GET_SERVERS;
- fixup_request( request );
- if ( sock_sendAll( connection.sockFd, &request, sizeof( request ), 2 ) != (ssize_t)sizeof( request ) ) {
- logadd( LOG_WARNING, "Connection failed while requesting alt server list" );
+ if ( !sendAltServerRequest( connection.sockFd ) ) {
+ logadd( LOG_WARNING, "Main connection failed while requesting alt server list" );
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
}
}
+static bool sendAltServerRequest( int sock )
+{
+ dnbd3_request_t request = {
+ .magic = dnbd3_packet_magic,
+ .cmd = CMD_GET_SERVERS,
+ };
+ fixup_request( request );
+ return sock_sendAll( sock, &request, sizeof( request ), 2 ) == (ssize_t)sizeof( request );
+}
+
static bool throwDataAway( int sockFd, uint32_t amount )
{
size_t done = 0;