summaryrefslogtreecommitdiffstats
path: root/src/fuse/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fuse/connection.c')
-rw-r--r--src/fuse/connection.c194
1 files changed, 115 insertions, 79 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 98b1d36..7bd8018 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -10,10 +10,12 @@
#include <pthread.h>
#include <string.h>
#include <stdio.h>
+#include <stdatomic.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <inttypes.h>
+#include <signal.h>
/* Constants */
static const size_t SHORTBUF = 100;
@@ -30,9 +32,12 @@ static const int FAIL_BACKOFF_START_COUNT = 8;
static bool connectionInitDone = false;
static bool threadInitDone = false;
static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
-static bool keepRunning = true;
+atomic_bool keepRunning = true;
static bool learnNewServers;
+static pthread_t tidReceiver;
+static pthread_t tidBackground;
+
// List of pending requests
static struct {
dnbd3_async_t *head;
@@ -58,12 +63,12 @@ static struct {
// Known alt servers
typedef struct _alt_server {
dnbd3_host_t host;
- int consecutiveFails;
- int rtt;
+ atomic_int consecutiveFails;
+ atomic_int rtt;
int rtts[RTT_COUNT];
int rttIndex;
- int bestCount;
- int liveRtt;
+ atomic_int bestCount;
+ atomic_int liveRtt;
} alt_server_t;
static dnbd3_server_entry_t newservers[MAX_ALTS];
@@ -83,20 +88,22 @@ static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER;
/* Static methods */
-static void* connection_receiveThreadMain(void *sock);
-static void* connection_backgroundThread(void *something);
+static void* connection_receiveThreadMain( void *sock );
+static void* connection_backgroundThread( void *something );
static void addAltServers();
static void sortAltServers();
static void probeAltServers();
-static void switchConnection(int sockFd, alt_server_t *srv);
+static void switchConnection( int sockFd, alt_server_t *srv );
static void requestAltServers();
-static bool throwDataAway(int sockFd, uint32_t amount);
+static bool throwDataAway( int sockFd, uint32_t amount );
+
+static void enqueueRequest( dnbd3_async_t *request );
+static dnbd3_async_t* removeRequest( dnbd3_async_t *request );
-static void enqueueRequest(dnbd3_async_t *request);
-static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
+static void blockSignals();
-bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew)
+bool connection_init( const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew )
{
int sock = -1;
char host[SHORTBUF];
@@ -111,7 +118,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
timing_setBase();
pthread_mutex_lock( &mutexInit );
- if ( !connectionInitDone && keepRunning ) {
+ if ( !connectionInitDone ) {
dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
const char *current, *end;
int altIndex = 0;
@@ -123,7 +130,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
// Get next host from string
while ( *current == ' ' ) current++;
end = strchr( current, ' ' );
- size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1);
+ size_t len = ( end == NULL ? SHORTBUF : (size_t)( end - current ) + 1 );
if ( len > SHORTBUF ) len = SHORTBUF;
snprintf( host, len, "%s", current );
int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
@@ -155,14 +162,14 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
}
if ( sock == -2 || sock == -1 )
continue;
- salen = sizeof(sa);
+ salen = sizeof( sa );
if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno );
close( sock );
sock = -1;
continue;
}
- hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) );
+ hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof( host ) );
logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host );
if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
logadd( LOG_ERROR, "Could not send select image" );
@@ -207,12 +214,11 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
bool connection_initThreads()
{
pthread_mutex_lock( &mutexInit );
- if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) {
+ if ( !connectionInitDone || threadInitDone || connection.sockFd == -1 ) {
pthread_mutex_unlock( &mutexInit );
return false;
}
bool success = true;
- pthread_t thread;
threadInitDone = true;
logadd( LOG_DEBUG1, "Initializing stuff" );
if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
@@ -220,10 +226,10 @@ bool connection_initThreads()
logadd( LOG_ERROR, "Mutex or spinlock init failure" );
success = false;
} else {
- if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) {
+ if ( pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) {
logadd( LOG_ERROR, "Could not create receive thread" );
success = false;
- } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
+ } else if ( pthread_create( &tidBackground, NULL, &connection_backgroundThread, NULL ) != 0 ) {
logadd( LOG_ERROR, "Could not create background thread" );
success = false;
}
@@ -241,7 +247,7 @@ uint64_t connection_getImageSize()
return image.size;
}
-bool connection_read(dnbd3_async_t *request)
+bool connection_read( dnbd3_async_t *request )
{
if ( !connectionInitDone ) return false;
pthread_mutex_lock( &connection.sendMutex );
@@ -250,9 +256,7 @@ bool connection_read(dnbd3_async_t *request)
if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) {
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
- pthread_mutex_unlock( &connection.sendMutex );
signal_call( connection.panicSignal );
- return true;
}
}
pthread_mutex_unlock( &connection.sendMutex );
@@ -261,24 +265,36 @@ bool connection_read(dnbd3_async_t *request)
void connection_close()
{
- if ( keepRunning ) {
- logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" );
- }
+ static bool signalled = false;
+ logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" );
pthread_mutex_lock( &mutexInit );
keepRunning = false;
+ if ( threadInitDone && !signalled ) {
+ signalled = true;
+ pthread_kill( tidReceiver, SIGHUP );
+ pthread_kill( tidBackground, SIGHUP );
+ }
+ pthread_mutex_unlock( &mutexInit );
if ( !connectionInitDone ) {
- pthread_mutex_unlock( &mutexInit );
return;
}
- pthread_mutex_unlock( &mutexInit );
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
+ logadd( LOG_DEBUG1, "Shutting down socket..." );
shutdown( connection.sockFd, SHUT_RDWR );
}
pthread_mutex_unlock( &connection.sendMutex );
}
-size_t connection_printStats(char *buffer, const size_t len)
+void connection_join()
+{
+ if ( !threadInitDone )
+ return;
+ pthread_join( tidReceiver, NULL );
+ pthread_join( tidBackground, NULL );
+}
+
+size_t connection_printStats( char *buffer, const size_t len )
{
int ret;
size_t remaining = len;
@@ -308,7 +324,7 @@ size_t connection_printStats(char *buffer, const size_t len)
*buffer++ = ' ';
}
const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining );
- remaining -= (addrlen + 1); // For space or * above
+ remaining -= ( addrlen + 1 ); // For space or * above
buffer += addrlen;
if ( remaining < 3 )
break;
@@ -324,7 +340,7 @@ size_t connection_printStats(char *buffer, const size_t len)
width += 3;
}
ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n",
- width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt );
+ width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt );
if ( ret < 0 ) {
ret = 0;
}
@@ -339,23 +355,23 @@ size_t connection_printStats(char *buffer, const size_t len)
return len - remaining;
}
-static void* connection_receiveThreadMain(void *sockPtr)
+static void* connection_receiveThreadMain( void *sockPtr )
{
int sockFd = (int)(size_t)sockPtr;
dnbd3_reply_t reply;
- pthread_detach( pthread_self() );
+ blockSignals();
while ( keepRunning ) {
int ret;
do {
ret = dnbd3_read_reply( sockFd, &reply, true );
+ if ( !keepRunning ) goto fail;
if ( ret == REPLY_OK ) break;
} while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
if ( ret != REPLY_OK ) {
logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
goto fail;
}
-
if ( reply.cmd == CMD_GET_BLOCK ) {
// Get block reply. find matching request
dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
@@ -390,10 +406,8 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
unlock_rw( &altLock );
}
- // Success, wake up caller
- request->success = true;
- request->finished = true;
- signal_call( request->signal );
+ fuse_reply_buf( request->fuse_req, request->buffer, request->length );
+ free( request );
}
} else if ( reply.cmd == CMD_GET_SERVERS ) {
// List of known alt servers
@@ -416,7 +430,6 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
}
}
- logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
fail:;
// Make sure noone is trying to use the socket for sending by locking,
pthread_mutex_lock( &connection.sendMutex );
@@ -424,7 +437,9 @@ fail:;
// as someone could have established a new connection already
if ( connection.sockFd == sockFd ) {
connection.sockFd = -1;
- signal_call( connection.panicSignal );
+ if ( keepRunning ) {
+ signal_call( connection.panicSignal );
+ }
}
pthread_mutex_unlock( &connection.sendMutex );
// As we're the only reader, it's safe to close the socket now
@@ -432,11 +447,12 @@ fail:;
return NULL;
}
-static void* connection_backgroundThread(void *something UNUSED)
+static void* connection_backgroundThread( void *something UNUSED )
{
ticks nextKeepalive;
ticks nextRttCheck;
+ blockSignals();
timing_get( &nextKeepalive );
nextRttCheck = nextKeepalive;
while ( keepRunning ) {
@@ -446,6 +462,8 @@ static void* connection_backgroundThread(void *something UNUSED)
uint32_t wt2 = timing_diffMs( &now, &nextRttCheck );
if ( wt1 > 0 && wt2 > 0 ) {
int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 );
+ if ( !keepRunning )
+ break;
if ( waitRes == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
}
@@ -470,10 +488,10 @@ static void* connection_backgroundThread(void *something UNUSED)
if ( timing_reachedPrecise( &nextKeepalive, &now ) ) {
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
- dnbd3_request_t request;
- request.magic = dnbd3_packet_magic;
- request.cmd = CMD_KEEPALIVE;
- request.handle = request.offset = request.size = 0;
+ dnbd3_request_t request = {
+ .magic = dnbd3_packet_magic,
+ .cmd = CMD_KEEPALIVE,
+ };
fixup_request( request );
ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 );
if ( (size_t)ret != sizeof request ) {
@@ -528,9 +546,10 @@ static void addAltServers()
altservers[slot].host = newservers[nIdx].host;
altservers[slot].liveRtt = 0;
}
-skip_server:;
+skip_server:
+ ;
}
- memset( newservers, 0, sizeof(newservers) );
+ memset( newservers, 0, sizeof( newservers ) );
unlock_rw( &altLock );
pthread_mutex_unlock( &newAltLock );
}
@@ -604,7 +623,7 @@ static void probeAltServers()
pthread_spin_lock( &requests.lock );
if ( requests.head != NULL ) {
if ( !panic && current != NULL ) {
- const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
+ const uint64_t maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
dnbd3_async_t *iterator;
for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
// A request with measurement tag is pending
@@ -626,7 +645,7 @@ static void probeAltServers()
}
lock_read( &altLock );
- for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) {
+ for ( int altIndex = 0; altIndex < ( panic ? MAX_ALTS : MAX_ALTS_ACTIVE ); ++altIndex ) {
alt_server_t * const srv = &altservers[altIndex];
if ( srv->host.type == 0 )
continue;
@@ -640,59 +659,60 @@ static void probeAltServers()
srv->rttIndex += 1;
}
// Probe
+ char hstr[100];
+ sock_printHost( &srv->host, hstr, 100 );
ticks start;
timing_get( &start );
errno = 0;
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
- logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno );
+ logadd( LOG_DEBUG1, "%s probe: Could not connect for probing. errno = %d", hstr, errno );
goto fail;
}
if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
- logadd( LOG_DEBUG1, "probe: select_image failed" );
+ logadd( LOG_DEBUG1, "%s probe: select_image failed (sock=%d, errno=%d)", hstr, sock, errno );
goto fail;
}
- if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
- logadd( LOG_DEBUG1, "probe: select image reply failed" );
+ if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize ) ) {
+ logadd( LOG_DEBUG1, "%s probe: select image reply failed", hstr );
goto fail;
}
if ( remoteProto < MIN_SUPPORTED_SERVER ) {
- logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
+ logadd( LOG_WARNING, "%s probe: Unsupported remote version (local: %d, remote: %d)", hstr, (int)PROTOCOL_VERSION, (int)remoteProto );
srv->consecutiveFails += 10;
goto fail;
}
if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
- logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
+ logadd( LOG_WARNING, "%s probe: Remote rid or name mismatch (got '%s')", hstr, remoteName );
srv->consecutiveFails += 10;
goto fail;
}
if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) {
- logadd( LOG_DEBUG1, "-> block request fail" );
+ logadd( LOG_DEBUG1, "%s probe: -> block request fail", hstr );
goto fail;
}
int a = 111;
- if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) {
- logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size );
+ if ( !( a = dnbd3_get_reply( sock, &reply ) ) || reply.size != testLength ) {
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply fail %d %d", hstr, a, (int)reply.size );
goto fail;
}
if ( request != NULL && removeRequest( request ) != NULL ) {
// Request successfully removed from queue
const ssize_t ret = sock_recv( sock, request->buffer, request->length );
if ( ret != (ssize_t)request->length ) {
- logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" );
+ logadd( LOG_DEBUG1, "%s probe: receiving payload for a block reply failed", hstr );
// Failure, add to queue again
connection_read( request );
goto fail;
}
- // Success, wake up caller
- logadd( LOG_DEBUG1, "[RTT] Successful direct probe" );
- request->success = true;
- request->finished = true;
- signal_call( request->signal );
+ // Success, reply to fuse
+ fuse_reply_buf( request->fuse_req, request->buffer, request->length );
+ free( request );
+ logadd( LOG_DEBUG1, "%s probe: Successful direct probe", hstr );
} else {
// Wasn't a request that's in our request queue
if ( !throwDataAway( sock, testLength ) ) {
- logadd( LOG_DEBUG1, "<- get block reply payload fail" );
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply payload fail", hstr );
goto fail;
}
}
@@ -701,7 +721,7 @@ static void probeAltServers()
// Panic mode? Just switch to server
if ( panic ) {
unlock_rw( &altLock );
- switchConnection( sock, srv );
+ if ( keepRunning ) switchConnection( sock, srv );
return;
}
// Non-panic mode:
@@ -733,7 +753,8 @@ static void probeAltServers()
close( sock );
}
continue;
-fail:;
+fail:
+ ;
if ( sock != -1 ) {
close( sock );
}
@@ -774,7 +795,7 @@ fail:;
// Regular logic: Apply threshold when considering switch
if ( !doSwitch && current != NULL ) {
doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD
- || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000;
+ || RTT_THRESHOLD_FACTOR( current->rtt ) > best->rtt + 1000;
}
}
// Switch if a better server was found
@@ -796,11 +817,10 @@ fail:;
}
}
-static void switchConnection(int sockFd, alt_server_t *srv)
+static void switchConnection( int sockFd, alt_server_t *srv )
{
- pthread_t thread;
struct sockaddr_storage addr;
- socklen_t addrLen = sizeof(addr);
+ socklen_t addrLen = sizeof( addr );
char message[200] = "Connection switched to ";
const size_t len = strlen( message );
int ret;
@@ -829,9 +849,10 @@ static void switchConnection(int sockFd, alt_server_t *srv)
signal_call( connection.panicSignal );
return;
}
+ pthread_detach( tidReceiver );
timing_get( &connection.startupTime );
- pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd );
- sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len );
+ pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)sockFd );
+ sock_printable( (struct sockaddr*)&addr, sizeof( addr ), message + len, sizeof( message ) - len );
logadd( LOG_INFO, "%s", message );
// resend queue
if ( queue != NULL ) {
@@ -863,14 +884,14 @@ static void requestAltServers()
request.magic = dnbd3_packet_magic;
request.cmd = CMD_GET_SERVERS;
fixup_request( request );
- if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) {
+ if ( sock_sendAll( connection.sockFd, &request, sizeof( request ), 2 ) != (ssize_t)sizeof( request ) ) {
logadd( LOG_WARNING, "Connection failed while requesting alt server list" );
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
}
}
-static bool throwDataAway(int sockFd, uint32_t amount)
+static bool throwDataAway( int sockFd, uint32_t amount )
{
size_t done = 0;
char tempBuffer[SHORTBUF];
@@ -883,11 +904,9 @@ static bool throwDataAway(int sockFd, uint32_t amount)
return true;
}
-static void enqueueRequest(dnbd3_async_t *request)
+static void enqueueRequest( dnbd3_async_t *request )
{
request->next = NULL;
- request->finished = false;
- request->success = false;
//logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
// Measure latency and add to switch formula
timing_get( &request->time );
@@ -901,7 +920,7 @@ static void enqueueRequest(dnbd3_async_t *request)
pthread_spin_unlock( &requests.lock );
}
-static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
+static dnbd3_async_t* removeRequest( dnbd3_async_t *request )
{
pthread_spin_lock( &requests.lock );
//logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line );
@@ -925,3 +944,20 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
return iterator;
}
+static void blockSignals()
+{
+ sigset_t sigmask;
+ if ( pthread_sigmask( 0, NULL, &sigmask ) == -1 ) {
+ logadd( LOG_WARNING, "Cannot get current sigmask of thread" );
+ sigemptyset( &sigmask );
+ }
+ sigaddset( &sigmask, SIGUSR1 );
+ sigaddset( &sigmask, SIGUSR2 );
+ sigaddset( &sigmask, SIGPIPE );
+ sigaddset( &sigmask, SIGINT );
+ sigaddset( &sigmask, SIGTERM );
+ sigdelset( &sigmask, SIGHUP );
+ if ( pthread_sigmask( SIG_SETMASK, &sigmask, NULL ) == -1 ) {
+ logadd( LOG_WARNING, "Cannot set sigmask of thread" );
+ }
+}