summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
authorFrederic Robra2019-06-25 17:03:28 +0200
committerFrederic Robra2019-06-25 17:03:28 +0200
commit43e57ce5e11e9052f5a7db66f2e8613f1784f919 (patch)
treec5e1372a160b2601f61b18d617b71799b06b02ae /src/fuse
downloaddnbd3-ng-43e57ce5e11e9052f5a7db66f2e8613f1784f919.tar.gz
dnbd3-ng-43e57ce5e11e9052f5a7db66f2e8613f1784f919.tar.xz
dnbd3-ng-43e57ce5e11e9052f5a7db66f2e8613f1784f919.zip
first version of dnbd3-ng
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/connection.c927
-rw-r--r--src/fuse/connection.h35
-rw-r--r--src/fuse/helper.c36
-rw-r--r--src/fuse/helper.h35
-rw-r--r--src/fuse/main.c420
-rw-r--r--src/fuse/serialize.c5
6 files changed, 1458 insertions, 0 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
new file mode 100644
index 0000000..fc9f05b
--- /dev/null
+++ b/src/fuse/connection.c
@@ -0,0 +1,927 @@
+#include "connection.h"
+#include "helper.h"
+#include "../clientconfig.h"
+#include "../shared/protocol.h"
+#include "../shared/fdsignal.h"
+#include "../shared/sockhelper.h"
+#include "../shared/log.h"
+
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <time.h>
+#include <inttypes.h>
+
+/* Constants */
+static const size_t SHORTBUF = 100;
+#define MAX_ALTS (16)
+#define MAX_ALTS_ACTIVE (5)
+#define MAX_HOSTS_PER_ADDRESS (2)
+// If a server wasn't reachable this many times, we slowly start skipping it on measurements
+static const int FAIL_BACKOFF_START_COUNT = 8;
+#define RTT_COUNT (4)
+
+/* Module variables */
+
+// Init guard
+static bool connectionInitDone = false;
+static bool threadInitDone = false;
+static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
+static bool keepRunning = true;
+static bool learnNewServers;
+
+// List of pending requests
+static struct {
+ dnbd3_async_t *head;
+ dnbd3_async_t *tail;
+ pthread_spinlock_t lock;
+} requests;
+
+// Connection for the image
+static struct {
+ char *name;
+ uint16_t rid;
+ uint64_t size;
+} image;
+
+static struct {
+ int sockFd;
+ pthread_mutex_t sendMutex;
+ dnbd3_signal_t* panicSignal;
+ dnbd3_host_t currentServer;
+ ticks startupTime;
+} connection;
+
+// Known alt servers
+typedef struct _alt_server {
+ dnbd3_host_t host;
+ int consecutiveFails;
+ int rtt;
+ int rtts[RTT_COUNT];
+ int rttIndex;
+ int bestCount;
+ int liveRtt;
+} alt_server_t;
+
+static dnbd3_server_entry_t newservers[MAX_ALTS];
+static pthread_mutex_t newAltLock = PTHREAD_MUTEX_INITIALIZER;
+static alt_server_t altservers[MAX_ALTS];
+// WR: Use when re-assigning or sorting altservers, i.e. an index in altservers
+// changes its meaning (host). Also used for newservers.
+// RD: Use when reading the list or modifying individual entries data, like RTT
+// and fail count. Isn't super clean as we still might have races here, but mostly
+// the code is clean in this regard, so we should only have stale data somewhere
+// but nothing nonsensical.
+static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER;
+#define lock_read pthread_rwlock_rdlock
+#define lock_write pthread_rwlock_wrlock
+#define unlock_rw pthread_rwlock_unlock
+
+/* Static methods */
+
+
+static void* connection_receiveThreadMain(void *sock);
+static void* connection_backgroundThread(void *something);
+
+static void addAltServers();
+static void sortAltServers();
+static void probeAltServers();
+static void switchConnection(int sockFd, alt_server_t *srv);
+static void requestAltServers();
+static bool throwDataAway(int sockFd, uint32_t amount);
+
+static void enqueueRequest(dnbd3_async_t *request);
+static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
+
+bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew)
+{
+ int sock = -1;
+ char host[SHORTBUF];
+ size_t hlen;
+ serialized_buffer_t buffer;
+ uint16_t remoteVersion, remoteRid;
+ char *remoteName;
+ uint64_t remoteSize;
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ poll_list_t *cons = sock_newPollList();
+
+ timing_setBase();
+ pthread_mutex_lock( &mutexInit );
+ if ( !connectionInitDone && keepRunning ) {
+ dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
+ const char *current, *end;
+ int altIndex = 0;
+ learnNewServers = doLearnNew;
+ memset( altservers, 0, sizeof altservers );
+ connection.sockFd = -1;
+ current = hosts;
+ do {
+ // Get next host from string
+ while ( *current == ' ' ) current++;
+ end = strchr( current, ' ' );
+ size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1);
+ if ( len > SHORTBUF ) len = SHORTBUF;
+ snprintf( host, len, "%s", current );
+ int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
+ for ( int i = 0; i < newHosts; ++i ) {
+ if ( altIndex >= MAX_ALTS )
+ break;
+ altservers[altIndex].host = tempHosts[i];
+ altIndex += 1;
+ }
+ current = end + 1;
+ } while ( end != NULL && altIndex < MAX_ALTS );
+ logadd( LOG_INFO, "Got %d servers from init call", altIndex );
+ // Connect
+ for ( int i = 0; i < altIndex + 5; ++i ) {
+ if ( i >= altIndex ) {
+ // Additional iteration - no corresponding slot in altservers, this
+ // is just so we can make a final calls with longer timeout
+ sock = sock_multiConnect( cons, NULL, 400, 1000 );
+ if ( sock == -2 ) {
+ logadd( LOG_ERROR, "Could not connect to any host" );
+ sock = -1;
+ break;
+ }
+ } else {
+ if ( altservers[i].host.type == 0 )
+ continue;
+ // Try to connect - 100ms timeout
+ sock = sock_multiConnect( cons, &altservers[i].host, 100, 1000 );
+ }
+ if ( sock == -2 || sock == -1 )
+ continue;
+ salen = sizeof(sa);
+ if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
+ logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno );
+ close( sock );
+ sock = -1;
+ continue;
+ }
+ hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) );
+ logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host );
+ if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
+ logadd( LOG_ERROR, "Could not send select image" );
+ } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
+ logadd( LOG_ERROR, "Could not read select image reply (%d)", errno );
+ } else if ( rid != 0 && rid != remoteRid ) {
+ logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid );
+ } else {
+ logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid );
+ logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid );
+ sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 );
+ image.name = strdup( remoteName );
+ image.rid = remoteRid;
+ image.size = remoteSize;
+ if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) {
+ logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" );
+ connection.currentServer.type = 0;
+ }
+ connection.panicSignal = signal_new();
+ timing_get( &connection.startupTime );
+ connection.sockFd = sock;
+ requests.head = NULL;
+ requests.tail = NULL;
+ requestAltServers();
+ break;
+ }
+ // Failed
+ if ( sock != -1 ) {
+ close( sock );
+ sock = -1;
+ }
+ }
+ if ( sock != -1 ) {
+ connectionInitDone = true;
+ }
+ }
+ pthread_mutex_unlock( &mutexInit );
+ sock_destroyPollList( cons );
+ return sock != -1;
+}
+
+bool connection_initThreads()
+{
+ pthread_mutex_lock( &mutexInit );
+ if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) {
+ pthread_mutex_unlock( &mutexInit );
+ return false;
+ }
+ bool success = true;
+ pthread_t thread;
+ threadInitDone = true;
+ logadd( LOG_DEBUG1, "Initializing stuff" );
+ if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
+ || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
+ logadd( LOG_ERROR, "Mutex or spinlock init failure" );
+ success = false;
+ } else {
+ if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create receive thread" );
+ success = false;
+ } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create background thread" );
+ success = false;
+ }
+ }
+ if ( !success ) {
+ close( connection.sockFd );
+ connection.sockFd = -1;
+ }
+ pthread_mutex_unlock( &mutexInit );
+ return success;
+}
+
+uint64_t connection_getImageSize()
+{
+ return image.size;
+}
+
+bool connection_read(dnbd3_async_t *request)
+{
+ if ( !connectionInitDone ) return false;
+ pthread_mutex_lock( &connection.sendMutex );
+ enqueueRequest( request );
+ if ( connection.sockFd != -1 ) {
+ if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ pthread_mutex_unlock( &connection.sendMutex );
+ signal_call( connection.panicSignal );
+ return true;
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ return true;
+}
+
+void connection_close()
+{
+ if ( keepRunning ) {
+ logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" );
+ }
+ pthread_mutex_lock( &mutexInit );
+ keepRunning = false;
+ if ( !connectionInitDone ) {
+ pthread_mutex_unlock( &mutexInit );
+ return;
+ }
+ pthread_mutex_unlock( &mutexInit );
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+}
+
+size_t connection_printStats(char *buffer, const size_t len)
+{
+ int ret;
+ size_t remaining = len;
+ declare_now;
+ if ( remaining > 0 ) {
+ ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n",
+ image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) );
+ if ( ret < 0 ) {
+ ret = 0;
+ }
+ if ( (size_t)ret >= remaining ) {
+ return len;
+ }
+ remaining -= ret;
+ buffer += ret;
+ }
+ int i = -1;
+ lock_read( &altLock );
+ while ( remaining > 3 && ++i < MAX_ALTS ) {
+ if ( altservers[i].host.type == 0 )
+ continue;
+ if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) {
+ *buffer++ = '*';
+ } else if ( i >= MAX_ALTS_ACTIVE ) {
+ *buffer++ = '-';
+ } else {
+ *buffer++ = ' ';
+ }
+ const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining );
+ remaining -= (addrlen + 1); // For space or * above
+ buffer += addrlen;
+ if ( remaining < 3 )
+ break;
+ int width = addrlen >= 35 ? 0 : 35 - (int)addrlen;
+ char *unit;
+ int value;
+ if ( altservers[i].rtt > 5000 ) {
+ unit = "ms ";
+ value = altservers[i].rtt / 1000;
+ } else {
+ unit = "µs";
+ value = altservers[i].rtt;
+ width += 3;
+ }
+ ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n",
+ width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt );
+ if ( ret < 0 ) {
+ ret = 0;
+ }
+ if ( (size_t)ret >= remaining ) {
+ remaining = 0;
+ break;
+ }
+ remaining -= ret;
+ buffer += ret;
+ }
+ unlock_rw( &altLock );
+ return len - remaining;
+}
+
+static void* connection_receiveThreadMain(void *sockPtr)
+{
+ int sockFd = (int)(size_t)sockPtr;
+ dnbd3_reply_t reply;
+ pthread_detach( pthread_self() );
+
+ while ( keepRunning ) {
+ int ret;
+ do {
+ ret = dnbd3_read_reply( sockFd, &reply, true );
+ if ( ret == REPLY_OK ) break;
+ } while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
+ if ( ret != REPLY_OK ) {
+ logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
+ goto fail;
+ }
+
+ if ( reply.cmd == CMD_GET_BLOCK ) {
+ // Get block reply. find matching request
+ dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
+ if ( request == NULL ) {
+ // This happens if the alt server probing thread tears down our connection
+ // and did a direct RTT probe to satisfy this very request.
+ logadd( LOG_DEBUG1, "Got block reply with no matching request" );
+ if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
+ logadd( LOG_DEBUG1, "....and choked on reply payload" );
+ goto fail;
+ }
+ } else {
+ // Found a match
+ const ssize_t ret = sock_recv( sockFd, request->buffer, request->length );
+ if ( ret != (ssize_t)request->length ) {
+ logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
+ connection_read( request );
+ goto fail;
+ }
+ // Check RTT
+ declare_now;
+ uint64_t diff = timing_diffUs( &request->time, &now );
+ if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s
+ lock_read( &altLock );
+ for ( int i = 0; i < MAX_ALTS; ++i ) {
+ if ( altservers[i].host.type == 0 )
+ continue;
+ if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) {
+ altservers[i].liveRtt = ( altservers[i].liveRtt * 3 + (int)diff ) / 4;
+ break;
+ }
+ }
+ unlock_rw( &altLock );
+ }
+ // Success, wake up caller
+ request->success = true;
+ request->finished = true;
+ signal_call( request->signal );
+ }
+ } else if ( reply.cmd == CMD_GET_SERVERS ) {
+ // List of known alt servers
+ dnbd3_server_entry_t entries[MAX_ALTS];
+ const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS );
+ const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count;
+ if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize
+ || !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) {
+ logadd( LOG_DEBUG1, "Error receiving list of alt servers." );
+ goto fail;
+ }
+ pthread_mutex_lock( &newAltLock );
+ memcpy( newservers, entries, relevantSize );
+ pthread_mutex_unlock( &newAltLock );
+ } else {
+ // TODO: Handle the others?
+ if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
+ logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd );
+ goto fail;
+ }
+ }
+ }
+ logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
+fail:;
+ // Make sure noone is trying to use the socket for sending by locking,
+ pthread_mutex_lock( &connection.sendMutex );
+ // then just set the fd to -1, but only if it's the same fd as ours,
+ // as someone could have established a new connection already
+ if ( connection.sockFd == sockFd ) {
+ connection.sockFd = -1;
+ signal_call( connection.panicSignal );
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ // As we're the only reader, it's safe to close the socket now
+ close( sockFd );
+ return NULL;
+}
+
+static void* connection_backgroundThread(void *something UNUSED)
+{
+ ticks nextKeepalive;
+ ticks nextRttCheck;
+
+ timing_get( &nextKeepalive );
+ nextRttCheck = nextKeepalive;
+ while ( keepRunning ) {
+ ticks now;
+ timing_get( &now );
+ uint32_t wt1 = timing_diffMs( &now, &nextKeepalive );
+ uint32_t wt2 = timing_diffMs( &now, &nextRttCheck );
+ if ( wt1 > 0 && wt2 > 0 ) {
+ int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 );
+ if ( waitRes == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
+ }
+ timing_get( &now );
+ }
+ // Woken up, see what we have to do
+ const bool panic = connection.sockFd == -1;
+ // Check alt servers
+ if ( panic || timing_reachedPrecise( &nextRttCheck, &now ) ) {
+ if ( learnNewServers ) {
+ addAltServers();
+ }
+ sortAltServers();
+ probeAltServers();
+ if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) {
+ timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP );
+ } else {
+ timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL );
+ }
+ }
+ // Send keepalive packet
+ if ( timing_reachedPrecise( &nextKeepalive, &now ) ) {
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ dnbd3_request_t request;
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_KEEPALIVE;
+ request.handle = request.offset = request.size = 0;
+ fixup_request( request );
+ ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 );
+ if ( (size_t)ret != sizeof request ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ nextRttCheck = now;
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET );
+ }
+ }
+ return NULL;
+}
+
+// Private quick helpers
+
+static void addAltServers()
+{
+ pthread_mutex_lock( &newAltLock );
+ lock_write( &altLock );
+ for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) {
+ if ( newservers[nIdx].host.type == 0 )
+ continue;
+ // Got a new alt server, see if it's already known
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) {
+ goto skip_server;
+ }
+ }
+ // Not known yet, add - find free slot
+ int slot = -1;
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( altservers[eIdx].host.type == 0 ) {
+ slot = eIdx; // free - bail out and use this one
+ break;
+ }
+ if ( altservers[eIdx].consecutiveFails > FAIL_BACKOFF_START_COUNT
+ && slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) {
+ // Replace an existing alt-server that failed recently if we got no more slots
+ slot = eIdx;
+ }
+ }
+ if ( slot != -1 ) {
+ char txt[200];
+ sock_printHost( &newservers[nIdx].host, txt, 200 );
+ logadd( LOG_DEBUG1, "new server %s in slot %d", txt, slot );
+ altservers[slot].consecutiveFails = 0;
+ altservers[slot].bestCount = 0;
+ altservers[slot].rtts[0] = RTT_UNREACHABLE;
+ altservers[slot].rttIndex = 1;
+ altservers[slot].host = newservers[nIdx].host;
+ altservers[slot].liveRtt = 0;
+ }
+skip_server:;
+ }
+ memset( newservers, 0, sizeof(newservers) );
+ unlock_rw( &altLock );
+ pthread_mutex_unlock( &newAltLock );
+}
+
+/**
+ * Find a server at index >= MAX_ALTS_ACTIVE (one that isn't considered for switching over)
+ * that has been inactive for a while, then look if there's an active server that's failed
+ * a couple of times recently. Swap both if found.
+ */
+static void sortAltServers()
+{
+ int ac = 0;
+ lock_write( &altLock );
+ for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) {
+ alt_server_t * const inactive = &altservers[ia];
+ if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 )
+ continue;
+ while ( ac < MAX_ALTS_ACTIVE ) {
+ if ( altservers[ac].host.type == 0 || altservers[ac].consecutiveFails > FAIL_BACKOFF_START_COUNT )
+ break;
+ ac++;
+ }
+ if ( ac == MAX_ALTS_ACTIVE )
+ break;
+ // Switch!
+ alt_server_t * const active = &altservers[ac];
+ dnbd3_host_t tmp = inactive->host;
+ inactive->host = active->host;
+ inactive->consecutiveFails = FAIL_BACKOFF_START_COUNT * 4;
+ inactive->bestCount = 0;
+ inactive->rtts[0] = RTT_UNREACHABLE;
+ inactive->rttIndex = 1;
+ inactive->liveRtt = 0;
+ active->host = tmp;
+ active->consecutiveFails = 0;
+ active->bestCount = 0;
+ active->rtts[0] = RTT_UNREACHABLE;
+ active->rttIndex = 1;
+ active->liveRtt = 0;
+ }
+ unlock_rw( &altLock );
+}
+
+static void probeAltServers()
+{
+ serialized_buffer_t buffer;
+ dnbd3_reply_t reply;
+ int bestSock = -1;
+ uint16_t remoteRid, remoteProto;
+ uint64_t remoteSize;
+ char *remoteName;
+ bool doSwitch;
+ bool panic = connection.sockFd == -1;
+ uint64_t testOffset = 0;
+ uint32_t testLength = RTT_BLOCK_SIZE;
+ dnbd3_async_t *request = NULL;
+ alt_server_t *current = NULL, *best = NULL;
+
+ if ( !panic ) {
+ lock_read( &altLock );
+ for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
+ if ( altservers[altIndex].host.type != 0
+ && isSameAddressPort( &altservers[altIndex].host, &connection.currentServer ) ) {
+ current = &altservers[altIndex];
+ break;
+ }
+ }
+ unlock_rw( &altLock );
+ }
+ declare_now;
+ pthread_spin_lock( &requests.lock );
+ if ( requests.head != NULL ) {
+ if ( !panic && current != NULL ) {
+ const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
+ dnbd3_async_t *iterator;
+ for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
+ // A request with measurement tag is pending
+ if ( timing_diffUs( &iterator->time, &now ) > maxDelay ) {
+ panic = true;
+ break;
+ }
+ }
+ }
+ if ( panic ) {
+ request = requests.head;
+ testOffset = requests.head->offset;
+ testLength = requests.head->length;
+ }
+ }
+ pthread_spin_unlock( &requests.lock );
+ if ( testOffset != 0 ) {
+ logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength );
+ }
+
+ lock_read( &altLock );
+ for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) {
+ alt_server_t * const srv = &altservers[altIndex];
+ if ( srv->host.type == 0 )
+ continue;
+ if ( !panic && srv->consecutiveFails > FAIL_BACKOFF_START_COUNT
+ && rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) {
+ continue;
+ }
+ if ( srv->rttIndex >= RTT_COUNT ) {
+ srv->rttIndex = 0;
+ } else {
+ srv->rttIndex += 1;
+ }
+ // Probe
+ ticks start;
+ timing_get( &start );
+ errno = 0;
+ int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
+ if ( sock == -1 ) {
+ logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno );
+ goto fail;
+ }
+ if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
+ logadd( LOG_DEBUG1, "probe: select_image failed" );
+ goto fail;
+ }
+ if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
+ logadd( LOG_DEBUG1, "probe: select image reply failed" );
+ goto fail;
+ }
+ if ( remoteProto < MIN_SUPPORTED_SERVER ) {
+ logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
+ srv->consecutiveFails += 10;
+ goto fail;
+ }
+ if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
+ logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
+ srv->consecutiveFails += 10;
+ goto fail;
+ }
+ if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) {
+ logadd( LOG_DEBUG1, "-> block request fail" );
+ goto fail;
+ }
+ int a = 111;
+ if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) {
+ logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size );
+ goto fail;
+ }
+ if ( request != NULL && removeRequest( request ) != NULL ) {
+ // Request successfully removed from queue
+ const ssize_t ret = sock_recv( sock, request->buffer, request->length );
+ if ( ret != (ssize_t)request->length ) {
+ logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" );
+ // Failure, add to queue again
+ connection_read( request );
+ goto fail;
+ }
+ // Success, wake up caller
+ logadd( LOG_DEBUG1, "[RTT] Successful direct probe" );
+ request->success = true;
+ request->finished = true;
+ signal_call( request->signal );
+ } else {
+ // Wasn't a request that's in our request queue
+ if ( !throwDataAway( sock, testLength ) ) {
+ logadd( LOG_DEBUG1, "<- get block reply payload fail" );
+ goto fail;
+ }
+ }
+
+ // Yay, success
+ // Panic mode? Just switch to server
+ if ( panic ) {
+ unlock_rw( &altLock );
+ switchConnection( sock, srv );
+ return;
+ }
+ // Non-panic mode:
+ // Update stats of server
+ ticks end;
+ timing_get( &end );
+ srv->consecutiveFails = 0;
+ srv->rtts[srv->rttIndex] = (int)timing_diffUs( &start, &end );
+ int newRtt = 0;
+ for ( int i = 0; i < RTT_COUNT; ++i ) {
+ newRtt += srv->rtts[i];
+ }
+ if ( srv->liveRtt != 0 ) {
+ // Make live rtt measurement influence result
+ newRtt = ( newRtt + srv->liveRtt ) / ( RTT_COUNT + 1 );
+ } else {
+ newRtt /= RTT_COUNT;
+ }
+ srv->rtt = newRtt;
+
+ // Keep socket open if this is currently the best one
+ if ( best == NULL || best->rtt > srv->rtt ) {
+ best = srv;
+ if ( bestSock != -1 ) {
+ close( bestSock );
+ }
+ bestSock = sock;
+ } else {
+ close( sock );
+ }
+ continue;
+fail:;
+ if ( sock != -1 ) {
+ close( sock );
+ }
+ srv->rtts[srv->rttIndex] = RTT_UNREACHABLE;
+ srv->consecutiveFails += 1;
+ }
+ doSwitch = false;
+ if ( best != NULL ) {
+ // Time-sensitive switch decision: If a server was best for some consecutive measurements,
+ // we switch no matter how small the difference to the current server is
+ for ( int altIndex = 0; altIndex < MAX_ALTS_ACTIVE; ++altIndex ) {
+ alt_server_t * const srv = &altservers[altIndex];
+ // Decay liveRtt slowly...
+ if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) {
+ srv->liveRtt -= ( ( srv->liveRtt / 100 ) + 1 );
+ }
+ if ( srv == best ) {
+ if ( srv->bestCount < 50 ) {
+ srv->bestCount += 2;
+ }
+ // Switch with increasing probability the higher the bestCount is
+ if ( srv->bestCount > 12 && ( current == NULL || srv->rtt < current->rtt ) && srv->bestCount > rand() % 50 ) {
+ doSwitch = true;
+ }
+ } else if ( srv->bestCount > 0 ) {
+ srv->bestCount--;
+ }
+ }
+ for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) {
+ if ( altservers[i].consecutiveFails > 0 ) {
+ altservers[i].consecutiveFails--;
+ }
+ }
+ // This takes care of the situation where two servers alternate being the best server all the time
+ if ( doSwitch && current != NULL && best->bestCount - current->bestCount < 8 ) {
+ doSwitch = false;
+ }
+ // Regular logic: Apply threshold when considering switch
+ if ( !doSwitch && current != NULL ) {
+ doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD
+ || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000;
+ }
+ }
+ // Switch if a better server was found
+ if ( doSwitch ) {
+ logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", current == NULL ? 0 : current->rtt, best->rtt );
+ for ( int i = 0; i < MAX_ALTS; ++i ) {
+ if ( &altservers[i] != best ) {
+ altservers[i].bestCount = 0;
+ }
+ }
+ unlock_rw( &altLock );
+ switchConnection( bestSock, best );
+ return;
+ }
+ // No switch
+ unlock_rw( &altLock );
+ if ( best != NULL ) {
+ close( bestSock );
+ }
+}
+
+static void switchConnection(int sockFd, alt_server_t *srv)
+{
+ pthread_t thread;
+ struct sockaddr_storage addr;
+ socklen_t addrLen = sizeof(addr);
+ char message[200] = "Connection switched to ";
+ const size_t len = strlen( message );
+ int ret;
+ dnbd3_async_t *queue, *it;
+
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ }
+ ret = getpeername( sockFd, (struct sockaddr*)&addr, &addrLen );
+ if ( ret == 0 ) {
+ connection.currentServer = srv->host;
+ connection.sockFd = sockFd;
+ pthread_spin_lock( &requests.lock );
+ queue = requests.head;
+ requests.head = requests.tail = NULL;
+ pthread_spin_unlock( &requests.lock );
+ } else {
+ connection.sockFd = -1;
+ }
+ requestAltServers();
+ pthread_mutex_unlock( &connection.sendMutex );
+ if ( ret != 0 ) {
+ close( sockFd );
+ logadd( LOG_WARNING, "Could not getpeername after connection switch, assuming connection already dead again. (Errno=%d)", errno );
+ signal_call( connection.panicSignal );
+ return;
+ }
+ timing_get( &connection.startupTime );
+ pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd );
+ sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len );
+ logadd( LOG_INFO, "%s", message );
+ // resend queue
+ if ( queue != NULL ) {
+ pthread_mutex_lock( &connection.sendMutex );
+ dnbd3_async_t *next = NULL;
+ for ( it = queue; it != NULL; it = next ) {
+ logadd( LOG_DEBUG1, "Requeue after server change" );
+ next = it->next;
+ enqueueRequest( it );
+ if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it, 0 ) ) {
+ logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" );
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ signal_call( connection.panicSignal );
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ }
+}
+
+/**
+ * Does not lock, so get the sendMutex first!
+ */
+static void requestAltServers()
+{
+ if ( connection.sockFd == -1 || !learnNewServers )
+ return;
+ dnbd3_request_t request = { 0 };
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_GET_SERVERS;
+ fixup_request( request );
+ if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) {
+ logadd( LOG_WARNING, "Connection failed while requesting alt server list" );
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ }
+}
+
+static bool throwDataAway(int sockFd, uint32_t amount)
+{
+ size_t done = 0;
+ char tempBuffer[SHORTBUF];
+ while ( done < amount ) {
+ const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) );
+ if ( ret <= 0 )
+ return false;
+ done += (size_t)ret;
+ }
+ return true;
+}
+
+static void enqueueRequest(dnbd3_async_t *request)
+{
+ request->next = NULL;
+ request->finished = false;
+ request->success = false;
+ //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
+ // Measure latency and add to switch formula
+ timing_get( &request->time );
+ pthread_spin_lock( &requests.lock );
+ if ( requests.head == NULL ) {
+ requests.head = requests.tail = request;
+ } else {
+ requests.tail->next = request;
+ requests.tail = request;
+ }
+ pthread_spin_unlock( &requests.lock );
+}
+
+static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
+{
+ pthread_spin_lock( &requests.lock );
+ //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line );
+ dnbd3_async_t *iterator, *prev = NULL;
+ for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
+ if ( iterator == request ) {
+ // Found it, break!
+ if ( prev != NULL ) {
+ prev->next = iterator->next;
+ } else {
+ requests.head = iterator->next;
+ }
+ if ( requests.tail == iterator ) {
+ requests.tail = prev;
+ }
+ break;
+ }
+ prev = iterator;
+ }
+ pthread_spin_unlock( &requests.lock );
+ return iterator;
+}
+
diff --git a/src/fuse/connection.h b/src/fuse/connection.h
new file mode 100644
index 0000000..cae554c
--- /dev/null
+++ b/src/fuse/connection.h
@@ -0,0 +1,35 @@
+#ifndef _CONNECTION_H_
+#define _CONNECTION_H_
+
+#include "../shared/fdsignal.h"
+#include "../shared/timing.h"
+#include <stddef.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+struct _dnbd3_async;
+
+typedef struct _dnbd3_async {
+ struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller)
+ dnbd3_signal_t* signal; // Used to signal the caller
+ char* buffer; // Caller-provided buffer to be filled
+ ticks time; // When request was put on wire, 0 if not measuring
+ uint64_t offset;
+ uint32_t length;
+ bool finished; // Will be set to true if the request has been handled
+ bool success; // Will be set to true if the request succeeded
+} dnbd3_async_t;
+
+bool connection_init(const char *hosts, const char *image, const uint16_t rid, const bool learnNewServers);
+
+bool connection_initThreads();
+
+uint64_t connection_getImageSize();
+
+bool connection_read(dnbd3_async_t *request);
+
+void connection_close();
+
+size_t connection_printStats(char *buffer, const size_t len);
+
+#endif /* CONNECTION_H_ */
diff --git a/src/fuse/helper.c b/src/fuse/helper.c
new file mode 100644
index 0000000..d81b08f
--- /dev/null
+++ b/src/fuse/helper.c
@@ -0,0 +1,36 @@
+#include "helper.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <inttypes.h>
+
+
+void printLog( log_info *info )
+{
+ FILE *logFile;
+
+ // Create logfile
+
+ logFile = fopen( "log.txt", "w" );
+ if ( logFile == NULL ) {
+ printf( "Error creating/opening log.txt\n" );
+ return;
+ }
+
+ //rewind(file);
+ fprintf( logFile, "ImageSize: %"PRIu64" MiB\n", ( uint64_t )( info->imageSize/ ( 1024ll*1024ll ) ) );
+ fprintf( logFile, "ReceivedMiB: %"PRIu64" MiB\n", ( uint64_t )( info->receivedBytes/ ( 1024ll*1024ll ) ) );
+ fprintf( logFile, "imageBlockCount: %"PRIu64"\n", info->imageBlockCount );
+ fprintf( logFile, "Blocksize: 4KiB\n\n" );
+ fprintf( logFile, "Block access count:\n" );
+
+ uint64_t i = 0;
+ for ( ; i < info->imageBlockCount; i++ ) {
+ if ( i % 50 == 0 ) {
+ fprintf( logFile, "\n" );
+ }
+ fprintf( logFile, "%i ", ( int ) info->blockRequestCount[i] );
+ }
+ fprintf( logFile, "\n" );
+ fclose( logFile );
+}
diff --git a/src/fuse/helper.h b/src/fuse/helper.h
new file mode 100644
index 0000000..9e5d127
--- /dev/null
+++ b/src/fuse/helper.h
@@ -0,0 +1,35 @@
+#ifndef IMAGEHELPER_H
+#define IMAGEHELPER_H
+
+#include "../types.h"
+
+#include <netdb.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/socket.h>
+
+typedef struct log_info {
+ uint64_t imageSize;
+ uint64_t receivedBytes;
+ uint64_t imageBlockCount;
+ uint8_t *blockRequestCount;
+} log_info;
+
+
+
+void printLog(log_info *info);
+
+int connect_to_server(char *server_adress, int port);
+
+static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
+{
+ return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) ));
+}
+
+static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
+{
+ return (a->type == b->type) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) ));
+}
+
+#endif
diff --git a/src/fuse/main.c b/src/fuse/main.c
new file mode 100644
index 0000000..1a5643c
--- /dev/null
+++ b/src/fuse/main.c
@@ -0,0 +1,420 @@
+/*
+ * FUSE: Filesystem in Userspace
+ * Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
+ * This program can be distributed under the terms of the GNU GPL.
+ * See the file COPYING.
+ *
+ * Changed by Stephan Schwaer
+ * */
+
+#include "connection.h"
+#include "helper.h"
+#include "../shared/protocol.h"
+#include "../shared/log.h"
+
+#define FUSE_USE_VERSION 30
+#include <fuse.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+/* for printing uint */
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <getopt.h>
+#include <time.h>
+#include <signal.h>
+#include <pthread.h>
+
+#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0)
+
+static const char * const IMAGE_PATH = "/img";
+static const char * const STATS_PATH = "/status";
+
+static uint64_t imageSize;
+/* Debug/Benchmark variables */
+static bool useDebug = false;
+static log_info logInfo;
+static struct timespec startupTime;
+static uid_t owner;
+static bool keepRunning = true;
+static void (*fuse_sigIntHandler)(int) = NULL;
+static void (*fuse_sigTermHandler)(int) = NULL;
+static struct fuse_operations dnbd3_fuse_no_operations;
+
+#define SIGPOOLSIZE 6
+static pthread_spinlock_t sigLock;
+static dnbd3_signal_t *signalPool[SIGPOOLSIZE];
+static dnbd3_signal_t **sigEnd = signalPool + SIGPOOLSIZE;
+static void signalInit()
+{
+ pthread_spin_init( &sigLock, PTHREAD_PROCESS_PRIVATE );
+ for ( size_t i = 0; i < SIGPOOLSIZE; ++i ) {
+ signalPool[i] = NULL;
+ }
+}
+static inline dnbd3_signal_t *signalGet()
+{
+ pthread_spin_lock( &sigLock );
+ for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
+ if ( *it != NULL ) {
+ dnbd3_signal_t *ret = *it;
+ *it = NULL;
+ pthread_spin_unlock( &sigLock );
+ return ret;
+ }
+ }
+ pthread_spin_unlock( &sigLock );
+ return signal_newBlocking();
+}
+static inline void signalPut(dnbd3_signal_t *signal)
+{
+ pthread_spin_lock( &sigLock );
+ for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
+ if ( *it == NULL ) {
+ *it = signal;
+ pthread_spin_unlock( &sigLock );
+ return;
+ }
+ }
+ pthread_spin_unlock( &sigLock );
+ signal_close( signal );
+}
+
+static int image_getattr(const char *path, struct stat *stbuf)
+{
+ int res = 0;
+ memset( stbuf, 0, sizeof( struct stat ) );
+ stbuf->st_ctim = stbuf->st_atim = stbuf->st_mtim = startupTime;
+ stbuf->st_uid = owner;
+ if ( strcmp( path, "/" ) == 0 ) {
+ stbuf->st_mode = S_IFDIR | 0550;
+ stbuf->st_nlink = 2;
+ } else if ( strcmp( path, IMAGE_PATH ) == 0 ) {
+ stbuf->st_mode = S_IFREG | 0440;
+ stbuf->st_nlink = 1;
+ stbuf->st_size = imageSize;
+ } else if ( strcmp( path, STATS_PATH ) == 0 ) {
+ stbuf->st_mode = S_IFREG | 0440;
+ stbuf->st_nlink = 1;
+ stbuf->st_size = 4096;
+ clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim );
+ } else {
+ res = -ENOENT;
+ }
+ return res;
+}
+
+static int image_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset UNUSED, struct fuse_file_info *fi UNUSED)
+{
+ if ( strcmp( path, "/" ) != 0 ) {
+ return -ENOENT;
+ }
+ filler( buf, ".", NULL, 0 );
+ filler( buf, "..", NULL, 0 );
+ filler( buf, IMAGE_PATH + 1, NULL, 0 );
+ filler( buf, STATS_PATH + 1, NULL, 0 );
+ return 0;
+}
+
+static int image_open(const char *path, struct fuse_file_info *fi)
+{
+ if ( strcmp( path, IMAGE_PATH ) != 0 && strcmp( path, STATS_PATH ) != 0 ) {
+ return -ENOENT;
+ }
+ if ( ( fi->flags & 3 ) != O_RDONLY ) {
+ return -EACCES;
+ }
+ return 0;
+}
+
+static int fillStatsFile(char *buf, size_t size, off_t offset) {
+ if ( offset == 0 ) {
+ return (int)connection_printStats( buf, size );
+ }
+ char buffer[4096];
+ int ret = (int)connection_printStats( buffer, sizeof buffer );
+ int len = MIN( ret - (int)offset, (int)size );
+ if ( len == 0 )
+ return 0;
+ if ( len < 0 ) {
+ return -EOF;
+ }
+ memcpy( buf, buffer + offset, len );
+ return len;
+}
+
+static int image_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi UNUSED)
+{
+ if ( size > __INT_MAX__ ) {
+ // fuse docs say we MUST fill the buffer with exactly size bytes and return size,
+ // otherwise the buffer will we padded with zeros. Since the return value is just
+ // an int, we could not properly fulfill read requests > 2GB. Since there is no
+ // mention of a guarantee that this will never happen, better add a safety check.
+ // Way to go fuse.
+ return -EIO;
+ }
+ if ( path[1] == STATS_PATH[1] ) {
+ return fillStatsFile( buf, size, offset );
+ }
+
+ if ( (uint64_t)offset >= imageSize ) {
+ return 0;
+ }
+
+ if ( offset + size > imageSize ) {
+ size = imageSize - offset;
+ }
+
+ if ( useDebug ) {
+ /* count the requested blocks */
+ uint64_t startBlock = offset / ( 4096 );
+ const uint64_t endBlock = ( offset + size - 1 ) / ( 4096 );
+
+ for ( ; startBlock <= endBlock; startBlock++ ) {
+ ++logInfo.blockRequestCount[startBlock];
+ }
+ }
+
+ dnbd3_async_t request;
+ request.buffer = buf;
+ request.length = (uint32_t)size;
+ request.offset = offset;
+ request.signal = signalGet();
+
+ if ( !connection_read( &request ) ) {
+ signalPut( request.signal );
+ return -EINVAL;
+ }
+ while ( !request.finished ) {
+ int ret = signal_wait( request.signal, 5000 );
+ if ( !keepRunning ) {
+ connection_close();
+ break;
+ }
+ if ( ret < 0 ) {
+ debugf( "fuse_read signal wait returned %d", ret );
+ }
+ }
+ signalPut( request.signal );
+ if ( request.success ) {
+ return request.length;
+ } else {
+ return -EIO;
+ }
+}
+
+static void image_sigHandler(int signum) {
+ keepRunning = false;
+ if ( signum == SIGINT && fuse_sigIntHandler != NULL ) {
+ fuse_sigIntHandler(signum);
+ }
+ if ( signum == SIGTERM && fuse_sigTermHandler != NULL ) {
+ fuse_sigTermHandler(signum);
+ }
+}
+
+static void* image_init(struct fuse_conn_info *conn UNUSED)
+{
+ if ( !connection_initThreads() ) {
+ logadd( LOG_ERROR, "Could not initialize threads for dnbd3 connection, exiting..." );
+ exit( EXIT_FAILURE );
+ }
+ // Prepare our handler
+ struct sigaction newHandler;
+ memset( &newHandler, 0, sizeof(newHandler) );
+ newHandler.sa_handler = &image_sigHandler;
+ sigemptyset( &newHandler.sa_mask );
+ struct sigaction oldHandler;
+ // Retrieve old handlers when setting
+ sigaction( SIGINT, &newHandler, &oldHandler );
+ fuse_sigIntHandler = oldHandler.sa_handler;
+ logadd( LOG_DEBUG1, "Previous SIGINT handler was %p", (void*)(uintptr_t)fuse_sigIntHandler );
+ sigaction( SIGTERM, &newHandler, &oldHandler );
+ fuse_sigTermHandler = oldHandler.sa_handler;
+ logadd( LOG_DEBUG1, "Previous SIGTERM handler was %p", (void*)(uintptr_t)fuse_sigIntHandler );
+ return NULL;
+}
+
+/* close the connection */
+static void image_destroy(void *private_data UNUSED)
+{
+ if ( useDebug ) {
+ printLog( &logInfo );
+ }
+ connection_close();
+ return;
+}
+
+/* map the implemented fuse operations */
+static struct fuse_operations image_oper = {
+ .getattr = image_getattr,
+ .readdir = image_readdir,
+ .open = image_open,
+ .read = image_read,
+ .init = image_init,
+ .destroy = image_destroy,
+};
+
+static void printVersion()
+{
+ char *arg[] = { "foo", "-V" };
+ printf( "DNBD3-Fuse Version 1.2.3.4, protocol version %d\n", (int)PROTOCOL_VERSION );
+ fuse_main( 2, arg, &dnbd3_fuse_no_operations, NULL );
+ exit( 0 );
+}
+
+static void printUsage(char *argv0, int exitCode)
+{
+ char *arg[] = { argv0, "-h" };
+ fuse_main( 2, arg, &dnbd3_fuse_no_operations, NULL );
+ printf( "\n" );
+ printf( "Usage: %s [--debug] [--option mountOpts] --host <serverAddress(es)> --image <imageName> [--rid revision] <mountPoint>\n", argv0 );
+ printf( "Or: %s [-d] [-o mountOpts] -h <serverAddress(es)> -i <imageName> [-r revision] <mountPoint>\n", argv0 );
+ printf( " -d --debug Don't fork, write stats file, and print debug output (fuse -> stderr, dnbd3 -> stdout)\n" );
+ printf( " -f Don't fork (dnbd3 -> stdout)\n" );
+ printf( " -h --host List of space separated hosts to use\n" );
+ printf( " -i --image Remote image name to request\n" );
+ printf( " -l --log Write log to given location\n" );
+ printf( " -o --option Mount options to pass to libfuse\n" );
+ printf( " -r --rid Revision to use (omit or pass 0 for latest)\n" );
+ printf( " -S --sticky Use only servers from command line (no learning from servers)\n" );
+ printf( " -s Single threaded mode\n" );
+ exit( exitCode );
+}
+
+static const char *optString = "dfHh:i:l:o:r:SsVv";
+static const struct option longOpts[] = {
+ { "debug", no_argument, NULL, 'd' },
+ { "help", no_argument, NULL, 'H' },
+ { "host", required_argument, NULL, 'h' },
+ { "image", required_argument, NULL, 'i' },
+ { "log", required_argument, NULL, 'l' },
+ { "option", required_argument, NULL, 'o' },
+ { "rid", required_argument, NULL, 'r' },
+ { "sticky", no_argument, NULL, 'S' },
+ { "version", no_argument, NULL, 'v' },
+ { 0, 0, 0, 0 }
+};
+
+int main(int argc, char *argv[])
+{
+ char *server_address = NULL;
+ char *image_Name = NULL;
+ char *log_file = NULL;
+ uint16_t rid = 0;
+ char **newArgv;
+ int newArgc;
+ int opt, lidx;
+ bool learnNewServers = true;
+
+ if ( argc <= 1 || strcmp( argv[1], "--help" ) == 0 || strcmp( argv[1], "--usage" ) == 0 ) {
+ printUsage( argv[0], 0 );
+ }
+
+ // TODO Make log mask configurable
+ log_setConsoleMask( 65535 );
+ log_setConsoleTimestamps( true );
+ log_setFileMask( 65535 );
+
+ newArgv = calloc( argc + 10, sizeof(char*) );
+ newArgv[0] = argv[0];
+ newArgc = 1;
+ while ( ( opt = getopt_long( argc, argv, optString, longOpts, &lidx ) ) != -1 ) {
+ switch ( opt ) {
+ case 'h':
+ server_address = optarg;
+ break;
+ case 'i':
+ image_Name = optarg;
+ break;
+ case 'r':
+ rid = (uint16_t)atoi(optarg);
+ break;
+ case 'o':
+ newArgv[newArgc++] = "-o";
+ newArgv[newArgc++] = optarg;
+ if ( strstr( optarg, "use_ino" ) != NULL ) {
+ logadd( LOG_WARNING, "************************" );
+ logadd( LOG_WARNING, "* WARNING: use_ino mount option is unsupported, use at your own risk!" );
+ logadd( LOG_WARNING, "************************" );
+ }
+ if ( strstr( optarg, "intr" ) != NULL ) {
+ logadd( LOG_WARNING, "************************" );
+ logadd( LOG_WARNING, "* WARNING: intr mount option is unsupported, use at your own risk!" );
+ logadd( LOG_WARNING, "************************" );
+ }
+ break;
+ case 'l':
+ log_file = optarg;
+ break;
+ case 'H':
+ printUsage( argv[0], 0 );
+ break;
+ case 'v':
+ case 'V':
+ printVersion();
+ break;
+ case 'd':
+ useDebug = true;
+ newArgv[newArgc++] = "-d";
+ break;
+ case 's':
+ newArgv[newArgc++] = "-s";
+ break;
+ case 'S':
+ learnNewServers = false;
+ break;
+ case 'f':
+ newArgv[newArgc++] = "-f";
+ break;
+ default:
+ printUsage( argv[0], EXIT_FAILURE );
+ }
+ }
+
+ if ( optind >= argc ) { // Missing mount point
+ printUsage( argv[0], EXIT_FAILURE );
+ }
+
+ if ( server_address == NULL || image_Name == NULL ) {
+ printUsage( argv[0], EXIT_FAILURE );
+ }
+
+ if ( log_file != NULL ) {
+ if ( !log_openLogFile( log_file ) ) {
+ logadd( LOG_WARNING, "Could not open log file at '%s'", log_file );
+ }
+ }
+
+ if ( !connection_init( server_address, image_Name, rid, learnNewServers ) ) {
+ logadd( LOG_ERROR, "Could not connect to any server. Bye.\n" );
+ return EXIT_FAILURE;
+ }
+ imageSize = connection_getImageSize();
+
+ /* initialize benchmark variables */
+ logInfo.receivedBytes = 0;
+ logInfo.imageSize = imageSize;
+ logInfo.imageBlockCount = ( imageSize + 4095 ) / 4096;
+ if ( useDebug ) {
+ logInfo.blockRequestCount = calloc( logInfo.imageBlockCount, sizeof(uint8_t) );
+ } else {
+ logInfo.blockRequestCount = NULL;
+ }
+
+ // Since dnbd3 is always read only and the remote image will not change
+ newArgv[newArgc++] = "-o";
+ newArgv[newArgc++] = "ro,auto_cache,default_permissions";
+ // Mount point goes last
+ newArgv[newArgc++] = argv[optind];
+
+ printf( "ImagePathName: %s\nFuseArgs:",IMAGE_PATH );
+ for ( int i = 0; i < newArgc; ++i ) {
+ printf( " '%s'", newArgv[i] );
+ }
+ putchar('\n');
+ clock_gettime( CLOCK_REALTIME, &startupTime );
+ owner = getuid();
+ signalInit();
+ return fuse_main( newArgc, newArgv, &image_oper, NULL );
+}
diff --git a/src/fuse/serialize.c b/src/fuse/serialize.c
new file mode 100644
index 0000000..4934132
--- /dev/null
+++ b/src/fuse/serialize.c
@@ -0,0 +1,5 @@
+#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
+
+#include "../serialize.c"