summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
authorSimon Rettberg2020-07-21 16:44:45 +0200
committerSimon Rettberg2020-07-21 16:44:45 +0200
commitd6cd84358f729fbd01579605e4f325c737728ba1 (patch)
treef98f7e96952a797bb83942584a3577a7d853073f /src/fuse
parent[FUSE] Formatting (diff)
downloaddnbd3-d6cd84358f729fbd01579605e4f325c737728ba1.tar.gz
dnbd3-d6cd84358f729fbd01579605e4f325c737728ba1.tar.xz
dnbd3-d6cd84358f729fbd01579605e4f325c737728ba1.zip
[FUSE] Properly signal worker threads to exit on shutdown
Our main signal handler sends SUGHUP to the receiver and background threads, so if they block in some recv() or poll() they will get EINTR and can check keepRunning.
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/connection.c79
-rw-r--r--src/fuse/connection.h4
-rw-r--r--src/fuse/main.c11
3 files changed, 70 insertions, 24 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index c4f8de3..e7787e7 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -15,6 +15,7 @@
#include <errno.h>
#include <time.h>
#include <inttypes.h>
+#include <signal.h>
/* Constants */
static const size_t SHORTBUF = 100;
@@ -34,6 +35,9 @@ static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
atomic_bool keepRunning = true;
static bool learnNewServers;
+static pthread_t tidReceiver;
+static pthread_t tidBackground;
+
// List of pending requests
static struct {
dnbd3_async_t *head;
@@ -97,6 +101,8 @@ static bool throwDataAway( int sockFd, uint32_t amount );
static void enqueueRequest( dnbd3_async_t *request );
static dnbd3_async_t* removeRequest( dnbd3_async_t *request );
+static void blockSignals();
+
bool connection_init( const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew )
{
int sock = -1;
@@ -213,7 +219,6 @@ bool connection_initThreads()
return false;
}
bool success = true;
- pthread_t thread;
threadInitDone = true;
logadd( LOG_DEBUG1, "Initializing stuff" );
if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
@@ -221,10 +226,10 @@ bool connection_initThreads()
logadd( LOG_ERROR, "Mutex or spinlock init failure" );
success = false;
} else {
- if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) {
+ if ( pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) {
logadd( LOG_ERROR, "Could not create receive thread" );
success = false;
- } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
+ } else if ( pthread_create( &tidBackground, NULL, &connection_backgroundThread, NULL ) != 0 ) {
logadd( LOG_ERROR, "Could not create background thread" );
success = false;
}
@@ -280,6 +285,22 @@ void connection_close()
logadd( LOG_DEBUG1, "Connection closed." );
}
+void connection_signalShutdown()
+{
+ if ( !threadInitDone )
+ return;
+ pthread_kill( tidReceiver, SIGHUP );
+ pthread_kill( tidBackground, SIGHUP );
+}
+
+void connection_join()
+{
+ if ( !threadInitDone )
+ return;
+ pthread_join( tidReceiver, NULL );
+ pthread_join( tidBackground, NULL );
+}
+
size_t connection_printStats( char *buffer, const size_t len )
{
int ret;
@@ -287,7 +308,7 @@ size_t connection_printStats( char *buffer, const size_t len )
declare_now;
if ( remaining > 0 ) {
ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n",
- image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) );
+ image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) );
if ( ret < 0 ) {
ret = 0;
}
@@ -345,12 +366,13 @@ static void* connection_receiveThreadMain( void *sockPtr )
{
int sockFd = (int)(size_t)sockPtr;
dnbd3_reply_t reply;
- pthread_detach( pthread_self() );
+ blockSignals();
while ( keepRunning ) {
int ret;
do {
ret = dnbd3_read_reply( sockFd, &reply, true );
+ if ( !keepRunning ) goto fail;
if ( ret == REPLY_OK ) break;
} while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
if ( ret != REPLY_OK ) {
@@ -425,17 +447,16 @@ static void* connection_receiveThreadMain( void *sockPtr )
}
}
}
- if( !keepRunning ) connection_close();
- logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
-fail:
- ;
+fail:;
// Make sure noone is trying to use the socket for sending by locking,
pthread_mutex_lock( &connection.sendMutex );
// then just set the fd to -1, but only if it's the same fd as ours,
// as someone could have established a new connection already
if ( connection.sockFd == sockFd ) {
connection.sockFd = -1;
- signal_call( connection.panicSignal );
+ if ( keepRunning ) {
+ signal_call( connection.panicSignal );
+ }
}
pthread_mutex_unlock( &connection.sendMutex );
// As we're the only reader, it's safe to close the socket now
@@ -445,10 +466,10 @@ fail:
static void* connection_backgroundThread( void *something UNUSED )
{
- pthread_detach( pthread_self() ); // fixes thread leak after fuse termination
ticks nextKeepalive;
ticks nextRttCheck;
+ blockSignals();
timing_get( &nextKeepalive );
nextRttCheck = nextKeepalive;
while ( keepRunning ) {
@@ -458,6 +479,8 @@ static void* connection_backgroundThread( void *something UNUSED )
uint32_t wt2 = timing_diffMs( &now, &nextRttCheck );
if ( wt1 > 0 && wt2 > 0 ) {
int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 );
+ if ( !keepRunning )
+ break;
if ( waitRes == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
}
@@ -653,56 +676,58 @@ static void probeAltServers()
srv->rttIndex += 1;
}
// Probe
+ char hstr[100];
+ sock_printHost( &srv->host, hstr, 100 );
ticks start;
timing_get( &start );
errno = 0;
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
- logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno );
+ logadd( LOG_DEBUG1, "%s probe: Could not connect for probing. errno = %d", hstr, errno );
goto fail;
}
if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
- logadd( LOG_DEBUG1, "probe: select_image failed" );
+ logadd( LOG_DEBUG1, "%s probe: select_image failed (sock=%d, errno=%d)", hstr, sock, errno );
goto fail;
}
if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize ) ) {
- logadd( LOG_DEBUG1, "probe: select image reply failed" );
+ logadd( LOG_DEBUG1, "%s probe: select image reply failed", hstr );
goto fail;
}
if ( remoteProto < MIN_SUPPORTED_SERVER ) {
- logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
+ logadd( LOG_WARNING, "%s probe: Unsupported remote version (local: %d, remote: %d)", hstr, (int)PROTOCOL_VERSION, (int)remoteProto );
srv->consecutiveFails += 10;
goto fail;
}
if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
- logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
+ logadd( LOG_WARNING, "%s probe: Remote rid or name mismatch (got '%s')", hstr, remoteName );
srv->consecutiveFails += 10;
goto fail;
}
if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) {
- logadd( LOG_DEBUG1, "-> block request fail" );
+ logadd( LOG_DEBUG1, "%s probe: -> block request fail", hstr );
goto fail;
}
int a = 111;
if ( !( a = dnbd3_get_reply( sock, &reply ) ) || reply.size != testLength ) {
- logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size );
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply fail %d %d", hstr, a, (int)reply.size );
goto fail;
}
if ( request != NULL && removeRequest( request ) != NULL ) {
// Request successfully removed from queue
const ssize_t ret = sock_recv( sock, request->buffer, request->length );
if ( ret != (ssize_t)request->length ) {
- logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" );
+ logadd( LOG_DEBUG1, "%s probe: receiving payload for a block reply failed", hstr );
// Failure, add to queue again
connection_read( request );
goto fail;
}
// Success, wake up caller
- logadd( LOG_DEBUG1, "[RTT] Successful direct probe" );
+ logadd( LOG_DEBUG1, "%s probe: Successful direct probe", hstr );
} else {
// Wasn't a request that's in our request queue
if ( !throwDataAway( sock, testLength ) ) {
- logadd( LOG_DEBUG1, "<- get block reply payload fail" );
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply payload fail", hstr );
goto fail;
}
}
@@ -934,3 +959,15 @@ static dnbd3_async_t* removeRequest( dnbd3_async_t *request )
return iterator;
}
+static void blockSignals()
+{
+ sigset_t sigmask;
+ sigemptyset( &sigmask );
+ sigaddset( &sigmask, SIGUSR1 );
+ sigaddset( &sigmask, SIGUSR2 );
+ sigaddset( &sigmask, SIGPIPE );
+ sigaddset( &sigmask, SIGINT );
+ sigaddset( &sigmask, SIGTERM );
+ pthread_sigmask( SIG_SETMASK, &sigmask, NULL );
+
+}
diff --git a/src/fuse/connection.h b/src/fuse/connection.h
index cd9846e..04d8894 100644
--- a/src/fuse/connection.h
+++ b/src/fuse/connection.h
@@ -33,6 +33,10 @@ bool connection_read( dnbd3_async_t *request );
void connection_close();
+void connection_signalShutdown();
+
+void connection_join();
+
size_t connection_printStats( char *buffer, const size_t len );
#endif /* CONNECTION_H_ */
diff --git a/src/fuse/main.c b/src/fuse/main.c
index 295c194..3e08203 100644
--- a/src/fuse/main.c
+++ b/src/fuse/main.c
@@ -244,7 +244,6 @@ static void image_ll_read( fuse_req_t req, fuse_ino_t ino, size_t size, off_t of
++logInfo.blockRequestCount[startBlock];
}
}
- if ( !keepRunning ) connection_close();
if ( ino == 2 && size != 0 ) // with size == 0 there is nothing to do
{
dnbd3_async_t *request = malloc( sizeof(dnbd3_async_t) );
@@ -261,10 +260,12 @@ static void image_sigHandler( int signum ) {
if ( signum == SIGINT && fuse_sigIntHandler != NULL ) {
keepRunning = false;
fuse_sigIntHandler( signum );
+ connection_signalShutdown();
}
if ( signum == SIGTERM && fuse_sigTermHandler != NULL ) {
keepRunning = false;
fuse_sigTermHandler( signum );
+ connection_signalShutdown();
}
errno = temp_errno;
}
@@ -494,8 +495,11 @@ int main( int argc, char *argv[] )
if ( fuse_set_signal_handlers( se ) != -1 ) {
fuse_session_add_chan( se, ch );
//fuse_daemonize(foreground);
- if ( single_thread ) fuse_err = fuse_session_loop( se );
- else fuse_err = fuse_session_loop_mt( se ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all
+ if ( single_thread ) {
+ fuse_err = fuse_session_loop( se );
+ } else {
+ fuse_err = fuse_session_loop_mt( se ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all
+ }
fuse_remove_signal_handlers( se );
fuse_session_remove_chan( ch );
}
@@ -506,5 +510,6 @@ int main( int argc, char *argv[] )
fuse_opt_free_args( &args );
free( newArgv );
logadd( LOG_DEBUG1, "Terminating. FUSE REPLIED: %d\n", fuse_err );
+ connection_join();
return fuse_err;
}