summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/fuse/connection.c79
-rw-r--r--src/fuse/connection.h4
-rw-r--r--src/fuse/main.c11
3 files changed, 70 insertions, 24 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index c4f8de3..e7787e7 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -15,6 +15,7 @@
#include <errno.h>
#include <time.h>
#include <inttypes.h>
+#include <signal.h>
/* Constants */
static const size_t SHORTBUF = 100;
@@ -34,6 +35,9 @@ static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
atomic_bool keepRunning = true;
static bool learnNewServers;
+static pthread_t tidReceiver;
+static pthread_t tidBackground;
+
// List of pending requests
static struct {
dnbd3_async_t *head;
@@ -97,6 +101,8 @@ static bool throwDataAway( int sockFd, uint32_t amount );
static void enqueueRequest( dnbd3_async_t *request );
static dnbd3_async_t* removeRequest( dnbd3_async_t *request );
+static void blockSignals();
+
bool connection_init( const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew )
{
int sock = -1;
@@ -213,7 +219,6 @@ bool connection_initThreads()
return false;
}
bool success = true;
- pthread_t thread;
threadInitDone = true;
logadd( LOG_DEBUG1, "Initializing stuff" );
if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
@@ -221,10 +226,10 @@ bool connection_initThreads()
logadd( LOG_ERROR, "Mutex or spinlock init failure" );
success = false;
} else {
- if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) {
+ if ( pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) {
logadd( LOG_ERROR, "Could not create receive thread" );
success = false;
- } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
+ } else if ( pthread_create( &tidBackground, NULL, &connection_backgroundThread, NULL ) != 0 ) {
logadd( LOG_ERROR, "Could not create background thread" );
success = false;
}
@@ -280,6 +285,22 @@ void connection_close()
logadd( LOG_DEBUG1, "Connection closed." );
}
+void connection_signalShutdown()
+{
+ if ( !threadInitDone )
+ return;
+ pthread_kill( tidReceiver, SIGHUP );
+ pthread_kill( tidBackground, SIGHUP );
+}
+
+void connection_join()
+{
+ if ( !threadInitDone )
+ return;
+ pthread_join( tidReceiver, NULL );
+ pthread_join( tidBackground, NULL );
+}
+
size_t connection_printStats( char *buffer, const size_t len )
{
int ret;
@@ -287,7 +308,7 @@ size_t connection_printStats( char *buffer, const size_t len )
declare_now;
if ( remaining > 0 ) {
ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n",
- image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) );
+ image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) );
if ( ret < 0 ) {
ret = 0;
}
@@ -345,12 +366,13 @@ static void* connection_receiveThreadMain( void *sockPtr )
{
int sockFd = (int)(size_t)sockPtr;
dnbd3_reply_t reply;
- pthread_detach( pthread_self() );
+ blockSignals();
while ( keepRunning ) {
int ret;
do {
ret = dnbd3_read_reply( sockFd, &reply, true );
+ if ( !keepRunning ) goto fail;
if ( ret == REPLY_OK ) break;
} while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
if ( ret != REPLY_OK ) {
@@ -425,17 +447,16 @@ static void* connection_receiveThreadMain( void *sockPtr )
}
}
}
- if( !keepRunning ) connection_close();
- logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
-fail:
- ;
+fail:;
// Make sure noone is trying to use the socket for sending by locking,
pthread_mutex_lock( &connection.sendMutex );
// then just set the fd to -1, but only if it's the same fd as ours,
// as someone could have established a new connection already
if ( connection.sockFd == sockFd ) {
connection.sockFd = -1;
- signal_call( connection.panicSignal );
+ if ( keepRunning ) {
+ signal_call( connection.panicSignal );
+ }
}
pthread_mutex_unlock( &connection.sendMutex );
// As we're the only reader, it's safe to close the socket now
@@ -445,10 +466,10 @@ fail:
static void* connection_backgroundThread( void *something UNUSED )
{
- pthread_detach( pthread_self() ); // fixes thread leak after fuse termination
ticks nextKeepalive;
ticks nextRttCheck;
+ blockSignals();
timing_get( &nextKeepalive );
nextRttCheck = nextKeepalive;
while ( keepRunning ) {
@@ -458,6 +479,8 @@ static void* connection_backgroundThread( void *something UNUSED )
uint32_t wt2 = timing_diffMs( &now, &nextRttCheck );
if ( wt1 > 0 && wt2 > 0 ) {
int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 );
+ if ( !keepRunning )
+ break;
if ( waitRes == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
}
@@ -653,56 +676,58 @@ static void probeAltServers()
srv->rttIndex += 1;
}
// Probe
+ char hstr[100];
+ sock_printHost( &srv->host, hstr, 100 );
ticks start;
timing_get( &start );
errno = 0;
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
- logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno );
+ logadd( LOG_DEBUG1, "%s probe: Could not connect for probing. errno = %d", hstr, errno );
goto fail;
}
if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
- logadd( LOG_DEBUG1, "probe: select_image failed" );
+ logadd( LOG_DEBUG1, "%s probe: select_image failed (sock=%d, errno=%d)", hstr, sock, errno );
goto fail;
}
if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize ) ) {
- logadd( LOG_DEBUG1, "probe: select image reply failed" );
+ logadd( LOG_DEBUG1, "%s probe: select image reply failed", hstr );
goto fail;
}
if ( remoteProto < MIN_SUPPORTED_SERVER ) {
- logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
+ logadd( LOG_WARNING, "%s probe: Unsupported remote version (local: %d, remote: %d)", hstr, (int)PROTOCOL_VERSION, (int)remoteProto );
srv->consecutiveFails += 10;
goto fail;
}
if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
- logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
+ logadd( LOG_WARNING, "%s probe: Remote rid or name mismatch (got '%s')", hstr, remoteName );
srv->consecutiveFails += 10;
goto fail;
}
if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) {
- logadd( LOG_DEBUG1, "-> block request fail" );
+ logadd( LOG_DEBUG1, "%s probe: -> block request fail", hstr );
goto fail;
}
int a = 111;
if ( !( a = dnbd3_get_reply( sock, &reply ) ) || reply.size != testLength ) {
- logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size );
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply fail %d %d", hstr, a, (int)reply.size );
goto fail;
}
if ( request != NULL && removeRequest( request ) != NULL ) {
// Request successfully removed from queue
const ssize_t ret = sock_recv( sock, request->buffer, request->length );
if ( ret != (ssize_t)request->length ) {
- logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" );
+ logadd( LOG_DEBUG1, "%s probe: receiving payload for a block reply failed", hstr );
// Failure, add to queue again
connection_read( request );
goto fail;
}
// Success, wake up caller
- logadd( LOG_DEBUG1, "[RTT] Successful direct probe" );
+ logadd( LOG_DEBUG1, "%s probe: Successful direct probe", hstr );
} else {
// Wasn't a request that's in our request queue
if ( !throwDataAway( sock, testLength ) ) {
- logadd( LOG_DEBUG1, "<- get block reply payload fail" );
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply payload fail", hstr );
goto fail;
}
}
@@ -934,3 +959,15 @@ static dnbd3_async_t* removeRequest( dnbd3_async_t *request )
return iterator;
}
+static void blockSignals()
+{
+ sigset_t sigmask;
+ sigemptyset( &sigmask );
+ sigaddset( &sigmask, SIGUSR1 );
+ sigaddset( &sigmask, SIGUSR2 );
+ sigaddset( &sigmask, SIGPIPE );
+ sigaddset( &sigmask, SIGINT );
+ sigaddset( &sigmask, SIGTERM );
+ pthread_sigmask( SIG_SETMASK, &sigmask, NULL );
+
+}
diff --git a/src/fuse/connection.h b/src/fuse/connection.h
index cd9846e..04d8894 100644
--- a/src/fuse/connection.h
+++ b/src/fuse/connection.h
@@ -33,6 +33,10 @@ bool connection_read( dnbd3_async_t *request );
void connection_close();
+void connection_signalShutdown();
+
+void connection_join();
+
size_t connection_printStats( char *buffer, const size_t len );
#endif /* CONNECTION_H_ */
diff --git a/src/fuse/main.c b/src/fuse/main.c
index 295c194..3e08203 100644
--- a/src/fuse/main.c
+++ b/src/fuse/main.c
@@ -244,7 +244,6 @@ static void image_ll_read( fuse_req_t req, fuse_ino_t ino, size_t size, off_t of
++logInfo.blockRequestCount[startBlock];
}
}
- if ( !keepRunning ) connection_close();
if ( ino == 2 && size != 0 ) // with size == 0 there is nothing to do
{
dnbd3_async_t *request = malloc( sizeof(dnbd3_async_t) );
@@ -261,10 +260,12 @@ static void image_sigHandler( int signum ) {
if ( signum == SIGINT && fuse_sigIntHandler != NULL ) {
keepRunning = false;
fuse_sigIntHandler( signum );
+ connection_signalShutdown();
}
if ( signum == SIGTERM && fuse_sigTermHandler != NULL ) {
keepRunning = false;
fuse_sigTermHandler( signum );
+ connection_signalShutdown();
}
errno = temp_errno;
}
@@ -494,8 +495,11 @@ int main( int argc, char *argv[] )
if ( fuse_set_signal_handlers( se ) != -1 ) {
fuse_session_add_chan( se, ch );
//fuse_daemonize(foreground);
- if ( single_thread ) fuse_err = fuse_session_loop( se );
- else fuse_err = fuse_session_loop_mt( se ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all
+ if ( single_thread ) {
+ fuse_err = fuse_session_loop( se );
+ } else {
+ fuse_err = fuse_session_loop_mt( se ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all
+ }
fuse_remove_signal_handlers( se );
fuse_session_remove_chan( ch );
}
@@ -506,5 +510,6 @@ int main( int argc, char *argv[] )
fuse_opt_free_args( &args );
free( newArgv );
logadd( LOG_DEBUG1, "Terminating. FUSE REPLIED: %d\n", fuse_err );
+ connection_join();
return fuse_err;
}