From 43e57ce5e11e9052f5a7db66f2e8613f1784f919 Mon Sep 17 00:00:00 2001 From: Frederic Robra Date: Tue, 25 Jun 2019 17:03:28 +0200 Subject: first version of dnbd3-ng --- src/fuse/connection.c | 927 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/fuse/connection.h | 35 ++ src/fuse/helper.c | 36 ++ src/fuse/helper.h | 35 ++ src/fuse/main.c | 420 +++++++++++++++++++++++ src/fuse/serialize.c | 5 + 6 files changed, 1458 insertions(+) create mode 100644 src/fuse/connection.c create mode 100644 src/fuse/connection.h create mode 100644 src/fuse/helper.c create mode 100644 src/fuse/helper.h create mode 100644 src/fuse/main.c create mode 100644 src/fuse/serialize.c (limited to 'src/fuse') diff --git a/src/fuse/connection.c b/src/fuse/connection.c new file mode 100644 index 0000000..fc9f05b --- /dev/null +++ b/src/fuse/connection.c @@ -0,0 +1,927 @@ +#include "connection.h" +#include "helper.h" +#include "../clientconfig.h" +#include "../shared/protocol.h" +#include "../shared/fdsignal.h" +#include "../shared/sockhelper.h" +#include "../shared/log.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +/* Constants */ +static const size_t SHORTBUF = 100; +#define MAX_ALTS (16) +#define MAX_ALTS_ACTIVE (5) +#define MAX_HOSTS_PER_ADDRESS (2) +// If a server wasn't reachable this many times, we slowly start skipping it on measurements +static const int FAIL_BACKOFF_START_COUNT = 8; +#define RTT_COUNT (4) + +/* Module variables */ + +// Init guard +static bool connectionInitDone = false; +static bool threadInitDone = false; +static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; +static bool keepRunning = true; +static bool learnNewServers; + +// List of pending requests +static struct { + dnbd3_async_t *head; + dnbd3_async_t *tail; + pthread_spinlock_t lock; +} requests; + +// Connection for the image +static struct { + char *name; + uint16_t rid; + uint64_t size; +} image; + +static struct { + int sockFd; + pthread_mutex_t sendMutex; + dnbd3_signal_t* panicSignal; + dnbd3_host_t currentServer; + ticks startupTime; +} connection; + +// Known alt servers +typedef struct _alt_server { + dnbd3_host_t host; + int consecutiveFails; + int rtt; + int rtts[RTT_COUNT]; + int rttIndex; + int bestCount; + int liveRtt; +} alt_server_t; + +static dnbd3_server_entry_t newservers[MAX_ALTS]; +static pthread_mutex_t newAltLock = PTHREAD_MUTEX_INITIALIZER; +static alt_server_t altservers[MAX_ALTS]; +// WR: Use when re-assigning or sorting altservers, i.e. an index in altservers +// changes its meaning (host). Also used for newservers. +// RD: Use when reading the list or modifying individual entries data, like RTT +// and fail count. Isn't super clean as we still might have races here, but mostly +// the code is clean in this regard, so we should only have stale data somewhere +// but nothing nonsensical. +static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER; +#define lock_read pthread_rwlock_rdlock +#define lock_write pthread_rwlock_wrlock +#define unlock_rw pthread_rwlock_unlock + +/* Static methods */ + + +static void* connection_receiveThreadMain(void *sock); +static void* connection_backgroundThread(void *something); + +static void addAltServers(); +static void sortAltServers(); +static void probeAltServers(); +static void switchConnection(int sockFd, alt_server_t *srv); +static void requestAltServers(); +static bool throwDataAway(int sockFd, uint32_t amount); + +static void enqueueRequest(dnbd3_async_t *request); +static dnbd3_async_t* removeRequest(dnbd3_async_t *request); + +bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew) +{ + int sock = -1; + char host[SHORTBUF]; + size_t hlen; + serialized_buffer_t buffer; + uint16_t remoteVersion, remoteRid; + char *remoteName; + uint64_t remoteSize; + struct sockaddr_storage sa; + socklen_t salen; + poll_list_t *cons = sock_newPollList(); + + timing_setBase(); + pthread_mutex_lock( &mutexInit ); + if ( !connectionInitDone && keepRunning ) { + dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; + const char *current, *end; + int altIndex = 0; + learnNewServers = doLearnNew; + memset( altservers, 0, sizeof altservers ); + connection.sockFd = -1; + current = hosts; + do { + // Get next host from string + while ( *current == ' ' ) current++; + end = strchr( current, ' ' ); + size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); + if ( len > SHORTBUF ) len = SHORTBUF; + snprintf( host, len, "%s", current ); + int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); + for ( int i = 0; i < newHosts; ++i ) { + if ( altIndex >= MAX_ALTS ) + break; + altservers[altIndex].host = tempHosts[i]; + altIndex += 1; + } + current = end + 1; + } while ( end != NULL && altIndex < MAX_ALTS ); + logadd( LOG_INFO, "Got %d servers from init call", altIndex ); + // Connect + for ( int i = 0; i < altIndex + 5; ++i ) { + if ( i >= altIndex ) { + // Additional iteration - no corresponding slot in altservers, this + // is just so we can make a final calls with longer timeout + sock = sock_multiConnect( cons, NULL, 400, 1000 ); + if ( sock == -2 ) { + logadd( LOG_ERROR, "Could not connect to any host" ); + sock = -1; + break; + } + } else { + if ( altservers[i].host.type == 0 ) + continue; + // Try to connect - 100ms timeout + sock = sock_multiConnect( cons, &altservers[i].host, 100, 1000 ); + } + if ( sock == -2 || sock == -1 ) + continue; + salen = sizeof(sa); + if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) { + logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno ); + close( sock ); + sock = -1; + continue; + } + hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) ); + logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host ); + if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) { + logadd( LOG_ERROR, "Could not send select image" ); + } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { + logadd( LOG_ERROR, "Could not read select image reply (%d)", errno ); + } else if ( rid != 0 && rid != remoteRid ) { + logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid ); + } else { + logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid ); + logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid ); + sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 ); + image.name = strdup( remoteName ); + image.rid = remoteRid; + image.size = remoteSize; + if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) { + logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" ); + connection.currentServer.type = 0; + } + connection.panicSignal = signal_new(); + timing_get( &connection.startupTime ); + connection.sockFd = sock; + requests.head = NULL; + requests.tail = NULL; + requestAltServers(); + break; + } + // Failed + if ( sock != -1 ) { + close( sock ); + sock = -1; + } + } + if ( sock != -1 ) { + connectionInitDone = true; + } + } + pthread_mutex_unlock( &mutexInit ); + sock_destroyPollList( cons ); + return sock != -1; +} + +bool connection_initThreads() +{ + pthread_mutex_lock( &mutexInit ); + if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) { + pthread_mutex_unlock( &mutexInit ); + return false; + } + bool success = true; + pthread_t thread; + threadInitDone = true; + logadd( LOG_DEBUG1, "Initializing stuff" ); + if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 + || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) { + logadd( LOG_ERROR, "Mutex or spinlock init failure" ); + success = false; + } else { + if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) { + logadd( LOG_ERROR, "Could not create receive thread" ); + success = false; + } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) { + logadd( LOG_ERROR, "Could not create background thread" ); + success = false; + } + } + if ( !success ) { + close( connection.sockFd ); + connection.sockFd = -1; + } + pthread_mutex_unlock( &mutexInit ); + return success; +} + +uint64_t connection_getImageSize() +{ + return image.size; +} + +bool connection_read(dnbd3_async_t *request) +{ + if ( !connectionInitDone ) return false; + pthread_mutex_lock( &connection.sendMutex ); + enqueueRequest( request ); + if ( connection.sockFd != -1 ) { + if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + pthread_mutex_unlock( &connection.sendMutex ); + signal_call( connection.panicSignal ); + return true; + } + } + pthread_mutex_unlock( &connection.sendMutex ); + return true; +} + +void connection_close() +{ + if ( keepRunning ) { + logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" ); + } + pthread_mutex_lock( &mutexInit ); + keepRunning = false; + if ( !connectionInitDone ) { + pthread_mutex_unlock( &mutexInit ); + return; + } + pthread_mutex_unlock( &mutexInit ); + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + shutdown( connection.sockFd, SHUT_RDWR ); + } + pthread_mutex_unlock( &connection.sendMutex ); +} + +size_t connection_printStats(char *buffer, const size_t len) +{ + int ret; + size_t remaining = len; + declare_now; + if ( remaining > 0 ) { + ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n", + image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) ); + if ( ret < 0 ) { + ret = 0; + } + if ( (size_t)ret >= remaining ) { + return len; + } + remaining -= ret; + buffer += ret; + } + int i = -1; + lock_read( &altLock ); + while ( remaining > 3 && ++i < MAX_ALTS ) { + if ( altservers[i].host.type == 0 ) + continue; + if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) { + *buffer++ = '*'; + } else if ( i >= MAX_ALTS_ACTIVE ) { + *buffer++ = '-'; + } else { + *buffer++ = ' '; + } + const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining ); + remaining -= (addrlen + 1); // For space or * above + buffer += addrlen; + if ( remaining < 3 ) + break; + int width = addrlen >= 35 ? 0 : 35 - (int)addrlen; + char *unit; + int value; + if ( altservers[i].rtt > 5000 ) { + unit = "ms "; + value = altservers[i].rtt / 1000; + } else { + unit = "µs"; + value = altservers[i].rtt; + width += 3; + } + ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n", + width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt ); + if ( ret < 0 ) { + ret = 0; + } + if ( (size_t)ret >= remaining ) { + remaining = 0; + break; + } + remaining -= ret; + buffer += ret; + } + unlock_rw( &altLock ); + return len - remaining; +} + +static void* connection_receiveThreadMain(void *sockPtr) +{ + int sockFd = (int)(size_t)sockPtr; + dnbd3_reply_t reply; + pthread_detach( pthread_self() ); + + while ( keepRunning ) { + int ret; + do { + ret = dnbd3_read_reply( sockFd, &reply, true ); + if ( ret == REPLY_OK ) break; + } while ( ret == REPLY_INTR || ret == REPLY_AGAIN ); + if ( ret != REPLY_OK ) { + logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret ); + goto fail; + } + + if ( reply.cmd == CMD_GET_BLOCK ) { + // Get block reply. find matching request + dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); + if ( request == NULL ) { + // This happens if the alt server probing thread tears down our connection + // and did a direct RTT probe to satisfy this very request. + logadd( LOG_DEBUG1, "Got block reply with no matching request" ); + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { + logadd( LOG_DEBUG1, "....and choked on reply payload" ); + goto fail; + } + } else { + // Found a match + const ssize_t ret = sock_recv( sockFd, request->buffer, request->length ); + if ( ret != (ssize_t)request->length ) { + logadd( LOG_DEBUG1, "receiving payload for a block reply failed" ); + connection_read( request ); + goto fail; + } + // Check RTT + declare_now; + uint64_t diff = timing_diffUs( &request->time, &now ); + if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s + lock_read( &altLock ); + for ( int i = 0; i < MAX_ALTS; ++i ) { + if ( altservers[i].host.type == 0 ) + continue; + if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) { + altservers[i].liveRtt = ( altservers[i].liveRtt * 3 + (int)diff ) / 4; + break; + } + } + unlock_rw( &altLock ); + } + // Success, wake up caller + request->success = true; + request->finished = true; + signal_call( request->signal ); + } + } else if ( reply.cmd == CMD_GET_SERVERS ) { + // List of known alt servers + dnbd3_server_entry_t entries[MAX_ALTS]; + const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS ); + const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count; + if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize + || !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) { + logadd( LOG_DEBUG1, "Error receiving list of alt servers." ); + goto fail; + } + pthread_mutex_lock( &newAltLock ); + memcpy( newservers, entries, relevantSize ); + pthread_mutex_unlock( &newAltLock ); + } else { + // TODO: Handle the others? + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { + logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd ); + goto fail; + } + } + } + logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" ); +fail:; + // Make sure noone is trying to use the socket for sending by locking, + pthread_mutex_lock( &connection.sendMutex ); + // then just set the fd to -1, but only if it's the same fd as ours, + // as someone could have established a new connection already + if ( connection.sockFd == sockFd ) { + connection.sockFd = -1; + signal_call( connection.panicSignal ); + } + pthread_mutex_unlock( &connection.sendMutex ); + // As we're the only reader, it's safe to close the socket now + close( sockFd ); + return NULL; +} + +static void* connection_backgroundThread(void *something UNUSED) +{ + ticks nextKeepalive; + ticks nextRttCheck; + + timing_get( &nextKeepalive ); + nextRttCheck = nextKeepalive; + while ( keepRunning ) { + ticks now; + timing_get( &now ); + uint32_t wt1 = timing_diffMs( &now, &nextKeepalive ); + uint32_t wt2 = timing_diffMs( &now, &nextRttCheck ); + if ( wt1 > 0 && wt2 > 0 ) { + int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 ); + if ( waitRes == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno ); + } + timing_get( &now ); + } + // Woken up, see what we have to do + const bool panic = connection.sockFd == -1; + // Check alt servers + if ( panic || timing_reachedPrecise( &nextRttCheck, &now ) ) { + if ( learnNewServers ) { + addAltServers(); + } + sortAltServers(); + probeAltServers(); + if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) { + timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP ); + } else { + timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL ); + } + } + // Send keepalive packet + if ( timing_reachedPrecise( &nextKeepalive, &now ) ) { + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + request.cmd = CMD_KEEPALIVE; + request.handle = request.offset = request.size = 0; + fixup_request( request ); + ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 ); + if ( (size_t)ret != sizeof request ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + nextRttCheck = now; + } + } + pthread_mutex_unlock( &connection.sendMutex ); + timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET ); + } + } + return NULL; +} + +// Private quick helpers + +static void addAltServers() +{ + pthread_mutex_lock( &newAltLock ); + lock_write( &altLock ); + for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) { + if ( newservers[nIdx].host.type == 0 ) + continue; + // Got a new alt server, see if it's already known + for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { + if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) { + goto skip_server; + } + } + // Not known yet, add - find free slot + int slot = -1; + for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { + if ( altservers[eIdx].host.type == 0 ) { + slot = eIdx; // free - bail out and use this one + break; + } + if ( altservers[eIdx].consecutiveFails > FAIL_BACKOFF_START_COUNT + && slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) { + // Replace an existing alt-server that failed recently if we got no more slots + slot = eIdx; + } + } + if ( slot != -1 ) { + char txt[200]; + sock_printHost( &newservers[nIdx].host, txt, 200 ); + logadd( LOG_DEBUG1, "new server %s in slot %d", txt, slot ); + altservers[slot].consecutiveFails = 0; + altservers[slot].bestCount = 0; + altservers[slot].rtts[0] = RTT_UNREACHABLE; + altservers[slot].rttIndex = 1; + altservers[slot].host = newservers[nIdx].host; + altservers[slot].liveRtt = 0; + } +skip_server:; + } + memset( newservers, 0, sizeof(newservers) ); + unlock_rw( &altLock ); + pthread_mutex_unlock( &newAltLock ); +} + +/** + * Find a server at index >= MAX_ALTS_ACTIVE (one that isn't considered for switching over) + * that has been inactive for a while, then look if there's an active server that's failed + * a couple of times recently. Swap both if found. + */ +static void sortAltServers() +{ + int ac = 0; + lock_write( &altLock ); + for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) { + alt_server_t * const inactive = &altservers[ia]; + if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 ) + continue; + while ( ac < MAX_ALTS_ACTIVE ) { + if ( altservers[ac].host.type == 0 || altservers[ac].consecutiveFails > FAIL_BACKOFF_START_COUNT ) + break; + ac++; + } + if ( ac == MAX_ALTS_ACTIVE ) + break; + // Switch! + alt_server_t * const active = &altservers[ac]; + dnbd3_host_t tmp = inactive->host; + inactive->host = active->host; + inactive->consecutiveFails = FAIL_BACKOFF_START_COUNT * 4; + inactive->bestCount = 0; + inactive->rtts[0] = RTT_UNREACHABLE; + inactive->rttIndex = 1; + inactive->liveRtt = 0; + active->host = tmp; + active->consecutiveFails = 0; + active->bestCount = 0; + active->rtts[0] = RTT_UNREACHABLE; + active->rttIndex = 1; + active->liveRtt = 0; + } + unlock_rw( &altLock ); +} + +static void probeAltServers() +{ + serialized_buffer_t buffer; + dnbd3_reply_t reply; + int bestSock = -1; + uint16_t remoteRid, remoteProto; + uint64_t remoteSize; + char *remoteName; + bool doSwitch; + bool panic = connection.sockFd == -1; + uint64_t testOffset = 0; + uint32_t testLength = RTT_BLOCK_SIZE; + dnbd3_async_t *request = NULL; + alt_server_t *current = NULL, *best = NULL; + + if ( !panic ) { + lock_read( &altLock ); + for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { + if ( altservers[altIndex].host.type != 0 + && isSameAddressPort( &altservers[altIndex].host, &connection.currentServer ) ) { + current = &altservers[altIndex]; + break; + } + } + unlock_rw( &altLock ); + } + declare_now; + pthread_spin_lock( &requests.lock ); + if ( requests.head != NULL ) { + if ( !panic && current != NULL ) { + const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second + dnbd3_async_t *iterator; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + // A request with measurement tag is pending + if ( timing_diffUs( &iterator->time, &now ) > maxDelay ) { + panic = true; + break; + } + } + } + if ( panic ) { + request = requests.head; + testOffset = requests.head->offset; + testLength = requests.head->length; + } + } + pthread_spin_unlock( &requests.lock ); + if ( testOffset != 0 ) { + logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength ); + } + + lock_read( &altLock ); + for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) { + alt_server_t * const srv = &altservers[altIndex]; + if ( srv->host.type == 0 ) + continue; + if ( !panic && srv->consecutiveFails > FAIL_BACKOFF_START_COUNT + && rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) { + continue; + } + if ( srv->rttIndex >= RTT_COUNT ) { + srv->rttIndex = 0; + } else { + srv->rttIndex += 1; + } + // Probe + ticks start; + timing_get( &start ); + errno = 0; + int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); + if ( sock == -1 ) { + logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno ); + goto fail; + } + if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { + logadd( LOG_DEBUG1, "probe: select_image failed" ); + goto fail; + } + if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) { + logadd( LOG_DEBUG1, "probe: select image reply failed" ); + goto fail; + } + if ( remoteProto < MIN_SUPPORTED_SERVER ) { + logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto ); + srv->consecutiveFails += 10; + goto fail; + } + if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) { + logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName ); + srv->consecutiveFails += 10; + goto fail; + } + if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) { + logadd( LOG_DEBUG1, "-> block request fail" ); + goto fail; + } + int a = 111; + if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) { + logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size ); + goto fail; + } + if ( request != NULL && removeRequest( request ) != NULL ) { + // Request successfully removed from queue + const ssize_t ret = sock_recv( sock, request->buffer, request->length ); + if ( ret != (ssize_t)request->length ) { + logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" ); + // Failure, add to queue again + connection_read( request ); + goto fail; + } + // Success, wake up caller + logadd( LOG_DEBUG1, "[RTT] Successful direct probe" ); + request->success = true; + request->finished = true; + signal_call( request->signal ); + } else { + // Wasn't a request that's in our request queue + if ( !throwDataAway( sock, testLength ) ) { + logadd( LOG_DEBUG1, "<- get block reply payload fail" ); + goto fail; + } + } + + // Yay, success + // Panic mode? Just switch to server + if ( panic ) { + unlock_rw( &altLock ); + switchConnection( sock, srv ); + return; + } + // Non-panic mode: + // Update stats of server + ticks end; + timing_get( &end ); + srv->consecutiveFails = 0; + srv->rtts[srv->rttIndex] = (int)timing_diffUs( &start, &end ); + int newRtt = 0; + for ( int i = 0; i < RTT_COUNT; ++i ) { + newRtt += srv->rtts[i]; + } + if ( srv->liveRtt != 0 ) { + // Make live rtt measurement influence result + newRtt = ( newRtt + srv->liveRtt ) / ( RTT_COUNT + 1 ); + } else { + newRtt /= RTT_COUNT; + } + srv->rtt = newRtt; + + // Keep socket open if this is currently the best one + if ( best == NULL || best->rtt > srv->rtt ) { + best = srv; + if ( bestSock != -1 ) { + close( bestSock ); + } + bestSock = sock; + } else { + close( sock ); + } + continue; +fail:; + if ( sock != -1 ) { + close( sock ); + } + srv->rtts[srv->rttIndex] = RTT_UNREACHABLE; + srv->consecutiveFails += 1; + } + doSwitch = false; + if ( best != NULL ) { + // Time-sensitive switch decision: If a server was best for some consecutive measurements, + // we switch no matter how small the difference to the current server is + for ( int altIndex = 0; altIndex < MAX_ALTS_ACTIVE; ++altIndex ) { + alt_server_t * const srv = &altservers[altIndex]; + // Decay liveRtt slowly... + if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) { + srv->liveRtt -= ( ( srv->liveRtt / 100 ) + 1 ); + } + if ( srv == best ) { + if ( srv->bestCount < 50 ) { + srv->bestCount += 2; + } + // Switch with increasing probability the higher the bestCount is + if ( srv->bestCount > 12 && ( current == NULL || srv->rtt < current->rtt ) && srv->bestCount > rand() % 50 ) { + doSwitch = true; + } + } else if ( srv->bestCount > 0 ) { + srv->bestCount--; + } + } + for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) { + if ( altservers[i].consecutiveFails > 0 ) { + altservers[i].consecutiveFails--; + } + } + // This takes care of the situation where two servers alternate being the best server all the time + if ( doSwitch && current != NULL && best->bestCount - current->bestCount < 8 ) { + doSwitch = false; + } + // Regular logic: Apply threshold when considering switch + if ( !doSwitch && current != NULL ) { + doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD + || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000; + } + } + // Switch if a better server was found + if ( doSwitch ) { + logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", current == NULL ? 0 : current->rtt, best->rtt ); + for ( int i = 0; i < MAX_ALTS; ++i ) { + if ( &altservers[i] != best ) { + altservers[i].bestCount = 0; + } + } + unlock_rw( &altLock ); + switchConnection( bestSock, best ); + return; + } + // No switch + unlock_rw( &altLock ); + if ( best != NULL ) { + close( bestSock ); + } +} + +static void switchConnection(int sockFd, alt_server_t *srv) +{ + pthread_t thread; + struct sockaddr_storage addr; + socklen_t addrLen = sizeof(addr); + char message[200] = "Connection switched to "; + const size_t len = strlen( message ); + int ret; + dnbd3_async_t *queue, *it; + + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + shutdown( connection.sockFd, SHUT_RDWR ); + } + ret = getpeername( sockFd, (struct sockaddr*)&addr, &addrLen ); + if ( ret == 0 ) { + connection.currentServer = srv->host; + connection.sockFd = sockFd; + pthread_spin_lock( &requests.lock ); + queue = requests.head; + requests.head = requests.tail = NULL; + pthread_spin_unlock( &requests.lock ); + } else { + connection.sockFd = -1; + } + requestAltServers(); + pthread_mutex_unlock( &connection.sendMutex ); + if ( ret != 0 ) { + close( sockFd ); + logadd( LOG_WARNING, "Could not getpeername after connection switch, assuming connection already dead again. (Errno=%d)", errno ); + signal_call( connection.panicSignal ); + return; + } + timing_get( &connection.startupTime ); + pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd ); + sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len ); + logadd( LOG_INFO, "%s", message ); + // resend queue + if ( queue != NULL ) { + pthread_mutex_lock( &connection.sendMutex ); + dnbd3_async_t *next = NULL; + for ( it = queue; it != NULL; it = next ) { + logadd( LOG_DEBUG1, "Requeue after server change" ); + next = it->next; + enqueueRequest( it ); + if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it, 0 ) ) { + logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" ); + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + signal_call( connection.panicSignal ); + } + } + pthread_mutex_unlock( &connection.sendMutex ); + } +} + +/** + * Does not lock, so get the sendMutex first! + */ +static void requestAltServers() +{ + if ( connection.sockFd == -1 || !learnNewServers ) + return; + dnbd3_request_t request = { 0 }; + request.magic = dnbd3_packet_magic; + request.cmd = CMD_GET_SERVERS; + fixup_request( request ); + if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) { + logadd( LOG_WARNING, "Connection failed while requesting alt server list" ); + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + } +} + +static bool throwDataAway(int sockFd, uint32_t amount) +{ + size_t done = 0; + char tempBuffer[SHORTBUF]; + while ( done < amount ) { + const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) ); + if ( ret <= 0 ) + return false; + done += (size_t)ret; + } + return true; +} + +static void enqueueRequest(dnbd3_async_t *request) +{ + request->next = NULL; + request->finished = false; + request->success = false; + //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line ); + // Measure latency and add to switch formula + timing_get( &request->time ); + pthread_spin_lock( &requests.lock ); + if ( requests.head == NULL ) { + requests.head = requests.tail = request; + } else { + requests.tail->next = request; + requests.tail = request; + } + pthread_spin_unlock( &requests.lock ); +} + +static dnbd3_async_t* removeRequest(dnbd3_async_t *request) +{ + pthread_spin_lock( &requests.lock ); + //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line ); + dnbd3_async_t *iterator, *prev = NULL; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + if ( iterator == request ) { + // Found it, break! + if ( prev != NULL ) { + prev->next = iterator->next; + } else { + requests.head = iterator->next; + } + if ( requests.tail == iterator ) { + requests.tail = prev; + } + break; + } + prev = iterator; + } + pthread_spin_unlock( &requests.lock ); + return iterator; +} + diff --git a/src/fuse/connection.h b/src/fuse/connection.h new file mode 100644 index 0000000..cae554c --- /dev/null +++ b/src/fuse/connection.h @@ -0,0 +1,35 @@ +#ifndef _CONNECTION_H_ +#define _CONNECTION_H_ + +#include "../shared/fdsignal.h" +#include "../shared/timing.h" +#include +#include +#include + +struct _dnbd3_async; + +typedef struct _dnbd3_async { + struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller) + dnbd3_signal_t* signal; // Used to signal the caller + char* buffer; // Caller-provided buffer to be filled + ticks time; // When request was put on wire, 0 if not measuring + uint64_t offset; + uint32_t length; + bool finished; // Will be set to true if the request has been handled + bool success; // Will be set to true if the request succeeded +} dnbd3_async_t; + +bool connection_init(const char *hosts, const char *image, const uint16_t rid, const bool learnNewServers); + +bool connection_initThreads(); + +uint64_t connection_getImageSize(); + +bool connection_read(dnbd3_async_t *request); + +void connection_close(); + +size_t connection_printStats(char *buffer, const size_t len); + +#endif /* CONNECTION_H_ */ diff --git a/src/fuse/helper.c b/src/fuse/helper.c new file mode 100644 index 0000000..d81b08f --- /dev/null +++ b/src/fuse/helper.c @@ -0,0 +1,36 @@ +#include "helper.h" + +#include +#include +#include + + +void printLog( log_info *info ) +{ + FILE *logFile; + + // Create logfile + + logFile = fopen( "log.txt", "w" ); + if ( logFile == NULL ) { + printf( "Error creating/opening log.txt\n" ); + return; + } + + //rewind(file); + fprintf( logFile, "ImageSize: %"PRIu64" MiB\n", ( uint64_t )( info->imageSize/ ( 1024ll*1024ll ) ) ); + fprintf( logFile, "ReceivedMiB: %"PRIu64" MiB\n", ( uint64_t )( info->receivedBytes/ ( 1024ll*1024ll ) ) ); + fprintf( logFile, "imageBlockCount: %"PRIu64"\n", info->imageBlockCount ); + fprintf( logFile, "Blocksize: 4KiB\n\n" ); + fprintf( logFile, "Block access count:\n" ); + + uint64_t i = 0; + for ( ; i < info->imageBlockCount; i++ ) { + if ( i % 50 == 0 ) { + fprintf( logFile, "\n" ); + } + fprintf( logFile, "%i ", ( int ) info->blockRequestCount[i] ); + } + fprintf( logFile, "\n" ); + fclose( logFile ); +} diff --git a/src/fuse/helper.h b/src/fuse/helper.h new file mode 100644 index 0000000..9e5d127 --- /dev/null +++ b/src/fuse/helper.h @@ -0,0 +1,35 @@ +#ifndef IMAGEHELPER_H +#define IMAGEHELPER_H + +#include "../types.h" + +#include +#include +#include +#include +#include + +typedef struct log_info { + uint64_t imageSize; + uint64_t receivedBytes; + uint64_t imageBlockCount; + uint8_t *blockRequestCount; +} log_info; + + + +void printLog(log_info *info); + +int connect_to_server(char *server_adress, int port); + +static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_host_t * const b) +{ + return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) )); +} + +static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b) +{ + return (a->type == b->type) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) )); +} + +#endif diff --git a/src/fuse/main.c b/src/fuse/main.c new file mode 100644 index 0000000..1a5643c --- /dev/null +++ b/src/fuse/main.c @@ -0,0 +1,420 @@ +/* + * FUSE: Filesystem in Userspace + * Copyright (C) 2001-2007 Miklos Szeredi + * This program can be distributed under the terms of the GNU GPL. + * See the file COPYING. + * + * Changed by Stephan Schwaer + * */ + +#include "connection.h" +#include "helper.h" +#include "../shared/protocol.h" +#include "../shared/log.h" + +#define FUSE_USE_VERSION 30 +#include +#include +#include +#include +/* for printing uint */ +#define __STDC_FORMAT_MACROS +#include +#include +#include +#include +#include + +#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0) + +static const char * const IMAGE_PATH = "/img"; +static const char * const STATS_PATH = "/status"; + +static uint64_t imageSize; +/* Debug/Benchmark variables */ +static bool useDebug = false; +static log_info logInfo; +static struct timespec startupTime; +static uid_t owner; +static bool keepRunning = true; +static void (*fuse_sigIntHandler)(int) = NULL; +static void (*fuse_sigTermHandler)(int) = NULL; +static struct fuse_operations dnbd3_fuse_no_operations; + +#define SIGPOOLSIZE 6 +static pthread_spinlock_t sigLock; +static dnbd3_signal_t *signalPool[SIGPOOLSIZE]; +static dnbd3_signal_t **sigEnd = signalPool + SIGPOOLSIZE; +static void signalInit() +{ + pthread_spin_init( &sigLock, PTHREAD_PROCESS_PRIVATE ); + for ( size_t i = 0; i < SIGPOOLSIZE; ++i ) { + signalPool[i] = NULL; + } +} +static inline dnbd3_signal_t *signalGet() +{ + pthread_spin_lock( &sigLock ); + for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) { + if ( *it != NULL ) { + dnbd3_signal_t *ret = *it; + *it = NULL; + pthread_spin_unlock( &sigLock ); + return ret; + } + } + pthread_spin_unlock( &sigLock ); + return signal_newBlocking(); +} +static inline void signalPut(dnbd3_signal_t *signal) +{ + pthread_spin_lock( &sigLock ); + for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) { + if ( *it == NULL ) { + *it = signal; + pthread_spin_unlock( &sigLock ); + return; + } + } + pthread_spin_unlock( &sigLock ); + signal_close( signal ); +} + +static int image_getattr(const char *path, struct stat *stbuf) +{ + int res = 0; + memset( stbuf, 0, sizeof( struct stat ) ); + stbuf->st_ctim = stbuf->st_atim = stbuf->st_mtim = startupTime; + stbuf->st_uid = owner; + if ( strcmp( path, "/" ) == 0 ) { + stbuf->st_mode = S_IFDIR | 0550; + stbuf->st_nlink = 2; + } else if ( strcmp( path, IMAGE_PATH ) == 0 ) { + stbuf->st_mode = S_IFREG | 0440; + stbuf->st_nlink = 1; + stbuf->st_size = imageSize; + } else if ( strcmp( path, STATS_PATH ) == 0 ) { + stbuf->st_mode = S_IFREG | 0440; + stbuf->st_nlink = 1; + stbuf->st_size = 4096; + clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim ); + } else { + res = -ENOENT; + } + return res; +} + +static int image_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset UNUSED, struct fuse_file_info *fi UNUSED) +{ + if ( strcmp( path, "/" ) != 0 ) { + return -ENOENT; + } + filler( buf, ".", NULL, 0 ); + filler( buf, "..", NULL, 0 ); + filler( buf, IMAGE_PATH + 1, NULL, 0 ); + filler( buf, STATS_PATH + 1, NULL, 0 ); + return 0; +} + +static int image_open(const char *path, struct fuse_file_info *fi) +{ + if ( strcmp( path, IMAGE_PATH ) != 0 && strcmp( path, STATS_PATH ) != 0 ) { + return -ENOENT; + } + if ( ( fi->flags & 3 ) != O_RDONLY ) { + return -EACCES; + } + return 0; +} + +static int fillStatsFile(char *buf, size_t size, off_t offset) { + if ( offset == 0 ) { + return (int)connection_printStats( buf, size ); + } + char buffer[4096]; + int ret = (int)connection_printStats( buffer, sizeof buffer ); + int len = MIN( ret - (int)offset, (int)size ); + if ( len == 0 ) + return 0; + if ( len < 0 ) { + return -EOF; + } + memcpy( buf, buffer + offset, len ); + return len; +} + +static int image_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi UNUSED) +{ + if ( size > __INT_MAX__ ) { + // fuse docs say we MUST fill the buffer with exactly size bytes and return size, + // otherwise the buffer will we padded with zeros. Since the return value is just + // an int, we could not properly fulfill read requests > 2GB. Since there is no + // mention of a guarantee that this will never happen, better add a safety check. + // Way to go fuse. + return -EIO; + } + if ( path[1] == STATS_PATH[1] ) { + return fillStatsFile( buf, size, offset ); + } + + if ( (uint64_t)offset >= imageSize ) { + return 0; + } + + if ( offset + size > imageSize ) { + size = imageSize - offset; + } + + if ( useDebug ) { + /* count the requested blocks */ + uint64_t startBlock = offset / ( 4096 ); + const uint64_t endBlock = ( offset + size - 1 ) / ( 4096 ); + + for ( ; startBlock <= endBlock; startBlock++ ) { + ++logInfo.blockRequestCount[startBlock]; + } + } + + dnbd3_async_t request; + request.buffer = buf; + request.length = (uint32_t)size; + request.offset = offset; + request.signal = signalGet(); + + if ( !connection_read( &request ) ) { + signalPut( request.signal ); + return -EINVAL; + } + while ( !request.finished ) { + int ret = signal_wait( request.signal, 5000 ); + if ( !keepRunning ) { + connection_close(); + break; + } + if ( ret < 0 ) { + debugf( "fuse_read signal wait returned %d", ret ); + } + } + signalPut( request.signal ); + if ( request.success ) { + return request.length; + } else { + return -EIO; + } +} + +static void image_sigHandler(int signum) { + keepRunning = false; + if ( signum == SIGINT && fuse_sigIntHandler != NULL ) { + fuse_sigIntHandler(signum); + } + if ( signum == SIGTERM && fuse_sigTermHandler != NULL ) { + fuse_sigTermHandler(signum); + } +} + +static void* image_init(struct fuse_conn_info *conn UNUSED) +{ + if ( !connection_initThreads() ) { + logadd( LOG_ERROR, "Could not initialize threads for dnbd3 connection, exiting..." ); + exit( EXIT_FAILURE ); + } + // Prepare our handler + struct sigaction newHandler; + memset( &newHandler, 0, sizeof(newHandler) ); + newHandler.sa_handler = &image_sigHandler; + sigemptyset( &newHandler.sa_mask ); + struct sigaction oldHandler; + // Retrieve old handlers when setting + sigaction( SIGINT, &newHandler, &oldHandler ); + fuse_sigIntHandler = oldHandler.sa_handler; + logadd( LOG_DEBUG1, "Previous SIGINT handler was %p", (void*)(uintptr_t)fuse_sigIntHandler ); + sigaction( SIGTERM, &newHandler, &oldHandler ); + fuse_sigTermHandler = oldHandler.sa_handler; + logadd( LOG_DEBUG1, "Previous SIGTERM handler was %p", (void*)(uintptr_t)fuse_sigIntHandler ); + return NULL; +} + +/* close the connection */ +static void image_destroy(void *private_data UNUSED) +{ + if ( useDebug ) { + printLog( &logInfo ); + } + connection_close(); + return; +} + +/* map the implemented fuse operations */ +static struct fuse_operations image_oper = { + .getattr = image_getattr, + .readdir = image_readdir, + .open = image_open, + .read = image_read, + .init = image_init, + .destroy = image_destroy, +}; + +static void printVersion() +{ + char *arg[] = { "foo", "-V" }; + printf( "DNBD3-Fuse Version 1.2.3.4, protocol version %d\n", (int)PROTOCOL_VERSION ); + fuse_main( 2, arg, &dnbd3_fuse_no_operations, NULL ); + exit( 0 ); +} + +static void printUsage(char *argv0, int exitCode) +{ + char *arg[] = { argv0, "-h" }; + fuse_main( 2, arg, &dnbd3_fuse_no_operations, NULL ); + printf( "\n" ); + printf( "Usage: %s [--debug] [--option mountOpts] --host --image [--rid revision] \n", argv0 ); + printf( "Or: %s [-d] [-o mountOpts] -h -i [-r revision] \n", argv0 ); + printf( " -d --debug Don't fork, write stats file, and print debug output (fuse -> stderr, dnbd3 -> stdout)\n" ); + printf( " -f Don't fork (dnbd3 -> stdout)\n" ); + printf( " -h --host List of space separated hosts to use\n" ); + printf( " -i --image Remote image name to request\n" ); + printf( " -l --log Write log to given location\n" ); + printf( " -o --option Mount options to pass to libfuse\n" ); + printf( " -r --rid Revision to use (omit or pass 0 for latest)\n" ); + printf( " -S --sticky Use only servers from command line (no learning from servers)\n" ); + printf( " -s Single threaded mode\n" ); + exit( exitCode ); +} + +static const char *optString = "dfHh:i:l:o:r:SsVv"; +static const struct option longOpts[] = { + { "debug", no_argument, NULL, 'd' }, + { "help", no_argument, NULL, 'H' }, + { "host", required_argument, NULL, 'h' }, + { "image", required_argument, NULL, 'i' }, + { "log", required_argument, NULL, 'l' }, + { "option", required_argument, NULL, 'o' }, + { "rid", required_argument, NULL, 'r' }, + { "sticky", no_argument, NULL, 'S' }, + { "version", no_argument, NULL, 'v' }, + { 0, 0, 0, 0 } +}; + +int main(int argc, char *argv[]) +{ + char *server_address = NULL; + char *image_Name = NULL; + char *log_file = NULL; + uint16_t rid = 0; + char **newArgv; + int newArgc; + int opt, lidx; + bool learnNewServers = true; + + if ( argc <= 1 || strcmp( argv[1], "--help" ) == 0 || strcmp( argv[1], "--usage" ) == 0 ) { + printUsage( argv[0], 0 ); + } + + // TODO Make log mask configurable + log_setConsoleMask( 65535 ); + log_setConsoleTimestamps( true ); + log_setFileMask( 65535 ); + + newArgv = calloc( argc + 10, sizeof(char*) ); + newArgv[0] = argv[0]; + newArgc = 1; + while ( ( opt = getopt_long( argc, argv, optString, longOpts, &lidx ) ) != -1 ) { + switch ( opt ) { + case 'h': + server_address = optarg; + break; + case 'i': + image_Name = optarg; + break; + case 'r': + rid = (uint16_t)atoi(optarg); + break; + case 'o': + newArgv[newArgc++] = "-o"; + newArgv[newArgc++] = optarg; + if ( strstr( optarg, "use_ino" ) != NULL ) { + logadd( LOG_WARNING, "************************" ); + logadd( LOG_WARNING, "* WARNING: use_ino mount option is unsupported, use at your own risk!" ); + logadd( LOG_WARNING, "************************" ); + } + if ( strstr( optarg, "intr" ) != NULL ) { + logadd( LOG_WARNING, "************************" ); + logadd( LOG_WARNING, "* WARNING: intr mount option is unsupported, use at your own risk!" ); + logadd( LOG_WARNING, "************************" ); + } + break; + case 'l': + log_file = optarg; + break; + case 'H': + printUsage( argv[0], 0 ); + break; + case 'v': + case 'V': + printVersion(); + break; + case 'd': + useDebug = true; + newArgv[newArgc++] = "-d"; + break; + case 's': + newArgv[newArgc++] = "-s"; + break; + case 'S': + learnNewServers = false; + break; + case 'f': + newArgv[newArgc++] = "-f"; + break; + default: + printUsage( argv[0], EXIT_FAILURE ); + } + } + + if ( optind >= argc ) { // Missing mount point + printUsage( argv[0], EXIT_FAILURE ); + } + + if ( server_address == NULL || image_Name == NULL ) { + printUsage( argv[0], EXIT_FAILURE ); + } + + if ( log_file != NULL ) { + if ( !log_openLogFile( log_file ) ) { + logadd( LOG_WARNING, "Could not open log file at '%s'", log_file ); + } + } + + if ( !connection_init( server_address, image_Name, rid, learnNewServers ) ) { + logadd( LOG_ERROR, "Could not connect to any server. Bye.\n" ); + return EXIT_FAILURE; + } + imageSize = connection_getImageSize(); + + /* initialize benchmark variables */ + logInfo.receivedBytes = 0; + logInfo.imageSize = imageSize; + logInfo.imageBlockCount = ( imageSize + 4095 ) / 4096; + if ( useDebug ) { + logInfo.blockRequestCount = calloc( logInfo.imageBlockCount, sizeof(uint8_t) ); + } else { + logInfo.blockRequestCount = NULL; + } + + // Since dnbd3 is always read only and the remote image will not change + newArgv[newArgc++] = "-o"; + newArgv[newArgc++] = "ro,auto_cache,default_permissions"; + // Mount point goes last + newArgv[newArgc++] = argv[optind]; + + printf( "ImagePathName: %s\nFuseArgs:",IMAGE_PATH ); + for ( int i = 0; i < newArgc; ++i ) { + printf( " '%s'", newArgv[i] ); + } + putchar('\n'); + clock_gettime( CLOCK_REALTIME, &startupTime ); + owner = getuid(); + signalInit(); + return fuse_main( newArgc, newArgv, &image_oper, NULL ); +} diff --git a/src/fuse/serialize.c b/src/fuse/serialize.c new file mode 100644 index 0000000..4934132 --- /dev/null +++ b/src/fuse/serialize.c @@ -0,0 +1,5 @@ +#include +#include +#include + +#include "../serialize.c" -- cgit v1.2.3-55-g7522