summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
authorSimon Rettberg2015-12-02 12:25:27 +0100
committerSimon Rettberg2015-12-02 12:25:27 +0100
commitb455beb789b89152bdfc3b769a87229b0cb7df26 (patch)
tree71402287732010e573c99403e35e318b5b5bffed /src/fuse
parent[FUSE] Clean up command line handling (diff)
downloaddnbd3-b455beb789b89152bdfc3b769a87229b0cb7df26.tar.gz
dnbd3-b455beb789b89152bdfc3b769a87229b0cb7df26.tar.xz
dnbd3-b455beb789b89152bdfc3b769a87229b0cb7df26.zip
[FUSE] Request alt servers from connected server
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/connection.c121
-rw-r--r--src/fuse/helper.h5
2 files changed, 100 insertions, 26 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 0c0374e..43e2907 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -51,14 +51,16 @@ static struct {
} connection;
// Known alt servers
-static struct _alt_server {
+typedef struct _alt_server {
dnbd3_host_t host;
int consecutiveFails;
int rtt;
int rtts[RTT_COUNT];
int rttIndex;
-} altservers[MAX_ALTS];
-typedef struct _alt_server alt_server_t;
+} alt_server_t;
+alt_server_t altservers[MAX_ALTS];
+alt_server_t newservers[MAX_ALTS];
+pthread_spinlock_t altLock;
/* Static methods */
@@ -66,16 +68,14 @@ typedef struct _alt_server alt_server_t;
static void* connection_receiveThreadMain(void *sock);
static void* connection_backgroundThread(void *something);
+static void addAltServers();
static void probeAltServers();
static void switchConnection(int sockFd, alt_server_t *srv);
+static void requestAltServers();
static bool throwDataAway(int sockFd, uint32_t amount);
-//static void enqueueRequest(dnbd3_async_t *request);
-//static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
-static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line);
-static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line);
-#define enqueueRequest(req) __enqueueRequest(req, __FILE__, __LINE__)
-#define removeRequest(req) __removeRequest(req, __FILE__, __LINE__)
+static void enqueueRequest(dnbd3_async_t *request);
+static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
static uint64_t nowMilli();
static uint64_t nowMicro();
@@ -118,10 +118,9 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
if ( altservers[i].host.type == 0 )
continue;
// Try to connect
- sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 2000 );
- logadd( LOG_DEBUG1, "Got socket %d", sock );
+ sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 1000 );
if ( sock == -1 ) {
- //
+ logadd( LOG_ERROR, "Could not connect to host" );
} else if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
logadd( LOG_ERROR, "Could not send select image" );
} else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
@@ -138,6 +137,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
connection.sockFd = sock;
requests.head = NULL;
requests.tail = NULL;
+ requestAltServers();
break;
}
// Failed
@@ -151,7 +151,9 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
pthread_t thread;
logadd( LOG_DEBUG1, "Initializing stuff" );
if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
- || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
+ || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0
+ || pthread_spin_init( &altLock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
+ logadd( LOG_ERROR, "Mutex or spinlock init failure" );
close( sock );
sock = -1;
} else {
@@ -227,14 +229,9 @@ static void* connection_receiveThreadMain(void *sockPtr)
logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
goto fail;
}
- // TODO: Ignoring anything but block replies for now; handle the others
- if ( reply.cmd != CMD_GET_BLOCK ) {
- if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
- logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd );
- goto fail;
- }
- } else {
- // get block reply. find matching request
+
+ if ( reply.cmd == CMD_GET_BLOCK ) {
+ // Get block reply. find matching request
dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
if ( request == NULL ) {
logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" );
@@ -255,6 +252,26 @@ static void* connection_receiveThreadMain(void *sockPtr)
request->finished = true;
signal_call( request->signalFd );
}
+ } else if ( reply.cmd == CMD_GET_SERVERS ) {
+ // List of known alt servers
+ dnbd3_server_entry_t entries[MAX_ALTS];
+ const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS );
+ const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count;
+ if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize
+ || !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) {
+ logadd( LOG_DEBUG1, "Error receiving list of alt servers." );
+ goto fail;
+ }
+ logadd( LOG_DEBUG1, "Server sent %d alts", count );
+ pthread_spin_lock( &altLock );
+ memcpy( newservers, entries, relevantSize );
+ pthread_spin_unlock( &altLock );
+ } else {
+ // TODO: Handle the others?
+ if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
+ logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd );
+ goto fail;
+ }
}
}
logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
@@ -292,6 +309,7 @@ static void* connection_backgroundThread(void *something UNUSED)
const bool panic = connection.sockFd == -1;
// Check alt servers
if ( panic || now >= nextRttCheck ) {
+ addAltServers();
probeAltServers();
if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull;
@@ -324,6 +342,41 @@ static void* connection_backgroundThread(void *something UNUSED)
// Private quick helpers
+static void addAltServers()
+{
+ pthread_spin_lock( &altLock );
+ for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) {
+ if ( newservers[nIdx].host.type == 0 )
+ continue;
+ // Got a new alt server, see if it's already known
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) )
+ goto skip_server;
+ }
+ // Not known yet, add
+ int slot = -1;
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( altservers[eIdx].host.type == 0 ) {
+ slot = eIdx;
+ break;
+ }
+ if ( altservers[eIdx].consecutiveFails > MAX_CONSECUTIVE_FAILURES
+ && slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) {
+ slot = eIdx;
+ }
+ }
+ if ( slot != -1 ) {
+ altservers[slot].consecutiveFails = 0;
+ altservers[slot].rtts[0] = RTT_UNREACHABLE;
+ altservers[slot].rttIndex = 1;
+ altservers[slot].host = newservers[nIdx].host;
+ }
+skip_server:;
+ }
+ memset( newservers, 0, sizeof(newservers) );
+ pthread_spin_unlock( &altLock );
+}
+
static void probeAltServers()
{
serialized_buffer_t buffer;
@@ -336,9 +389,6 @@ static void probeAltServers()
char *remoteName;
const bool panic = connection.sockFd == -1;
- if ( panic ) {
- logadd( LOG_DEBUG1, "C'est la panique, panique!" );
- }
for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
alt_server_t * const srv = &altservers[altIndex];
if ( srv->host.type == 0 )
@@ -459,6 +509,7 @@ static void switchConnection(int sockFd, alt_server_t *srv)
} else {
connection.sockFd = -1;
}
+ requestAltServers();
pthread_mutex_unlock( &connection.sendMutex );
if ( ret != 0 ) {
close( sockFd );
@@ -489,6 +540,24 @@ static void switchConnection(int sockFd, alt_server_t *srv)
}
}
+/**
+ * Does not lock, so get the sendMutex first!
+ */
+static void requestAltServers()
+{
+ if ( connection.sockFd == -1 )
+ return;
+ dnbd3_request_t request = { 0 };
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_GET_SERVERS;
+ fixup_request( request );
+ if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) {
+ logadd( LOG_WARNING, "Connection failed while requesting alt server list" );
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ }
+}
+
static bool throwDataAway(int sockFd, uint32_t amount)
{
size_t done = 0;
@@ -502,7 +571,7 @@ static bool throwDataAway(int sockFd, uint32_t amount)
return true;
}
-static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line)
+static void enqueueRequest(dnbd3_async_t *request)
{
request->next = NULL;
request->finished = false;
@@ -518,7 +587,7 @@ static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line)
pthread_spin_unlock( &requests.lock );
}
-static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line)
+static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
{
pthread_spin_lock( &requests.lock );
//logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line );
diff --git a/src/fuse/helper.h b/src/fuse/helper.h
index 0b98ded..c0c70c7 100644
--- a/src/fuse/helper.h
+++ b/src/fuse/helper.h
@@ -27,4 +27,9 @@ static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_h
return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) ));
}
+static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
+{
+ return (a->type == b->type) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) ));
+}
+
#endif