diff options
Diffstat (limited to 'src/fuse/connection.c')
-rw-r--r-- | src/fuse/connection.c | 542 |
1 files changed, 349 insertions, 193 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c index fc9f05b..e760d98 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -1,19 +1,21 @@ #include "connection.h" #include "helper.h" -#include "../clientconfig.h" -#include "../shared/protocol.h" -#include "../shared/fdsignal.h" -#include "../shared/sockhelper.h" -#include "../shared/log.h" +#include <dnbd3/config/client.h> +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/fdsignal.h> +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/shared/log.h> #include <stdlib.h> #include <pthread.h> #include <string.h> #include <stdio.h> +#include <stdatomic.h> #include <unistd.h> #include <errno.h> #include <time.h> #include <inttypes.h> +#include <signal.h> /* Constants */ static const size_t SHORTBUF = 100; @@ -30,9 +32,18 @@ static const int FAIL_BACKOFF_START_COUNT = 8; static bool connectionInitDone = false; static bool threadInitDone = false; static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; -static bool keepRunning = true; +// For multi-threaded concurrent connection during init +static pthread_mutex_t mutexCondConn = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t condConn = PTHREAD_COND_INITIALIZER; +static atomic_int pendingConnectionAttempts = 0; +// Shutdown flag +atomic_bool keepRunning = true; +// Should we learn new alt-servers from servers we connect to? static bool learnNewServers; +static pthread_t tidReceiver; +static pthread_t tidBackground; + // List of pending requests static struct { dnbd3_async_t *head; @@ -55,15 +66,21 @@ static struct { ticks startupTime; } connection; +struct conn_data { + char *lowerImage; + uint16_t rid; + int idx; +}; + // Known alt servers typedef struct _alt_server { dnbd3_host_t host; - int consecutiveFails; - int rtt; + atomic_int consecutiveFails; + atomic_int rtt; int rtts[RTT_COUNT]; int rttIndex; - int bestCount; - int liveRtt; + atomic_int bestCount; + atomic_int liveRtt; } alt_server_t; static dnbd3_server_entry_t newservers[MAX_ALTS]; @@ -83,136 +100,232 @@ static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER; /* Static methods */ -static void* connection_receiveThreadMain(void *sock); -static void* connection_backgroundThread(void *something); +static void* connectThread(void * data); +static void* connection_receiveThreadMain( void *sock ); +static void* connection_backgroundThread( void *something ); -static void addAltServers(); +static bool hasAltServer( dnbd3_host_t *host ); +static void addAltServers( void ); static void sortAltServers(); static void probeAltServers(); -static void switchConnection(int sockFd, alt_server_t *srv); -static void requestAltServers(); -static bool throwDataAway(int sockFd, uint32_t amount); +static void switchConnection( int sockFd, alt_server_t *srv ); +static void requestAltServers( void ); +static bool sendAltServerRequest( int sock ); +static bool throwDataAway( int sockFd, uint32_t amount ); + +static void enqueueRequest( dnbd3_async_t *request ); +static dnbd3_async_t* removeRequest( dnbd3_async_t *request ); -static void enqueueRequest(dnbd3_async_t *request); -static dnbd3_async_t* removeRequest(dnbd3_async_t *request); +static void blockSignals(); -bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew) +bool connection_init( const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew ) { - int sock = -1; char host[SHORTBUF]; - size_t hlen; - serialized_buffer_t buffer; - uint16_t remoteVersion, remoteRid; - char *remoteName; - uint64_t remoteSize; - struct sockaddr_storage sa; - socklen_t salen; - poll_list_t *cons = sock_newPollList(); + dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; + const char *current, *end; + int altIndex = 0; timing_setBase(); pthread_mutex_lock( &mutexInit ); - if ( !connectionInitDone && keepRunning ) { - dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; - const char *current, *end; - int altIndex = 0; - learnNewServers = doLearnNew; - memset( altservers, 0, sizeof altservers ); - connection.sockFd = -1; - current = hosts; - do { - // Get next host from string - while ( *current == ' ' ) current++; - end = strchr( current, ' ' ); - size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); - if ( len > SHORTBUF ) len = SHORTBUF; - snprintf( host, len, "%s", current ); - int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); - for ( int i = 0; i < newHosts; ++i ) { - if ( altIndex >= MAX_ALTS ) - break; - altservers[altIndex].host = tempHosts[i]; - altIndex += 1; - } - current = end + 1; - } while ( end != NULL && altIndex < MAX_ALTS ); - logadd( LOG_INFO, "Got %d servers from init call", altIndex ); - // Connect - for ( int i = 0; i < altIndex + 5; ++i ) { - if ( i >= altIndex ) { - // Additional iteration - no corresponding slot in altservers, this - // is just so we can make a final calls with longer timeout - sock = sock_multiConnect( cons, NULL, 400, 1000 ); - if ( sock == -2 ) { - logadd( LOG_ERROR, "Could not connect to any host" ); - sock = -1; - break; - } - } else { - if ( altservers[i].host.type == 0 ) - continue; - // Try to connect - 100ms timeout - sock = sock_multiConnect( cons, &altservers[i].host, 100, 1000 ); - } - if ( sock == -2 || sock == -1 ) - continue; - salen = sizeof(sa); - if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) { - logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno ); - close( sock ); - sock = -1; + if ( connectionInitDone ) { + pthread_mutex_unlock( &mutexInit ); + return false; + } + learnNewServers = doLearnNew; + memset( altservers, 0, sizeof altservers ); + connection.sockFd = -1; + current = hosts; + pthread_attr_t threadAttrs; + pthread_attr_init( &threadAttrs ); + pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); + // Resolve all hosts and connect + pthread_mutex_lock( &mutexCondConn ); + do { + // Get next host from string + while ( *current == ' ' || *current == '\t' || *current == '\n' ) { + current++; + } + end = current; + while ( *end != ' ' && *end != '\t' && *end != '\n' && *end != '\0' ) { + end++; + } + if ( end == current ) + break; + size_t len = (size_t)( end - current ) + 1; + if ( len > SHORTBUF ) { + len = SHORTBUF; + } + snprintf( host, len, "%s", current ); + int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); + for ( int i = 0; i < newHosts; ++i ) { + if ( altIndex >= MAX_ALTS ) + break; + if ( hasAltServer( &tempHosts[i] ) ) continue; - } - hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) ); - logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host ); - if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) { - logadd( LOG_ERROR, "Could not send select image" ); - } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { - logadd( LOG_ERROR, "Could not read select image reply (%d)", errno ); - } else if ( rid != 0 && rid != remoteRid ) { - logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid ); - } else { - logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid ); - logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid ); - sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 ); - image.name = strdup( remoteName ); - image.rid = remoteRid; - image.size = remoteSize; - if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) { - logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" ); - connection.currentServer.type = 0; + altservers[altIndex].host = tempHosts[i]; + // Start thread for async connect if not connected yet + atomic_thread_fence( memory_order_acquire ); + if ( connection.sockFd == -1 ) { + pthread_t t; + struct conn_data *cd = malloc( sizeof(*cd) ); + // We cannot be sure a thread is taking longer than this function runs, so better copy + cd->lowerImage = strdup( lowerImage ); + cd->rid = rid; + cd->idx = altIndex; + pendingConnectionAttempts++; + if ( ( errno = pthread_create( &t, &threadAttrs, &connectThread, (void*)cd ) ) != 0 ) { + pendingConnectionAttempts--; + logadd( LOG_ERROR, "Could not create connect thread %d, errno=%d", cd->idx, errno ); + free( cd->lowerImage ); + free( cd ); + continue; } - connection.panicSignal = signal_new(); - timing_get( &connection.startupTime ); - connection.sockFd = sock; - requests.head = NULL; - requests.tail = NULL; - requestAltServers(); - break; - } - // Failed - if ( sock != -1 ) { - close( sock ); - sock = -1; + struct timespec timeout; + clock_gettime( CLOCK_REALTIME, &timeout ); + timeout.tv_nsec += 200 * 1000 * 1000; + if ( timeout.tv_nsec >= 1000 * 1000 * 1000 ) { + timeout.tv_nsec -= 1000 * 1000 * 1000; + timeout.tv_sec += 1; + } + pthread_cond_timedwait( &condConn, &mutexCondConn, &timeout ); } + // End async connect + altIndex += 1; } - if ( sock != -1 ) { - connectionInitDone = true; - } + current = end + 1; + } while ( *end != '\0' && altIndex < MAX_ALTS ); + logadd( LOG_INFO, "Got %d servers from init call", altIndex ); + // Wait a maximum of five seconds if we're not connected yet + if ( connection.sockFd == -1 && pendingConnectionAttempts > 0 ) { + struct timespec end; + clock_gettime( CLOCK_REALTIME, &end ); + end.tv_sec += 5; + pthread_cond_timedwait( &condConn, &mutexCondConn, &end ); + } + pthread_mutex_unlock( &mutexCondConn ); + pthread_attr_destroy( &threadAttrs ); + if ( connection.sockFd != -1 ) { + connectionInitDone = true; } pthread_mutex_unlock( &mutexInit ); - sock_destroyPollList( cons ); - return sock != -1; + return connectionInitDone; +} + +static void* connectThread(void * data) +{ + struct conn_data *cd = (struct conn_data*)data; + int idx = cd->idx; + int sock = -1; + serialized_buffer_t buffer; + uint16_t remoteVersion, remoteRid; + char *remoteName; + uint64_t remoteSize; + char host[SHORTBUF]; + struct sockaddr_storage sa; + socklen_t salen = sizeof(sa); + + if ( idx < 0 || idx >= MAX_ALTS || altservers[idx].host.type == 0 ) { + logadd( LOG_ERROR, "BUG: Index out of range, or empty server in connect thread (%d)", idx ); + goto bailout; + } + + sock_printHost( &altservers[idx].host, host, sizeof(host) ); + logadd( LOG_INFO, "Trying to connect to %s", host ); + sock = sock_connect( &altservers[idx].host, 1500, SOCKET_TIMEOUT_RECV * 1000 ); + if ( sock == -1 ) { + logadd( LOG_INFO, "[%s] Connection failed", host ); + goto bailout; + } + + salen = sizeof( sa ); + if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) { + logadd( LOG_ERROR, "[%s] getpeername on successful connection failed!? (errno=%d)", host, errno ); + goto bailout; + } + atomic_thread_fence( memory_order_acquire ); + if ( connection.sockFd != -1 ) + goto bailout; + + sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) ); + logadd( LOG_INFO, "[%s] Connected", host ); + if ( !dnbd3_select_image( sock, cd->lowerImage, cd->rid, 0 ) ) { + logadd( LOG_ERROR, "[%s] Could not send select image", host ); + goto bailout; + } + + if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { + logadd( LOG_ERROR, "[%s] Could not read select image reply (%d)", host, errno ); + goto bailout; + } + atomic_thread_fence( memory_order_acquire ); + if ( connection.sockFd != -1 ) + goto bailout; + + if ( cd->rid != 0 && cd->rid != remoteRid ) { + logadd( LOG_ERROR, "[%s] rid mismatch (want: %d, got: %d)", + host, (int)cd->rid, (int)remoteRid ); + goto bailout; + } + // Seems we got a winner + pthread_mutex_lock( &mutexCondConn ); + if ( connection.sockFd != -1 || connectionInitDone ) { + pthread_mutex_unlock( &mutexCondConn ); + logadd( LOG_INFO, "[%s] Raced by other connection", host ); + goto bailout; + } + logadd( LOG_INFO, "Requested: '%s:%d'", cd->lowerImage, (int)cd->rid ); + logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid ); + image.name = strdup( remoteName ); + image.rid = remoteRid; + image.size = remoteSize; + connection.currentServer = altservers[idx].host; + connection.panicSignal = signal_new(); + timing_get( &connection.startupTime ); + requests.head = NULL; + requests.tail = NULL; + if ( learnNewServers && !sendAltServerRequest( sock ) ) + goto bailout; + // Everything good, tell main connect function + connection.sockFd = sock; + atomic_thread_fence( memory_order_release ); + pendingConnectionAttempts--; + if ( idx != 0 ) { + // Make server first in list - enough to swap host, other data has not changed yet + lock_write( &altLock ); + dnbd3_host_t tmp = altservers[idx].host; + altservers[idx].host = altservers[0].host; + altservers[0].host = tmp; + unlock_rw( &altLock ); + } + pthread_cond_signal( &condConn ); + pthread_mutex_unlock( &mutexCondConn ); + return NULL; + +bailout: + if ( sock != -1 ) { + close( sock ); + } + free( cd->lowerImage ); + free( cd ); + // Last one has to wake up main thread, which is waiting for up to 5 seconds for + // any connect thread to succeed. If none succeeded, there is no point in waiting + // any longer. + if ( --pendingConnectionAttempts == 0 ) { + pthread_mutex_lock( &mutexCondConn ); + pthread_cond_signal( &condConn ); + pthread_mutex_unlock( &mutexCondConn ); + } + return NULL; } bool connection_initThreads() { pthread_mutex_lock( &mutexInit ); - if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) { + if ( !connectionInitDone || threadInitDone || connection.sockFd == -1 ) { pthread_mutex_unlock( &mutexInit ); return false; } bool success = true; - pthread_t thread; threadInitDone = true; logadd( LOG_DEBUG1, "Initializing stuff" ); if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 @@ -220,10 +333,10 @@ bool connection_initThreads() logadd( LOG_ERROR, "Mutex or spinlock init failure" ); success = false; } else { - if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) { + if ( pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) { logadd( LOG_ERROR, "Could not create receive thread" ); success = false; - } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) { + } else if ( pthread_create( &tidBackground, NULL, &connection_backgroundThread, NULL ) != 0 ) { logadd( LOG_ERROR, "Could not create background thread" ); success = false; } @@ -241,7 +354,7 @@ uint64_t connection_getImageSize() return image.size; } -bool connection_read(dnbd3_async_t *request) +bool connection_read( dnbd3_async_t *request ) { if ( !connectionInitDone ) return false; pthread_mutex_lock( &connection.sendMutex ); @@ -250,9 +363,7 @@ bool connection_read(dnbd3_async_t *request) if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) { shutdown( connection.sockFd, SHUT_RDWR ); connection.sockFd = -1; - pthread_mutex_unlock( &connection.sendMutex ); signal_call( connection.panicSignal ); - return true; } } pthread_mutex_unlock( &connection.sendMutex ); @@ -261,24 +372,36 @@ bool connection_read(dnbd3_async_t *request) void connection_close() { - if ( keepRunning ) { - logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" ); - } + static bool signalled = false; + logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" ); pthread_mutex_lock( &mutexInit ); keepRunning = false; + if ( threadInitDone && !signalled ) { + signalled = true; + pthread_kill( tidReceiver, SIGHUP ); + pthread_kill( tidBackground, SIGHUP ); + } + pthread_mutex_unlock( &mutexInit ); if ( !connectionInitDone ) { - pthread_mutex_unlock( &mutexInit ); return; } - pthread_mutex_unlock( &mutexInit ); pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { + logadd( LOG_DEBUG1, "Shutting down socket..." ); shutdown( connection.sockFd, SHUT_RDWR ); } pthread_mutex_unlock( &connection.sendMutex ); } -size_t connection_printStats(char *buffer, const size_t len) +void connection_join() +{ + if ( !threadInitDone ) + return; + pthread_join( tidReceiver, NULL ); + pthread_join( tidBackground, NULL ); +} + +size_t connection_printStats( char *buffer, const size_t len ) { int ret; size_t remaining = len; @@ -308,7 +431,7 @@ size_t connection_printStats(char *buffer, const size_t len) *buffer++ = ' '; } const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining ); - remaining -= (addrlen + 1); // For space or * above + remaining -= ( addrlen + 1 ); // For space or * above buffer += addrlen; if ( remaining < 3 ) break; @@ -324,7 +447,7 @@ size_t connection_printStats(char *buffer, const size_t len) width += 3; } ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n", - width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt ); + width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt ); if ( ret < 0 ) { ret = 0; } @@ -339,23 +462,23 @@ size_t connection_printStats(char *buffer, const size_t len) return len - remaining; } -static void* connection_receiveThreadMain(void *sockPtr) +static void* connection_receiveThreadMain( void *sockPtr ) { int sockFd = (int)(size_t)sockPtr; dnbd3_reply_t reply; - pthread_detach( pthread_self() ); + blockSignals(); while ( keepRunning ) { int ret; do { ret = dnbd3_read_reply( sockFd, &reply, true ); + if ( !keepRunning ) goto fail; if ( ret == REPLY_OK ) break; } while ( ret == REPLY_INTR || ret == REPLY_AGAIN ); if ( ret != REPLY_OK ) { logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret ); goto fail; } - if ( reply.cmd == CMD_GET_BLOCK ) { // Get block reply. find matching request dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); @@ -390,10 +513,8 @@ static void* connection_receiveThreadMain(void *sockPtr) } unlock_rw( &altLock ); } - // Success, wake up caller - request->success = true; - request->finished = true; - signal_call( request->signal ); + fuse_reply_buf( request->fuse_req, request->buffer, request->length ); + free( request ); } } else if ( reply.cmd == CMD_GET_SERVERS ) { // List of known alt servers @@ -416,7 +537,6 @@ static void* connection_receiveThreadMain(void *sockPtr) } } } - logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" ); fail:; // Make sure noone is trying to use the socket for sending by locking, pthread_mutex_lock( &connection.sendMutex ); @@ -424,7 +544,9 @@ fail:; // as someone could have established a new connection already if ( connection.sockFd == sockFd ) { connection.sockFd = -1; - signal_call( connection.panicSignal ); + if ( keepRunning ) { + signal_call( connection.panicSignal ); + } } pthread_mutex_unlock( &connection.sendMutex ); // As we're the only reader, it's safe to close the socket now @@ -432,11 +554,12 @@ fail:; return NULL; } -static void* connection_backgroundThread(void *something UNUSED) +static void* connection_backgroundThread( void *something UNUSED ) { ticks nextKeepalive; ticks nextRttCheck; + blockSignals(); timing_get( &nextKeepalive ); nextRttCheck = nextKeepalive; while ( keepRunning ) { @@ -446,6 +569,8 @@ static void* connection_backgroundThread(void *something UNUSED) uint32_t wt2 = timing_diffMs( &now, &nextRttCheck ); if ( wt1 > 0 && wt2 > 0 ) { int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 ); + if ( !keepRunning ) + break; if ( waitRes == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno ); } @@ -460,20 +585,20 @@ static void* connection_backgroundThread(void *something UNUSED) } sortAltServers(); probeAltServers(); - if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) { + if ( panic || timing_diff( &connection.startupTime, &now ) <= DISCOVER_STARTUP_PHASE_COUNT * TIMER_INTERVAL_PROBE_STARTUP ) { timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP ); } else { - timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL ); + timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_MAX ); } } // Send keepalive packet if ( timing_reachedPrecise( &nextKeepalive, &now ) ) { pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { - dnbd3_request_t request; - request.magic = dnbd3_packet_magic; - request.cmd = CMD_KEEPALIVE; - request.handle = request.offset = request.size = 0; + dnbd3_request_t request = { + .magic = dnbd3_packet_magic, + .cmd = CMD_KEEPALIVE, + }; fixup_request( request ); ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 ); if ( (size_t)ret != sizeof request ) { @@ -483,7 +608,7 @@ static void* connection_backgroundThread(void *something UNUSED) } } pthread_mutex_unlock( &connection.sendMutex ); - timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET ); + timing_addSeconds( &nextKeepalive, &now, KEEPALIVE_INTERVAL ); } } return NULL; @@ -491,7 +616,20 @@ static void* connection_backgroundThread(void *something UNUSED) // Private quick helpers -static void addAltServers() +/** + * Check if given host is in list of altsevers. + * Does not lock 'altLock', do so at caller site. + */ +static bool hasAltServer( dnbd3_host_t *host ) +{ + for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { + if ( isSameAddress( host, &altservers[eIdx].host ) ) + return true; + } + return false; +} + +static void addAltServers( void ) { pthread_mutex_lock( &newAltLock ); lock_write( &altLock ); @@ -499,11 +637,8 @@ static void addAltServers() if ( newservers[nIdx].host.type == 0 ) continue; // Got a new alt server, see if it's already known - for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { - if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) { - goto skip_server; - } - } + if ( hasAltServer( &newservers[nIdx].host ) ) + continue; // Not known yet, add - find free slot int slot = -1; for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { @@ -528,9 +663,8 @@ static void addAltServers() altservers[slot].host = newservers[nIdx].host; altservers[slot].liveRtt = 0; } -skip_server:; } - memset( newservers, 0, sizeof(newservers) ); + memset( newservers, 0, sizeof( newservers ) ); unlock_rw( &altLock ); pthread_mutex_unlock( &newAltLock ); } @@ -604,7 +738,7 @@ static void probeAltServers() pthread_spin_lock( &requests.lock ); if ( requests.head != NULL ) { if ( !panic && current != NULL ) { - const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second + const uint64_t maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second dnbd3_async_t *iterator; for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { // A request with measurement tag is pending @@ -626,7 +760,7 @@ static void probeAltServers() } lock_read( &altLock ); - for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) { + for ( int altIndex = 0; altIndex < ( panic ? MAX_ALTS : MAX_ALTS_ACTIVE ); ++altIndex ) { alt_server_t * const srv = &altservers[altIndex]; if ( srv->host.type == 0 ) continue; @@ -634,65 +768,65 @@ static void probeAltServers() && rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) { continue; } + srv->rttIndex += 1; if ( srv->rttIndex >= RTT_COUNT ) { srv->rttIndex = 0; - } else { - srv->rttIndex += 1; } // Probe + char hstr[100]; + sock_printHost( &srv->host, hstr, 100 ); ticks start; timing_get( &start ); errno = 0; int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); if ( sock == -1 ) { - logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno ); + logadd( LOG_DEBUG1, "%s probe: Could not connect for probing. errno = %d", hstr, errno ); goto fail; } if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { - logadd( LOG_DEBUG1, "probe: select_image failed" ); + logadd( LOG_DEBUG1, "%s probe: select_image failed (sock=%d, errno=%d)", hstr, sock, errno ); goto fail; } - if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) { - logadd( LOG_DEBUG1, "probe: select image reply failed" ); + if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize ) ) { + logadd( LOG_DEBUG1, "%s probe: select image reply failed", hstr ); goto fail; } if ( remoteProto < MIN_SUPPORTED_SERVER ) { - logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto ); + logadd( LOG_WARNING, "%s probe: Unsupported remote version (local: %d, remote: %d)", hstr, (int)PROTOCOL_VERSION, (int)remoteProto ); srv->consecutiveFails += 10; goto fail; } if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) { - logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName ); + logadd( LOG_WARNING, "%s probe: Remote rid or name mismatch (got '%s')", hstr, remoteName ); srv->consecutiveFails += 10; goto fail; } if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) { - logadd( LOG_DEBUG1, "-> block request fail" ); + logadd( LOG_DEBUG1, "%s probe: -> block request fail", hstr ); goto fail; } int a = 111; - if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) { - logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size ); + if ( !( a = dnbd3_get_reply( sock, &reply ) ) || reply.size != testLength ) { + logadd( LOG_DEBUG1, "%s probe: <- get block reply fail %d %d", hstr, a, (int)reply.size ); goto fail; } if ( request != NULL && removeRequest( request ) != NULL ) { // Request successfully removed from queue const ssize_t ret = sock_recv( sock, request->buffer, request->length ); if ( ret != (ssize_t)request->length ) { - logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" ); + logadd( LOG_DEBUG1, "%s probe: receiving payload for a block reply failed", hstr ); // Failure, add to queue again connection_read( request ); goto fail; } - // Success, wake up caller - logadd( LOG_DEBUG1, "[RTT] Successful direct probe" ); - request->success = true; - request->finished = true; - signal_call( request->signal ); + // Success, reply to fuse + fuse_reply_buf( request->fuse_req, request->buffer, request->length ); + free( request ); + logadd( LOG_DEBUG1, "%s probe: Successful direct probe", hstr ); } else { // Wasn't a request that's in our request queue if ( !throwDataAway( sock, testLength ) ) { - logadd( LOG_DEBUG1, "<- get block reply payload fail" ); + logadd( LOG_DEBUG1, "%s probe: <- get block reply payload fail", hstr ); goto fail; } } @@ -701,7 +835,7 @@ static void probeAltServers() // Panic mode? Just switch to server if ( panic ) { unlock_rw( &altLock ); - switchConnection( sock, srv ); + if ( keepRunning ) switchConnection( sock, srv ); return; } // Non-panic mode: @@ -733,7 +867,8 @@ static void probeAltServers() close( sock ); } continue; -fail:; +fail: + ; if ( sock != -1 ) { close( sock ); } @@ -774,7 +909,7 @@ fail:; // Regular logic: Apply threshold when considering switch if ( !doSwitch && current != NULL ) { doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD - || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000; + || RTT_THRESHOLD_FACTOR( current->rtt ) > best->rtt + 1000; } } // Switch if a better server was found @@ -796,11 +931,10 @@ fail:; } } -static void switchConnection(int sockFd, alt_server_t *srv) +static void switchConnection( int sockFd, alt_server_t *srv ) { - pthread_t thread; struct sockaddr_storage addr; - socklen_t addrLen = sizeof(addr); + socklen_t addrLen = sizeof( addr ); char message[200] = "Connection switched to "; const size_t len = strlen( message ); int ret; @@ -829,9 +963,10 @@ static void switchConnection(int sockFd, alt_server_t *srv) signal_call( connection.panicSignal ); return; } + pthread_detach( tidReceiver ); timing_get( &connection.startupTime ); - pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd ); - sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len ); + pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)sockFd ); + sock_printable( (struct sockaddr*)&addr, sizeof( addr ), message + len, sizeof( message ) - len ); logadd( LOG_INFO, "%s", message ); // resend queue if ( queue != NULL ) { @@ -855,22 +990,28 @@ static void switchConnection(int sockFd, alt_server_t *srv) /** * Does not lock, so get the sendMutex first! */ -static void requestAltServers() +static void requestAltServers( void ) { if ( connection.sockFd == -1 || !learnNewServers ) return; - dnbd3_request_t request = { 0 }; - request.magic = dnbd3_packet_magic; - request.cmd = CMD_GET_SERVERS; - fixup_request( request ); - if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) { - logadd( LOG_WARNING, "Connection failed while requesting alt server list" ); + if ( !sendAltServerRequest( connection.sockFd ) ) { + logadd( LOG_WARNING, "Main connection failed while requesting alt server list" ); shutdown( connection.sockFd, SHUT_RDWR ); connection.sockFd = -1; } } -static bool throwDataAway(int sockFd, uint32_t amount) +static bool sendAltServerRequest( int sock ) +{ + dnbd3_request_t request = { + .magic = dnbd3_packet_magic, + .cmd = CMD_GET_SERVERS, + }; + fixup_request( request ); + return sock_sendAll( sock, &request, sizeof( request ), 2 ) == (ssize_t)sizeof( request ); +} + +static bool throwDataAway( int sockFd, uint32_t amount ) { size_t done = 0; char tempBuffer[SHORTBUF]; @@ -883,11 +1024,9 @@ static bool throwDataAway(int sockFd, uint32_t amount) return true; } -static void enqueueRequest(dnbd3_async_t *request) +static void enqueueRequest( dnbd3_async_t *request ) { request->next = NULL; - request->finished = false; - request->success = false; //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line ); // Measure latency and add to switch formula timing_get( &request->time ); @@ -901,7 +1040,7 @@ static void enqueueRequest(dnbd3_async_t *request) pthread_spin_unlock( &requests.lock ); } -static dnbd3_async_t* removeRequest(dnbd3_async_t *request) +static dnbd3_async_t* removeRequest( dnbd3_async_t *request ) { pthread_spin_lock( &requests.lock ); //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line ); @@ -925,3 +1064,20 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request) return iterator; } +static void blockSignals() +{ + sigset_t sigmask; + if ( pthread_sigmask( 0, NULL, &sigmask ) == -1 ) { + logadd( LOG_WARNING, "Cannot get current sigmask of thread" ); + sigemptyset( &sigmask ); + } + sigaddset( &sigmask, SIGUSR1 ); + sigaddset( &sigmask, SIGUSR2 ); + sigaddset( &sigmask, SIGPIPE ); + sigaddset( &sigmask, SIGINT ); + sigaddset( &sigmask, SIGTERM ); + sigdelset( &sigmask, SIGHUP ); + if ( pthread_sigmask( SIG_SETMASK, &sigmask, NULL ) == -1 ) { + logadd( LOG_WARNING, "Cannot set sigmask of thread" ); + } +} |