From 7b51c287a60d2f202fb131eeed9d1bf19b65a7a3 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 24 Nov 2015 12:30:46 +0100 Subject: [FUSE] Mid-refactoring, does not compile --- src/fuse/connection.c | 245 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 220 insertions(+), 25 deletions(-) (limited to 'src/fuse/connection.c') diff --git a/src/fuse/connection.c b/src/fuse/connection.c index 039c532..cdfc1df 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -3,21 +3,28 @@ #include "../config.h" #include "../shared/protocol.h" #include "../shared/signal.h" +#include "../shared/sockhelper.h" #include #include #include #include +#include +#include /* Constants */ static const size_t SHORTBUF = 100; #define MAX_ALTS (8) +#define MAX_HOSTS_PER_ADDRESS (2) +static const int MAX_CONSECUTIVE_FAILURES = 16; +#define RTT_COUNT (4) /* Module variables */ // Init guard static bool initDone = false; static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; +static bool keepRunning = true; // List of pending requests static struct { @@ -31,14 +38,23 @@ static struct { char *name; uint16_t rid; uint64_t size; +} image; + +static struct { int sockFd; pthread_mutex_t sendMutex; pthread_t receiveThread; -} image; + int panicSignalFd; + bool panicMode; +} connection; // Known alt servers static struct _alt_server { - + dnbd3_host_t host; + int consecutiveFails; + int rtt; + int rtts[RTT_COUNT]; + int rttIndex; } altservers[MAX_ALTS]; typedef struct _alt_server alt_server_t; @@ -47,22 +63,28 @@ typedef struct _alt_server alt_server_t; static void* connection_receiveThreadMain(void *sock); +static void probeAltServers(); static bool throwDataAway(int sockFd, uint32_t amount); static void enqueueRequest(dnbd3_async_t *request); static dnbd3_async_t* removeRequest(dnbd3_async_t *request); +static uint64_t nowMilli(); +static uint64_t nowMicro(); bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid) { int sock = -1; char host[SHORTBUF]; - const char *current, *end; serialized_buffer_t buffer; uint16_t remoteVersion, remoteRid; char *remoteName; uint64_t remoteSize; pthread_mutex_lock( &mutexInit ); - if ( !initDone ) { + if ( !initDone && keepRunning ) { + dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; + const char *current, *end; + int altIndex = 0; + memset( altservers, 0, sizeof altservers ); current = hosts; do { // Get next host from string @@ -71,9 +93,23 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); if ( len > SHORTBUF ) len = SHORTBUF; snprintf( host, len, "%s", current ); + int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); + for ( int i = 0; i < newHosts; ++i ) { + if ( altIndex >= MAX_ALTS ) + break; + altservers[altIndex].host = tempHosts[i]; + altIndex += 1; + } current = end + 1; + } while ( end != NULL && altIndex < MAX_ALTS ); + printf( "Got %d servers from init call\n", altIndex ); + // Connect + for ( int i = 0; i < altIndex; ++i ) { + if ( altservers[i].host.type == 0 ) + continue; // Try to connect - sock = connect_to_server( host, PORT ); // TODO: Parse port from host + sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 2000 ); // TODO timeout... + printf( "Got socket %d\n", sock ); if ( sock != -1 && dnbd3_select_image( sock, lowerImage, rid, 0 ) && dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) && ( rid == 0 || rid == remoteRid ) ) { @@ -87,16 +123,18 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r close( sock ); sock = -1; } - // TODO: Add to alt list - } while ( end != NULL ); + } if ( sock != -1 ) { - if ( pthread_mutex_init( &image.sendMutex, NULL ) != 0 + printf( "Initializing stuff\n" ); + if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 - || pthread_create( &image.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) { + || pthread_create( &connection.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) { close( sock ); sock = -1; } else { - image.sockFd = sock; + connection.sockFd = sock; + connection.panicMode = false; + connection.panicSignalFd = signal_new(); requests.head = NULL; requests.tail = NULL; } @@ -111,31 +149,42 @@ bool connection_read(dnbd3_async_t *request) { if (!initDone) return false; enqueueRequest( request ); - pthread_mutex_lock( &image.sendMutex ); - if ( image.sockFd != -1 ) { - while ( !dnbd3_get_block( image.sockFd, request->offset, request->length, (uint64_t)request ) ) { - shutdown( image.sockFd, SHUT_RDWR ); - image.sockFd = -1; + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + while ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request ) ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; // TODO reconnect! - pthread_mutex_unlock( &image.sendMutex ); + pthread_mutex_unlock( &connection.sendMutex ); return false; } } - pthread_mutex_unlock( &image.sendMutex ); + pthread_mutex_unlock( &connection.sendMutex ); return true; } void connection_close() { - // + pthread_mutex_lock( &mutexInit ); + keepRunning = false; + if ( !initDone ) { + pthread_mutex_unlock( &mutexInit ); + return; + } + pthread_mutex_unlock( &mutexInit ); + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + shutdown( connection.sockFd, SHUT_RDWR ); + } + pthread_mutex_unlock( &connection.sendMutex ); } static void* connection_receiveThreadMain(void *sockPtr) { int sockFd = (int)(size_t)sockPtr; dnbd3_reply_t reply; - for ( ;; ) { - if ( !dnbd3_get_reply( image.sockFd, &reply ) ) + while ( keepRunning ) { + if ( !dnbd3_get_reply( connection.sockFd, &reply ) ) goto fail; // TODO: Ignoring anything but block replies for now; handle the others if ( reply.cmd != CMD_GET_BLOCK ) { @@ -167,26 +216,152 @@ static void* connection_receiveThreadMain(void *sockPtr) } fail:; // Make sure noone is trying to use the socket for sending by locking, - pthread_mutex_lock( &image.sendMutex ); + pthread_mutex_lock( &connection.sendMutex ); // then just set the fd to -1, but only if it's the same fd as ours, // as someone could have established a new connection already - if ( image.sockFd == sockFd ) { - image.sockFd = -1; + if ( connection.sockFd == sockFd ) { + connection.sockFd = -1; } - pthread_mutex_unlock( &image.sendMutex ); + pthread_mutex_unlock( &connection.sendMutex ); // As we're the only reader, it's safe to close the socket now close( sockFd ); return NULL; } +static void* connection_backgroundThread(void *something UNUSED) +{ + uint64_t nextKeepalive = 0; + uint64_t nextRttCheck = 0; + const uint64_t startupTime = nowMilli(); + + while ( keepRunning ) { + const uint64_t now = nowMilli(); + if ( now < nextKeepalive && now < nextRttCheck ) { + int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now ); + int waitRes = signal_wait( connection.panicSignalFd, waitTime ); + if ( waitRes == SIGNAL_ERROR ) { + printf( "Error waiting on signal in background thread! Errno = %d\n", errno ); + } + } + // Woken up, see what we have to do + // Check alt servers + if ( connection.panicMode || now < nextRttCheck ) { + probeAltServers(); + if ( connection.panicMode || startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) { + nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull; + } else { + nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull; + } + } + // Send keepalive packet + if ( now < nextKeepalive ) { + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + printf( "Sending keepalive...\n" ); + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + request.cmd = CMD_KEEPALIVE; + request.size = 0; + fixup_request( request ); + ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 ); + if ( (size_t)ret != sizeof request ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + connection.panicMode = true; + } + } + pthread_mutex_unlock( &connection.sendMutex ); + nextKeepalive = now + TIMER_INTERVAL_KEEPALIVE_PACKET * 1000ull; + } + } + return NULL; +} + // Private quick helpers +static void probeAltServers() +{ + serialized_buffer_t buffer; + dnbd3_request_t request; + dnbd3_reply_t reply; + int bestIndex = -1; + int bestSock = -1; + uint16_t remoteRid, remoteProto; + uint64_t remoteSize; + char *remoteName; + + for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { + alt_server_t * const srv = &altservers[altIndex]; + if ( srv->host.type == 0 ) + continue; + if ( !connection.panicMode && srv->consecutiveFails > MAX_CONSECUTIVE_FAILURES + && srv->consecutiveFails % ( srv->consecutiveFails / 8 ) != 0 ) { + continue; + } + if (srv->rttIndex >= RTT_COUNT) { + srv->rttIndex = 0; + } else { + srv->rttIndex += 1; + } + // Probe + const uint64_t start = nowMicro(); + int sock = sock_connect( &srv->host, connection.panicMode ? 1000 : 333, 1000 ); + if ( sock == -1 ) { + printf( "Could not crrate socket for probing. errno = %d\n", errno ); + continue; + } + if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { + goto fail; + } + if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) { + goto fail; + } + if ( remoteProto < MIN_SUPPORTED_SERVER || remoteProto > PROTOCOL_VERSION ) { + printf( "Unsupported remote version\n" ); + goto fail; + } + if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) { + printf( "Remote rid or name mismatch\n" ); + goto fail; + } + if ( !dnbd3_get_block( sock, 0, RTT_BLOCK_SIZE, 0 ) ) { + goto fail; + } + if ( !dnbd3_get_reply( sock, &reply ) || reply.size != RTT_BLOCK_SIZE + || !throwDataAway( sock, RTT_BLOCK_SIZE ) ) { + goto fail; + } + // Yay, success + const uint64_t end = nowMicro(); + srv->consecutiveFails = 0; + srv->rtts[srv->rttIndex] = (int)(end - start); + srv->rtt = 0; + for ( int i = 0; i < RTT_COUNT; ++i ) { + srv->rtt += srv->rtts[i]; + } + srv->rtt /= RTT_COUNT; + if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) { + bestIndex = altIndex; + close( bestSock ); + bestSock = sock; + } else { + close( sock ); + } + continue; // XXX: Remember current server, compare to it, update value on change, +fail:; + close( sock ); + srv->rtts[srv->rttIndex] = RTT_UNREACHABLE; + srv->consecutiveFails += 1; + } +} + static bool throwDataAway(int sockFd, uint32_t amount) { uint32_t done = 0; char tempBuffer[SHORTBUF]; while ( done < amount ) { - if ( recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), 0 ) <= 0 ) + const ssize_t ret = recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), MSG_NOSIGNAL ); + if ( ret == 0 || ( ret < 0 && ret != EINTR ) ) return false; } return true; @@ -227,3 +402,23 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request) pthread_spin_unlock( &requests.lock ); return iterator; } + +static uint64_t nowMilli() +{ + struct timespec ts; + if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { + printf( "clock_gettime() failed. Errno: %d\n", errno ); + return 0; + } + return ( ts.tv_sec * 1000ull ) + ( ts.tv_nsec / 1000000ull ); +} + +static uint64_t nowMicro() +{ + struct timespec ts; + if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { + printf( "clock_gettime() failed. Errno: %d\n", errno ); + return 0; + } + return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull ); +} -- cgit v1.2.3-55-g7522