summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
authorSimon Rettberg2015-11-24 12:30:46 +0100
committerSimon Rettberg2015-11-24 12:30:46 +0100
commit7b51c287a60d2f202fb131eeed9d1bf19b65a7a3 (patch)
tree031573c708b50aeafe9a6fe6f0992b3ae6456e7c /src/fuse
parent[SERVER] Fix race condition potentially leading to use after release (diff)
downloaddnbd3-7b51c287a60d2f202fb131eeed9d1bf19b65a7a3.tar.gz
dnbd3-7b51c287a60d2f202fb131eeed9d1bf19b65a7a3.tar.xz
dnbd3-7b51c287a60d2f202fb131eeed9d1bf19b65a7a3.zip
[FUSE] Mid-refactoring, does not compile
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/connection.c245
-rw-r--r--src/fuse/helper.c51
-rw-r--r--src/fuse/helper.h8
-rw-r--r--src/fuse/log.h43
-rw-r--r--src/fuse/main.c2
5 files changed, 272 insertions, 77 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 039c532..cdfc1df 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -3,21 +3,28 @@
#include "../config.h"
#include "../shared/protocol.h"
#include "../shared/signal.h"
+#include "../shared/sockhelper.h"
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
+#include <errno.h>
+#include <time.h>
/* Constants */
static const size_t SHORTBUF = 100;
#define MAX_ALTS (8)
+#define MAX_HOSTS_PER_ADDRESS (2)
+static const int MAX_CONSECUTIVE_FAILURES = 16;
+#define RTT_COUNT (4)
/* Module variables */
// Init guard
static bool initDone = false;
static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
+static bool keepRunning = true;
// List of pending requests
static struct {
@@ -31,14 +38,23 @@ static struct {
char *name;
uint16_t rid;
uint64_t size;
+} image;
+
+static struct {
int sockFd;
pthread_mutex_t sendMutex;
pthread_t receiveThread;
-} image;
+ int panicSignalFd;
+ bool panicMode;
+} connection;
// Known alt servers
static struct _alt_server {
-
+ dnbd3_host_t host;
+ int consecutiveFails;
+ int rtt;
+ int rtts[RTT_COUNT];
+ int rttIndex;
} altservers[MAX_ALTS];
typedef struct _alt_server alt_server_t;
@@ -47,22 +63,28 @@ typedef struct _alt_server alt_server_t;
static void* connection_receiveThreadMain(void *sock);
+static void probeAltServers();
static bool throwDataAway(int sockFd, uint32_t amount);
static void enqueueRequest(dnbd3_async_t *request);
static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
+static uint64_t nowMilli();
+static uint64_t nowMicro();
bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid)
{
int sock = -1;
char host[SHORTBUF];
- const char *current, *end;
serialized_buffer_t buffer;
uint16_t remoteVersion, remoteRid;
char *remoteName;
uint64_t remoteSize;
pthread_mutex_lock( &mutexInit );
- if ( !initDone ) {
+ if ( !initDone && keepRunning ) {
+ dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
+ const char *current, *end;
+ int altIndex = 0;
+ memset( altservers, 0, sizeof altservers );
current = hosts;
do {
// Get next host from string
@@ -71,9 +93,23 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1);
if ( len > SHORTBUF ) len = SHORTBUF;
snprintf( host, len, "%s", current );
+ int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
+ for ( int i = 0; i < newHosts; ++i ) {
+ if ( altIndex >= MAX_ALTS )
+ break;
+ altservers[altIndex].host = tempHosts[i];
+ altIndex += 1;
+ }
current = end + 1;
+ } while ( end != NULL && altIndex < MAX_ALTS );
+ printf( "Got %d servers from init call\n", altIndex );
+ // Connect
+ for ( int i = 0; i < altIndex; ++i ) {
+ if ( altservers[i].host.type == 0 )
+ continue;
// Try to connect
- sock = connect_to_server( host, PORT ); // TODO: Parse port from host
+ sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 2000 ); // TODO timeout...
+ printf( "Got socket %d\n", sock );
if ( sock != -1 && dnbd3_select_image( sock, lowerImage, rid, 0 )
&& dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize )
&& ( rid == 0 || rid == remoteRid ) ) {
@@ -87,16 +123,18 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
close( sock );
sock = -1;
}
- // TODO: Add to alt list
- } while ( end != NULL );
+ }
if ( sock != -1 ) {
- if ( pthread_mutex_init( &image.sendMutex, NULL ) != 0
+ printf( "Initializing stuff\n" );
+ if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
|| pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0
- || pthread_create( &image.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) {
+ || pthread_create( &connection.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) {
close( sock );
sock = -1;
} else {
- image.sockFd = sock;
+ connection.sockFd = sock;
+ connection.panicMode = false;
+ connection.panicSignalFd = signal_new();
requests.head = NULL;
requests.tail = NULL;
}
@@ -111,31 +149,42 @@ bool connection_read(dnbd3_async_t *request)
{
if (!initDone) return false;
enqueueRequest( request );
- pthread_mutex_lock( &image.sendMutex );
- if ( image.sockFd != -1 ) {
- while ( !dnbd3_get_block( image.sockFd, request->offset, request->length, (uint64_t)request ) ) {
- shutdown( image.sockFd, SHUT_RDWR );
- image.sockFd = -1;
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ while ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request ) ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
// TODO reconnect!
- pthread_mutex_unlock( &image.sendMutex );
+ pthread_mutex_unlock( &connection.sendMutex );
return false;
}
}
- pthread_mutex_unlock( &image.sendMutex );
+ pthread_mutex_unlock( &connection.sendMutex );
return true;
}
void connection_close()
{
- //
+ pthread_mutex_lock( &mutexInit );
+ keepRunning = false;
+ if ( !initDone ) {
+ pthread_mutex_unlock( &mutexInit );
+ return;
+ }
+ pthread_mutex_unlock( &mutexInit );
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
}
static void* connection_receiveThreadMain(void *sockPtr)
{
int sockFd = (int)(size_t)sockPtr;
dnbd3_reply_t reply;
- for ( ;; ) {
- if ( !dnbd3_get_reply( image.sockFd, &reply ) )
+ while ( keepRunning ) {
+ if ( !dnbd3_get_reply( connection.sockFd, &reply ) )
goto fail;
// TODO: Ignoring anything but block replies for now; handle the others
if ( reply.cmd != CMD_GET_BLOCK ) {
@@ -167,26 +216,152 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
fail:;
// Make sure noone is trying to use the socket for sending by locking,
- pthread_mutex_lock( &image.sendMutex );
+ pthread_mutex_lock( &connection.sendMutex );
// then just set the fd to -1, but only if it's the same fd as ours,
// as someone could have established a new connection already
- if ( image.sockFd == sockFd ) {
- image.sockFd = -1;
+ if ( connection.sockFd == sockFd ) {
+ connection.sockFd = -1;
}
- pthread_mutex_unlock( &image.sendMutex );
+ pthread_mutex_unlock( &connection.sendMutex );
// As we're the only reader, it's safe to close the socket now
close( sockFd );
return NULL;
}
+static void* connection_backgroundThread(void *something UNUSED)
+{
+ uint64_t nextKeepalive = 0;
+ uint64_t nextRttCheck = 0;
+ const uint64_t startupTime = nowMilli();
+
+ while ( keepRunning ) {
+ const uint64_t now = nowMilli();
+ if ( now < nextKeepalive && now < nextRttCheck ) {
+ int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now );
+ int waitRes = signal_wait( connection.panicSignalFd, waitTime );
+ if ( waitRes == SIGNAL_ERROR ) {
+ printf( "Error waiting on signal in background thread! Errno = %d\n", errno );
+ }
+ }
+ // Woken up, see what we have to do
+ // Check alt servers
+ if ( connection.panicMode || now < nextRttCheck ) {
+ probeAltServers();
+ if ( connection.panicMode || startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
+ nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull;
+ } else {
+ nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull;
+ }
+ }
+ // Send keepalive packet
+ if ( now < nextKeepalive ) {
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ printf( "Sending keepalive...\n" );
+ dnbd3_request_t request;
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_KEEPALIVE;
+ request.size = 0;
+ fixup_request( request );
+ ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 );
+ if ( (size_t)ret != sizeof request ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ connection.panicMode = true;
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ nextKeepalive = now + TIMER_INTERVAL_KEEPALIVE_PACKET * 1000ull;
+ }
+ }
+ return NULL;
+}
+
// Private quick helpers
+static void probeAltServers()
+{
+ serialized_buffer_t buffer;
+ dnbd3_request_t request;
+ dnbd3_reply_t reply;
+ int bestIndex = -1;
+ int bestSock = -1;
+ uint16_t remoteRid, remoteProto;
+ uint64_t remoteSize;
+ char *remoteName;
+
+ for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
+ alt_server_t * const srv = &altservers[altIndex];
+ if ( srv->host.type == 0 )
+ continue;
+ if ( !connection.panicMode && srv->consecutiveFails > MAX_CONSECUTIVE_FAILURES
+ && srv->consecutiveFails % ( srv->consecutiveFails / 8 ) != 0 ) {
+ continue;
+ }
+ if (srv->rttIndex >= RTT_COUNT) {
+ srv->rttIndex = 0;
+ } else {
+ srv->rttIndex += 1;
+ }
+ // Probe
+ const uint64_t start = nowMicro();
+ int sock = sock_connect( &srv->host, connection.panicMode ? 1000 : 333, 1000 );
+ if ( sock == -1 ) {
+ printf( "Could not crrate socket for probing. errno = %d\n", errno );
+ continue;
+ }
+ if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
+ goto fail;
+ }
+ if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
+ goto fail;
+ }
+ if ( remoteProto < MIN_SUPPORTED_SERVER || remoteProto > PROTOCOL_VERSION ) {
+ printf( "Unsupported remote version\n" );
+ goto fail;
+ }
+ if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
+ printf( "Remote rid or name mismatch\n" );
+ goto fail;
+ }
+ if ( !dnbd3_get_block( sock, 0, RTT_BLOCK_SIZE, 0 ) ) {
+ goto fail;
+ }
+ if ( !dnbd3_get_reply( sock, &reply ) || reply.size != RTT_BLOCK_SIZE
+ || !throwDataAway( sock, RTT_BLOCK_SIZE ) ) {
+ goto fail;
+ }
+ // Yay, success
+ const uint64_t end = nowMicro();
+ srv->consecutiveFails = 0;
+ srv->rtts[srv->rttIndex] = (int)(end - start);
+ srv->rtt = 0;
+ for ( int i = 0; i < RTT_COUNT; ++i ) {
+ srv->rtt += srv->rtts[i];
+ }
+ srv->rtt /= RTT_COUNT;
+ if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) {
+ bestIndex = altIndex;
+ close( bestSock );
+ bestSock = sock;
+ } else {
+ close( sock );
+ }
+ continue; // XXX: Remember current server, compare to it, update value on change,
+fail:;
+ close( sock );
+ srv->rtts[srv->rttIndex] = RTT_UNREACHABLE;
+ srv->consecutiveFails += 1;
+ }
+}
+
static bool throwDataAway(int sockFd, uint32_t amount)
{
uint32_t done = 0;
char tempBuffer[SHORTBUF];
while ( done < amount ) {
- if ( recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), 0 ) <= 0 )
+ const ssize_t ret = recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), MSG_NOSIGNAL );
+ if ( ret == 0 || ( ret < 0 && ret != EINTR ) )
return false;
}
return true;
@@ -227,3 +402,23 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
pthread_spin_unlock( &requests.lock );
return iterator;
}
+
+static uint64_t nowMilli()
+{
+ struct timespec ts;
+ if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) {
+ printf( "clock_gettime() failed. Errno: %d\n", errno );
+ return 0;
+ }
+ return ( ts.tv_sec * 1000ull ) + ( ts.tv_nsec / 1000000ull );
+}
+
+static uint64_t nowMicro()
+{
+ struct timespec ts;
+ if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) {
+ printf( "clock_gettime() failed. Errno: %d\n", errno );
+ return 0;
+ }
+ return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull );
+}
diff --git a/src/fuse/helper.c b/src/fuse/helper.c
index 65644f8..6e46352 100644
--- a/src/fuse/helper.c
+++ b/src/fuse/helper.c
@@ -47,54 +47,3 @@ bool sock_printable( struct sockaddr *addr, socklen_t addrLen, char *output, int
}
return ret == 0;
}
-
-// TODO: Pretty much same as in server/*
-int connect_to_server( char *server_address, int port )
-{
- const int on = 1;
- int sock = -1;
- struct addrinfo hints, *res, *ptr;
- char portStr[6];
-
- // Set hints for local addresses.
- memset( &hints, 0, sizeof( hints ) );
- hints.ai_flags = AI_PASSIVE;
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- snprintf( portStr, sizeof portStr, "%d", port );
- if ( getaddrinfo( server_address, portStr, &hints, &res ) != 0 || res == NULL ) {
- return false;
- }
- // Attempt to bind to all of the addresses as long as there's room in the poll list
- for ( ptr = res; ptr != NULL; ptr = ptr->ai_next ) {
- char bla[100];
- if ( !sock_printable( ( struct sockaddr * ) ptr->ai_addr, ptr->ai_addrlen, bla, 100 ) ) {
- snprintf( bla, 100, "[invalid]" );
- }
- printf( "Trying to connect to %s ", bla );
- sock = socket( ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol );
- if ( sock < 0 ) {
- printf( "...cannot create socket, errno=%d\n", errno );
- sock = -1;
- continue;
- }
- setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof( on ) );
- if ( ptr->ai_family == PF_INET6 ) {
- setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof( on ) );
- }
- if ( connect( sock, ptr->ai_addr, ptr->ai_addrlen ) < 0 ) {
- // if ( bind( sock, ptr->ai_addr, ptr->ai_addrlen ) == -1 ) {
- printf( "...socket Error, errno=%d\n", errno );
- close( sock );
- sock = -1;
- continue;
- } else {
- printf( "... connecting successful!\n" );
- break;
- }
- }
-
- freeaddrinfo( res );
- return sock;
-}
-
diff --git a/src/fuse/helper.h b/src/fuse/helper.h
index bbba44c..35cdc8a 100644
--- a/src/fuse/helper.h
+++ b/src/fuse/helper.h
@@ -4,6 +4,9 @@
#include <netdb.h>
#include <stdbool.h>
#include <stdint.h>
+#include <string.h>
+
+#include "../types.h"
typedef struct log_info {
@@ -21,4 +24,9 @@ bool sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int
int connect_to_server(char *server_adress, int port);
+static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
+{
+ return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) ));
+}
+
#endif
diff --git a/src/fuse/log.h b/src/fuse/log.h
new file mode 100644
index 0000000..e429861
--- /dev/null
+++ b/src/fuse/log.h
@@ -0,0 +1,43 @@
+#ifndef LOG_H_
+#define LOG_H_
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdarg.h>
+
+typedef unsigned int logmask_t;
+#define LOG_ERROR ((logmask_t)1) // Fatal error, server will terminate
+#define LOG_WARNING ((logmask_t)2) // Major issue, something is broken but keep running
+#define LOG_MINOR ((logmask_t)4) // Minor issue, more of a hickup than serious problem
+#define LOG_INFO ((logmask_t)8) // Informational message
+#define LOG_DEBUG1 ((logmask_t)16) // Debug information, use this for non-spammy stuff
+#define LOG_DEBUG2 ((logmask_t)32) // Use this for debug messages that will show up a lot
+
+//void log_setFileMask(logmask_t mask);
+
+//void log_setConsoleMask(logmask_t mask);
+
+/**
+ * Open or reopen the log file. If path is NULL and the
+ * function was called with a path before, the same path
+ * will be used again.
+ */
+//bool log_openLogFile(const char *path);
+
+/**
+ * Add a line to the log
+ */
+void logadd(const logmask_t mask, const char *text, ...)
+{
+ va_list args;
+ va_start( args, text );
+ vprintf( text, args );
+ va_end( args );
+}
+
+/**
+ * Return last size bytes of log.
+ */
+//bool log_fetch(char *buffer, int size);
+
+#endif /* LOG_H_ */
diff --git a/src/fuse/main.c b/src/fuse/main.c
index d6a4d98..7889023 100644
--- a/src/fuse/main.c
+++ b/src/fuse/main.c
@@ -58,7 +58,7 @@ static void dnbd3_connect()
if ( sock != -1 ) {
close( sock );
}
- sock = connect_to_server( server_address, portno );
+ sock = -1; // connect_to_server( server_address, portno );
if ( sock == -1 ) {
debugf( "[ERROR] Connection Error!\n" );