From 53dc1d1bcc76c50e956eb57a9a48ed937dff4972 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 1 Dec 2015 18:15:32 +0100 Subject: [FUSE] Fix losing requests on server change --- src/fuse/connection.c | 32 +++++++++++++++++++++----------- src/fuse/main.c | 10 +++++----- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/fuse/connection.c b/src/fuse/connection.c index 20ddd8f..c0bcd6d 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -6,6 +6,7 @@ #include "../shared/sockhelper.h" #include "../shared/log.h" +#include #include #include #include @@ -68,8 +69,14 @@ static void* connection_backgroundThread(void *something); static void probeAltServers(); static void switchConnection(int sockFd, alt_server_t *srv); static bool throwDataAway(int sockFd, uint32_t amount); -static void enqueueRequest(dnbd3_async_t *request); -static dnbd3_async_t* removeRequest(dnbd3_async_t *request); + +//static void enqueueRequest(dnbd3_async_t *request); +//static dnbd3_async_t* removeRequest(dnbd3_async_t *request); +static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line); +static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line); +#define enqueueRequest(req) __enqueueRequest(req, __FILE__, __LINE__) +#define removeRequest(req) __removeRequest(req, __FILE__, __LINE__) + static uint64_t nowMilli(); static uint64_t nowMicro(); @@ -239,10 +246,8 @@ static void* connection_receiveThreadMain(void *sockPtr) // Found a match const ssize_t ret = sock_recv( sockFd, request->buffer, request->length ); if ( ret != (ssize_t)request->length ) { - request->success = false; - request->finished = true; - signal_call( request->signalFd ); logadd( LOG_DEBUG1, "receiving payload for a block reply failed" ); + connection_read( request ); goto fail; } // Success, wake up caller @@ -298,7 +303,6 @@ static void* connection_backgroundThread(void *something UNUSED) if ( now >= nextKeepalive ) { pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { - logadd( LOG_DEBUG1, "Sending keepalive..." ); dnbd3_request_t request; request.magic = dnbd3_packet_magic; request.cmd = CMD_KEEPALIVE; @@ -349,7 +353,6 @@ static void probeAltServers() srv->rttIndex += 1; } // Probe - logadd( LOG_DEBUG1, "Trying server %d\n", altIndex ); const uint64_t start = nowMicro(); int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); if ( sock == -1 ) { @@ -400,6 +403,7 @@ static void probeAltServers() srv->rtt += srv->rtts[i]; } srv->rtt /= RTT_COUNT; + srv->rtt += rand() % 30000; // Remember rtt if this server matches the current one if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) { currentRtt = srv->rtt; @@ -420,7 +424,7 @@ fail:; } // Switch if a better server was found if ( bestIndex != -1 - && ( currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD + && ( currentRtt > altservers[bestIndex].rtt || currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1500 ) ) { logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt ); switchConnection( bestSock, &altservers[bestIndex] ); @@ -470,7 +474,10 @@ static void switchConnection(int sockFd, alt_server_t *srv) // resend queue if ( queue != NULL ) { pthread_mutex_lock( &connection.sendMutex ); - for ( it = queue; it != NULL; it = it->next ) { + dnbd3_async_t *next = NULL; + for ( it = queue; it != NULL; it = next ) { + logadd( LOG_DEBUG1, "Requeue after server change" ); + next = it->next; enqueueRequest( it ); if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it ) ) { logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" ); @@ -496,12 +503,13 @@ static bool throwDataAway(int sockFd, uint32_t amount) return true; } -static void enqueueRequest(dnbd3_async_t *request) +static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line) { request->next = NULL; request->finished = false; request->success = false; pthread_spin_lock( &requests.lock ); + //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line ); if ( requests.head == NULL ) { requests.head = requests.tail = request; } else { @@ -511,9 +519,10 @@ static void enqueueRequest(dnbd3_async_t *request) pthread_spin_unlock( &requests.lock ); } -static dnbd3_async_t* removeRequest(dnbd3_async_t *request) +static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line) { pthread_spin_lock( &requests.lock ); + //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line ); dnbd3_async_t *iterator, *prev = NULL; for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { if ( iterator == request ) { @@ -553,3 +562,4 @@ static uint64_t nowMicro() } return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull ); } + diff --git a/src/fuse/main.c b/src/fuse/main.c index 42c3664..f237832 100644 --- a/src/fuse/main.c +++ b/src/fuse/main.c @@ -112,8 +112,8 @@ static int image_read(const char *path, char *buf, size_t size, off_t offset, st } while ( !request.finished ) { int ret = signal_wait( request.signalFd, 10000 ); - if ( ret != SIGNAL_OK ) { - debugf( "signal_wait returned %d", ret ); + if ( ret < 0 ) { + debugf( "fuse_read signal wait returned %d", ret ); } } signal_close( request.signalFd ); @@ -190,7 +190,7 @@ exit_usage: if ( testOpt ) { /* values for testing. */ - server_address = "132.230.4.1"; + server_address = "132.230.4.1 132.230.8.113 132.230.4.60"; image_Name = "windows7-umwelt.vmdk"; useLog = true; } @@ -199,11 +199,11 @@ exit_usage: goto exit_usage; } - int arg_count = 5; + int arg_count = 4; if ( useDebug ) { arg_count++; } - char *args[6] = {"foo", "-o", "ro,allow_other", "-s", mountPoint, "-d"}; + char *args[6] = { "foo", "-o", "ro,allow_other,kernel_cache,max_readahead=262144", mountPoint, "-d" }; if ( !connection_init( server_address, image_Name, 0 ) ) { printf( "Tschüss\n" ); -- cgit v1.2.3-55-g7522