From b455beb789b89152bdfc3b769a87229b0cb7df26 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 2 Dec 2015 12:25:27 +0100 Subject: [FUSE] Request alt servers from connected server --- src/fuse/connection.c | 121 +++++++++++++++++++++++++++++++++++++++----------- src/fuse/helper.h | 5 +++ 2 files changed, 100 insertions(+), 26 deletions(-) (limited to 'src/fuse') diff --git a/src/fuse/connection.c b/src/fuse/connection.c index 0c0374e..43e2907 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -51,14 +51,16 @@ static struct { } connection; // Known alt servers -static struct _alt_server { +typedef struct _alt_server { dnbd3_host_t host; int consecutiveFails; int rtt; int rtts[RTT_COUNT]; int rttIndex; -} altservers[MAX_ALTS]; -typedef struct _alt_server alt_server_t; +} alt_server_t; +alt_server_t altservers[MAX_ALTS]; +alt_server_t newservers[MAX_ALTS]; +pthread_spinlock_t altLock; /* Static methods */ @@ -66,16 +68,14 @@ typedef struct _alt_server alt_server_t; static void* connection_receiveThreadMain(void *sock); static void* connection_backgroundThread(void *something); +static void addAltServers(); static void probeAltServers(); static void switchConnection(int sockFd, alt_server_t *srv); +static void requestAltServers(); static bool throwDataAway(int sockFd, uint32_t amount); -//static void enqueueRequest(dnbd3_async_t *request); -//static dnbd3_async_t* removeRequest(dnbd3_async_t *request); -static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line); -static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line); -#define enqueueRequest(req) __enqueueRequest(req, __FILE__, __LINE__) -#define removeRequest(req) __removeRequest(req, __FILE__, __LINE__) +static void enqueueRequest(dnbd3_async_t *request); +static dnbd3_async_t* removeRequest(dnbd3_async_t *request); static uint64_t nowMilli(); static uint64_t nowMicro(); @@ -118,10 +118,9 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r if ( altservers[i].host.type == 0 ) continue; // Try to connect - sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 2000 ); - logadd( LOG_DEBUG1, "Got socket %d", sock ); + sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 1000 ); if ( sock == -1 ) { - // + logadd( LOG_ERROR, "Could not connect to host" ); } else if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) { logadd( LOG_ERROR, "Could not send select image" ); } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { @@ -138,6 +137,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r connection.sockFd = sock; requests.head = NULL; requests.tail = NULL; + requestAltServers(); break; } // Failed @@ -151,7 +151,9 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r pthread_t thread; logadd( LOG_DEBUG1, "Initializing stuff" ); if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 - || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) { + || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 + || pthread_spin_init( &altLock, PTHREAD_PROCESS_PRIVATE ) != 0 ) { + logadd( LOG_ERROR, "Mutex or spinlock init failure" ); close( sock ); sock = -1; } else { @@ -227,14 +229,9 @@ static void* connection_receiveThreadMain(void *sockPtr) logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret ); goto fail; } - // TODO: Ignoring anything but block replies for now; handle the others - if ( reply.cmd != CMD_GET_BLOCK ) { - if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { - logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd ); - goto fail; - } - } else { - // get block reply. find matching request + + if ( reply.cmd == CMD_GET_BLOCK ) { + // Get block reply. find matching request dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); if ( request == NULL ) { logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" ); @@ -255,6 +252,26 @@ static void* connection_receiveThreadMain(void *sockPtr) request->finished = true; signal_call( request->signalFd ); } + } else if ( reply.cmd == CMD_GET_SERVERS ) { + // List of known alt servers + dnbd3_server_entry_t entries[MAX_ALTS]; + const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS ); + const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count; + if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize + || !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) { + logadd( LOG_DEBUG1, "Error receiving list of alt servers." ); + goto fail; + } + logadd( LOG_DEBUG1, "Server sent %d alts", count ); + pthread_spin_lock( &altLock ); + memcpy( newservers, entries, relevantSize ); + pthread_spin_unlock( &altLock ); + } else { + // TODO: Handle the others? + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { + logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd ); + goto fail; + } } } logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" ); @@ -292,6 +309,7 @@ static void* connection_backgroundThread(void *something UNUSED) const bool panic = connection.sockFd == -1; // Check alt servers if ( panic || now >= nextRttCheck ) { + addAltServers(); probeAltServers(); if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) { nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull; @@ -324,6 +342,41 @@ static void* connection_backgroundThread(void *something UNUSED) // Private quick helpers +static void addAltServers() +{ + pthread_spin_lock( &altLock ); + for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) { + if ( newservers[nIdx].host.type == 0 ) + continue; + // Got a new alt server, see if it's already known + for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { + if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) + goto skip_server; + } + // Not known yet, add + int slot = -1; + for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { + if ( altservers[eIdx].host.type == 0 ) { + slot = eIdx; + break; + } + if ( altservers[eIdx].consecutiveFails > MAX_CONSECUTIVE_FAILURES + && slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) { + slot = eIdx; + } + } + if ( slot != -1 ) { + altservers[slot].consecutiveFails = 0; + altservers[slot].rtts[0] = RTT_UNREACHABLE; + altservers[slot].rttIndex = 1; + altservers[slot].host = newservers[nIdx].host; + } +skip_server:; + } + memset( newservers, 0, sizeof(newservers) ); + pthread_spin_unlock( &altLock ); +} + static void probeAltServers() { serialized_buffer_t buffer; @@ -336,9 +389,6 @@ static void probeAltServers() char *remoteName; const bool panic = connection.sockFd == -1; - if ( panic ) { - logadd( LOG_DEBUG1, "C'est la panique, panique!" ); - } for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { alt_server_t * const srv = &altservers[altIndex]; if ( srv->host.type == 0 ) @@ -459,6 +509,7 @@ static void switchConnection(int sockFd, alt_server_t *srv) } else { connection.sockFd = -1; } + requestAltServers(); pthread_mutex_unlock( &connection.sendMutex ); if ( ret != 0 ) { close( sockFd ); @@ -489,6 +540,24 @@ static void switchConnection(int sockFd, alt_server_t *srv) } } +/** + * Does not lock, so get the sendMutex first! + */ +static void requestAltServers() +{ + if ( connection.sockFd == -1 ) + return; + dnbd3_request_t request = { 0 }; + request.magic = dnbd3_packet_magic; + request.cmd = CMD_GET_SERVERS; + fixup_request( request ); + if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) { + logadd( LOG_WARNING, "Connection failed while requesting alt server list" ); + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + } +} + static bool throwDataAway(int sockFd, uint32_t amount) { size_t done = 0; @@ -502,7 +571,7 @@ static bool throwDataAway(int sockFd, uint32_t amount) return true; } -static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line) +static void enqueueRequest(dnbd3_async_t *request) { request->next = NULL; request->finished = false; @@ -518,7 +587,7 @@ static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line) pthread_spin_unlock( &requests.lock ); } -static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line) +static dnbd3_async_t* removeRequest(dnbd3_async_t *request) { pthread_spin_lock( &requests.lock ); //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line ); diff --git a/src/fuse/helper.h b/src/fuse/helper.h index 0b98ded..c0c70c7 100644 --- a/src/fuse/helper.h +++ b/src/fuse/helper.h @@ -27,4 +27,9 @@ static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_h return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) )); } +static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b) +{ + return (a->type == b->type) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) )); +} + #endif -- cgit v1.2.3-55-g7522