From ecc6e5dd28821bf1c207e074f1b5903426a7e4c4 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 1 Dec 2015 13:38:10 +0100 Subject: [FUSE] Stability improvements, runs for longer than a couple secs now :) --- src/fuse/connection.c | 60 +++++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 28 deletions(-) (limited to 'src/fuse') diff --git a/src/fuse/connection.c b/src/fuse/connection.c index 53949ca..20ddd8f 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -122,7 +122,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r } else if ( rid != 0 && rid != remoteRid ) { logadd( LOG_ERROR, "rid mismatch" ); } else { - image.name = strdup(remoteName); + image.name = strdup( remoteName ); image.rid = remoteRid; image.size = remoteSize; connection.currentServer = altservers[i].host; @@ -215,10 +215,9 @@ static void* connection_receiveThreadMain(void *sockPtr) do { ret = dnbd3_read_reply( sockFd, &reply, true ); if ( ret == REPLY_OK ) break; - logadd( LOG_DEBUG1, "Receive return code %d", ret ); } while ( ret == REPLY_INTR || ret == REPLY_AGAIN ); if ( ret != REPLY_OK ) { - logadd( LOG_DEBUG1, "Error receiving reply on receiveThread" ); + logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret ); goto fail; } // TODO: Ignoring anything but block replies for now; handle the others @@ -229,7 +228,6 @@ static void* connection_receiveThreadMain(void *sockPtr) } } else { // get block reply. find matching request - logadd( LOG_DEBUG1, "Got a block reply :)" ); dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); if ( request == NULL ) { logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" ); @@ -239,25 +237,17 @@ static void* connection_receiveThreadMain(void *sockPtr) } } else { // Found a match - logadd( LOG_DEBUG1, "Found pending entry :)" ); - request->finished = true; - uint32_t done = 0; - while ( done < request->length ) { - logadd( LOG_DEBUG1, "Will read %d bytes...", (int)(request->length - done) ); - ssize_t ret = recv( sockFd, request->buffer + done, request->length - done, 0 ); - if ( ret <= 0 ) { - if ( ret == -1 && errno == EINTR ) continue; - logadd( LOG_DEBUG1, "Read failure :(" ); - request->success = false; - signal_call( request->signalFd ); - logadd( LOG_DEBUG1, "receiving payload for a block reply failed" ); - goto fail; - } - done += (uint32_t)ret; + const ssize_t ret = sock_recv( sockFd, request->buffer, request->length ); + if ( ret != (ssize_t)request->length ) { + request->success = false; + request->finished = true; + signal_call( request->signalFd ); + logadd( LOG_DEBUG1, "receiving payload for a block reply failed" ); + goto fail; } // Success, wake up caller - logadd( LOG_DEBUG1, "Read completed :)" ); request->success = true; + request->finished = true; signal_call( request->signalFd ); } } @@ -271,6 +261,7 @@ fail:; logadd( LOG_DEBUG1, "RT: Local sock: %d, global: %d", sockFd, connection.sockFd ); if ( connection.sockFd == sockFd ) { connection.sockFd = -1; + signal_call( connection.panicSignalFd ); } pthread_mutex_unlock( &connection.sendMutex ); // As we're the only reader, it's safe to close the socket now @@ -295,7 +286,7 @@ static void* connection_backgroundThread(void *something UNUSED) // Woken up, see what we have to do const bool panic = connection.sockFd == -1; // Check alt servers - if ( panic || now < nextRttCheck ) { + if ( panic || now >= nextRttCheck ) { probeAltServers(); if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) { nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull; @@ -304,7 +295,7 @@ static void* connection_backgroundThread(void *something UNUSED) } } // Send keepalive packet - if ( now < nextKeepalive ) { + if ( now >= nextKeepalive ) { pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { logadd( LOG_DEBUG1, "Sending keepalive..." ); @@ -317,6 +308,7 @@ static void* connection_backgroundThread(void *something UNUSED) if ( (size_t)ret != sizeof request ) { shutdown( connection.sockFd, SHUT_RDWR ); connection.sockFd = -1; + nextRttCheck = now; } } pthread_mutex_unlock( &connection.sendMutex ); @@ -340,6 +332,9 @@ static void probeAltServers() char *remoteName; const bool panic = connection.sockFd == -1; + if ( panic ) { + logadd( LOG_DEBUG1, "C'est la panique, panique!" ); + } for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { alt_server_t * const srv = &altservers[altIndex]; if ( srv->host.type == 0 ) @@ -354,6 +349,7 @@ static void probeAltServers() srv->rttIndex += 1; } // Probe + logadd( LOG_DEBUG1, "Trying server %d\n", altIndex ); const uint64_t start = nowMicro(); int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); if ( sock == -1 ) { @@ -361,9 +357,11 @@ static void probeAltServers() continue; } if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { + logadd( LOG_DEBUG1, "-> select fail" ); goto fail; } if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) { + logadd( LOG_DEBUG1, "<- select fail" ); goto fail; } if ( remoteProto < MIN_SUPPORTED_SERVER || remoteProto > PROTOCOL_VERSION ) { @@ -377,10 +375,13 @@ static void probeAltServers() goto fail; } if ( !dnbd3_get_block( sock, 0, RTT_BLOCK_SIZE, 0 ) ) { + logadd( LOG_DEBUG1, "-> block request fail" ); goto fail; } - if ( !dnbd3_get_reply( sock, &reply ) || reply.size != RTT_BLOCK_SIZE - || !throwDataAway( sock, RTT_BLOCK_SIZE ) ) { + int a = 111, b = 111; + if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != RTT_BLOCK_SIZE + || !(b = throwDataAway( sock, RTT_BLOCK_SIZE )) ) { + logadd( LOG_DEBUG1, "<- block paxload fail %d %d %d", a, (int)reply.size, b ); goto fail; } // Yay, success @@ -421,9 +422,11 @@ fail:; if ( bestIndex != -1 && ( currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1500 ) ) { + logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt ); switchConnection( bestSock, &altservers[bestIndex] ); - } else { + } else if ( bestIndex != -1 ) { // No switch + logadd( LOG_DEBUG1, "Current: %dµs, best: %dµs. Will not switch.", currentRtt, altservers[bestIndex].rtt ); close( bestSock ); } } @@ -482,12 +485,13 @@ static void switchConnection(int sockFd, alt_server_t *srv) static bool throwDataAway(int sockFd, uint32_t amount) { - uint32_t done = 0; + size_t done = 0; char tempBuffer[SHORTBUF]; while ( done < amount ) { - const ssize_t ret = recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), MSG_NOSIGNAL ); - if ( ret == 0 || ( ret < 0 && ret != EINTR ) ) + const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) ); + if ( ret <= 0 ) return false; + done += (size_t)ret; } return true; } -- cgit v1.2.3-55-g7522