diff options
Diffstat (limited to 'src/fuse')
-rw-r--r-- | src/fuse/connection.c | 79 | ||||
-rw-r--r-- | src/fuse/connection.h | 4 | ||||
-rw-r--r-- | src/fuse/main.c | 11 |
3 files changed, 70 insertions, 24 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c index c4f8de3..e7787e7 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -15,6 +15,7 @@ #include <errno.h> #include <time.h> #include <inttypes.h> +#include <signal.h> /* Constants */ static const size_t SHORTBUF = 100; @@ -34,6 +35,9 @@ static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; atomic_bool keepRunning = true; static bool learnNewServers; +static pthread_t tidReceiver; +static pthread_t tidBackground; + // List of pending requests static struct { dnbd3_async_t *head; @@ -97,6 +101,8 @@ static bool throwDataAway( int sockFd, uint32_t amount ); static void enqueueRequest( dnbd3_async_t *request ); static dnbd3_async_t* removeRequest( dnbd3_async_t *request ); +static void blockSignals(); + bool connection_init( const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew ) { int sock = -1; @@ -213,7 +219,6 @@ bool connection_initThreads() return false; } bool success = true; - pthread_t thread; threadInitDone = true; logadd( LOG_DEBUG1, "Initializing stuff" ); if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 @@ -221,10 +226,10 @@ bool connection_initThreads() logadd( LOG_ERROR, "Mutex or spinlock init failure" ); success = false; } else { - if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) { + if ( pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) { logadd( LOG_ERROR, "Could not create receive thread" ); success = false; - } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) { + } else if ( pthread_create( &tidBackground, NULL, &connection_backgroundThread, NULL ) != 0 ) { logadd( LOG_ERROR, "Could not create background thread" ); success = false; } @@ -280,6 +285,22 @@ void connection_close() logadd( LOG_DEBUG1, "Connection closed." ); } +void connection_signalShutdown() +{ + if ( !threadInitDone ) + return; + pthread_kill( tidReceiver, SIGHUP ); + pthread_kill( tidBackground, SIGHUP ); +} + +void connection_join() +{ + if ( !threadInitDone ) + return; + pthread_join( tidReceiver, NULL ); + pthread_join( tidBackground, NULL ); +} + size_t connection_printStats( char *buffer, const size_t len ) { int ret; @@ -287,7 +308,7 @@ size_t connection_printStats( char *buffer, const size_t len ) declare_now; if ( remaining > 0 ) { ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n", - image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) ); + image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) ); if ( ret < 0 ) { ret = 0; } @@ -345,12 +366,13 @@ static void* connection_receiveThreadMain( void *sockPtr ) { int sockFd = (int)(size_t)sockPtr; dnbd3_reply_t reply; - pthread_detach( pthread_self() ); + blockSignals(); while ( keepRunning ) { int ret; do { ret = dnbd3_read_reply( sockFd, &reply, true ); + if ( !keepRunning ) goto fail; if ( ret == REPLY_OK ) break; } while ( ret == REPLY_INTR || ret == REPLY_AGAIN ); if ( ret != REPLY_OK ) { @@ -425,17 +447,16 @@ static void* connection_receiveThreadMain( void *sockPtr ) } } } - if( !keepRunning ) connection_close(); - logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" ); -fail: - ; +fail:; // Make sure noone is trying to use the socket for sending by locking, pthread_mutex_lock( &connection.sendMutex ); // then just set the fd to -1, but only if it's the same fd as ours, // as someone could have established a new connection already if ( connection.sockFd == sockFd ) { connection.sockFd = -1; - signal_call( connection.panicSignal ); + if ( keepRunning ) { + signal_call( connection.panicSignal ); + } } pthread_mutex_unlock( &connection.sendMutex ); // As we're the only reader, it's safe to close the socket now @@ -445,10 +466,10 @@ fail: static void* connection_backgroundThread( void *something UNUSED ) { - pthread_detach( pthread_self() ); // fixes thread leak after fuse termination ticks nextKeepalive; ticks nextRttCheck; + blockSignals(); timing_get( &nextKeepalive ); nextRttCheck = nextKeepalive; while ( keepRunning ) { @@ -458,6 +479,8 @@ static void* connection_backgroundThread( void *something UNUSED ) uint32_t wt2 = timing_diffMs( &now, &nextRttCheck ); if ( wt1 > 0 && wt2 > 0 ) { int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 ); + if ( !keepRunning ) + break; if ( waitRes == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno ); } @@ -653,56 +676,58 @@ static void probeAltServers() srv->rttIndex += 1; } // Probe + char hstr[100]; + sock_printHost( &srv->host, hstr, 100 ); ticks start; timing_get( &start ); errno = 0; int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); if ( sock == -1 ) { - logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno ); + logadd( LOG_DEBUG1, "%s probe: Could not connect for probing. errno = %d", hstr, errno ); goto fail; } if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { - logadd( LOG_DEBUG1, "probe: select_image failed" ); + logadd( LOG_DEBUG1, "%s probe: select_image failed (sock=%d, errno=%d)", hstr, sock, errno ); goto fail; } if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize ) ) { - logadd( LOG_DEBUG1, "probe: select image reply failed" ); + logadd( LOG_DEBUG1, "%s probe: select image reply failed", hstr ); goto fail; } if ( remoteProto < MIN_SUPPORTED_SERVER ) { - logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto ); + logadd( LOG_WARNING, "%s probe: Unsupported remote version (local: %d, remote: %d)", hstr, (int)PROTOCOL_VERSION, (int)remoteProto ); srv->consecutiveFails += 10; goto fail; } if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) { - logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName ); + logadd( LOG_WARNING, "%s probe: Remote rid or name mismatch (got '%s')", hstr, remoteName ); srv->consecutiveFails += 10; goto fail; } if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) { - logadd( LOG_DEBUG1, "-> block request fail" ); + logadd( LOG_DEBUG1, "%s probe: -> block request fail", hstr ); goto fail; } int a = 111; if ( !( a = dnbd3_get_reply( sock, &reply ) ) || reply.size != testLength ) { - logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size ); + logadd( LOG_DEBUG1, "%s probe: <- get block reply fail %d %d", hstr, a, (int)reply.size ); goto fail; } if ( request != NULL && removeRequest( request ) != NULL ) { // Request successfully removed from queue const ssize_t ret = sock_recv( sock, request->buffer, request->length ); if ( ret != (ssize_t)request->length ) { - logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" ); + logadd( LOG_DEBUG1, "%s probe: receiving payload for a block reply failed", hstr ); // Failure, add to queue again connection_read( request ); goto fail; } // Success, wake up caller - logadd( LOG_DEBUG1, "[RTT] Successful direct probe" ); + logadd( LOG_DEBUG1, "%s probe: Successful direct probe", hstr ); } else { // Wasn't a request that's in our request queue if ( !throwDataAway( sock, testLength ) ) { - logadd( LOG_DEBUG1, "<- get block reply payload fail" ); + logadd( LOG_DEBUG1, "%s probe: <- get block reply payload fail", hstr ); goto fail; } } @@ -934,3 +959,15 @@ static dnbd3_async_t* removeRequest( dnbd3_async_t *request ) return iterator; } +static void blockSignals() +{ + sigset_t sigmask; + sigemptyset( &sigmask ); + sigaddset( &sigmask, SIGUSR1 ); + sigaddset( &sigmask, SIGUSR2 ); + sigaddset( &sigmask, SIGPIPE ); + sigaddset( &sigmask, SIGINT ); + sigaddset( &sigmask, SIGTERM ); + pthread_sigmask( SIG_SETMASK, &sigmask, NULL ); + +} diff --git a/src/fuse/connection.h b/src/fuse/connection.h index cd9846e..04d8894 100644 --- a/src/fuse/connection.h +++ b/src/fuse/connection.h @@ -33,6 +33,10 @@ bool connection_read( dnbd3_async_t *request ); void connection_close(); +void connection_signalShutdown(); + +void connection_join(); + size_t connection_printStats( char *buffer, const size_t len ); #endif /* CONNECTION_H_ */ diff --git a/src/fuse/main.c b/src/fuse/main.c index 295c194..3e08203 100644 --- a/src/fuse/main.c +++ b/src/fuse/main.c @@ -244,7 +244,6 @@ static void image_ll_read( fuse_req_t req, fuse_ino_t ino, size_t size, off_t of ++logInfo.blockRequestCount[startBlock]; } } - if ( !keepRunning ) connection_close(); if ( ino == 2 && size != 0 ) // with size == 0 there is nothing to do { dnbd3_async_t *request = malloc( sizeof(dnbd3_async_t) ); @@ -261,10 +260,12 @@ static void image_sigHandler( int signum ) { if ( signum == SIGINT && fuse_sigIntHandler != NULL ) { keepRunning = false; fuse_sigIntHandler( signum ); + connection_signalShutdown(); } if ( signum == SIGTERM && fuse_sigTermHandler != NULL ) { keepRunning = false; fuse_sigTermHandler( signum ); + connection_signalShutdown(); } errno = temp_errno; } @@ -494,8 +495,11 @@ int main( int argc, char *argv[] ) if ( fuse_set_signal_handlers( se ) != -1 ) { fuse_session_add_chan( se, ch ); //fuse_daemonize(foreground); - if ( single_thread ) fuse_err = fuse_session_loop( se ); - else fuse_err = fuse_session_loop_mt( se ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all + if ( single_thread ) { + fuse_err = fuse_session_loop( se ); + } else { + fuse_err = fuse_session_loop_mt( se ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all + } fuse_remove_signal_handlers( se ); fuse_session_remove_chan( ch ); } @@ -506,5 +510,6 @@ int main( int argc, char *argv[] ) fuse_opt_free_args( &args ); free( newArgv ); logadd( LOG_DEBUG1, "Terminating. FUSE REPLIED: %d\n", fuse_err ); + connection_join(); return fuse_err; } |