summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/CMakeLists.txt35
-rw-r--r--src/fuse/connection.c585
-rw-r--r--src/fuse/connection.h33
-rw-r--r--src/fuse/cowDoc/img/datastructure.jpgbin0 -> 397688 bytes
-rw-r--r--src/fuse/cowDoc/img/readrequest.svg4
-rw-r--r--src/fuse/cowDoc/readme.md367
-rw-r--r--src/fuse/cowfile.c1777
-rw-r--r--src/fuse/cowfile.h146
-rw-r--r--src/fuse/helper.c6
-rw-r--r--src/fuse/helper.h14
-rw-r--r--src/fuse/main.c604
-rw-r--r--src/fuse/main.h12
-rw-r--r--src/fuse/serialize.c5
13 files changed, 3178 insertions, 410 deletions
diff --git a/src/fuse/CMakeLists.txt b/src/fuse/CMakeLists.txt
new file mode 100644
index 0000000..05b3fcd
--- /dev/null
+++ b/src/fuse/CMakeLists.txt
@@ -0,0 +1,35 @@
+cmake_minimum_required(VERSION 3.10)
+
+# set the project name
+project(dnbd3-fuse
+ LANGUAGES C)
+
+find_package(Fuse REQUIRED)
+
+# find atomic library required by dnbd3-fuse
+find_package(Stdatomic REQUIRED)
+find_package(Libatomic REQUIRED)
+
+# find curl for cow
+find_package(CURL REQUIRED)
+
+# add compile option to enable enhanced POSIX pthread features
+add_definitions(-D_GNU_SOURCE)
+
+set(DNBD3_FUSE_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/cowfile.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/connection.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/helper.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/main.c)
+set(DNBD3_FUSE_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/cowfile.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/connection.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/helper.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/main.h)
+
+add_executable(dnbd3-fuse ${DNBD3_FUSE_SOURCE_FILES})
+target_include_directories(dnbd3-fuse PRIVATE ${FUSE_INCLUDE_DIRS} ${CURL_INCLUDE_DIR} )
+target_link_libraries(dnbd3-fuse dnbd3-build dnbd3-version dnbd3-shared ${FUSE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${CURL_LIBRARIES} )
+install(TARGETS dnbd3-fuse RUNTIME DESTINATION bin
+ COMPONENT fuse)
+
+add_linter(dnbd3-fuse-lint "${DNBD3_FUSE_SOURCE_FILES}" "${DNBD3_FUSE_HEADER_FILES}")
+add_linter_fix(dnbd3-fuse-lint-fix "${DNBD3_FUSE_SOURCE_FILES}" "${DNBD3_FUSE_HEADER_FILES}")
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 98b1d36..dede680 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -1,19 +1,23 @@
#include "connection.h"
#include "helper.h"
-#include "../clientconfig.h"
-#include "../shared/protocol.h"
-#include "../shared/fdsignal.h"
-#include "../shared/sockhelper.h"
-#include "../shared/log.h"
+#include <dnbd3/config/client.h>
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/fdsignal.h>
+#include <dnbd3/shared/sockhelper.h>
+#include <dnbd3/shared/log.h>
+#include "main.h"
+#include "cowfile.h"
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
+#include <stdatomic.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <inttypes.h>
+#include <signal.h>
/* Constants */
static const size_t SHORTBUF = 100;
@@ -30,9 +34,18 @@ static const int FAIL_BACKOFF_START_COUNT = 8;
static bool connectionInitDone = false;
static bool threadInitDone = false;
static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
-static bool keepRunning = true;
+// For multi-threaded concurrent connection during init
+static pthread_mutex_t mutexCondConn = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t condConn = PTHREAD_COND_INITIALIZER;
+static atomic_int pendingConnectionAttempts = 0;
+// Shutdown flag
+atomic_bool keepRunning = true;
+// Should we learn new alt-servers from servers we connect to?
static bool learnNewServers;
+static pthread_t tidReceiver;
+static pthread_t tidBackground;
+
// List of pending requests
static struct {
dnbd3_async_t *head;
@@ -55,15 +68,21 @@ static struct {
ticks startupTime;
} connection;
+struct conn_data {
+ char *lowerImage;
+ uint16_t rid;
+ int idx;
+};
+
// Known alt servers
typedef struct _alt_server {
dnbd3_host_t host;
- int consecutiveFails;
- int rtt;
+ atomic_int consecutiveFails;
+ atomic_int rtt;
int rtts[RTT_COUNT];
int rttIndex;
- int bestCount;
- int liveRtt;
+ atomic_int bestCount;
+ atomic_int liveRtt;
} alt_server_t;
static dnbd3_server_entry_t newservers[MAX_ALTS];
@@ -83,136 +102,233 @@ static pthread_rwlock_t altLock = PTHREAD_RWLOCK_INITIALIZER;
/* Static methods */
-static void* connection_receiveThreadMain(void *sock);
-static void* connection_backgroundThread(void *something);
+static void* connectThread(void * data);
+static void* connection_receiveThreadMain( void *sock );
+static void* connection_backgroundThread( void *something );
-static void addAltServers();
+static bool hasAltServer( dnbd3_host_t *host );
+static void addAltServers( void );
static void sortAltServers();
static void probeAltServers();
-static void switchConnection(int sockFd, alt_server_t *srv);
-static void requestAltServers();
-static bool throwDataAway(int sockFd, uint32_t amount);
+static size_t receiveRequest(const int sock, dnbd3_async_t* request );
+static void switchConnection( int sockFd, alt_server_t *srv );
+static void requestAltServers( void );
+static bool sendAltServerRequest( int sock );
+static bool throwDataAway( int sockFd, uint32_t amount );
+
+static void enqueueRequest( dnbd3_async_t *request );
+static dnbd3_async_t* removeRequest( dnbd3_async_t *request );
-static void enqueueRequest(dnbd3_async_t *request);
-static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
+static void blockSignals();
-bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew)
+bool connection_init( const char *hosts, const char *lowerImage, const uint16_t rid, const bool doLearnNew )
{
- int sock = -1;
char host[SHORTBUF];
- size_t hlen;
- serialized_buffer_t buffer;
- uint16_t remoteVersion, remoteRid;
- char *remoteName;
- uint64_t remoteSize;
- struct sockaddr_storage sa;
- socklen_t salen;
- poll_list_t *cons = sock_newPollList();
+ dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
+ const char *current, *end;
+ int altIndex = 0;
timing_setBase();
pthread_mutex_lock( &mutexInit );
- if ( !connectionInitDone && keepRunning ) {
- dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
- const char *current, *end;
- int altIndex = 0;
- learnNewServers = doLearnNew;
- memset( altservers, 0, sizeof altservers );
- connection.sockFd = -1;
- current = hosts;
- do {
- // Get next host from string
- while ( *current == ' ' ) current++;
- end = strchr( current, ' ' );
- size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1);
- if ( len > SHORTBUF ) len = SHORTBUF;
- snprintf( host, len, "%s", current );
- int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
- for ( int i = 0; i < newHosts; ++i ) {
- if ( altIndex >= MAX_ALTS )
- break;
- altservers[altIndex].host = tempHosts[i];
- altIndex += 1;
- }
- current = end + 1;
- } while ( end != NULL && altIndex < MAX_ALTS );
- logadd( LOG_INFO, "Got %d servers from init call", altIndex );
- // Connect
- for ( int i = 0; i < altIndex + 5; ++i ) {
- if ( i >= altIndex ) {
- // Additional iteration - no corresponding slot in altservers, this
- // is just so we can make a final calls with longer timeout
- sock = sock_multiConnect( cons, NULL, 400, 3000 );
- if ( sock == -2 ) {
- logadd( LOG_ERROR, "Could not connect to any host" );
- sock = -1;
- break;
- }
- } else {
- if ( altservers[i].host.type == 0 )
- continue;
- // Try to connect - 100ms timeout
- sock = sock_multiConnect( cons, &altservers[i].host, 100, 3000 );
- }
- if ( sock == -2 || sock == -1 )
- continue;
- salen = sizeof(sa);
- if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
- logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno );
- close( sock );
- sock = -1;
+ if ( connectionInitDone ) {
+ pthread_mutex_unlock( &mutexInit );
+ return false;
+ }
+ learnNewServers = doLearnNew;
+ memset( altservers, 0, sizeof altservers );
+ connection.sockFd = -1;
+ current = hosts;
+ pthread_attr_t threadAttrs;
+ pthread_attr_init( &threadAttrs );
+ pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
+ // Resolve all hosts and connect
+ pthread_mutex_lock( &mutexCondConn );
+ do {
+ // Get next host from string
+ while ( *current == ' ' || *current == '\t' || *current == '\n' ) {
+ current++;
+ }
+ end = current;
+ while ( *end != ' ' && *end != '\t' && *end != '\n' && *end != '\0' ) {
+ end++;
+ }
+ if ( end == current )
+ break;
+ size_t len = (size_t)( end - current ) + 1;
+ if ( len > SHORTBUF ) {
+ len = SHORTBUF;
+ }
+ snprintf( host, len, "%s", current );
+ int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
+ for ( int i = 0; i < newHosts; ++i ) {
+ if ( altIndex >= MAX_ALTS )
+ break;
+ if ( hasAltServer( &tempHosts[i] ) )
continue;
- }
- hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) );
- logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host );
- if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
- logadd( LOG_ERROR, "Could not send select image" );
- } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
- logadd( LOG_ERROR, "Could not read select image reply (%d)", errno );
- } else if ( rid != 0 && rid != remoteRid ) {
- logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid );
- } else {
- logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid );
- logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid );
- sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 );
- image.name = strdup( remoteName );
- image.rid = remoteRid;
- image.size = remoteSize;
- if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) {
- logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" );
- connection.currentServer.type = 0;
+ altservers[altIndex].host = tempHosts[i];
+ // Start thread for async connect if not connected yet
+ atomic_thread_fence( memory_order_acquire );
+ if ( connection.sockFd == -1 ) {
+ pthread_t t;
+ struct conn_data *cd = malloc( sizeof(*cd) );
+ // We cannot be sure a thread is taking longer than this function runs, so better copy
+ cd->lowerImage = strdup( lowerImage );
+ cd->rid = rid;
+ cd->idx = altIndex;
+ pendingConnectionAttempts++;
+ if ( ( errno = pthread_create( &t, &threadAttrs, &connectThread, (void*)cd ) ) != 0 ) {
+ pendingConnectionAttempts--;
+ logadd( LOG_ERROR, "Could not create connect thread %d, errno=%d", cd->idx, errno );
+ free( cd->lowerImage );
+ free( cd );
+ continue;
}
- connection.panicSignal = signal_new();
- timing_get( &connection.startupTime );
- connection.sockFd = sock;
- requests.head = NULL;
- requests.tail = NULL;
- requestAltServers();
- break;
- }
- // Failed
- if ( sock != -1 ) {
- close( sock );
- sock = -1;
+ struct timespec timeout;
+ clock_gettime( CLOCK_REALTIME, &timeout );
+ timeout.tv_nsec += 200 * 1000 * 1000;
+ if ( timeout.tv_nsec >= 1000 * 1000 * 1000 ) {
+ timeout.tv_nsec -= 1000 * 1000 * 1000;
+ timeout.tv_sec += 1;
+ }
+ pthread_cond_timedwait( &condConn, &mutexCondConn, &timeout );
}
+ // End async connect
+ altIndex += 1;
}
- if ( sock != -1 ) {
- connectionInitDone = true;
- }
+ current = end + 1;
+ } while ( *end != '\0' && altIndex < MAX_ALTS );
+ logadd( LOG_INFO, "Got %d servers from init call", altIndex );
+ // Wait a maximum of five seconds if we're not connected yet
+ if ( connection.sockFd == -1 && pendingConnectionAttempts > 0 ) {
+ struct timespec end;
+ clock_gettime( CLOCK_REALTIME, &end );
+ end.tv_sec += 5;
+ pthread_cond_timedwait( &condConn, &mutexCondConn, &end );
+ }
+ pthread_mutex_unlock( &mutexCondConn );
+ pthread_attr_destroy( &threadAttrs );
+ if ( connection.sockFd != -1 ) {
+ connectionInitDone = true;
}
pthread_mutex_unlock( &mutexInit );
- sock_destroyPollList( cons );
- return sock != -1;
+ return connectionInitDone;
+}
+
+static void* connectThread(void * data)
+{
+ struct conn_data *cd = (struct conn_data*)data;
+ int idx = cd->idx;
+ int sock = -1;
+ serialized_buffer_t buffer;
+ uint16_t remoteVersion, remoteRid;
+ char *remoteName;
+ uint64_t remoteSize;
+ char host[SHORTBUF];
+ struct sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+
+ if ( idx < 0 || idx >= MAX_ALTS || altservers[idx].host.type == 0 ) {
+ logadd( LOG_ERROR, "BUG: Index out of range, or empty server in connect thread (%d)", idx );
+ goto bailout;
+ }
+
+ sock_printHost( &altservers[idx].host, host, sizeof(host) );
+ logadd( LOG_INFO, "Trying to connect to %s", host );
+ sock = sock_connect( &altservers[idx].host, 1500, SOCKET_TIMEOUT_RECV * 1000 );
+ if ( sock == -1 ) {
+ logadd( LOG_INFO, "[%s] Connection failed", host );
+ goto bailout;
+ }
+
+ salen = sizeof( sa );
+ if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
+ logadd( LOG_ERROR, "[%s] getpeername on successful connection failed!? (errno=%d)", host, errno );
+ goto bailout;
+ }
+ atomic_thread_fence( memory_order_acquire );
+ if ( connection.sockFd != -1 )
+ goto bailout;
+
+ sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) );
+ logadd( LOG_INFO, "[%s] Connected", host );
+ if ( !dnbd3_select_image( sock, cd->lowerImage, cd->rid, 0 ) ) {
+ logadd( LOG_ERROR, "[%s] Could not send select image", host );
+ goto bailout;
+ }
+
+ if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
+ logadd( LOG_ERROR, "[%s] Could not read select image reply (%d)", host, errno );
+ goto bailout;
+ }
+ atomic_thread_fence( memory_order_acquire );
+ if ( connection.sockFd != -1 )
+ goto bailout;
+
+ if ( cd->rid != 0 && cd->rid != remoteRid ) {
+ logadd( LOG_ERROR, "[%s] rid mismatch (want: %d, got: %d)",
+ host, (int)cd->rid, (int)remoteRid );
+ goto bailout;
+ }
+ // Seems we got a winner
+ pthread_mutex_lock( &mutexCondConn );
+ if ( connection.sockFd != -1 || connectionInitDone ) {
+ pthread_mutex_unlock( &mutexCondConn );
+ logadd( LOG_INFO, "[%s] Raced by other connection", host );
+ goto bailout;
+ }
+ logadd( LOG_INFO, "Requested: '%s:%d'", cd->lowerImage, (int)cd->rid );
+ logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid );
+ image.name = strdup( remoteName );
+ image.rid = remoteRid;
+ image.size = remoteSize;
+ connection.currentServer = altservers[idx].host;
+ connection.panicSignal = signal_new();
+ timing_get( &connection.startupTime );
+ requests.head = NULL;
+ requests.tail = NULL;
+ if ( learnNewServers && !sendAltServerRequest( sock ) )
+ goto bailout;
+ // Everything good, tell main connect function
+ connection.sockFd = sock;
+ atomic_thread_fence( memory_order_release );
+ pendingConnectionAttempts--;
+ if ( idx != 0 ) {
+ // Make server first in list - enough to swap host, other data has not changed yet
+ lock_write( &altLock );
+ dnbd3_host_t tmp = altservers[idx].host;
+ altservers[idx].host = altservers[0].host;
+ altservers[0].host = tmp;
+ unlock_rw( &altLock );
+ }
+ pthread_cond_signal( &condConn );
+ pthread_mutex_unlock( &mutexCondConn );
+ return NULL;
+
+bailout:
+ if ( sock != -1 ) {
+ close( sock );
+ }
+ free( cd->lowerImage );
+ free( cd );
+ // Last one has to wake up main thread, which is waiting for up to 5 seconds for
+ // any connect thread to succeed. If none succeeded, there is no point in waiting
+ // any longer.
+ if ( --pendingConnectionAttempts == 0 ) {
+ pthread_mutex_lock( &mutexCondConn );
+ pthread_cond_signal( &condConn );
+ pthread_mutex_unlock( &mutexCondConn );
+ }
+ return NULL;
}
bool connection_initThreads()
{
pthread_mutex_lock( &mutexInit );
- if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) {
+ if ( !connectionInitDone || threadInitDone || connection.sockFd == -1 ) {
pthread_mutex_unlock( &mutexInit );
return false;
}
bool success = true;
- pthread_t thread;
threadInitDone = true;
logadd( LOG_DEBUG1, "Initializing stuff" );
if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
@@ -220,10 +336,10 @@ bool connection_initThreads()
logadd( LOG_ERROR, "Mutex or spinlock init failure" );
success = false;
} else {
- if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) {
+ if ( pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)connection.sockFd ) != 0 ) {
logadd( LOG_ERROR, "Could not create receive thread" );
success = false;
- } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
+ } else if ( pthread_create( &tidBackground, NULL, &connection_backgroundThread, NULL ) != 0 ) {
logadd( LOG_ERROR, "Could not create background thread" );
success = false;
}
@@ -236,12 +352,23 @@ bool connection_initThreads()
return success;
}
+char * connection_getImageName()
+{
+ return image.name;
+}
+
+uint16_t connection_getImageRID()
+{
+ return image.rid;
+}
+
+
uint64_t connection_getImageSize()
{
return image.size;
}
-bool connection_read(dnbd3_async_t *request)
+bool connection_read( dnbd3_async_t *request )
{
if ( !connectionInitDone ) return false;
pthread_mutex_lock( &connection.sendMutex );
@@ -250,9 +377,7 @@ bool connection_read(dnbd3_async_t *request)
if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) {
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
- pthread_mutex_unlock( &connection.sendMutex );
signal_call( connection.panicSignal );
- return true;
}
}
pthread_mutex_unlock( &connection.sendMutex );
@@ -261,24 +386,36 @@ bool connection_read(dnbd3_async_t *request)
void connection_close()
{
- if ( keepRunning ) {
- logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" );
- }
+ static bool signalled = false;
+ logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" );
pthread_mutex_lock( &mutexInit );
keepRunning = false;
+ if ( threadInitDone && !signalled ) {
+ signalled = true;
+ pthread_kill( tidReceiver, SIGHUP );
+ pthread_kill( tidBackground, SIGHUP );
+ }
+ pthread_mutex_unlock( &mutexInit );
if ( !connectionInitDone ) {
- pthread_mutex_unlock( &mutexInit );
return;
}
- pthread_mutex_unlock( &mutexInit );
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
+ logadd( LOG_DEBUG1, "Shutting down socket..." );
shutdown( connection.sockFd, SHUT_RDWR );
}
pthread_mutex_unlock( &connection.sendMutex );
}
-size_t connection_printStats(char *buffer, const size_t len)
+void connection_join()
+{
+ if ( !threadInitDone )
+ return;
+ pthread_join( tidReceiver, NULL );
+ pthread_join( tidBackground, NULL );
+}
+
+size_t connection_printStats( char *buffer, const size_t len )
{
int ret;
size_t remaining = len;
@@ -308,7 +445,7 @@ size_t connection_printStats(char *buffer, const size_t len)
*buffer++ = ' ';
}
const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining );
- remaining -= (addrlen + 1); // For space or * above
+ remaining -= ( addrlen + 1 ); // For space or * above
buffer += addrlen;
if ( remaining < 3 )
break;
@@ -324,7 +461,7 @@ size_t connection_printStats(char *buffer, const size_t len)
width += 3;
}
ret = snprintf( buffer, remaining, "% *d %s Unreachable:% 5d BestCount:% 5d Live:% 5dµs\n",
- width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt );
+ width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount, altservers[i].liveRtt );
if ( ret < 0 ) {
ret = 0;
}
@@ -339,23 +476,23 @@ size_t connection_printStats(char *buffer, const size_t len)
return len - remaining;
}
-static void* connection_receiveThreadMain(void *sockPtr)
+static void* connection_receiveThreadMain( void *sockPtr )
{
int sockFd = (int)(size_t)sockPtr;
dnbd3_reply_t reply;
- pthread_detach( pthread_self() );
+ blockSignals();
while ( keepRunning ) {
int ret;
do {
ret = dnbd3_read_reply( sockFd, &reply, true );
+ if ( !keepRunning ) goto fail;
if ( ret == REPLY_OK ) break;
} while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
if ( ret != REPLY_OK ) {
logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
goto fail;
}
-
if ( reply.cmd == CMD_GET_BLOCK ) {
// Get block reply. find matching request
dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
@@ -369,7 +506,7 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
} else {
// Found a match
- const ssize_t ret = sock_recv( sockFd, request->buffer, request->length );
+ const ssize_t ret = receiveRequest( sockFd, request );
if ( ret != (ssize_t)request->length ) {
logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
connection_read( request );
@@ -390,10 +527,14 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
unlock_rw( &altLock );
}
- // Success, wake up caller
- request->success = true;
- request->finished = true;
- signal_call( request->signal );
+ // TODO: See comment in receiveRequest()
+ if( useCow ) {
+ cowfile_handleCallback( request );
+ }
+ else {
+ fuse_reply_buf( request->fuse_req, container_of( request, dnbd3_async_parent_t, request )->buffer, request->length );
+ free( request );
+ }
}
} else if ( reply.cmd == CMD_GET_SERVERS ) {
// List of known alt servers
@@ -416,7 +557,6 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
}
}
- logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
fail:;
// Make sure noone is trying to use the socket for sending by locking,
pthread_mutex_lock( &connection.sendMutex );
@@ -424,7 +564,9 @@ fail:;
// as someone could have established a new connection already
if ( connection.sockFd == sockFd ) {
connection.sockFd = -1;
- signal_call( connection.panicSignal );
+ if ( keepRunning ) {
+ signal_call( connection.panicSignal );
+ }
}
pthread_mutex_unlock( &connection.sendMutex );
// As we're the only reader, it's safe to close the socket now
@@ -432,20 +574,23 @@ fail:;
return NULL;
}
-static void* connection_backgroundThread(void *something UNUSED)
+static void* connection_backgroundThread( void *something UNUSED )
{
ticks nextKeepalive;
ticks nextRttCheck;
+ blockSignals();
timing_get( &nextKeepalive );
nextRttCheck = nextKeepalive;
while ( keepRunning ) {
ticks now;
timing_get( &now );
- uint32_t wt1 = timing_diffMs( &now, &nextKeepalive );
- uint32_t wt2 = timing_diffMs( &now, &nextRttCheck );
+ uint32_t wt1 = (uint32_t)timing_diffMs( &now, &nextKeepalive );
+ uint32_t wt2 = (uint32_t)timing_diffMs( &now, &nextRttCheck );
if ( wt1 > 0 && wt2 > 0 ) {
int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 );
+ if ( !keepRunning )
+ break;
if ( waitRes == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
}
@@ -460,20 +605,20 @@ static void* connection_backgroundThread(void *something UNUSED)
}
sortAltServers();
probeAltServers();
- if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) {
+ if ( panic || timing_diff( &connection.startupTime, &now ) <= DISCOVER_STARTUP_PHASE_COUNT * TIMER_INTERVAL_PROBE_STARTUP ) {
timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP );
} else {
- timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL );
+ timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_MAX );
}
}
// Send keepalive packet
if ( timing_reachedPrecise( &nextKeepalive, &now ) ) {
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
- dnbd3_request_t request;
- request.magic = dnbd3_packet_magic;
- request.cmd = CMD_KEEPALIVE;
- request.handle = request.offset = request.size = 0;
+ dnbd3_request_t request = {
+ .magic = dnbd3_packet_magic,
+ .cmd = CMD_KEEPALIVE,
+ };
fixup_request( request );
ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 );
if ( (size_t)ret != sizeof request ) {
@@ -483,7 +628,7 @@ static void* connection_backgroundThread(void *something UNUSED)
}
}
pthread_mutex_unlock( &connection.sendMutex );
- timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET );
+ timing_addSeconds( &nextKeepalive, &now, KEEPALIVE_INTERVAL );
}
}
return NULL;
@@ -491,7 +636,20 @@ static void* connection_backgroundThread(void *something UNUSED)
// Private quick helpers
-static void addAltServers()
+/**
+ * Check if given host is in list of altsevers.
+ * Does not lock 'altLock', do so at caller site.
+ */
+static bool hasAltServer( dnbd3_host_t *host )
+{
+ for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
+ if ( isSameAddress( host, &altservers[eIdx].host ) )
+ return true;
+ }
+ return false;
+}
+
+static void addAltServers( void )
{
pthread_mutex_lock( &newAltLock );
lock_write( &altLock );
@@ -499,11 +657,8 @@ static void addAltServers()
if ( newservers[nIdx].host.type == 0 )
continue;
// Got a new alt server, see if it's already known
- for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
- if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) {
- goto skip_server;
- }
- }
+ if ( hasAltServer( &newservers[nIdx].host ) )
+ continue;
// Not known yet, add - find free slot
int slot = -1;
for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
@@ -528,9 +683,8 @@ static void addAltServers()
altservers[slot].host = newservers[nIdx].host;
altservers[slot].liveRtt = 0;
}
-skip_server:;
}
- memset( newservers, 0, sizeof(newservers) );
+ memset( newservers, 0, sizeof( newservers ) );
unlock_rw( &altLock );
pthread_mutex_unlock( &newAltLock );
}
@@ -604,7 +758,7 @@ static void probeAltServers()
pthread_spin_lock( &requests.lock );
if ( requests.head != NULL ) {
if ( !panic && current != NULL ) {
- const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
+ const uint64_t maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
dnbd3_async_t *iterator;
for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
// A request with measurement tag is pending
@@ -626,7 +780,7 @@ static void probeAltServers()
}
lock_read( &altLock );
- for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) {
+ for ( int altIndex = 0; altIndex < ( panic ? MAX_ALTS : MAX_ALTS_ACTIVE ); ++altIndex ) {
alt_server_t * const srv = &altservers[altIndex];
if ( srv->host.type == 0 )
continue;
@@ -634,65 +788,70 @@ static void probeAltServers()
&& rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) {
continue;
}
+ srv->rttIndex += 1;
if ( srv->rttIndex >= RTT_COUNT ) {
srv->rttIndex = 0;
- } else {
- srv->rttIndex += 1;
}
// Probe
+ char hstr[100];
+ sock_printHost( &srv->host, hstr, 100 );
ticks start;
timing_get( &start );
errno = 0;
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
- logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno );
+ logadd( LOG_DEBUG1, "%s probe: Could not connect for probing. errno = %d", hstr, errno );
goto fail;
}
if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
- logadd( LOG_DEBUG1, "probe: select_image failed" );
+ logadd( LOG_DEBUG1, "%s probe: select_image failed (sock=%d, errno=%d)", hstr, sock, errno );
goto fail;
}
- if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
- logadd( LOG_DEBUG1, "probe: select image reply failed" );
+ if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize ) ) {
+ logadd( LOG_DEBUG1, "%s probe: select image reply failed", hstr );
goto fail;
}
if ( remoteProto < MIN_SUPPORTED_SERVER ) {
- logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
+ logadd( LOG_WARNING, "%s probe: Unsupported remote version (local: %d, remote: %d)", hstr, (int)PROTOCOL_VERSION, (int)remoteProto );
srv->consecutiveFails += 10;
goto fail;
}
if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
- logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
+ logadd( LOG_WARNING, "%s probe: Remote rid or name mismatch (got '%s')", hstr, remoteName );
srv->consecutiveFails += 10;
goto fail;
}
if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) {
- logadd( LOG_DEBUG1, "-> block request fail" );
+ logadd( LOG_DEBUG1, "%s probe: -> block request fail", hstr );
goto fail;
}
int a = 111;
- if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) {
- logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size );
+ if ( !( a = dnbd3_get_reply( sock, &reply ) ) || reply.size != testLength ) {
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply fail %d %d", hstr, a, (int)reply.size );
goto fail;
}
if ( request != NULL && removeRequest( request ) != NULL ) {
// Request successfully removed from queue
- const ssize_t ret = sock_recv( sock, request->buffer, request->length );
+ ssize_t const ret = receiveRequest( sock, request);
if ( ret != (ssize_t)request->length ) {
- logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" );
+ logadd( LOG_DEBUG1, "%s probe: receiving payload for a block reply failed", hstr );
// Failure, add to queue again
connection_read( request );
goto fail;
}
- // Success, wake up caller
- logadd( LOG_DEBUG1, "[RTT] Successful direct probe" );
- request->success = true;
- request->finished = true;
- signal_call( request->signal );
+ // Success, reply to fuse
+ if( useCow ) {
+ cowfile_handleCallback( request );
+ }
+ else {
+ fuse_reply_buf( request->fuse_req, container_of( request, dnbd3_async_parent_t, request )->buffer, request->length );
+ free( request );
+ }
+ logadd( LOG_DEBUG1, "%s probe: Successful direct probe", hstr );
} else {
// Wasn't a request that's in our request queue
if ( !throwDataAway( sock, testLength ) ) {
- logadd( LOG_DEBUG1, "<- get block reply payload fail" );
+ logadd( LOG_DEBUG1, "%s probe: <- get block reply payload fail", hstr );
goto fail;
}
}
@@ -701,7 +860,7 @@ static void probeAltServers()
// Panic mode? Just switch to server
if ( panic ) {
unlock_rw( &altLock );
- switchConnection( sock, srv );
+ if ( keepRunning ) switchConnection( sock, srv );
return;
}
// Non-panic mode:
@@ -733,7 +892,8 @@ static void probeAltServers()
close( sock );
}
continue;
-fail:;
+fail:
+ ;
if ( sock != -1 ) {
close( sock );
}
@@ -774,7 +934,7 @@ fail:;
// Regular logic: Apply threshold when considering switch
if ( !doSwitch && current != NULL ) {
doSwitch = current->rtt > best->rtt + RTT_ABSOLUTE_THRESHOLD
- || RTT_THRESHOLD_FACTOR(current->rtt) > best->rtt + 1000;
+ || RTT_THRESHOLD_FACTOR( current->rtt ) > best->rtt + 1000;
}
}
// Switch if a better server was found
@@ -796,11 +956,20 @@ fail:;
}
}
-static void switchConnection(int sockFd, alt_server_t *srv)
+static size_t receiveRequest(const int sock, dnbd3_async_t* request )
+{
+ if( useCow ) {
+ cow_sub_request_t * cow_request = container_of( request, cow_sub_request_t, dRequest );
+ return sock_recv( sock, cow_request->buffer, request->length );
+ } else {
+ return sock_recv( sock, container_of( request, dnbd3_async_parent_t, request )->buffer, request->length );
+ }
+}
+
+static void switchConnection( int sockFd, alt_server_t *srv )
{
- pthread_t thread;
struct sockaddr_storage addr;
- socklen_t addrLen = sizeof(addr);
+ socklen_t addrLen = sizeof( addr );
char message[200] = "Connection switched to ";
const size_t len = strlen( message );
int ret;
@@ -829,9 +998,10 @@ static void switchConnection(int sockFd, alt_server_t *srv)
signal_call( connection.panicSignal );
return;
}
+ pthread_detach( tidReceiver );
timing_get( &connection.startupTime );
- pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd );
- sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len );
+ pthread_create( &tidReceiver, NULL, &connection_receiveThreadMain, ( void* )(size_t)sockFd );
+ sock_printable( (struct sockaddr*)&addr, sizeof( addr ), message + len, sizeof( message ) - len );
logadd( LOG_INFO, "%s", message );
// resend queue
if ( queue != NULL ) {
@@ -855,22 +1025,28 @@ static void switchConnection(int sockFd, alt_server_t *srv)
/**
* Does not lock, so get the sendMutex first!
*/
-static void requestAltServers()
+static void requestAltServers( void )
{
if ( connection.sockFd == -1 || !learnNewServers )
return;
- dnbd3_request_t request = { 0 };
- request.magic = dnbd3_packet_magic;
- request.cmd = CMD_GET_SERVERS;
- fixup_request( request );
- if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) {
- logadd( LOG_WARNING, "Connection failed while requesting alt server list" );
+ if ( !sendAltServerRequest( connection.sockFd ) ) {
+ logadd( LOG_WARNING, "Main connection failed while requesting alt server list" );
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
}
}
-static bool throwDataAway(int sockFd, uint32_t amount)
+static bool sendAltServerRequest( int sock )
+{
+ dnbd3_request_t request = {
+ .magic = dnbd3_packet_magic,
+ .cmd = CMD_GET_SERVERS,
+ };
+ fixup_request( request );
+ return sock_sendAll( sock, &request, sizeof( request ), 2 ) == (ssize_t)sizeof( request );
+}
+
+static bool throwDataAway( int sockFd, uint32_t amount )
{
size_t done = 0;
char tempBuffer[SHORTBUF];
@@ -883,11 +1059,9 @@ static bool throwDataAway(int sockFd, uint32_t amount)
return true;
}
-static void enqueueRequest(dnbd3_async_t *request)
+static void enqueueRequest( dnbd3_async_t *request )
{
request->next = NULL;
- request->finished = false;
- request->success = false;
//logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
// Measure latency and add to switch formula
timing_get( &request->time );
@@ -901,7 +1075,7 @@ static void enqueueRequest(dnbd3_async_t *request)
pthread_spin_unlock( &requests.lock );
}
-static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
+static dnbd3_async_t* removeRequest( dnbd3_async_t *request )
{
pthread_spin_lock( &requests.lock );
//logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line );
@@ -925,3 +1099,20 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
return iterator;
}
+static void blockSignals()
+{
+ sigset_t sigmask;
+ if ( pthread_sigmask( 0, NULL, &sigmask ) == -1 ) {
+ logadd( LOG_WARNING, "Cannot get current sigmask of thread" );
+ sigemptyset( &sigmask );
+ }
+ sigaddset( &sigmask, SIGUSR1 );
+ sigaddset( &sigmask, SIGUSR2 );
+ sigaddset( &sigmask, SIGPIPE );
+ sigaddset( &sigmask, SIGINT );
+ sigaddset( &sigmask, SIGTERM );
+ sigdelset( &sigmask, SIGHUP );
+ if ( pthread_sigmask( SIG_SETMASK, &sigmask, NULL ) == -1 ) {
+ logadd( LOG_WARNING, "Cannot set sigmask of thread" );
+ }
+}
diff --git a/src/fuse/connection.h b/src/fuse/connection.h
index cae554c..b22e3ce 100644
--- a/src/fuse/connection.h
+++ b/src/fuse/connection.h
@@ -1,35 +1,50 @@
#ifndef _CONNECTION_H_
#define _CONNECTION_H_
-#include "../shared/fdsignal.h"
-#include "../shared/timing.h"
+#include <dnbd3/shared/fdsignal.h>
+#include <dnbd3/shared/timing.h>
+#include <stdatomic.h>
#include <stddef.h>
#include <stdbool.h>
#include <stdint.h>
+#include <sys/socket.h>
+#define FUSE_USE_VERSION 30
+#include <fuse_lowlevel.h>
+
+
+extern atomic_bool keepRunning;
struct _dnbd3_async;
typedef struct _dnbd3_async {
struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller)
- dnbd3_signal_t* signal; // Used to signal the caller
- char* buffer; // Caller-provided buffer to be filled
ticks time; // When request was put on wire, 0 if not measuring
uint64_t offset;
uint32_t length;
- bool finished; // Will be set to true if the request has been handled
- bool success; // Will be set to true if the request succeeded
+ fuse_req_t fuse_req;
} dnbd3_async_t;
-bool connection_init(const char *hosts, const char *image, const uint16_t rid, const bool learnNewServers);
+typedef struct _dnbd3_async_parent {
+ dnbd3_async_t request;
+ char buffer[]; // Must be last member!
+} dnbd3_async_parent_t;
+
+bool connection_init( const char *hosts, const char *image, const uint16_t rid, const bool learnNewServers );
bool connection_initThreads();
uint64_t connection_getImageSize();
-bool connection_read(dnbd3_async_t *request);
+char * connection_getImageName();
+
+uint16_t connection_getImageRID();
+
+bool connection_read( dnbd3_async_t *request );
void connection_close();
-size_t connection_printStats(char *buffer, const size_t len);
+void connection_join();
+
+size_t connection_printStats( char *buffer, const size_t len );
#endif /* CONNECTION_H_ */
diff --git a/src/fuse/cowDoc/img/datastructure.jpg b/src/fuse/cowDoc/img/datastructure.jpg
new file mode 100644
index 0000000..d471d2a
--- /dev/null
+++ b/src/fuse/cowDoc/img/datastructure.jpg
Binary files differ
diff --git a/src/fuse/cowDoc/img/readrequest.svg b/src/fuse/cowDoc/img/readrequest.svg
new file mode 100644
index 0000000..a16f95c
--- /dev/null
+++ b/src/fuse/cowDoc/img/readrequest.svg
@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Do not edit this file with editors other than diagrams.net -->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="719px" height="701px" viewBox="-0.5 -0.5 719 701" content="&lt;mxfile host=&quot;app.diagrams.net&quot; modified=&quot;2022-06-27T15:42:31.682Z&quot; agent=&quot;5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.67 Safari/537.36 OPR/87.0.4390.45&quot; etag=&quot;_RkRrWUXEJff4oZ7Tmkg&quot; version=&quot;20.0.3&quot;&gt;&lt;diagram id=&quot;otKvBIWVLEUd5MDrhZ-X&quot; name=&quot;Page-1&quot;&gt;5VpZc5swEP4tffBjOiAO48fm6jHpTGfcmTSPspENLSAihI376yuBZA45NrGxoclMxmFXu5LY3W8lrRgZN2H2mcDY+45dFIyA5mYj43YEgG4CMOJ/mrspOOOJVTCWxHeFUMmY+n+RYGqCm/ouSmqCFOOA+nGdOcdRhOa0xoOE4HVdbIGD+qgxXCKFMZ3DQOU++i71Cq4DxiX/C/KXnhxZtydFSwilsHiTxIMuXldYxt3IuCEY0+IpzG5QwI0n7VLo3b/Qup0YQRFto4BW8ZRkP52Hr4+mt/LJ+ttmdqWbRTcrGKTijcVs6UaaALnMIoLEhHp4iSMY3JXca4LTyEV8HI1RpcwDxjFj6oz5G1G6Ee6FKcWM5dEwEK0LHFHRqPMuijnwgV98WcFKcErmaM8bGiJoIFkiukdusnUJi2WEQ0TJhukRFEDqr+rzgCKolls5ofqJELipCMTYj2hS6fkHZzABgQ/LFMEh0KHbNR+yh6JHSVWmVrJyP7/C54bicoKgm7/rc4oSqgRA6V7uq7XnUzSNYW71NUN93ZWQzIUnTbD15AoRirL9vlRtLxRMp24kabN1iUddgsyrYFHK7fJWxcJHGFBTLLiAQYLeGHLGLZEDToXOSc4Y70hgdsDme53EMKr5w35OebLNbXaV5Eb7xAR0Lc5yy8l29rTk/2cBnv9hAlyZ62uMhnxCTACGPO6LX5XMx58R2RFeLBJEq4pBMY6GIlc22oJ3X8y70FSx6OFwliaHcdgB8GQ22gKvLfKccyEPOIq3KUkHATxmU7L5JRpz4ol3/tGS5G0mBiuojaQyn27V2HNFi1GlEiekTocgd9qC3OwT5AZ4Fxm3rTOMF3B7GWeoGOw84354l/nWAkPLt7qKu6Hk2w5hN2kJO71X2BmT4ebA7eJXWe+easvd7sWvByca4xOdeNRBz9Tq0J40zuqK+AnStnVIfK/8eQ6davB2tmj8p9neaJ79Qe/Z3n4P2V6XZcSD6d7qM90Da7jpvg9v9HsAkdO8ZJlh+CmrdSnufClrV/nnbQHEagmQU/c1p/lBzVYhZoPy2xr2E6GML9Ai0hv+uXwhy2gbt/bZCllqEb6vpbbcu7+ucKWcAUQhSz5fuJDVtlp98qq+e1PduKOw9EbkFPMXSo3g6WB7DdSt2xvLg8Bs6WCn112bep86oDzYLDD1nweNYcbtsTmxy3hvW9Do5eK6me4OVCga4he55gZqyUHcc+fwS8qNdu/7EcvuHYfD/A7k2Eu1DnFotD2g9oLDMXgdEJvyl/ngRD08N5G4IDjk3kCEAWsAK+MAEKmebIXRXBypB4Wev9FpVgZsp6X9jvhIh5HlV3NFjJbfHhp3/wA=&lt;/diagram&gt;&lt;/mxfile&gt;" style="background-color: rgb(255, 255, 255);"><defs/><g><path d="M 520 50 L 520 130 L 446.37 130" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 441.12 130 L 448.12 126.5 L 446.37 130 L 448.12 133.5 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><rect x="460" y="10" width="120" height="40" rx="16.8" ry="16.8" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 30px; margin-left: 461px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;">read request</div></div></div></foreignObject><text x="520" y="34" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">read request</text></switch></g><path d="M 200 490 L 200 523.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 200 528.88 L 196.5 521.88 L 200 523.63 L 203.5 521.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 510px; margin-left: 200px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">false</div></div></div></foreignObject><text x="200" y="513" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">false</text></switch></g><path d="M 200 410 L 260 450 L 200 490 L 140 450 Z" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 450px; margin-left: 141px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;"><span style="font-size: 10px;">block == local &amp;&amp;<br />offset &lt; endoffset</span></div></div></div></foreignObject><text x="200" y="454" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">block == local &amp;&amp;...</text></switch></g><path d="M 620 450 L 640 450 L 640 340 L 626.37 340" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 621.12 340 L 628.12 336.5 L 626.37 340 L 628.12 343.5 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 395px; margin-left: 640px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">true</div></div></div></foreignObject><text x="640" y="398" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">true</text></switch></g><path d="M 560 490 L 560 523.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 560 528.88 L 556.5 521.88 L 560 523.63 L 563.5 521.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 510px; margin-left: 560px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">false</div></div></div></foreignObject><text x="560" y="513" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">false</text></switch></g><path d="M 560 410 L 620 450 L 560 490 L 500 450 Z" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 450px; margin-left: 501px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;"><span style="font-size: 10px;">block != local &amp;&amp;<br />offset &lt; endoffset</span></div></div></div></foreignObject><text x="560" y="454" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">block != local &amp;&amp;...</text></switch></g><path d="M 380 170 L 380 203.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 380 208.88 L 376.5 201.88 L 380 203.63 L 383.5 201.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 190px; margin-left: 380px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">true</div></div></div></foreignObject><text x="380" y="193" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">true</text></switch></g><path d="M 380 90 L 380 60 L 20 60 L 20 620 L 380 620 L 380 643.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 380 648.88 L 376.5 641.88 L 380 643.63 L 383.5 641.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 340px; margin-left: 20px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">false</div></div></div></foreignObject><text x="20" y="343" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">false</text></switch></g><path d="M 380 90 L 440 130 L 380 170 L 320 130 Z" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 130px; margin-left: 321px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;"><span style="font-size: 10px;">offset &lt; endoffset</span></div></div></div></foreignObject><text x="380" y="134" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">offset &lt; endoffset</text></switch></g><path d="M 320 250 L 200 250 L 200 303.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 200 308.88 L 196.5 301.88 L 200 303.63 L 203.5 301.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 250px; margin-left: 230px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">true</div></div></div></foreignObject><text x="230" y="253" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">true</text></switch></g><path d="M 440 250 L 560 250 L 560 303.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 560 308.88 L 556.5 301.88 L 560 303.63 L 563.5 301.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 250px; margin-left: 530px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">false</div></div></div></foreignObject><text x="530" y="253" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">false</text></switch></g><path d="M 380 210 L 440 250 L 380 290 L 320 250 Z" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 250px; margin-left: 321px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;"><span style="font-size: 10px;">block == local</span></div></div></div></foreignObject><text x="380" y="254" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">block == local</text></switch></g><path d="M 200 370 L 200 403.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 200 408.88 L 196.5 401.88 L 200 403.63 L 203.5 401.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><rect x="140" y="310" width="120" height="60" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 340px; margin-left: 141px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;">move to next block</div></div></div></foreignObject><text x="200" y="344" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">move to next block</text></switch></g><path d="M 140 450 L 120 450 L 120 340 L 133.63 340" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 138.88 340 L 131.88 343.5 L 133.63 340 L 131.88 336.5 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 1px; height: 1px; padding-top: 395px; margin-left: 120px;"><div data-drawio-colors="color: rgb(0, 0, 0); background-color: rgb(255, 255, 255); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 10px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; background-color: rgb(255, 255, 255); white-space: nowrap;">true</div></div></div></foreignObject><text x="120" y="398" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="10px" text-anchor="middle">true</text></switch></g><path d="M 560 370 L 560 403.63" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 560 408.88 L 556.5 401.88 L 560 403.63 L 563.5 401.88 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><rect x="500" y="310" width="120" height="60" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 340px; margin-left: 501px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;">move to next block</div></div></div></foreignObject><text x="560" y="344" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">move to next block</text></switch></g><path d="M 140 560 L 60 560 L 60 130 L 313.63 130" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 318.88 130 L 311.88 133.5 L 313.63 130 L 311.88 126.5 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><rect x="140" y="530" width="120" height="60" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 560px; margin-left: 141px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;">read blocks local</div></div></div></foreignObject><text x="200" y="564" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">read blocks local</text></switch></g><path d="M 620 560 L 700 560 L 700 130 L 446.37 130" fill="none" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="stroke"/><path d="M 441.12 130 L 448.12 126.5 L 446.37 130 L 448.12 133.5 Z" fill="rgb(0, 0, 0)" stroke="rgb(0, 0, 0)" stroke-miterlimit="10" pointer-events="all"/><rect x="500" y="530" width="120" height="60" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 560px; margin-left: 501px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;">read blocks from server</div></div></div></foreignObject><text x="560" y="564" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">read blocks from ser...</text></switch></g><rect x="320" y="650" width="120" height="40" rx="16.8" ry="16.8" fill="rgb(255, 255, 255)" stroke="rgb(0, 0, 0)" pointer-events="all"/><g transform="translate(-0.5 -0.5)"><switch><foreignObject pointer-events="none" width="100%" height="100%" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility" style="overflow: visible; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display: flex; align-items: unsafe center; justify-content: unsafe center; width: 118px; height: 1px; padding-top: 670px; margin-left: 321px;"><div data-drawio-colors="color: rgb(0, 0, 0); " style="box-sizing: border-box; font-size: 0px; text-align: center;"><div style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; pointer-events: all; white-space: normal; overflow-wrap: normal;">read done</div></div></div></foreignObject><text x="380" y="674" fill="rgb(0, 0, 0)" font-family="Helvetica" font-size="12px" text-anchor="middle">read done</text></switch></g></g><switch><g requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"/><a transform="translate(0,-5)" xlink:href="https://www.diagrams.net/doc/faq/svg-export-text-problems" target="_blank"><text text-anchor="middle" font-size="10px" x="50%" y="100%">Text is not SVG - cannot display</text></a></switch></svg> \ No newline at end of file
diff --git a/src/fuse/cowDoc/readme.md b/src/fuse/cowDoc/readme.md
new file mode 100644
index 0000000..51a0052
--- /dev/null
+++ b/src/fuse/cowDoc/readme.md
@@ -0,0 +1,367 @@
+
+# Fuse Copy on Write (CoW)
+
+### Table of Contents
+1. [Introduction](#introduction)
+2. [Usage](#usage)
+3. [Implementation Details](#implementation-details)
+4. [REST Api](#rest-api)
+
+
+# Introduction
+
+This extension of the fuse dnbd3 client makes it possible to mount images in a writable way. The changes are saved in a separate file ) on the client computer (also called Copy on Write, cow for short). These changes are uploaded to the cow server in the background. As soon as the user unmounts the image, all remaining changes are uploaded. Once all have been uploaded, the changes can be merged into a copy of the original image on the cow server (this can be set in the start parameters).
+
+
+A typical use case is updating or adding software to an existing image.
+
+# Usage
+
+### New Parameters
+- `-c <path>` Enables the cow functionality. The `path` parameter sets the path for the temporary `meta` and `data` files in which the changes are saved.
+- `-C <address>` sets the address of the cow server. The Cow server is responsible for merging the original image with the client's changes.
+
+- `- L <path>` Similar to `-c <path>`, but instead of creating a new session, an existing one is loaded from the specified path.
+- `-m` the client requests a merge after the image has been unmounted and all changes have been uploaded.
+
+- `--cow-stats-file` creates a status file at the same location as the data and meta file. The file contains information about the current session, for more information see [here](#status).
+- `--cow-stats-stdout` similar to `--cow-stats-file` but the information will be printed in the stdout.
+
+Example parameters for creating a new cow session:
+```
+./dnbd3-fuse "/home/user/VMs/mount" -f -h localhost -i imagename -c "/home/user/temp" -C "192.168.178.20:5000" --cow-stats-stdout -m
+
+```
+
+# Implementation Details
+
+
+## Data structure
+
+The data structure is divided into two main parts. The actual data of the writing on the image and the corresponding metadata. It is also important to distinguish between a dnbd3 block, which is 4096 bytes in size, and a cow cluster, which combines 320 dnbd3 blocks. A cow cluster has a `cow_l2_entry_t` structure in its according l2 table that contains the corresponding metadata. The metadata is used to determine if a dnbd3 block has been written to, where that block is stored in the `data` file, when it was last modified and when it was uploaded. But more on this later.
+
+
+### Metadata
+
+![Datastructure](img/datastructure.jpg)
+
+The metadata file, which ultimately stores the `cow_l2_entry_t` structs, contains a layer 1 (L1) and a layer 2 (L2) table for looking up the struct, which ultimately points to the actual data in the data file.
+The entire L1 table is initialised at the beginning and cannot be resized, therefore the size of the L1 table limits the maximum size of the image.
+The L2 tables are created dynamically as needed. So at the beginning, all L1 pointers are invalid (-1).
+
+The L2 tables contain 1024 `cow_l2_entry_t` structs each. An L2 table is created as soon as any data is written to any offset in the image that corresponds to the range it covers, which is a span of 1024 * 320 * 4096 bytes.
+
+```C
+typedef struct cow_l2_entry
+{
+ atomic_int_least64_t offset;
+ atomic_uint_least64_t timeChanged;
+ _Atomic(uint32_t) uploads;
+ _Atomic(uint32_t) fails;
+ atomic_char bitfield[40];
+} cow_l2_entry_t;
+```
+Each `cow_l2_entry_t` contains a 40 byte, 320 bit bit-field. The bit-field indicates whether the corresponding dnbd3 block has been written locally. If, for example, the bit field begins with 01... the first 4096 bytes contain no data and the next 4096 do.
+So each `cow_l2_entry_t` stores the metadata for up to 320\*4096 bytes. The offset field is the offset into the data file where the corresponding data is stored. `timeChanged` contains the unix timestamp when the cluster was last written. It is 0 if it has never been changed or if the latest changes have already been uploaded.
+
+
+For example, to get the `cow_l2_entry_t` for offset 4033085440, one would take L1[3], since
+```
+4033085440 / ( COW_FULL_L2_TABLE_DATA_SIZE ) ≈ 3.005
+```
+
+Then one would take the fifth `cow_l2_entry_t` in the L2 array because of
+```
+(4033085440 mod COW_FULL_L2_TABLE_DATA_SIZE) / COW_DATA_CLUSTER_SIZE = 5
+```
+Where:
+```
+COW_FULL_L2_TABLE_DATA_SIZE = 1024 * 320 * 4096
+COW_DATA_CLUSTER_SIZE = 320 * 4096
+```
+
+Since the result is an integer, the offset refers to the first dnbd3 block in that cluster. Otherwise, the block number within the cluster be calculated via
+```
+(4033085440 % (320 * 4096)) / 4096
+```
+which is the index in the bit-field that tells whether the block has been written to the data file.
+
+
+### Read Request
+
+When a read request is made, for each dnbd3 block block it is checked whether it already exists in the data file (i.e. has already been written to once). If so, it is read from the data file, otherwise it needs to be requested from the dnbd3 server. To increase performance, several subsequent blocks that are also local/non-local are combined into a larger reads from disk or requests from the server.
+
+![readrequest](img/readrequest.svg)
+
+The diagram above is somewhat simplified for clarity. The server's read operations are asynchronous. This means that while iterating over the 4k blocks from the read request, it does not wait for a response from the server for blocks that are missing locally, but fires off a request to the dnbd3 server asynchronously, continuing to check the remaining blocks. As soon as all pending requests to the server are completed, the combined data is handed over to fuse, completing the request.
+To keep track of pending requests, each request to the dnbd3 server increments the field `workCounter` in the according `cow_request_t` by one, and each time a request is completed, it is decreased by one. As soon as `workCounter` reaches `0`, all data is known to be fetched properly and assembled in a buffer that can be handed over to fuse.
+
+### Write Request
+
+If, in a write request, the beginning or end does not match a multiple of 4096, the beginning and/or end block must be padded if the accoding dnbd3 block hasn't been written to before.
+This is because the granularity of the cow bit-field represents a full dnbd3 block of 4096 bytes, so we cannot write partial data to those blocks, as there is no mechanism to annotate which parts of the block have been written to, and which are still missing.
+To work around this limitation, we need to fill the partial block's missing data with data from the dnbd3 server if it is still within the range of the original image size. If it is outside the original image size (because the image has grown), the missing bytes can simply be set to 0 and no request needs to be made.
+The write request calculates the corresponding `cow_l2_entry_t` from the offset. If the corresponding `cow_l2_entry_t` does not yet exist, it is created. The data will be written to the data file, and the offset stored in `cow_l2_entry_t.offset`.
+Then the corresponding bit in the bit-field is set and `timeChanged` is updated. If there is more data to write to the current cluster, the next `cow_l2_entry_t` is calculated and the above steps are repeated.
+The variable `workCounter` is also used here to ensure that the padding of the data occurs before the fuse request returns.
+
+
+### Background Cluster Upload
+
+For uploading clusters, there is a background thread that periodically loops over all cow clusters and checks whether `timeChanged` is not 0 and the time difference between now and `timeChanged` is greater than `COW_MIN_UPLOAD_DELAY`.
+If so, the entire cluster is uploaded. The `timeChanged` before the upload is remembered.
+After the upload, `timeChanged` is set to 0 if it still has the same time as the temporarily stored one (if not, there was a change during the upload and it has to be uploaded again).
+Once the image is unmounted, `COW_MIN_UPLOAD_DELAY` is ignored and all clusters that still have a `timeChanged` other than 0 are uploaded.
+The upload is done via a [rest request](#/api/file/update).
+There are two different limits for the number of parallel uploads in the [config/cow.h](#config-variables).
+
+## Files
+
+When a new CoW session is started, a new `meta`, `data` and, if so set in the command line arguments, a `status` file is created.
+
+### status
+
+The file `status` can be activated with the command line parameter `--cow-stats-file`.
+
+The file will contain the following:
+
+```
+uuid=<uuid>
+state=backgroundUpload
+inQueue=0
+modifiedBlocks=0
+idleClusters=0
+totalClustersUploaded=0
+activeUploads=0
+avgSpeedKb=0.00
+```
+- The `uuid` is the session uuid used by the Cow server to identify the session.
+
+- The `status` is `backgroundUpload` when the image is still mounted and cow clusters are uploaded in the background.
+It is `uploading` when the image has been unmounted and all clusters that have not yet been uploaded are uploaded.
+It is `done` when the image has been unmounted and all clusters have been uploaded.
+- `Queue` are the cow clusters that are currently being uploaded or are waiting for a free slot.
+- `ModifiedClusters` are cow clusters that have changes that have not yet been uploaded to the server because the changes are too recent.
+- `totalClustersUploaded` the total amount of cow clusters uploaded since the image was mounted.
+- `activeUploads` is the number of clusters currently being uploaded.
+- `ulspeed` the current upload speed in kb/s.
+
+Once all clusters have been uploaded, the status is set to `done`.
+If you define `COW_DUMP_BLOCK_UPLOADS`, a list of all clusters, sorted by the number of uploads, is copied to the status file after the cluster upload is completed.
+
+With the command line parameter `--cow-stats-stdout` the same output of the stats file will be printed in stdout.
+
+### meta
+
+The `meta` file contains the following header:
+```C
+// cowfile.h
+typedef struct cowfile_metadata_header
+{
+ uint64_t magicValue; // 8byte
+ atomic_uint_least64_t imageSize; // 8byte
+ int32_t version; // 4byte
+ int32_t blocksize; // 4byte
+ uint64_t validRemoteSize; // 8byte
+ uint32_t startL1; // 4byte
+ uint32_t startL2; // 4byte
+ int32_t bitfieldSize; // 4byte
+ int32_t nextL2; // 4byte
+ atomic_int_least64_t metaSize; // 8byte
+ atomic_int_least64_t nextClusterOffset; // 8byte
+ uint64_t maxImageSize; // 8byte
+ uint64_t creationTime; // 8byte
+ char uuid[40]; // 40byte
+ char imageName[200]; // 200byte
+} cowfile_metadata_header_t;
+```
+After this header, the above-mentioned l1 and then the l2 data structure begins at byte offsets specified by members startL1 and startL2. The offsets are absolute from the beginning of the file.
+
+### data
+
+The `data` file starts with `COW_FILE_DATA_MAGIC_VALUE` and at the `COW_DATA_CLUSTER_SIZE` (40 * 8 * 4096) offset the first cluster starts.
+
+### magic values in the file headers
+
+The magic values in both files are used to ensure that an appropriate file is read and that the machine has the correct endianness.
+```C
+//config.h
+#define COW_FILE_META_MAGIC_VALUE ((uint64_t)0xEBE44D6E72F7825E) // Magic Value to recognize a Cow meta file
+#define COW_FILE_DATA_MAGIC_VALUE ((uint64_t)0xEBE44D6E72F7825F) // Magic Value to recognize a Cow data file
+```
+
+### Threads
+
+This extension uses two new threads:
+```
+tidCowUploader
+tidStatUpdater
+```
+`tidCowUploader` is the thread that uploads blocks to the cow server.
+
+`tidStatUpdater` updates the stats in stdout or the stats files (depending on parameters).
+
+### Locks
+
+This extension uses a new lock `cow.l2CreateLock`. It is used when a new L2 table is allocated.
+
+### Config Variables
+
+The following configuration variables have been added to `config/cow.h`.
+```c
+//config.h
+// +++++ COW +++++
+#define COW_BITFIELD_SIZE 40 // NEVER CHANGE THIS OR THE WORLD WILL ALSO END!
+#define COW_FILE_META_MAGIC_VALUE ((uint64_t)0xEBE44D6E72F7825E) // Magic Value to recognize a Cow meta file
+#define COW_FILE_DATA_MAGIC_VALUE ((uint64_t)0xEBE44D6E72F7825F) // Magic Value to recognize a Cow data file
+#define COW_MIN_UPLOAD_DELAY 60 // in seconds
+#define COW_STATS_UPDATE_TIME 5 // time in seconds the cow status files gets updated (while uploading clusters)
+#define COW_MAX_PARALLEL_UPLOADS 10 // maximum number of parallel uploads
+#define COW_MAX_PARALLEL_BACKGROUND_UPLOADS 2 // maximum number of parallel uploads while the image is still mounted
+#define COW_URL_STRING_SIZE 500 // Max string size for an url
+#define COW_SHOW_UL_SPEED 1 // enable display of ul speed in cow status file
+#define COW_MAX_IMAGE_SIZE 1000LL * 1000LL * 1000LL * 1000LL; // Maximum size an image can have(tb*gb*mb*kb)
+// +++++ COW API Endpoints +++++
+#define COW_API_PREFIX "%s/v1/"
+#define COW_API_CREATE COW_API_PREFIX "file/create"
+#define COW_API_UPDATE COW_API_PREFIX "file/update?uuid=%s&clusterindex=%lu"
+#define COW_API_START_MERGE COW_API_PREFIX "file/merge"
+```
+
+- `COW_MIN_UPLOAD_DELAY` sets the minimum time in seconds that must have elapsed since the last change to a cow cluster before it is uploaded.
+This value can be fine-tuned. A larger value usually reduces redundant uploading of clusters.
+A smaller value reduces the time for the final upload after the image has been unmounted.
+If you set `COW_DUMP_BLOCK_UPLOADS` and set the command line parameter `--cow-stats-file`, then a list of all clusters, sorted by the number of uploads, will be written to the status file after the cluster upload is complete.
+This can help in fine-tuning `COW_MIN_UPLOAD_DELAY`.
+- `COW_STATS_UPDATE_TIME` defines the update frequency of the stdout output/statistics file in seconds. Setting it too low could affect performance as a loop runs over all clusters.
+- `COW_MAX_PARALLEL_BACKGROUND_UPLOADS` defines the maximum number of parallel cluster uploads. This number is used when the image is still mounted and the user is still using it.
+- `COW_MAX_PARALLEL_UPLOADS` defines the maximum number of parallel cluster uploads. This number is used once the image has been unmounted to upload the remaining modified clusters.
+
+
+# REST API
+
+The following Rest API is used to transmit the data and commands to the cow server:
+
+## Required methods
+
+### v1/file/create
+
+#### POST
+##### Parameters
+
+| Name | Located in | Description | Required | Schema |
+| ---- | ---------- | ----------- | -------- | ---- |
+| imageName | post | Name of image | Yes | relative path |
+| revision | post | revision id of image | Yes | integer |
+| bitfieldSize | post | number of bits per L2 cluster | Yes | integer |
+
+##### Responses
+
+| Code | Description |
+| ---- | ----------- |
+| 200 | Success |
+| 404 | Source image not found |
+
+This request is used as soon as a new cow session is created. The returned uuid is used in all subsequent requests to identify the session.
+
+
+### v1/file/update
+
+#### POST
+##### Parameters
+
+| Name | Located in | Description | Required | Schema |
+| ---- | ---------- | ----------- | -------- | ---- |
+| uuid | query | | Yes | string (uuid) |
+| clusterindex | query | | Yes | integer |
+
+##### Responses
+
+| Code | Description |
+| ---- | ----------- |
+| 200 | Success |
+| 503 | Server can't keep up, if Retry-After header is present, it can request a backoff interval, specified in seconds. |
+
+Used to upload a cluster. The cluster number is the absolute cluster number. The body contains an "application/octet-stream", where the first bytes are the bit field, directly followed by the actual cluster data. The cluster data is sparse, i.e. only blocks for which the bit is set are present, all other blocks are skipped.
+
+
+### v1/file/merge
+
+#### POST
+##### Parameters
+
+| Name | Located in | Description | Required | Schema |
+| ---- | ---------- | ----------- | -------- | ---- |
+| uuid | Form | | Yes | string (uuid) |
+| originalFileSize | Form | | Yes | integer |
+| newFileSize | Form | | Yes | integer |
+
+##### Responses
+
+| Code | Description |
+| ---- | ----------- |
+| 200 | Success |
+Used to start the merge on the server.
+
+## Optional methods, not used by dnbd3-fuse
+
+### v1/File/GetTopModifiedBlocks
+
+#### GET
+##### Parameters
+
+| Name | Located in | Description | Required | Schema |
+| ---- | ---------- | ----------- | -------- | ---- |
+| uuid | query | | Yes | string (uuid) |
+| amount | query | | Yes | integer |
+
+##### Responses
+
+| Code | Description |
+| ---- | ----------- |
+| 200 | Success |
+
+This request returns a list containing the cluster IDs and the number of uploads, sorted by the number of uploads. This is useful if you want to fine-tune `COW_MIN_UPLOAD_DELAY`.
+
+### v1/File/Status
+
+#### GET
+##### Parameters
+
+| Name | Located in | Description | Required | Schema |
+| ---- | ---------- | ----------- | -------- | ---- |
+| uuid | query | | Yes | string (uuid) |
+
+##### Responses
+
+| Code | Description |
+| ---- | ----------- |
+| 200 | Success |
+
+Returns the SessionStatus model that provides information about the session.
+
+## Models
+
+#### BlockStatistics
+
+| Name | Type | Description | Required |
+| ---- | ---- | ----------- | -------- |
+| clusterNumber | integer | | Yes |
+| modifications | integer | | Yes |
+
+#### SessionState
+
+| Name | Type | Description | Required |
+| ---- | ---- | ----------- | -------- |
+| SessionState | string | | |
+
+#### SessionStatus
+
+| Name | Type | Description | Required |
+| ---- | ---- | ----------- | -------- |
+| state | string | _Enum:_ `"Copying"`, `"Active"`, `"Merging"`, `"Done"`, `"Failed"` | Yes |
+| imageName | string | | Yes |
+| originalImageVersion | integer | | Yes |
+| newImageVersion | integer | | Yes |
+| mergedClusters | integer | | Yes |
+| totalClusters | integer | | Yes |
diff --git a/src/fuse/cowfile.c b/src/fuse/cowfile.c
new file mode 100644
index 0000000..525eef0
--- /dev/null
+++ b/src/fuse/cowfile.c
@@ -0,0 +1,1777 @@
+#include "cowfile.h"
+#include "main.h"
+#include "connection.h"
+
+#include <dnbd3/config.h>
+#include <dnbd3/types.h>
+#include <dnbd3/shared/log.h>
+#include <sys/mman.h>
+#include <string.h>
+#include <pthread.h>
+#include <errno.h>
+#include <curl/curl.h>
+#include <signal.h>
+#include <inttypes.h>
+#include <assert.h>
+
+#define UUID_STRLEN 36
+// Maximum assumed page size, in case the cow data gets transferred between different architectures
+// 16k should be the largest minimum in existence (Itanium)
+#define MAX_PAGE_SIZE 16384
+
+extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );
+
+static const int CURRENT_COW_VERSION = 3;
+
+static bool statStdout;
+static bool statFile;
+static pthread_t tidCowUploader;
+static pthread_t tidStatUpdater;
+static const char *cowServerAddress;
+static CURL *curl;
+static cowfile_metadata_header_t *metadata = NULL;
+static atomic_uint_fast64_t bytesUploaded;
+static uint64_t totalBlocksUploaded = 0;
+static int activeUploads = 0;
+static int uploadLoopThrottle = 0;
+static atomic_bool uploadLoop = true; // Keep upload loop running?
+static atomic_bool uploadLoopDone = false; // Upload loop has finished all work?
+static atomic_bool uploadCancelled = false; // Skip uploading remaining blocks
+static struct curl_slist *uploadHeaders = NULL;
+
+static struct cow
+{
+ char *metadata_mmap;
+ l1 *l1;
+ l2 *l2;
+ int fdMeta;
+ int fdData;
+ int fdStats;
+ pthread_mutex_t l2CreateLock;
+} cow;
+
+static size_t curlHeaderCallbackUploadBlock( char *buffer, size_t size, size_t nitems, void *userdata );
+
+static int countOneBits( atomic_uchar *bf, int numBytes )
+{
+ int bitCount = 0;
+ for ( int i = 0; i < numBytes; ++i ) {
+ unsigned char value = bf[i];
+ while ( value > 0 ) {
+ if ( ( value & 1 ) == 1 ) {
+ bitCount++;
+ }
+ value >>= 1;
+ }
+ }
+ return bitCount;
+}
+
+#define IS_4K_ALIGNED(v) ( ( (uint64_t)(v) & DNBD3_BLOCK_MASK ) == 0 )
+
+static bool writeAll( int fd, const char *buf, size_t count, off_t offset )
+{
+ while ( count > 0 ) {
+ ssize_t ret = pwrite( fd, buf, count, offset );
+ if ( ret == (ssize_t)count )
+ return true;
+ if ( ret == -1 ) {
+ if ( errno == EINTR )
+ continue;
+ return false;
+ }
+ if ( ret == 0 )
+ return false;
+ count -= ret;
+ buf += ret;
+ }
+ return true;
+}
+
+/**
+ * @brief Computes the l1 index for an absolute file offset
+ *
+ * @param offset absolute file offset
+ * @return int l1 index
+ */
+static int offsetToL1Index( size_t offset )
+{
+ return (int)( offset / COW_FULL_L2_TABLE_DATA_SIZE );
+}
+
+/**
+ * @brief Computes the l2 index for an absolute file offset
+ *
+ * @param offset absolute file offset
+ * @return int l2 index
+ */
+static int offsetToL2Index( size_t offset )
+{
+ return (int)( ( offset % COW_FULL_L2_TABLE_DATA_SIZE ) / COW_DATA_CLUSTER_SIZE );
+}
+
+/**
+ * @brief Computes the bit in the bitfield from the absolute file offset
+ *
+ * @param offset absolute file offset
+ * @return int bit(0-319) in the bitfield
+ */
+static int getBitfieldOffsetBit( size_t offset )
+{
+ return (int)( offset / DNBD3_BLOCK_SIZE ) % ( COW_BITFIELD_SIZE * 8 );
+}
+
+/**
+ * @brief Sets the specified bits in the specified range threadsafe to 1.
+ *
+ * @param byte of a bitfield
+ * @param from start bit
+ * @param to end bit
+ * @param value set bits to 1 or 0
+ */
+static void setBits( atomic_uchar *byte, int64_t from, int64_t to, bool value )
+{
+ char mask = (char)( ( 255 >> ( 7 - ( to - from ) ) ) << from );
+ if ( value ) {
+ atomic_fetch_or( byte, mask );
+ } else {
+ atomic_fetch_and( byte, ~mask );
+ }
+}
+
+/**
+ * @brief Sets the specified bits in the specified range threadsafe to 1.
+ *
+ * @param bitfield of a cow_l2_entry
+ * @param from start bit
+ * @param to end bit
+ * @param value set bits to 1 or 0
+ */
+static void setBitsInBitfield( atomic_uchar *bitfield, int64_t from, int64_t to, bool value )
+{
+ assert( from >= 0 && to < COW_BITFIELD_SIZE * 8 );
+ int64_t start = from / 8;
+ int64_t end = to / 8;
+
+ for ( int64_t i = start; i <= end; i++ ) {
+ setBits( ( bitfield + i ), from - i * 8, MIN( 7, to - i * 8 ), value );
+ from = ( i + 1 ) * 8;
+ }
+}
+
+/**
+ * @brief Checks if the n bit of a bit field is 0 or 1.
+ *
+ * @param bitfield of a cow_l2_entry
+ * @param n the bit which should be checked
+ */
+static bool checkBit( atomic_uchar *bitfield, int64_t n )
+{
+ return ( bitfield[n / 8] >> ( n % 8 ) ) & 1;
+}
+
+
+/**
+ * Generic callback for writing received data to a 500 byte buffer.
+ * MAKE SURE THE BUFFER IS EMPTY AT THE START! (i.e. buffer[0] = '\0')
+ */
+static size_t curlWriteCb500( char *buffer, size_t itemSize, size_t nitems, void *userpointer )
+{
+ char *dest = (char*)userpointer;
+ size_t done = strlen( dest );
+ size_t bytes = itemSize * nitems;
+
+ assert( done < 500 );
+ if ( done < 499 ) {
+ size_t n = MIN( bytes, 499 - done );
+ memcpy( dest + done, buffer, n );
+ dest[done + n] = '\0';
+ }
+ return bytes;
+}
+
+/**
+ * @brief Create a Session with the cow server and gets the session uuid.
+ */
+static bool createSession( const char *imageName, uint16_t rid )
+{
+ CURLcode res;
+ char url[COW_URL_STRING_SIZE];
+ char body[1000], reply[500];
+ const char *nameEsc;
+
+ curl_easy_reset( curl );
+ snprintf( url, COW_URL_STRING_SIZE, COW_API_CREATE, cowServerAddress );
+ logadd( LOG_INFO, "COW_API_CREATE URL: %s", url );
+ curl_easy_setopt( curl, CURLOPT_POST, 1L );
+ curl_easy_setopt( curl, CURLOPT_URL, url );
+
+ nameEsc = curl_easy_escape( curl, imageName, 0 );
+ if ( nameEsc == NULL ) {
+ logadd( LOG_ERROR, "Error escaping imageName" );
+ nameEsc = imageName; // Hope for the best
+ }
+ snprintf( body, sizeof body, "revision=%d&bitfieldSize=%d&imageName=%s",
+ (int)rid, (int)metadata->bitfieldSize, nameEsc );
+ if ( nameEsc != imageName ) {
+ curl_free( (char*)nameEsc );
+ }
+ curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body );
+
+ reply[0] = '\0';
+ curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, curlWriteCb500 );
+ curl_easy_setopt( curl, CURLOPT_WRITEDATA, reply );
+
+ res = curl_easy_perform( curl );
+
+ /* Check for errors */
+ if ( res != CURLE_OK ) {
+ logadd( LOG_ERROR, "COW_API_CREATE failed: curl says %s", curl_easy_strerror( res ) );
+ return false;
+ }
+
+ long http_code = 0;
+ curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
+ if ( http_code < 200 || http_code >= 300 ) {
+ logadd( LOG_ERROR, "COW_API_CREATE failed: http code %ld, %s", http_code, reply );
+ return false;
+ }
+ if ( strlen( reply ) > UUID_STRLEN ) {
+ logadd( LOG_ERROR, "Returned session id is too long: '%s'", reply );
+ return false;
+ }
+ strncpy( metadata->uuid, reply, sizeof(metadata->uuid) );
+ logadd( LOG_DEBUG1, "Cow session started, uuid: %s", metadata->uuid );
+ return true;
+}
+
+/**
+ * @brief Implementation of CURLOPT_READFUNCTION, this function will first send the bit field and
+ * then the block data in one bitstream. this function is usually called multiple times per block,
+ * because the buffer is usually not large for one block and its bitfield.
+ * for more details see: https://curl.se/libcurl/c/CURLOPT_READFUNCTION.html
+ *
+ * @param ptr to the buffer
+ * @param size of one element in buffer
+ * @param nmemb number of elements in buffer
+ * @param userdata from CURLOPT_READFUNCTION
+ * @return size_t size written in buffer
+ */
+static size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata )
+{
+ cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata;
+ size_t len = 0;
+ // Check if we're still in the bitfield
+ if ( uploadBlock->position < COW_BITFIELD_SIZE ) {
+ size_t lenCpy = MIN( COW_BITFIELD_SIZE - uploadBlock->position, size * nmemb );
+ memcpy( ptr + uploadBlock->position, uploadBlock->bitfield + uploadBlock->position,
+ lenCpy );
+ uploadBlock->position += lenCpy;
+ len += lenCpy;
+ }
+ // No elseif here, might just have crossed over...
+ if ( uploadBlock->position >= COW_BITFIELD_SIZE ) {
+ // Subtract the bitfield size from everything first
+ off_t inClusterOffset = uploadBlock->position - COW_BITFIELD_SIZE;
+ ssize_t spaceLeft = ( size * nmemb ) - len;
+ // Only read blocks that have been written to the cluster. Saves bandwidth. Not optimal since
+ // we do a lot of 4k/32k reads, but it's not that performance critical I guess...
+ while ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE && inClusterOffset < (off_t)COW_DATA_CLUSTER_SIZE ) {
+ int bitNumber = (int)( inClusterOffset / DNBD3_BLOCK_SIZE );
+ size_t readSize;
+ // Small performance hack: All bits one in a byte, do a 32k instead of 4k read
+ // TODO: preadv with a large iov, reading unchanged blocks into a trash-buffer
+ if ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE * 8
+ && bitNumber % 8 == 0
+ && uploadBlock->bitfield[bitNumber / 8] == 0xff ) {
+ readSize = DNBD3_BLOCK_SIZE * 8;
+ } else {
+ readSize = DNBD3_BLOCK_SIZE;
+ }
+ // If handling single block, check bits in our copy, as global bitfield could change
+ if ( readSize != DNBD3_BLOCK_SIZE || checkBit( uploadBlock->bitfield, bitNumber ) ) {
+ ssize_t lengthRead = pread( cow.fdData, ( ptr + len ), readSize,
+ uploadBlock->cluster->offset + inClusterOffset );
+ if ( lengthRead == -1 ) {
+ if ( errno == EAGAIN )
+ continue;
+ logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno );
+ return CURL_READFUNC_ABORT;
+ }
+ if ( lengthRead != (ssize_t)readSize ) {
+ logadd( LOG_ERROR, "Upload: Reading from COW file failed with short read (%d/%d)",
+ (int)lengthRead, (int)readSize );
+ return CURL_READFUNC_ABORT;
+ }
+ len += lengthRead;
+ spaceLeft -= lengthRead;
+ }
+ inClusterOffset += readSize;
+ uploadBlock->position += readSize;
+ }
+ }
+ return len;
+}
+
+
+/**
+ * @brief Requests the merging of the image on the cow server.
+ */
+static bool postMergeRequest()
+{
+ CURLcode res;
+ char url[COW_URL_STRING_SIZE];
+ char body[500], reply[500];
+ char *uuid;
+
+ curl_easy_reset( curl );
+ snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress );
+ curl_easy_setopt( curl, CURLOPT_URL, url );
+ curl_easy_setopt( curl, CURLOPT_POST, 1L );
+ curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, curlWriteCb500 );
+ curl_easy_setopt( curl, CURLOPT_WRITEDATA, reply );
+
+ uuid = curl_easy_escape( curl, metadata->uuid, 0 );
+ if ( uuid == NULL ) {
+ logadd( LOG_ERROR, "Error escaping uuid" );
+ uuid = metadata->uuid; // Hope for the best
+ }
+ snprintf( body, sizeof body, "originalFileSize=%"PRIu64"&newFileSize=%"PRIu64"&uuid=%s",
+ metadata->validRemoteSize, metadata->imageSize, uuid );
+ if ( uuid != metadata->uuid ) {
+ curl_free( uuid );
+ }
+ curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body );
+
+ reply[0] = '\0';
+ res = curl_easy_perform( curl );
+ if ( res != CURLE_OK ) {
+ logadd( LOG_WARNING, "COW_API_START_MERGE failed. curl reported: %s", curl_easy_strerror( res ) );
+ return false;
+ }
+ long http_code = 0;
+ curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
+ if ( http_code < 200 || http_code >= 300 ) {
+ logadd( LOG_WARNING, "COW_API_START_MERGE failed with http: %ld: %s", http_code, reply );
+ return false;
+ }
+ return true;
+}
+
+/**
+ * @brief Wrapper for postMergeRequest so if its fails it will be tried again.
+ *
+ */
+static void requestRemoteMerge()
+{
+ int fails = 0;
+ bool success = false;
+ success = postMergeRequest();
+ while ( fails <= 5 && !success ) {
+ fails++;
+ logadd( LOG_WARNING, "Trying again. %i/5", fails );
+ sleep( 10 );
+ postMergeRequest();
+ }
+}
+
+/**
+ * @brief Implementation of the CURLOPT_XFERINFOFUNCTION.
+ * For more infos see: https://curl.se/libcurl/c/CURLOPT_XFERINFOFUNCTION.html
+ *
+ * Each active transfer callbacks this function.
+ * This function computes the uploaded bytes between each call and adds it to
+ * bytesUploaded, which is used to compute the kb/s uploaded over all transfers.
+ *
+ * @param ulNow number of bytes uploaded by this transfer so far.
+ * @return int always returns 0 to continue the callbacks.
+ */
+static int progress_callback( void *clientp, UNUSED curl_off_t dlTotal,
+ UNUSED curl_off_t dlNow, UNUSED curl_off_t ulTotal, curl_off_t ulNow )
+{
+ cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t *)clientp;
+ bytesUploaded += ( ulNow - uploadingCluster->ulLast );
+ uploadingCluster->ulLast = ulNow;
+ return 0;
+}
+
+#ifdef COW_DUMP_BLOCK_UPLOADS
+static int cmpfunc( const void *a, const void *b )
+{
+ return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads );
+}
+/**
+ * @brief Writes all block numbers sorted by the number of uploads into the statsfile.
+ *
+ */
+static void dumpBlockUploads()
+{
+ long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
+
+ cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE];
+ uint64_t currentBlock = 0;
+ for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
+ if ( cow.l1[l1Index] == -1 ) {
+ continue;
+ }
+ for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
+ cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
+
+ blockUploads[currentBlock].uploads = block->uploads;
+ blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
+ currentBlock++;
+ }
+ }
+ qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc );
+
+ dprintf( cow.fdStats, "\n\n[BlockStats]\n" );
+ for ( uint64_t i = 0; i < currentBlock; i++ ) {
+ dprintf( cow.fdStats, "%" PRIu64 "=%" PRIu64 " \n",
+ blockUploads[i].clusterNumber, blockUploads[i].uploads );
+ }
+}
+#endif
+
+/**
+ * @brief Updates the status to the stdout/statfile depending on the startup parameters.
+ *
+ * @param inQueue Blocks that have changes old enough to be uploaded.
+ * @param modified Blocks that have been changed but whose changes are not old enough to be uploaded.
+ * @param idle Blocks that do not contain changes that have not yet been uploaded.
+ * @param speedBuffer ptr to char array that contains the current upload speed.
+ */
+static void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer )
+{
+ char buffer[300];
+ const char *state;
+
+ if ( uploadLoop ) {
+ state = "backgroundUpload";
+ } else if ( !uploadLoopDone ) {
+ state = "uploading";
+ } else {
+ state = "done";
+ }
+
+ int len = snprintf( buffer, sizeof buffer,
+ "[General]\n"
+ "uuid=%s\n"
+ "state=%s\n"
+ "inQueue=%" PRIu64 "\n"
+ "modifiedClusters=%" PRIu64 "\n"
+ "idleClusters=%" PRIu64 "\n"
+ "totalClustersUploaded=%" PRIu64 "\n"
+ "activeUploads=%i\n"
+ "%s%s\n",
+ metadata->uuid,
+ state, inQueue, modified, idle, totalBlocksUploaded, activeUploads,
+ COW_SHOW_UL_SPEED ? "avgSpeedKb=" : "",
+ speedBuffer );
+
+ if ( len == -1 ) {
+ logadd( LOG_ERROR, "snprintf error" );
+ return;
+ }
+
+ if ( statStdout ) {
+ logadd( LOG_INFO, "%s", buffer );
+ }
+
+ if ( statFile ) {
+ // Pad with a bunch of newlines so we don't change the file size all the time
+ ssize_t extra = MIN( 20, (ssize_t)sizeof(buffer) - len - 1 );
+ memset( buffer + len, '\n', extra );
+ if ( pwrite( cow.fdStats, buffer, len + extra, 0 ) != len + extra ) {
+ logadd( LOG_WARNING, "Could not update cow status file" );
+ }
+#ifdef COW_DUMP_BLOCK_UPLOADS
+ if ( !uploadLoop && uploadLoopDone ) {
+ lseek( cow.fdStats, len + extra, SEEK_SET );
+ dumpBlockUploads();
+ }
+#endif
+ }
+}
+
+/**
+ * @brief Starts the upload of a given block.
+ *
+ * @param cm Curl_multi
+ * @param uploadingCluster containing the data for the block to upload.
+ */
+static bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster )
+{
+ CURL *eh = curl_easy_init();
+
+ char url[COW_URL_STRING_SIZE];
+
+ snprintf( url, COW_URL_STRING_SIZE,
+ COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber );
+
+ curl_easy_setopt( eh, CURLOPT_URL, url );
+ curl_easy_setopt( eh, CURLOPT_POST, 1L );
+ curl_easy_setopt( eh, CURLOPT_HEADERFUNCTION, curlHeaderCallbackUploadBlock );
+ curl_easy_setopt( eh, CURLOPT_HEADERDATA, (void *)uploadingCluster );
+ curl_easy_setopt( eh, CURLOPT_READFUNCTION, curlReadCallbackUploadBlock );
+ curl_easy_setopt( eh, CURLOPT_READDATA, (void *)uploadingCluster );
+ curl_easy_setopt( eh, CURLOPT_WRITEFUNCTION, curlWriteCb500 );
+ curl_easy_setopt( eh, CURLOPT_WRITEDATA, (void *)uploadingCluster->replyBuffer );
+ curl_easy_setopt( eh, CURLOPT_PRIVATE, (void *)uploadingCluster );
+ // min upload speed of 1kb/s over 10 sec otherwise the upload is canceled.
+ curl_easy_setopt( eh, CURLOPT_LOW_SPEED_TIME, 10L );
+ curl_easy_setopt( eh, CURLOPT_LOW_SPEED_LIMIT, 1000L );
+
+ curl_easy_setopt( eh, CURLOPT_POSTFIELDSIZE_LARGE,
+ (long)( COW_BITFIELD_SIZE
+ + DNBD3_BLOCK_SIZE * countOneBits( uploadingCluster->bitfield, COW_BITFIELD_SIZE ) )
+ );
+
+ if ( COW_SHOW_UL_SPEED ) {
+ uploadingCluster->ulLast = 0;
+ curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L );
+ curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback );
+ curl_easy_setopt( eh, CURLOPT_XFERINFODATA, uploadingCluster );
+ }
+ curl_easy_setopt( eh, CURLOPT_HTTPHEADER, uploadHeaders );
+ curl_multi_add_handle( cm, eh );
+
+ return true;
+}
+
+static size_t curlHeaderCallbackUploadBlock( char *buffer, size_t size, size_t nitems, void *userdata )
+{
+ size_t len, offset;
+ int delay;
+ cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t*)userdata;
+
+ // If the "Retry-After" header is set, we interpret this as the server being overloaded
+ // or not ready yet to take another update. We slow down our upload loop then.
+ // We'll only accept a delay in seconds here, not an HTTP Date string.
+ // Otherwise, increase the fails counter.
+ len = size * nitems;
+ if ( len < 13 )
+ return len;
+ for ( int i = 0; i < 11; ++i ) {
+ buffer[i] |= 0x60;
+ }
+ if ( strncmp( buffer, "retry-after:", 12 ) != 0 )
+ return len;
+ offset = 12;
+ while ( offset + 1 < len && buffer[offset] == ' ' ) {
+ offset++;
+ }
+ delay = atoi( buffer + offset );
+ if ( delay > 0 ) {
+ if ( delay > 120 ) {
+ // Cap to two minutes
+ delay = 120;
+ }
+ uploadLoopThrottle = MAX( uploadLoopThrottle, delay );
+ uploadingCluster->retryTime = delay;
+ }
+ return len;
+}
+
+/**
+ * @brief After an upload completes, either successful or unsuccessful this
+ * function cleans everything up. If unsuccessful and there are some tries left
+ * retries to upload the block.
+ *
+ * @param cm Curl_multi
+ * @param msg CURLMsg
+ * @return true returned if the upload was successful or retries are still possible.
+ * @return false returned if the upload was unsuccessful.
+ */
+static bool clusterUploadDoneHandler( CURLM *cm, CURLMsg *msg )
+{
+ bool success = false;
+ cow_curl_read_upload_t *uploadingCluster;
+ CURLcode res;
+ CURLcode res2;
+ res = curl_easy_getinfo( msg->easy_handle, CURLINFO_PRIVATE, &uploadingCluster );
+
+ long http_code = 0;
+ res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code );
+
+ if ( msg->msg != CURLMSG_DONE ) {
+ logadd( LOG_ERROR, "multi_message->msg unexpectedly not DONE (%d)", (int)msg->msg );
+ } else if ( msg->data.result != CURLE_OK ) {
+ logadd( LOG_ERROR, "curl_easy returned non-OK after multi-finish: %s",
+ curl_easy_strerror( msg->data.result ) );
+ logadd( LOG_ERROR, "(%ld, %s)", http_code, uploadingCluster->replyBuffer );
+ } else if ( res != CURLE_OK || res2 != CURLE_OK ) {
+ logadd( LOG_ERROR, "curl_easy_getinfo failed after multifinish (%d, %d)", (int)res, (int)res2 );
+ } else if ( http_code == 503 ) {
+ if ( uploadingCluster->retryTime > 0 ) {
+ logadd( LOG_INFO, "COW server is asking to backoff for %d seconds", uploadingCluster->retryTime );
+ } else {
+ logadd( LOG_ERROR, "COW server returned 503 without Retry-After value: %s",
+ uploadingCluster->replyBuffer );
+ }
+ } else if ( http_code < 200 || http_code >= 300 ) {
+ logadd( LOG_ERROR, "COW server returned HTTP %ld: %s", http_code, uploadingCluster->replyBuffer );
+ } else {
+ // everything went ok, reset timeChanged of underlying cluster, but only if it
+ // didn't get updated again in the meantime.
+ atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, 0 );
+ uploadingCluster->cluster->uploads++;
+ uploadingCluster->cluster->fails = 0;
+ totalBlocksUploaded++;
+ success = true;
+ }
+ if ( !success ) {
+ uploadingCluster->cluster->fails++;
+ if ( uploadingCluster->retryTime > 0 ) {
+ // Don't reset timeChanged timestamp, so the next iteration of uploadModifiedClusters
+ // will queue this upload again after the throttle time expired.
+ } else {
+ logadd( LOG_ERROR, "Uploading cluster failed %i/5 times", uploadingCluster->cluster->fails );
+ // Pretend the block changed again just now, to prevent immediate retry
+ atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time,
+ time( NULL ) );
+ }
+ }
+ curl_multi_remove_handle( cm, msg->easy_handle );
+ curl_easy_cleanup( msg->easy_handle );
+ free( uploadingCluster );
+
+ return success;
+}
+
+/**
+ * @param cm Curl_multi
+ * @param activeUploads ptr to integer which holds the number of current uploads
+ * @param minNumberUploads break out of loop as soon as there are less than these many transfers running
+ * else COW_MAX_PARALLEL_BACKGROUND_UPLOADS.
+ * @return true returned if all uploads were successful
+ * @return false returned if one ore more upload failed.
+ */
+static bool curlMultiLoop( CURLM *cm, int minNumberUploads )
+{
+ CURLMsg *msg;
+ int msgsLeft = -1;
+ bool status = true;
+
+ if ( minNumberUploads <= 0 ) {
+ minNumberUploads = 1;
+ }
+ for ( ;; ) {
+ CURLMcode mc = curl_multi_perform( cm, &activeUploads );
+ if ( mc != CURLM_OK ) {
+ logadd( LOG_ERROR, "curl_multi_perform error %d, bailing out", (int)mc );
+ status = false;
+ break;
+ }
+
+ while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) {
+ if ( !clusterUploadDoneHandler( cm, msg ) ) {
+ status = false;
+ }
+ }
+ if ( activeUploads < minNumberUploads ) {
+ break;
+ }
+ // ony wait if there are active uploads
+ if ( activeUploads > 0 ) {
+ mc = curl_multi_wait( cm, NULL, 0, 1000, NULL );
+ if ( mc != CURLM_OK ) {
+ logadd( LOG_ERROR, "curl_multi_wait error %d, bailing out", (int)mc );
+ status = false;
+ break;
+ }
+ }
+
+ }
+ return status;
+}
+
+/**
+ * @brief loops through all blocks and uploads them.
+ *
+ * @param ignoreMinUploadDelay If true uploads all blocks that have changes while
+ * ignoring COW_MIN_UPLOAD_DELAY
+ * @param cm Curl_multi
+ * @return true if all blocks uploaded successful
+ * @return false if one ore more blocks failed to upload
+ */
+bool uploadModifiedClusters( bool ignoreMinUploadDelay, CURLM *cm )
+{
+ bool success = true;
+ const time_t now = time( NULL );
+
+ long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
+ // Iterate over all blocks, L1 first
+ for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
+ if ( cow.l1[l1Index] == -1 ) {
+ continue; // Not allocated
+ }
+ // Now all L2 clusters
+ for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
+ cow_l2_entry_t *cluster = ( cow.l2[cow.l1[l1Index]] + l2Index );
+ if ( cluster->offset == -1 ) {
+ continue; // Not allocated
+ }
+ if ( cluster->timeChanged == 0 ) {
+ continue; // Not changed
+ }
+ if ( !ignoreMinUploadDelay && ( now - cluster->timeChanged < COW_MIN_UPLOAD_DELAY ) ) {
+ continue; // Last change not old enough
+ }
+ // Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached
+ int minUploads = ignoreMinUploadDelay
+ ? COW_MAX_PARALLEL_UPLOADS
+ : COW_MAX_PARALLEL_BACKGROUND_UPLOADS;
+ if ( !curlMultiLoop( cm, minUploads ) ) {
+ success = false;
+ }
+ // Maybe one of the uploads was rejected by the server asking us to slow down a bit.
+ // Check for that case and don't trigger a new upload.
+ if ( uploadLoopThrottle > 0 ) {
+ goto DONE;
+ }
+ cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) );
+ b->cluster = cluster;
+ b->clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
+ b->position = 0;
+ b->retryTime = 0;
+ b->time = cluster->timeChanged;
+ b->replyBuffer[0] = '\0';
+ // Copy, so it doesn't change during upload
+ // when we assemble the data in curlReadCallbackUploadBlock()
+ for ( int i = 0; i < COW_BITFIELD_SIZE; ++i ) {
+ b->bitfield[i] = cluster->bitfield[i];
+ }
+ addUpload( cm, b );
+ if ( !ignoreMinUploadDelay && !uploadLoop ) {
+ goto DONE;
+ }
+ }
+ }
+DONE:
+ // Finish all the transfers still active
+ while ( activeUploads > 0 ) {
+ if ( !curlMultiLoop( cm, 1 ) ) {
+ success = false;
+ break;
+ }
+ }
+ return success;
+}
+
+
+/**
+ * @brief Computes the data for the status to the stdout/statfile every COW_STATS_UPDATE_TIME seconds.
+ *
+ */
+
+void *cowfile_statUpdater( UNUSED void *something )
+{
+ uint64_t lastUpdateTime = time( NULL );
+ time_t now;
+ char speedBuffer[20] = "0";
+
+ while ( !uploadLoopDone ) {
+ int modified = 0;
+ int inQueue = 0;
+ int idle = 0;
+ long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
+ now = time( NULL );
+ for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
+ if ( cow.l1[l1Index] == -1 ) {
+ continue;
+ }
+ for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
+ cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
+ if ( block->offset == -1 ) {
+ continue;
+ }
+ if ( block->timeChanged != 0 ) {
+ if ( !uploadLoop || now > block->timeChanged + COW_MIN_UPLOAD_DELAY ) {
+ inQueue++;
+ } else {
+ modified++;
+ }
+ } else {
+ idle++;
+ }
+ }
+ }
+
+ if ( COW_SHOW_UL_SPEED ) {
+ double delta;
+ double bytes = (double)atomic_exchange( &bytesUploaded, 0 );
+ now = time( NULL );
+ delta = (double)( now - lastUpdateTime );
+ lastUpdateTime = now;
+ if ( delta > 0 ) {
+ snprintf( speedBuffer, sizeof speedBuffer, "%.2f", bytes / 1000.0 / delta );
+ }
+ }
+
+ updateCowStatsFile( inQueue, modified, idle, speedBuffer );
+ sleep( COW_STATS_UPDATE_TIME );
+ }
+ return NULL;
+}
+
+void quitSigHandler( int sig UNUSED )
+{
+ uploadCancelled = true;
+ uploadLoop = false;
+}
+
+/**
+ * @brief main loop for blockupload in the background
+ */
+static void *uploaderThreadMain( UNUSED void *something )
+{
+ CURLM *cm;
+
+ cm = curl_multi_init();
+ curl_multi_setopt( cm, CURLMOPT_MAXCONNECTS,
+ (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) );
+
+ do {
+ // Unblock so this very thread gets the signal for abandoning the upload
+ struct sigaction newHandler = { .sa_handler = &quitSigHandler };
+ sigemptyset( &newHandler.sa_mask );
+ sigaction( SIGQUIT, &newHandler, NULL );
+ sigset_t sigmask;
+ sigemptyset( &sigmask );
+ sigaddset( &sigmask, SIGQUIT );
+ pthread_sigmask( SIG_UNBLOCK, &sigmask, NULL );
+ } while ( 0 );
+
+ while ( uploadLoop ) {
+ while ( uploadLoopThrottle > 0 && uploadLoop ) {
+ sleep( 1 );
+ uploadLoopThrottle--;
+ }
+ sleep( 2 );
+ if ( !uploadLoop )
+ break;
+ uploadModifiedClusters( false, cm );
+ }
+
+ if ( uploadCancelled ) {
+ uploadLoopDone = true;
+ logadd( LOG_INFO, "Not uploading remaining clusters, SIGQUIT received" );
+ } else {
+ // force the upload of all remaining blocks because the user dismounted the image
+ logadd( LOG_INFO, "Start uploading the remaining clusters." );
+ if ( !uploadModifiedClusters( true, cm ) ) {
+ uploadLoopDone = true;
+ logadd( LOG_ERROR, "One or more clusters failed to upload" );
+ } else {
+ uploadLoopDone = true;
+ logadd( LOG_DEBUG1, "All clusters uploaded" );
+ if ( cow_merge_after_upload ) {
+ requestRemoteMerge();
+ logadd( LOG_DEBUG1, "Requesting merge" );
+ }
+ }
+ }
+ curl_multi_cleanup( cm );
+ return NULL;
+}
+
+/**
+ * @brief Create a Cow Stats File an inserts the session uuid
+ *
+ * @param path where the file is created
+ * @return true
+ * @return false if failed to create or to write into the file
+ */
+static bool createCowStatsFile( char *path )
+{
+ char pathStatus[strlen( path ) + 12];
+
+ snprintf( pathStatus, strlen( path ) + 12, "%s%s", path, "/status" );
+
+ char buffer[100];
+ int len = snprintf( buffer, 100, "[General]\nuuid=%s\nstate=active\n", metadata->uuid );
+ if ( statStdout ) {
+ logadd( LOG_INFO, "%s", buffer );
+ }
+ if ( statFile ) {
+ if ( ( cow.fdStats = open( pathStatus, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) {
+ logadd( LOG_ERROR, "Could not create cow status file. Bye.\n" );
+ return false;
+ }
+
+ if ( pwrite( cow.fdStats, buffer, len, 0 ) != len ) {
+ logadd( LOG_ERROR, "Could not write to cow status file. Bye.\n" );
+ return false;
+ }
+ }
+ return true;
+}
+
+static bool commonInit( const char* serverAddress, const char *cowUuid )
+{
+ CURLcode m;
+
+ if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) {
+ logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid );
+ return false;
+ }
+ uploadHeaders = curl_slist_append( uploadHeaders, "Content-Type: application/octet-stream" );
+ pthread_mutex_init( &cow.l2CreateLock, NULL );
+ cowServerAddress = serverAddress;
+ if ( ( m = curl_global_init( CURL_GLOBAL_ALL ) ) != CURLE_OK ) {
+ logadd( LOG_ERROR, "curl_global_init failed: %s",
+ curl_easy_strerror( m ) );
+ return false;
+ }
+ curl = curl_easy_init();
+ if ( curl == NULL ) {
+ logadd( LOG_ERROR, "Error on curl_easy_init" );
+ return false;
+ }
+ return true;
+}
+
+/**
+ * @brief initializes the cow functionality, creates the data & meta file.
+ *
+ * @param path where the files should be stored
+ * @param image_Name name of the original file/image
+ * @param imageSizePtr
+ * @param cowUuid optional, use given UUID for talking to COW server instead of creating session
+ */
+bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
+ atomic_uint_fast64_t **imageSizePtr,
+ char *serverAddress, bool sStdout, bool sfile, const char *cowUuid )
+{
+ char pathMeta[strlen( path ) + 6];
+ char pathData[strlen( path ) + 6];
+
+ if ( !commonInit( serverAddress, cowUuid ) )
+ return false;
+
+ statStdout = sStdout;
+ statFile = sfile;
+
+ snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
+ snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );
+
+ if ( ( cow.fdMeta = open( pathMeta, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR ) ) == -1 ) {
+ logadd( LOG_ERROR, "Could not create cow meta file. Bye.\n %s \n", pathMeta );
+ return false;
+ }
+
+ if ( ( cow.fdData = open( pathData, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR ) ) == -1 ) {
+ logadd( LOG_ERROR, "Could not create cow data file. Bye.\n" );
+ return false;
+ }
+ struct stat fs;
+ if ( fstat( cow.fdData, &fs ) == -1 || fs.st_size != 0 ) {
+ logadd( LOG_ERROR, "/data file already exists and is not empty" );
+ return false;
+ }
+
+ size_t metaDataSizeHeader = sizeof( cowfile_metadata_header_t );
+
+ // Calculate how many full l2 tables we need to address COW_MAX_IMAGE_SIZE
+ size_t l1NumEntries = ( ( COW_MAX_IMAGE_SIZE + COW_FULL_L2_TABLE_DATA_SIZE - 1 )
+ / COW_FULL_L2_TABLE_DATA_SIZE );
+ // Make sure l1 and l2 are aligned to struct size
+ size_t sizeL1 = sizeof(cow.l1[0]);
+ size_t sizeL2 = sizeof(cow.l2[0]);
+ size_t startL1 = ( ( metaDataSizeHeader + sizeL1 - 1 ) / sizeL1 ) * sizeL1;
+ size_t startL2 = ( ( startL1 + l1NumEntries * sizeL1 + sizeL2 - 1 ) / sizeL2 ) * sizeL2;
+
+ // size of l1 array + number of l2's * size of l2
+ size_t ps = getpagesize();
+ if ( ps == 0 || ps > INT_MAX ) {
+ logadd( LOG_ERROR, "Cannot get native page size, aborting..." );
+ return false;
+ }
+ size_t metaSize = ( ( startL2 + l1NumEntries * sizeof( l2 ) + ps - 1 ) / ps ) * ps;
+
+ if ( ftruncate( cow.fdMeta, metaSize ) != 0 ) {
+ logadd( LOG_ERROR, "Could not set file size of meta data file (errno=%d). Bye.\n", errno );
+ return false;
+ }
+
+ cow.metadata_mmap = mmap( NULL, metaSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fdMeta, 0 );
+
+ if ( cow.metadata_mmap == MAP_FAILED ) {
+ logadd( LOG_ERROR, "Error while mmap()ing meta data, errno=%d", errno );
+ return false;
+ }
+
+ metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );
+ metadata->magicValue = COW_FILE_META_MAGIC_VALUE;
+ metadata->imageSize = **imageSizePtr;
+ metadata->version = CURRENT_COW_VERSION;
+ metadata->validRemoteSize = **imageSizePtr;
+ metadata->startL1 = (uint32_t)startL1;
+ metadata->startL2 = (uint32_t)startL2;
+ metadata->bitfieldSize = COW_BITFIELD_SIZE;
+ metadata->nextL2 = 0;
+ metadata->metaSize = ATOMIC_VAR_INIT( metaSize );
+ metadata->nextClusterOffset = ATOMIC_VAR_INIT( COW_DATA_CLUSTER_SIZE );
+ metadata->maxImageSize = COW_MAX_IMAGE_SIZE;
+ metadata->creationTime = time( NULL );
+ snprintf( metadata->imageName, 200, "%s", image_Name );
+
+ cow.l1 = (l1 *)( cow.metadata_mmap + startL1 );
+ cow.l2 = (l2 *)( cow.metadata_mmap + startL2 );
+ for ( size_t i = 0; i < l1NumEntries; i++ ) {
+ cow.l1[i] = -1;
+ }
+
+ // write header to data file
+ uint64_t header = COW_FILE_DATA_MAGIC_VALUE;
+ if ( pwrite( cow.fdData, &header, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) {
+ logadd( LOG_ERROR, "Could not write header to cow data file. Bye.\n" );
+ return false;
+ }
+
+ if ( cowUuid != NULL ) {
+ snprintf( metadata->uuid, sizeof(metadata->uuid), "%s", cowUuid );
+ logadd( LOG_INFO, "Using provided upload session id" );
+ } else if ( !createSession( image_Name, imageVersion ) ) {
+ return false;
+ }
+ createCowStatsFile( path );
+ *imageSizePtr = &metadata->imageSize;
+ return true;
+}
+
+/**
+ * @brief loads an existing cow state from the meta & data files
+ *
+ * @param path where the meta & data file is located
+ * @param imageSizePtr
+ */
+bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid )
+{
+ char pathMeta[strlen( path ) + 6];
+ char pathData[strlen( path ) + 6];
+
+ if ( !commonInit( serverAddress, cowUuid ) )
+ return false;
+
+ statStdout = sStdout;
+ statFile = sFile;
+
+ snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
+ snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );
+
+ if ( ( cow.fdMeta = open( pathMeta, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) {
+ logadd( LOG_ERROR, "Could not open cow meta file. Bye.\n" );
+ return false;
+ }
+ if ( ( cow.fdData = open( pathData, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) {
+ logadd( LOG_ERROR, "Could not open cow data file. Bye.\n" );
+ return false;
+ }
+
+ cowfile_metadata_header_t header;
+ {
+ size_t sizeToRead = sizeof( cowfile_metadata_header_t );
+ size_t readBytes = 0;
+ while ( readBytes < sizeToRead ) {
+ ssize_t bytes = pread( cow.fdMeta, ( ( &header ) + readBytes ), sizeToRead - readBytes, 0 );
+ if ( bytes <= 0 ) {
+ logadd( LOG_ERROR, "Error while reading meta file header. Bye.\n" );
+ return false;
+ }
+ readBytes += bytes;
+ }
+
+
+ if ( header.magicValue != COW_FILE_META_MAGIC_VALUE ) {
+ if ( __builtin_bswap64( header.magicValue ) == COW_FILE_META_MAGIC_VALUE ) {
+ logadd( LOG_ERROR, "cow meta file of wrong endianess. Bye.\n" );
+ return false;
+ }
+ logadd( LOG_ERROR, "cow meta file of unkown format. Bye.\n" );
+ return false;
+ }
+
+ if ( header.bitfieldSize != COW_BITFIELD_SIZE ) {
+ logadd( LOG_ERROR, "cow meta file has unexpected bitfield size %d", (int)header.bitfieldSize );
+ return false;
+ }
+ if ( header.startL1 >= header.startL2 || header.startL2 >= header.metaSize ) {
+ logadd( LOG_ERROR, "l1/l2 offset messed up in metadata." );
+ return false;
+ }
+
+ struct stat st;
+ fstat( cow.fdMeta, &st );
+ if ( st.st_size < (off_t)header.metaSize ) {
+ logadd( LOG_ERROR, "cow meta file too small. Bye." );
+ return false;
+ }
+ }
+ {
+ uint64_t magicValueDataFile;
+ if ( pread( cow.fdData, &magicValueDataFile, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) {
+ logadd( LOG_ERROR, "Error while reading cow data file, wrong file?. Bye." );
+ return false;
+ }
+
+ if ( magicValueDataFile != COW_FILE_DATA_MAGIC_VALUE ) {
+ if ( __builtin_bswap64( magicValueDataFile ) == COW_FILE_DATA_MAGIC_VALUE ) {
+ logadd( LOG_ERROR, "cow data file of wrong endianess. Bye." );
+ return false;
+ }
+ logadd( LOG_ERROR, "cow data file of unkown format. Bye." );
+ return false;
+ }
+ struct stat st;
+ fstat( cow.fdData, &st ); // add cluster size, since we don't preallocate
+ if ( header.nextClusterOffset > st.st_size + (int)COW_DATA_CLUSTER_SIZE ) {
+ logadd( LOG_ERROR, "cow data file too small. Expected=%jd, Is=%jd.",
+ (intmax_t)header.nextClusterOffset, (intmax_t)st.st_size );
+ return false;
+ }
+ }
+
+ cow.metadata_mmap = mmap( NULL, header.metaSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fdMeta, 0 );
+
+ if ( cow.metadata_mmap == MAP_FAILED ) {
+ logadd( LOG_ERROR, "Error while mapping mmap, errno=%d.", errno );
+ return false;
+ }
+ if ( header.version != CURRENT_COW_VERSION ) {
+ logadd( LOG_ERROR, "Error wrong file version got: %i expected: %i. Bye.",
+ metadata->version, CURRENT_COW_VERSION );
+ return false;
+ }
+
+
+ metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );
+
+ if ( cowUuid != NULL ) {
+ logadd( LOG_INFO, "Overriding stored upload session id with provided one" );
+ snprintf( metadata->uuid, sizeof(metadata->uuid), "%s", cowUuid );
+ }
+
+ *imageSizePtr = &metadata->imageSize;
+ cow.l1 = (l1 *)( cow.metadata_mmap + metadata->startL1 );
+ cow.l2 = (l2 *)( cow.metadata_mmap + metadata->startL2 );
+ createCowStatsFile( path );
+ return true;
+}
+/**
+ * @brief Starts the cow BackgroundThreads which are needed for stats and data upload
+ *
+ */
+bool cowfile_startBackgroundThreads()
+{
+ if( pthread_create( &tidCowUploader, NULL, &uploaderThreadMain, NULL ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create cow uploader thread");
+ return false;
+ }
+ if ( statFile || statStdout ) {
+ if(pthread_create( &tidStatUpdater, NULL, &cowfile_statUpdater, NULL ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create stat updater thread");
+ return false;
+ }
+ }
+ return true;
+}
+
+/**
+ * Check if block at given offset is local, i.e. has been modified.
+ * @param meta The cow_l2_entry for the according cluster MUST be provided
+ * @param offset offset of data, can be absolute image offset as it will be transformed into cluster offset
+ */
+static bool isBlockLocal( cow_l2_entry_t *meta, off_t offset )
+{
+ if ( meta == NULL )
+ return false;
+ return checkBit( meta->bitfield, ( offset % COW_DATA_CLUSTER_SIZE ) / DNBD3_BLOCK_SIZE );
+}
+
+/**
+ * @brief Get the cow_l2_entry_t from l1Index and l2Index.
+ * l1 offset must be valid
+ *
+ * @param l1Index
+ * @param l2Index
+ * @return cow_l2_entry_t*
+ */
+static cow_l2_entry_t *getL2Entry( int l1Index, int l2Index, bool create )
+{
+ if ( cow.l1[l1Index] == -1 )
+ return NULL;
+ cow_l2_entry_t *block = cow.l2[cow.l1[l1Index]] + l2Index;
+ if ( block->offset == -1 ) {
+ if ( !create )
+ return NULL;
+ block->offset = atomic_fetch_add( &metadata->nextClusterOffset, COW_DATA_CLUSTER_SIZE );
+ }
+ return block;
+}
+
+/**
+ * @brief creates an new L2 table and initializes the containing cow_l2_entry_t
+ *
+ * @param l1Index
+ */
+static bool createL2Table( int l1Index )
+{
+ pthread_mutex_lock( &cow.l2CreateLock );
+ if ( cow.l1[l1Index] == -1 ) {
+ int idx = metadata->nextL2++;
+ for ( int i = 0; i < COW_L2_TABLE_SIZE; i++ ) {
+ cow.l2[idx][i].offset = -1;
+ cow.l2[idx][i].timeChanged = ATOMIC_VAR_INIT( 0 );
+ cow.l2[idx][i].uploads = ATOMIC_VAR_INIT( 0 );
+ for ( int j = 0; j < COW_BITFIELD_SIZE; j++ ) {
+ cow.l2[idx][i].bitfield[j] = ATOMIC_VAR_INIT( 0 );
+ }
+ }
+ cow.l1[l1Index] = idx;
+ }
+ pthread_mutex_unlock( &cow.l2CreateLock );
+ return true;
+}
+
+/**
+ * @brief Is called once a fuse write request ist finished.
+ * Calls the corrsponding fuse reply depending on the type and
+ * success of the request.
+ *
+ * @param req fuse_req_t
+ * @param cowRequest
+ */
+
+static void finishWriteRequest( fuse_req_t req, cow_request_t *cowRequest )
+{
+ if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) != 1 )
+ return; // More sub-requests are pending, bail out
+ if ( cowRequest->errorCode != 0 ) {
+ fuse_reply_err( req, cowRequest->errorCode );
+ } else {
+ uint64_t newSize = cowRequest->bytesWorkedOn + cowRequest->fuseRequestOffset;
+ if ( newSize > metadata->imageSize ) {
+ uint64_t oldSize;
+ do {
+ oldSize = metadata->imageSize;
+ newSize = MAX( oldSize, newSize );
+ } while ( !atomic_compare_exchange_weak( &metadata->imageSize, &oldSize, newSize ) );
+ }
+ fuse_reply_write( req, cowRequest->bytesWorkedOn );
+ }
+ free( cowRequest );
+}
+
+/**
+ * @brief Called after the padding data was received from the dnbd3 server.
+ * The data from the write request will be combined with the data from the server
+ * so that we get a full DNBD3_BLOCK and is then written on the disk.
+ * @param sRequest
+ */
+static void writePaddedBlock( cow_sub_request_t *sRequest )
+{
+ assert( ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ) + sRequest->size <= DNBD3_BLOCK_SIZE );
+ // Here, we again check if the block is written locally - there might have been a second write
+ // that wrote the full block, hence didn't have to wait for remote data and finished faster.
+ // In that case, don't pad from remote as we'd overwrite newer data.
+ if ( isBlockLocal( sRequest->cluster, sRequest->inClusterOffset ) ) {
+ logadd( LOG_INFO, "It happened!" );
+ } else {
+ // copy write Data
+ // writeBuffer is the received data, patch data from fuse write into it
+ memcpy( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ), sRequest->writeSrc,
+ sRequest->size );
+ if ( !writeAll( cow.fdData, sRequest->writeBuffer, DNBD3_BLOCK_SIZE,
+ sRequest->cluster->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) {
+ sRequest->cowRequest->errorCode = errno;
+ } else {
+ sRequest->cowRequest->bytesWorkedOn += sRequest->size;
+ int64_t bit = sRequest->inClusterOffset / DNBD3_BLOCK_SIZE;
+ setBitsInBitfield( sRequest->cluster->bitfield, bit, bit, true );
+ sRequest->cluster->timeChanged = time( NULL );
+ }
+ }
+
+ finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest );
+ free( sRequest );
+}
+
+/**
+ * @brief If a block does not start or finish on an multiple of DNBD3_BLOCK_SIZE, the blocks need to be
+ * padded. If this block is inside the original image size, the padding data will be read from the server.
+ * Otherwise it will be padded with 0 since the it must be a block after the end of the image.
+ * @param req fuse_req_t
+ * @param cowRequest cow_request_t
+ * @param startOffset Absolute offset where the real data starts
+ * @param endOffset Absolute offset where the real data ends
+ * @param srcBuffer pointer to the data that needs to be padded, ie. data from user space.
+ */
+static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest,
+ off_t startOffset, off_t endOffset, const char *srcBuffer )
+{
+ // Make sure we pad exactly one block
+ endOffset = MIN( (uint64_t)endOffset, ( startOffset + DNBD3_BLOCK_SIZE ) & ~DNBD3_BLOCK_MASK );
+ assert( startOffset < endOffset );
+ size_t size = (size_t)( endOffset - startOffset );
+ int l1Index = offsetToL1Index( startOffset );
+ int l2Index = offsetToL2Index( startOffset );
+ off_t inClusterOffset = startOffset % COW_DATA_CLUSTER_SIZE;
+ cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, true );
+ if ( isBlockLocal( cluster, startOffset ) ) {
+ // No padding at all, keep existing data
+ bool ret = writeAll( cow.fdData, srcBuffer, size, cluster->offset + inClusterOffset );
+ if ( ret ) {
+ cowRequest->bytesWorkedOn += size;
+ cluster->timeChanged = time( NULL );
+ }
+ return ret;
+ }
+ // Not local, need some form of padding
+ createL2Table( l1Index );
+ if ( cluster == NULL ) {
+ cluster = getL2Entry( l1Index, l2Index, true );
+ }
+ uint64_t validImageSize = metadata->validRemoteSize; // As we don't lock
+ if ( startOffset >= (off_t)validImageSize ) {
+ // After end of remote valid data, pad with zeros entirely
+ char buf[DNBD3_BLOCK_SIZE] = {0};
+ off_t start = startOffset % DNBD3_BLOCK_SIZE;
+ assert( start + size <= DNBD3_BLOCK_SIZE );
+ memcpy( buf + start, srcBuffer, size );
+ bool ret = writeAll( cow.fdData, buf, DNBD3_BLOCK_SIZE,
+ cluster->offset + ( inClusterOffset & ~DNBD3_BLOCK_MASK ) );
+ if ( ret ) {
+ int64_t bit = inClusterOffset / DNBD3_BLOCK_SIZE;
+ setBitsInBitfield( cluster->bitfield, bit, bit, true );
+ cowRequest->bytesWorkedOn += size;
+ cluster->timeChanged = time( NULL );
+ }
+ return ret;
+ }
+ // Need to fetch padding from upstream, allocate struct plus one block
+ cow_sub_request_t *sub = calloc( sizeof( *sub ) + DNBD3_BLOCK_SIZE, 1 );
+ sub->callback = writePaddedBlock;
+ sub->inClusterOffset = inClusterOffset;
+ sub->cluster = cluster;
+ sub->size = size;
+ sub->writeSrc = srcBuffer;
+ sub->cowRequest = cowRequest;
+ sub->buffer = sub->writeBuffer;
+
+ sub->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, validImageSize - startOffset );
+ sub->dRequest.offset = startOffset & ~DNBD3_BLOCK_MASK;
+ sub->dRequest.fuse_req = req;
+
+ atomic_fetch_add( &cowRequest->workCounter, 1 );
+
+ if ( !connection_read( &sub->dRequest ) ) {
+ free( sub );
+ errno = ENOTSOCK;
+ // Don't need to go via finishWriteRequest here since the caller will take care of error handling
+ atomic_fetch_sub( &cowRequest->workCounter, 1 );
+ return false;
+ }
+ return true;
+}
+
+/**
+ * @brief Will be called after a dnbd3_async_t is finished.
+ * Calls the corrsponding callback function, either writePaddedBlock or readRemoteCallback
+ * depending if the original fuse request was a write or read.
+ *
+ */
+void cowfile_handleCallback( dnbd3_async_t *request )
+{
+ cow_sub_request_t *sRequest = container_of( request, cow_sub_request_t, dRequest );
+ sRequest->callback( sRequest );
+}
+
+
+/**
+ * @brief called once dnbd3_async_t is finished. Increases bytesWorkedOn by the number of bytes
+ * this request had. Also checks if it was the last dnbd3_async_t to finish the fuse request, if
+ * so replys to fuse and cleans up the request.
+ *
+ */
+static void readRemoteCallback( cow_sub_request_t *sRequest )
+{
+ atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length );
+
+ if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) {
+ if ( sRequest->cowRequest->bytesWorkedOn != sRequest->cowRequest->fuseRequestSize ) {
+ // Because connection_read() will always return exactly as many bytes as requested,
+ // or simply never finish.
+ logadd( LOG_ERROR, "BUG? Pad read has invalid size. worked on: %"PRIu64", request size: %"
+ PRIu64", offset: %"PRIu64,
+ (uint64_t)sRequest->cowRequest->bytesWorkedOn,
+ (uint64_t)sRequest->cowRequest->fuseRequestSize,
+ (uint64_t)sRequest->cowRequest->fuseRequestOffset );
+ fuse_reply_err( sRequest->dRequest.fuse_req, EIO );
+ } else {
+ fuse_reply_buf( sRequest->dRequest.fuse_req, sRequest->cowRequest->readBuffer,
+ sRequest->cowRequest->bytesWorkedOn );
+ }
+ free( sRequest->cowRequest->readBuffer );
+ free( sRequest->cowRequest );
+ }
+ free( sRequest );
+}
+
+/**
+ * @brief changes the imageSize
+ *
+ * @param req fuse request
+ * @param size new size the image should have
+ * @param ino fuse_ino_t
+ * @param fi fuse_file_info
+ */
+
+void cowfile_setSize( fuse_req_t req, size_t size, fuse_ino_t ino, struct fuse_file_info *fi )
+{
+ if ( size < metadata->imageSize ) {
+ // truncate file
+ if ( size < metadata->validRemoteSize ) {
+ metadata->validRemoteSize = size;
+ }
+ } else if ( size > metadata->imageSize ) {
+ // grow file, pad with zeroes
+ off_t offset = metadata->imageSize;
+ int l1Index = offsetToL1Index( offset );
+ int l2Index = offsetToL2Index( offset );
+ int l1EndIndex = offsetToL1Index( size );
+ int l2EndIndex = offsetToL2Index( size );
+ // Special case, first cluster through which the size change passes
+ cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, false );
+ if ( cluster != NULL ) {
+ off_t inClusterOffset = offset % COW_DATA_CLUSTER_SIZE;
+ // if the new size is inside a DNBD3_BLOCK it might still contain old data before a truncate
+ if ( !IS_4K_ALIGNED( metadata->imageSize ) ) {
+ size_t sizeToWrite = DNBD3_BLOCK_SIZE - ( metadata->imageSize % DNBD3_BLOCK_SIZE );
+
+ if ( checkBit( cluster->bitfield, inClusterOffset / DNBD3_BLOCK_SIZE ) ) {
+ char buf[DNBD3_BLOCK_SIZE] = {0};
+ ssize_t bytesWritten = pwrite( cow.fdData, buf, sizeToWrite, cluster->offset + inClusterOffset );
+
+ if ( bytesWritten < (ssize_t)sizeToWrite ) {
+ fuse_reply_err( req, bytesWritten == -1 ? errno : EIO );
+ return;
+ }
+ cluster->timeChanged = time( NULL );
+ offset += sizeToWrite;
+ }
+ }
+ // all remaining bits in cluster will get set to 0
+ inClusterOffset = offset % COW_DATA_CLUSTER_SIZE;
+ setBitsInBitfield( cluster->bitfield, inClusterOffset / DNBD3_BLOCK_SIZE,
+ ( COW_BITFIELD_SIZE * 8 ) - 1, false );
+ cluster->timeChanged = time( NULL );
+ l2Index++;
+ if ( l2Index >= COW_L2_TABLE_SIZE ) {
+ l2Index = 0;
+ l1Index++;
+ }
+ }
+ // normal case, if clusters exist, null bitfields
+ while ( l1Index < l1EndIndex || ( l1Index == l1EndIndex && l2Index <= l2EndIndex ) ) {
+ if ( cow.l1[l1Index] == -1 ) {
+ l1Index++;
+ l2Index = 0;
+ continue;
+ }
+ cluster = getL2Entry( l1Index, l2Index, false );
+ if ( cluster != NULL ) {
+ memset( cluster->bitfield, 0, COW_BITFIELD_SIZE );
+ cluster->timeChanged = time( NULL );
+ }
+ l2Index++;
+ if ( l2Index >= COW_L2_TABLE_SIZE ) {
+ l2Index = 0;
+ l1Index++;
+ }
+ }
+ }
+ metadata->imageSize = size;
+ if ( req != NULL ) {
+ image_ll_getattr( req, ino, fi );
+ }
+}
+
+/**
+ * @brief Implementation of a write request.
+ *
+ * @param req fuse_req_t
+ * @param cowRequest
+ * @param offset Offset where the write starts,
+ * @param size Size of the write.
+ */
+void cowfile_write( fuse_req_t req, cow_request_t *cowRequest, off_t offset, size_t size )
+{
+ // if beyond end of file, pad with 0
+ if ( offset > (off_t)metadata->imageSize ) {
+ cowfile_setSize( NULL, offset, 0, NULL );
+ }
+
+
+ off_t currentOffset = offset;
+ off_t endOffset = offset + size;
+
+ if ( !IS_4K_ALIGNED( currentOffset ) ) {
+ // Handle case where start is not 4k aligned
+ if ( !padBlockForWrite( req, cowRequest, currentOffset, endOffset, cowRequest->writeBuffer ) ) {
+ goto fail;
+ }
+ // Move forward to next block border
+ currentOffset = ( currentOffset + DNBD3_BLOCK_SIZE ) & ~DNBD3_BLOCK_MASK;
+ }
+ if ( currentOffset < endOffset && !IS_4K_ALIGNED( endOffset ) ) {
+ // Handle case where end is not 4k aligned
+ off_t lastBlockStart = endOffset & ~DNBD3_BLOCK_MASK;
+ if ( !padBlockForWrite( req, cowRequest, lastBlockStart, endOffset,
+ cowRequest->writeBuffer + ( lastBlockStart - offset ) ) ) {
+ goto fail;
+ }
+ endOffset = lastBlockStart;
+ }
+
+ // From here on start and end are block-aligned
+ int l1Index = offsetToL1Index( currentOffset );
+ int l2Index = offsetToL2Index( currentOffset );
+ while ( currentOffset < endOffset ) {
+ if ( cow.l1[l1Index] == -1 ) {
+ createL2Table( l1Index );
+ }
+ //loop over L2 array (metadata)
+ while ( currentOffset < endOffset && l2Index < COW_L2_TABLE_SIZE ) {
+ cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, true );
+ size_t inClusterOffset = currentOffset % COW_DATA_CLUSTER_SIZE;
+ // How many bytes we can write to this cluster before crossing a boundary,
+ // or before the write request is complete
+ size_t bytesToWriteToCluster =
+ MIN( (size_t)( endOffset - currentOffset ), COW_DATA_CLUSTER_SIZE - inClusterOffset );
+
+ if ( !writeAll( cow.fdData, cowRequest->writeBuffer + ( currentOffset - offset ),
+ bytesToWriteToCluster, cluster->offset + inClusterOffset ) ) {
+ goto fail;
+ }
+ int64_t f = inClusterOffset / DNBD3_BLOCK_SIZE;
+ int64_t t = ( inClusterOffset + bytesToWriteToCluster - 1 ) / DNBD3_BLOCK_SIZE;
+ setBitsInBitfield( cluster->bitfield, f, t, true );
+ cowRequest->bytesWorkedOn += bytesToWriteToCluster;
+ currentOffset += bytesToWriteToCluster;
+ cluster->timeChanged = time( NULL );
+ l2Index++;
+ }
+ l1Index++;
+ l2Index = 0;
+ }
+ goto success;
+
+fail:
+ if ( cowRequest->errorCode == 0 ) {
+ cowRequest->errorCode = errno != 0 ? errno : EIO;
+ }
+ // Fallthrough
+success:
+ finishWriteRequest( req, cowRequest );
+}
+
+
+/**
+ * @brief Request data, that is not available locally, via the network.
+ *
+ * @param req fuse_req_t
+ * @param offset from the start of the file
+ * @param size of data to request
+ * @param buffer into which the data is to be written
+ * @param cowRequest cow_request_t
+ */
+static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer, cow_request_t *cowRequest )
+{
+ assert( offset < (off_t)metadata->validRemoteSize );
+ assert( offset + size <= (off_t)metadata->validRemoteSize );
+ if ( size == 0 )
+ return;
+ assert( size > 0 );
+ cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) );
+ sRequest->callback = readRemoteCallback;
+ sRequest->dRequest.length = (uint32_t)size;
+ sRequest->dRequest.offset = offset;
+ sRequest->dRequest.fuse_req = req;
+ sRequest->cowRequest = cowRequest;
+ sRequest->buffer = buffer;
+
+ atomic_fetch_add( &cowRequest->workCounter, 1 );
+ if ( !connection_read( &sRequest->dRequest ) ) {
+ cowRequest->errorCode = EIO;
+ free( sRequest );
+ if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
+ fuse_reply_err( req, EIO );
+ free( cowRequest->readBuffer );
+ free( cowRequest );
+ }
+ }
+}
+
+/**
+ * @brief Get the Block Data Source object
+ *
+ * @param block
+ * @param bitfieldOffset
+ * @param offset
+ * @return enum dataSource
+ */
+enum dataSource getBlockDataSource( cow_l2_entry_t *block, off_t bitfieldOffset, off_t offset )
+{
+ if ( block != NULL && checkBit( block->bitfield, bitfieldOffset ) ) {
+ return ds_local;
+ }
+ if ( offset >= (off_t)metadata->validRemoteSize ) {
+ return ds_zero;
+ }
+ return ds_remote;
+}
+
+/**
+ * @brief Reads data at given offset. If the data are available locally,
+ * they are read locally, otherwise they are requested remotely.
+ *
+ * @param req fuse_req_t
+ * @param size of date to read
+ * @param offset offset where the read starts.
+ * @return uint64_t Number of bytes read.
+ */
+void cowfile_read( fuse_req_t req, size_t size, off_t startOffset )
+{
+ cow_request_t *cowRequest = malloc( sizeof( cow_request_t ) );
+ cowRequest->fuseRequestSize = size;
+ cowRequest->bytesWorkedOn = ATOMIC_VAR_INIT( 0 );
+ cowRequest->workCounter = ATOMIC_VAR_INIT( 1 );
+ cowRequest->errorCode = ATOMIC_VAR_INIT( 0 );
+ cowRequest->readBuffer = calloc( size, 1 );
+ cowRequest->fuseRequestOffset = startOffset;
+ off_t lastReadOffset = -1;
+ off_t endOffset = startOffset + size;
+ off_t searchOffset = startOffset;
+ int l1Index = offsetToL1Index( startOffset );
+ int l2Index = offsetToL2Index( startOffset );
+ int bitfieldOffset = getBitfieldOffsetBit( startOffset );
+ cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, false );
+ enum dataSource dataState = ds_invalid;
+ bool flushCurrentSpan = false; // Set if we need to read the current span and start the next one
+ bool newSourceType = true; // Set if we're starting a new span, and the source type needs to be determined
+
+ while ( searchOffset < endOffset ) {
+ if ( newSourceType ) {
+ newSourceType = false;
+ lastReadOffset = searchOffset;
+ dataState = getBlockDataSource( cluster, bitfieldOffset, searchOffset );
+ } else if ( getBlockDataSource( cluster, bitfieldOffset, searchOffset ) != dataState ) {
+ // Source type changed, obviously need to flush current span
+ flushCurrentSpan = true;
+ } else {
+ bitfieldOffset++;
+ // If reading from local cow file, crossing a cluster border means we need to flush
+ // since the next cluster might be somewhere else in the data file
+ if ( dataState == ds_local && bitfieldOffset == COW_BITFIELD_SIZE * 8 ) {
+ flushCurrentSpan = true;
+ }
+ }
+
+ // compute the absolute image offset from bitfieldOffset, l2Index and l1Index
+ // bitfieldOffset might be out of bounds here, but that doesn't matter for the calculation
+ searchOffset = DNBD3_BLOCK_SIZE * bitfieldOffset + l2Index * COW_DATA_CLUSTER_SIZE
+ + l1Index * COW_FULL_L2_TABLE_DATA_SIZE;
+ if ( flushCurrentSpan || searchOffset >= endOffset ) {
+ ssize_t spanEndOffset = MIN( searchOffset, endOffset );
+ if ( dataState == ds_remote ) {
+ if ( spanEndOffset > (ssize_t)metadata->validRemoteSize ) {
+ // Account for bytes we leave zero, because they're beyond the (truncated) original image size
+ atomic_fetch_add( &cowRequest->bytesWorkedOn, spanEndOffset - metadata->validRemoteSize );
+ spanEndOffset = metadata->validRemoteSize;
+ }
+ readRemote( req, lastReadOffset, spanEndOffset - lastReadOffset,
+ cowRequest->readBuffer + ( lastReadOffset - startOffset ), cowRequest );
+ } else if ( dataState == ds_zero ) {
+ // Past end of image, account for leaving them zero
+ ssize_t numBytes = spanEndOffset - lastReadOffset;
+ atomic_fetch_add( &cowRequest->bytesWorkedOn, numBytes );
+ } else if ( dataState == ds_local ) {
+ ssize_t numBytes = spanEndOffset - lastReadOffset;
+ // Compute the startOffset in the data file where the read starts
+ off_t localRead = cluster->offset + ( lastReadOffset % COW_DATA_CLUSTER_SIZE );
+ ssize_t totalBytesRead = 0;
+ while ( totalBytesRead < numBytes ) {
+ ssize_t bytesRead = pread( cow.fdData, cowRequest->readBuffer + ( lastReadOffset - startOffset ),
+ numBytes - totalBytesRead, localRead + totalBytesRead );
+ if ( bytesRead == -1 ) {
+ cowRequest->errorCode = errno;
+ goto fail;
+ } else if ( bytesRead == 0 ) {
+ logadd( LOG_ERROR, "EOF for read at localRead=%"PRIu64", totalBR=%"PRIu64,
+ (uint64_t)localRead, (uint64_t)totalBytesRead );
+ logadd( LOG_ERROR, "searchOffset=%"PRIu64", endOffset=%"PRIu64", imageSize=%"PRIu64,
+ searchOffset, endOffset, metadata->imageSize );
+ cowRequest->errorCode = EIO;
+ goto fail;
+ }
+ totalBytesRead += bytesRead;
+ }
+
+ atomic_fetch_add( &cowRequest->bytesWorkedOn, numBytes );
+ } else {
+ assert( 4 == 6 );
+ }
+ lastReadOffset = searchOffset;
+ flushCurrentSpan = false;
+ // Since the source type changed, reset
+ newSourceType = true;
+ }
+ if ( bitfieldOffset == COW_BITFIELD_SIZE * 8 ) {
+ // Advance to next cluster in current l2 table
+ bitfieldOffset = 0;
+ l2Index++;
+ if ( l2Index >= COW_L2_TABLE_SIZE ) {
+ // Advance to next l1 entry, reset l2 index
+ l2Index = 0;
+ l1Index++;
+ }
+ cluster = getL2Entry( l1Index, l2Index, false );
+ }
+ }
+fail:;
+ if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
+ if ( cowRequest->errorCode != 0 || cowRequest->bytesWorkedOn != size ) {
+ logadd( LOG_ERROR, "incomplete read or I/O error (errno=%d, workedOn: %"PRIu64", size: %"PRIu64")",
+ cowRequest->errorCode, (uint64_t)cowRequest->bytesWorkedOn, (uint64_t)size );
+ fuse_reply_err( req, cowRequest->errorCode != 0 ? cowRequest->errorCode : EIO );
+ } else {
+ fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn );
+ }
+ free( cowRequest->readBuffer );
+ free( cowRequest );
+ }
+}
+
+
+/**
+ * @brief stops the StatUpdater and CowUploader threads
+ * and waits for them to finish, then cleans up curl.
+ *
+ */
+void cowfile_close()
+{
+ uploadLoop = false;
+ pthread_join( tidCowUploader, NULL );
+ if ( statFile || statStdout ) {
+ // Send a signal in case it's hanging in the sleep call
+ pthread_kill( tidStatUpdater, SIGHUP );
+ pthread_join( tidStatUpdater, NULL );
+ }
+
+ curl_slist_free_all( uploadHeaders );
+ if ( curl ) {
+ curl_easy_cleanup( curl );
+ curl_global_cleanup();
+ }
+}
diff --git a/src/fuse/cowfile.h b/src/fuse/cowfile.h
new file mode 100644
index 0000000..3b1711c
--- /dev/null
+++ b/src/fuse/cowfile.h
@@ -0,0 +1,146 @@
+#ifndef _COWFILE_H_
+#define _COWFILE_H_
+
+#include "connection.h"
+
+#include <dnbd3/config/cow.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdatomic.h>
+#include <stdlib.h>
+
+// Net storage capacity of a single cluster in the data file
+#define COW_DATA_CLUSTER_SIZE ( COW_BITFIELD_SIZE * 8 * DNBD3_BLOCK_SIZE )
+// Number of entries per L2 table
+#define COW_L2_TABLE_SIZE 1024
+// Net storage capacity in data file represented by a full L2 table
+#define COW_FULL_L2_TABLE_DATA_SIZE ( COW_L2_TABLE_SIZE * COW_DATA_CLUSTER_SIZE )
+
+_Static_assert( ATOMIC_INT_LOCK_FREE == 2, "ATOMIC INT not lock free" );
+_Static_assert( ATOMIC_LONG_LOCK_FREE == 2, "ATOMIC LONG not lock free" );
+_Static_assert( ATOMIC_LLONG_LOCK_FREE == 2, "ATOMIC LLONG not lock free" );
+_Static_assert( sizeof( atomic_uint_least64_t ) == 8, "atomic_uint_least64_t not 8 byte" );
+_Static_assert( sizeof( _Atomic(uint32_t) ) == 4, "_Atomic(uint32_t) not 4 byte" );
+_Static_assert( sizeof( atomic_int_least64_t ) == 8, "atomic_int_least64_t not 8 byte" );
+
+enum dataSource
+{
+ ds_invalid,
+ ds_local,
+ ds_remote,
+ ds_zero
+};
+
+#define COW_METADATA_HEADER_SIZE 320
+typedef struct cowfile_metadata_header
+{
+ uint64_t magicValue; // 8byte
+ atomic_uint_least64_t imageSize; // 8byte
+ int32_t version; // 4byte
+ int32_t blocksize; // 4byte
+ uint64_t validRemoteSize; // 8byte
+ uint32_t startL1; // 4byte
+ uint32_t startL2; // 4byte
+ int32_t bitfieldSize; // 4byte
+ int32_t nextL2; // 4byte
+ atomic_int_least64_t metaSize; // 8byte
+ atomic_int_least64_t nextClusterOffset; // 8byte
+ uint64_t maxImageSize; // 8byte
+ uint64_t creationTime; // 8byte
+ char uuid[40]; // 40byte
+ char imageName[200]; // 200byte
+} cowfile_metadata_header_t;
+_Static_assert( sizeof( cowfile_metadata_header_t ) == COW_METADATA_HEADER_SIZE,
+ "cowfile_metadata_header is messed up" );
+
+#define COW_L2_ENTRY_SIZE 64
+typedef struct cow_l2_entry
+{
+ atomic_int_least64_t offset;
+ atomic_int_least64_t timeChanged;
+ _Atomic(uint32_t) uploads;
+ _Atomic(uint32_t) fails;
+ atomic_uchar bitfield[COW_BITFIELD_SIZE];
+} cow_l2_entry_t;
+_Static_assert( sizeof( cow_l2_entry_t ) == COW_L2_ENTRY_SIZE, "cow_l2_entry_t is messed up" );
+
+/**
+ * Open request for reading/writing the virtual image we expose.
+ */
+typedef struct cow_request
+{
+ size_t fuseRequestSize; // Number of bytes to be read/written
+ off_t fuseRequestOffset; // Absolute offset into the image, as seen by user space
+ char *readBuffer; // Used only in read case
+ const char *writeBuffer; // Used only in write case
+ atomic_size_t bytesWorkedOn; // Used for tracking how many bytes we have touched (exluding padding etc)
+ atomic_int workCounter; // How many pending sub requests (see below)
+ atomic_int errorCode; // For reporting back to fuse
+ fuse_ino_t ino; // Inode of file, used for ??? (For reporting back to fuse, dont know if needed?)
+ struct fuse_file_info *fi; // Used for ??? (For reporting back to fuse, dont know if needed?)
+ //fuse_req_t req; // Fuse request
+} cow_request_t;
+
+typedef struct cow_sub_request cow_sub_request_t;
+typedef void ( *cow_callback )( cow_sub_request_t *sRequest );
+
+/**
+ * A sub-request for above, which needs to be completed successfully
+ * before the parent cow_request can be completed.
+ * TODO Please verify field comments
+ */
+typedef struct cow_sub_request
+{
+ size_t size; // size of this sub-request
+ off_t inClusterOffset; // offset relative to the beginning of the cluster
+ const char *writeSrc; // pointer to the data of a write request which needs padding
+ char *buffer; // The pointer points to the original read buffer to the place where the sub read request should be copied to.
+ cow_l2_entry_t *cluster; // the cluster inClusterOffset refers to
+ cow_callback callback; // Callback when we're done handling this
+ cow_request_t *cowRequest; // parent request
+ dnbd3_async_t dRequest; // Probably request to dnbd3-server for non-aligned writes (wrt 4k dnbd3 block)
+ char writeBuffer[]; // buffer for a padding write request, gets filled from a remote read, then the writeSrc data gets copied into it.
+} cow_sub_request_t;
+
+typedef struct cow_curl_read_upload
+{
+ atomic_uint_least64_t time;
+ cow_l2_entry_t *cluster;
+ size_t position;
+ uint64_t clusterNumber;
+ int64_t ulLast;
+ int retryTime;
+ atomic_uchar bitfield[COW_BITFIELD_SIZE];
+ char replyBuffer[500];
+} cow_curl_read_upload_t;
+
+
+typedef struct cow_cluster_statistics
+{
+ uint64_t clusterNumber;
+ uint64_t uploads;
+} cow_cluster_statistics_t;
+
+typedef int32_t l1;
+typedef cow_l2_entry_t l2[COW_L2_TABLE_SIZE];
+
+bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, atomic_uint_fast64_t **imageSizePtr,
+ char *serverAddress, bool sStdout, bool sFile, const char *cowUuid );
+
+bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid );
+bool cowfile_startBackgroundThreads();
+void cowfile_read( fuse_req_t req, size_t size, off_t offset );
+
+void cowfile_write( fuse_req_t req, cow_request_t *cowRequest, off_t offset, size_t size );
+
+void cowfile_handleCallback( dnbd3_async_t *request );
+
+void cowfile_setSize( fuse_req_t req, size_t size, fuse_ino_t ino, struct fuse_file_info *fi );
+
+void readRemoteData( cow_sub_request_t *sRequest );
+
+int cow_printStats( char *buffer, const size_t len );
+
+void cowfile_close();
+
+#endif /* COWFILE_H_ */
diff --git a/src/fuse/helper.c b/src/fuse/helper.c
index d81b08f..f54073b 100644
--- a/src/fuse/helper.c
+++ b/src/fuse/helper.c
@@ -18,8 +18,8 @@ void printLog( log_info *info )
}
//rewind(file);
- fprintf( logFile, "ImageSize: %"PRIu64" MiB\n", ( uint64_t )( info->imageSize/ ( 1024ll*1024ll ) ) );
- fprintf( logFile, "ReceivedMiB: %"PRIu64" MiB\n", ( uint64_t )( info->receivedBytes/ ( 1024ll*1024ll ) ) );
+ fprintf( logFile, "ImageSize: %"PRIu64" MiB\n", (uint64_t)( info->imageSize/ ( 1024ll*1024ll ) ) );
+ fprintf( logFile, "ReceivedMiB: %"PRIu64" MiB\n", (uint64_t)( info->receivedBytes/ ( 1024ll*1024ll ) ) );
fprintf( logFile, "imageBlockCount: %"PRIu64"\n", info->imageBlockCount );
fprintf( logFile, "Blocksize: 4KiB\n\n" );
fprintf( logFile, "Block access count:\n" );
@@ -29,7 +29,7 @@ void printLog( log_info *info )
if ( i % 50 == 0 ) {
fprintf( logFile, "\n" );
}
- fprintf( logFile, "%i ", ( int ) info->blockRequestCount[i] );
+ fprintf( logFile, "%i ", (int) info->blockRequestCount[i] );
}
fprintf( logFile, "\n" );
fclose( logFile );
diff --git a/src/fuse/helper.h b/src/fuse/helper.h
index 9e5d127..b1fa513 100644
--- a/src/fuse/helper.h
+++ b/src/fuse/helper.h
@@ -1,7 +1,7 @@
#ifndef IMAGEHELPER_H
#define IMAGEHELPER_H
-#include "../types.h"
+#include <dnbd3/types.h>
#include <netdb.h>
#include <stdbool.h>
@@ -18,18 +18,18 @@ typedef struct log_info {
-void printLog(log_info *info);
+void printLog( log_info *info );
-int connect_to_server(char *server_adress, int port);
+int connect_to_server( char *server_adress, int port );
-static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
+static inline bool isSameAddressPort( const dnbd3_host_t * const a, const dnbd3_host_t * const b )
{
- return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) ));
+ return ( a->type == b->type ) && ( a->port == b->port ) && ( 0 == memcmp( a->addr, b->addr, ( a->type == HOST_IP4 ? 4 : 16 ) ) );
}
-static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
+static inline bool isSameAddress( const dnbd3_host_t * const a, const dnbd3_host_t * const b )
{
- return (a->type == b->type) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) ));
+ return ( a->type == b->type ) && ( 0 == memcmp( a->addr, b->addr, ( a->type == HOST_IP4 ? 4 : 16 ) ) );
}
#endif
diff --git a/src/fuse/main.c b/src/fuse/main.c
index 1a5643c..13dd168 100644
--- a/src/fuse/main.c
+++ b/src/fuse/main.c
@@ -5,271 +5,359 @@
* See the file COPYING.
*
* Changed by Stephan Schwaer
+ * FUSE lowlevel by Alan Reichert
* */
+#include "main.h"
+#include "cowfile.h"
#include "connection.h"
#include "helper.h"
-#include "../shared/protocol.h"
-#include "../shared/log.h"
+#include <dnbd3/version.h>
+#include <dnbd3/build.h>
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/log.h>
+#include <dnbd3/config.h>
-#define FUSE_USE_VERSION 30
-#include <fuse.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <assert.h>
/* for printing uint */
-#define __STDC_FORMAT_MACROS
+//#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <getopt.h>
#include <time.h>
#include <signal.h>
#include <pthread.h>
-
#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0)
-static const char * const IMAGE_PATH = "/img";
-static const char * const STATS_PATH = "/status";
+#define INO_ROOT (1)
+#define INO_STATS (2)
+#define INO_IMAGE (3)
+
+static const char *IMAGE_NAME = "img";
+static const char *STATS_NAME = "status";
+
+static struct fuse_session *_fuseSession = NULL;
+bool useCow = false;
+bool cow_merge_after_upload = false;
+static atomic_uint_fast64_t imageSize;
+static atomic_uint_fast64_t *imageSizePtr =&imageSize;
-static uint64_t imageSize;
/* Debug/Benchmark variables */
static bool useDebug = false;
static log_info logInfo;
static struct timespec startupTime;
static uid_t owner;
-static bool keepRunning = true;
-static void (*fuse_sigIntHandler)(int) = NULL;
-static void (*fuse_sigTermHandler)(int) = NULL;
-static struct fuse_operations dnbd3_fuse_no_operations;
-
-#define SIGPOOLSIZE 6
-static pthread_spinlock_t sigLock;
-static dnbd3_signal_t *signalPool[SIGPOOLSIZE];
-static dnbd3_signal_t **sigEnd = signalPool + SIGPOOLSIZE;
-static void signalInit()
+static int reply_buf_limited( fuse_req_t req, const char *buf, size_t bufsize, off_t off, size_t maxsize );
+static void fillStatsFile( fuse_req_t req, size_t size, off_t offset );
+static void image_destroy( void *private_data );
+static void image_ll_init( void *userdata, struct fuse_conn_info *conn );
+static void image_ll_lookup( fuse_req_t req, fuse_ino_t parent, const char *name );
+static void image_ll_open( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );
+static void image_ll_readdir( fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info *fi );
+static void image_ll_read( fuse_req_t req, fuse_ino_t ino, size_t size, off_t offset, struct fuse_file_info *fi );
+static void image_ll_write( fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi );
+static void image_ll_setattr( fuse_req_t req, fuse_ino_t ino, struct stat *attr, int to_set, struct fuse_file_info *fi );
+static int image_stat( fuse_ino_t ino, struct stat *stbuf );
+static void printUsage( char *argv0, int exitCode );
+static void printVersion();
+
+static int image_stat( fuse_ino_t ino, struct stat *stbuf )
{
- pthread_spin_init( &sigLock, PTHREAD_PROCESS_PRIVATE );
- for ( size_t i = 0; i < SIGPOOLSIZE; ++i ) {
- signalPool[i] = NULL;
+ switch ( ino ) {
+ case INO_ROOT:
+ stbuf->st_mode = S_IFDIR | 0550;
+ if( useCow ) {
+ stbuf->st_mode = S_IFDIR | 0770;
+ }
+ stbuf->st_nlink = 2;
+ stbuf->st_mtim = startupTime;
+ break;
+ case INO_IMAGE:
+ if ( useCow ) {
+ stbuf->st_mode = S_IFREG | 0660;
+ } else {
+ stbuf->st_mode = S_IFREG | 0440;
+ }
+ stbuf->st_nlink = 1;
+ stbuf->st_size = *imageSizePtr;
+ stbuf->st_mtim = startupTime;
+ break;
+ case INO_STATS:
+ stbuf->st_mode = S_IFREG | 0440;
+ stbuf->st_nlink = 1;
+ stbuf->st_size = 4096;
+ clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim );
+ break;
+ default:
+ return -1;
}
+ stbuf->st_ctim = stbuf->st_atim = startupTime;
+ stbuf->st_uid = owner;
+ stbuf->st_ino = ino;
+ return 0;
}
-static inline dnbd3_signal_t *signalGet()
+
+void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi )
{
- pthread_spin_lock( &sigLock );
- for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
- if ( *it != NULL ) {
- dnbd3_signal_t *ret = *it;
- *it = NULL;
- pthread_spin_unlock( &sigLock );
- return ret;
- }
+ struct stat stbuf = { 0 };
+ ( void ) fi;
+
+ if ( image_stat( ino, &stbuf ) == -1 ) {
+ fuse_reply_err( req, ENOENT );
+ } else {
+ fuse_reply_attr( req, &stbuf, ino == INO_IMAGE ? 1200 : 1 ); // seconds validity timeout
}
- pthread_spin_unlock( &sigLock );
- return signal_newBlocking();
}
-static inline void signalPut(dnbd3_signal_t *signal)
+
+static void image_ll_lookup( fuse_req_t req, fuse_ino_t parent, const char *name )
{
- pthread_spin_lock( &sigLock );
- for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
- if ( *it == NULL ) {
- *it = signal;
- pthread_spin_unlock( &sigLock );
+ ( void )parent;
+
+ if ( strcmp( name, IMAGE_NAME ) == 0 || strcmp( name, STATS_NAME ) == 0 ) {
+ struct fuse_entry_param e = { 0 };
+ if ( strcmp( name, IMAGE_NAME ) == 0 ) {
+ e.ino = INO_IMAGE;
+ e.attr_timeout = e.entry_timeout = 1200;
+ } else {
+ e.ino = INO_STATS;
+ e.attr_timeout = e.entry_timeout = 0;
+ }
+ if ( image_stat( e.ino, &e.attr ) == 0 ) {
+ fuse_reply_entry( req, &e );
return;
}
}
- pthread_spin_unlock( &sigLock );
- signal_close( signal );
+ fuse_reply_err( req, ENOENT );
}
-static int image_getattr(const char *path, struct stat *stbuf)
+struct dirbuf {
+ char *p;
+ size_t size;
+};
+
+static void dirbuf_add( fuse_req_t req, struct dirbuf *b, const char *name, fuse_ino_t ino )
{
- int res = 0;
- memset( stbuf, 0, sizeof( struct stat ) );
- stbuf->st_ctim = stbuf->st_atim = stbuf->st_mtim = startupTime;
- stbuf->st_uid = owner;
- if ( strcmp( path, "/" ) == 0 ) {
- stbuf->st_mode = S_IFDIR | 0550;
- stbuf->st_nlink = 2;
- } else if ( strcmp( path, IMAGE_PATH ) == 0 ) {
- stbuf->st_mode = S_IFREG | 0440;
- stbuf->st_nlink = 1;
- stbuf->st_size = imageSize;
- } else if ( strcmp( path, STATS_PATH ) == 0 ) {
- stbuf->st_mode = S_IFREG | 0440;
- stbuf->st_nlink = 1;
- stbuf->st_size = 4096;
- clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim );
- } else {
- res = -ENOENT;
- }
- return res;
+ struct stat stbuf = { .st_ino = ino };
+ size_t oldsize = b->size;
+ b->size += fuse_add_direntry( req, NULL, 0, name, NULL, 0 );
+ b->p = ( char * ) realloc( b->p, b->size );
+ fuse_add_direntry( req, b->p + oldsize, b->size - oldsize, name, &stbuf, b->size );
+ return;
}
-static int image_readdir(const char *path, void *buf, fuse_fill_dir_t filler, off_t offset UNUSED, struct fuse_file_info *fi UNUSED)
+static int reply_buf_limited( fuse_req_t req, const char *buf, size_t bufsize, off_t off, size_t maxsize )
{
- if ( strcmp( path, "/" ) != 0 ) {
- return -ENOENT;
+ if ( off >= 0 && off < (off_t)bufsize ) {
+ return fuse_reply_buf( req, buf + off, MIN( bufsize - off, maxsize ) );
}
- filler( buf, ".", NULL, 0 );
- filler( buf, "..", NULL, 0 );
- filler( buf, IMAGE_PATH + 1, NULL, 0 );
- filler( buf, STATS_PATH + 1, NULL, 0 );
- return 0;
+ return fuse_reply_buf( req, NULL, 0 );
}
-static int image_open(const char *path, struct fuse_file_info *fi)
+static void image_ll_readdir( fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info *fi )
{
- if ( strcmp( path, IMAGE_PATH ) != 0 && strcmp( path, STATS_PATH ) != 0 ) {
- return -ENOENT;
- }
- if ( ( fi->flags & 3 ) != O_RDONLY ) {
- return -EACCES;
+ ( void ) fi;
+
+ if ( ino != INO_ROOT ) {
+ fuse_reply_err( req, ENOTDIR );
+ } else {
+ struct dirbuf b;
+ memset( &b, 0, sizeof( b ) );
+ dirbuf_add( req, &b, ".", INO_ROOT );
+ dirbuf_add( req, &b, "..", INO_ROOT );
+ dirbuf_add( req, &b, IMAGE_NAME, INO_IMAGE );
+ dirbuf_add( req, &b, STATS_NAME, INO_STATS );
+ reply_buf_limited( req, b.p, b.size, off, size );
+ free( b.p );
}
- return 0;
}
-static int fillStatsFile(char *buf, size_t size, off_t offset) {
- if ( offset == 0 ) {
- return (int)connection_printStats( buf, size );
+static void image_ll_open( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi )
+{
+ if ( ino != INO_IMAGE && ino != INO_STATS ) {
+ fuse_reply_err( req, EISDIR );
+ } else if ( ( fi->flags & 3 ) != O_RDONLY && !useCow ) {
+ fuse_reply_err( req, EACCES );
+ } else {
+ // auto caching
+ fi->keep_cache = 1;
+ fuse_reply_open( req, fi );
}
+}
+
+static void fillStatsFile( fuse_req_t req, size_t size, off_t offset ) {
char buffer[4096];
int ret = (int)connection_printStats( buffer, sizeof buffer );
int len = MIN( ret - (int)offset, (int)size );
- if ( len == 0 )
- return 0;
if ( len < 0 ) {
- return -EOF;
+ fuse_reply_err( req, 0 );
+ return;
}
- memcpy( buf, buffer + offset, len );
- return len;
+ fuse_reply_buf( req, buffer + offset, len );
}
-static int image_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi UNUSED)
+static void image_ll_read( fuse_req_t req, fuse_ino_t ino, size_t size, off_t offset, struct fuse_file_info *fi )
{
- if ( size > __INT_MAX__ ) {
- // fuse docs say we MUST fill the buffer with exactly size bytes and return size,
- // otherwise the buffer will we padded with zeros. Since the return value is just
- // an int, we could not properly fulfill read requests > 2GB. Since there is no
- // mention of a guarantee that this will never happen, better add a safety check.
- // Way to go fuse.
- return -EIO;
+ assert( ino == INO_STATS || ino == INO_IMAGE );
+
+ ( void )fi;
+
+ if ( ino == INO_STATS ) {
+ fillStatsFile( req, size, offset );
+ return;
}
- if ( path[1] == STATS_PATH[1] ) {
- return fillStatsFile( buf, size, offset );
+
+ if ( size == 0 || size > UINT32_MAX ) {
+ fuse_reply_err( req, 0 );
+ return;
}
- if ( (uint64_t)offset >= imageSize ) {
- return 0;
+ if ( (uint64_t)offset >= *imageSizePtr ) {
+ fuse_reply_err( req, 0 );
+ return;
+ }
+ if ( offset + size > *imageSizePtr ) {
+ size = *imageSizePtr - offset;
}
- if ( offset + size > imageSize ) {
- size = imageSize - offset;
+ if ( useCow ) {
+ cowfile_read(req, size, offset);
+ return;
}
if ( useDebug ) {
- /* count the requested blocks */
uint64_t startBlock = offset / ( 4096 );
const uint64_t endBlock = ( offset + size - 1 ) / ( 4096 );
- for ( ; startBlock <= endBlock; startBlock++ ) {
+ for ( ; startBlock <= endBlock; startBlock++ )
+ {
++logInfo.blockRequestCount[startBlock];
}
}
- dnbd3_async_t request;
- request.buffer = buf;
- request.length = (uint32_t)size;
- request.offset = offset;
- request.signal = signalGet();
- if ( !connection_read( &request ) ) {
- signalPut( request.signal );
- return -EINVAL;
- }
- while ( !request.finished ) {
- int ret = signal_wait( request.signal, 5000 );
- if ( !keepRunning ) {
- connection_close();
- break;
- }
- if ( ret < 0 ) {
- debugf( "fuse_read signal wait returned %d", ret );
- }
- }
- signalPut( request.signal );
- if ( request.success ) {
- return request.length;
- } else {
- return -EIO;
+ dnbd3_async_parent_t *parent = malloc( sizeof(dnbd3_async_parent_t) + size );
+ parent->request.length = (uint32_t)size;
+ parent->request.offset = offset;
+ parent->request.fuse_req = req;
+
+ if ( !connection_read( &parent->request ) ) {
+ fuse_reply_err( req, EIO );
+ free( parent );
}
}
-static void image_sigHandler(int signum) {
- keepRunning = false;
- if ( signum == SIGINT && fuse_sigIntHandler != NULL ) {
- fuse_sigIntHandler(signum);
- }
- if ( signum == SIGTERM && fuse_sigTermHandler != NULL ) {
- fuse_sigTermHandler(signum);
- }
+static void noopSigHandler( int signum )
+{
+ (void)signum;
}
-static void* image_init(struct fuse_conn_info *conn UNUSED)
+static void image_ll_init( void *userdata UNUSED, struct fuse_conn_info *conn UNUSED )
{
+ ( void ) userdata;
+ ( void ) conn;
if ( !connection_initThreads() ) {
logadd( LOG_ERROR, "Could not initialize threads for dnbd3 connection, exiting..." );
- exit( EXIT_FAILURE );
- }
- // Prepare our handler
- struct sigaction newHandler;
- memset( &newHandler, 0, sizeof(newHandler) );
- newHandler.sa_handler = &image_sigHandler;
- sigemptyset( &newHandler.sa_mask );
- struct sigaction oldHandler;
- // Retrieve old handlers when setting
- sigaction( SIGINT, &newHandler, &oldHandler );
- fuse_sigIntHandler = oldHandler.sa_handler;
- logadd( LOG_DEBUG1, "Previous SIGINT handler was %p", (void*)(uintptr_t)fuse_sigIntHandler );
- sigaction( SIGTERM, &newHandler, &oldHandler );
- fuse_sigTermHandler = oldHandler.sa_handler;
- logadd( LOG_DEBUG1, "Previous SIGTERM handler was %p", (void*)(uintptr_t)fuse_sigIntHandler );
- return NULL;
+ if ( _fuseSession != NULL ) {
+ fuse_session_exit( _fuseSession );
+ }
+ }
}
/* close the connection */
-static void image_destroy(void *private_data UNUSED)
+static void image_destroy( void *private_data UNUSED )
{
if ( useDebug ) {
printLog( &logInfo );
}
connection_close();
- return;
+}
+
+
+static void image_ll_write( fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi )
+{
+ assert( ino == INO_STATS || ino == INO_IMAGE );
+
+ ( void )fi;
+
+ if ( ino == INO_STATS ) {
+ fuse_reply_err( req, EACCES );
+ return;
+ }
+
+ cow_request_t* cowRequest = malloc(sizeof(cow_request_t));
+ cowRequest->fuseRequestSize = size;
+ cowRequest->workCounter = ATOMIC_VAR_INIT( 1 );
+ cowRequest->writeBuffer = buf;
+ cowRequest->readBuffer = NULL;
+ cowRequest->errorCode = ATOMIC_VAR_INIT( 0 );
+ cowRequest->fuseRequestOffset = off;
+ cowRequest->bytesWorkedOn = ATOMIC_VAR_INIT( 0 );
+ cowfile_write(req, cowRequest, off, size);
+}
+
+static void image_ll_setattr( fuse_req_t req, fuse_ino_t ino, struct stat *attr, int to_set, struct fuse_file_info *fi )
+{
+ if ( ino != INO_IMAGE ) {
+ fuse_reply_err( req, EACCES );
+ return;
+ }
+ if (to_set & FUSE_SET_ATTR_SIZE) {
+ cowfile_setSize( req, attr->st_size, ino, fi);
+ return;
+ }
+ fuse_reply_err( req, EACCES );
}
/* map the implemented fuse operations */
-static struct fuse_operations image_oper = {
- .getattr = image_getattr,
- .readdir = image_readdir,
- .open = image_open,
- .read = image_read,
- .init = image_init,
+static struct fuse_lowlevel_ops image_oper = {
+ .lookup = image_ll_lookup,
+ .getattr = image_ll_getattr,
+ .readdir = image_ll_readdir,
+ .open = image_ll_open,
+ .read = image_ll_read,
+ .init = image_ll_init,
+ .destroy = image_destroy,
+};
+
+/* map the implemented fuse operations with copy on write */
+static struct fuse_lowlevel_ops image_oper_cow = {
+ .lookup = image_ll_lookup,
+ .getattr = image_ll_getattr,
+ .readdir = image_ll_readdir,
+ .open = image_ll_open,
+ .read = image_ll_read,
+ .init = image_ll_init,
.destroy = image_destroy,
+ .write = image_ll_write,
+ .setattr = image_ll_setattr,
};
+
static void printVersion()
{
char *arg[] = { "foo", "-V" };
- printf( "DNBD3-Fuse Version 1.2.3.4, protocol version %d\n", (int)PROTOCOL_VERSION );
- fuse_main( 2, arg, &dnbd3_fuse_no_operations, NULL );
+ printf( "dnbd3-fuse version: %s\n", DNBD3_VERSION_LONG );
+ printf( "Built: %s\n", DNBD3_BUILD_DATE );
+ printf( "Protocol version: %d\n", (int)PROTOCOL_VERSION );
+ struct fuse_args args = FUSE_ARGS_INIT( 2, arg );
+ fuse_parse_cmdline( &args, NULL, NULL, NULL );
exit( 0 );
}
-static void printUsage(char *argv0, int exitCode)
+static void printUsage( char *argv0, int exitCode )
{
char *arg[] = { argv0, "-h" };
- fuse_main( 2, arg, &dnbd3_fuse_no_operations, NULL );
+ struct fuse_args args = FUSE_ARGS_INIT( 2, arg );
+ fuse_parse_cmdline( &args, NULL, NULL, NULL );
printf( "\n" );
- printf( "Usage: %s [--debug] [--option mountOpts] --host <serverAddress(es)> --image <imageName> [--rid revision] <mountPoint>\n", argv0 );
- printf( "Or: %s [-d] [-o mountOpts] -h <serverAddress(es)> -i <imageName> [-r revision] <mountPoint>\n", argv0 );
+ printf( "Usage: %s [--debug] [--option mountOpts] --host <serverAddress(es)> --image <imageName> [--rid revision] <mountPoint>\n", argv0 );
+ printf( "Or: %s [-d] [-o mountOpts] -h <serverAddress(es)> -i <imageName> [-r revision] <mountPoint>\n", argv0 );
+ printf( "For cow: %s [-d] [-o mountOpts] -h <serverAddress(es)> -i <imageName> [-r revision] -c <path> -C <cowServerAddress> -m [--cow-stats-stdout] [--cow-stats-file] <mountPoint>\n", argv0 );
printf( " -d --debug Don't fork, write stats file, and print debug output (fuse -> stderr, dnbd3 -> stdout)\n" );
printf( " -f Don't fork (dnbd3 -> stdout)\n" );
printf( " -h --host List of space separated hosts to use\n" );
@@ -279,26 +367,41 @@ static void printUsage(char *argv0, int exitCode)
printf( " -r --rid Revision to use (omit or pass 0 for latest)\n" );
printf( " -S --sticky Use only servers from command line (no learning from servers)\n" );
printf( " -s Single threaded mode\n" );
+ printf( " -c Enables cow, creates the cow files at given location\n" );
+ printf( " -L Loads the cow files from the given location\n" );
+ printf( " -C Host address of the cow server\n" );
+ printf( "--upload-uuid <id> Use provided UUID as upload session id instead of asking server/loading from file\n" );
+ printf( "--cow-stats-stdout prints the cow status in stdout\n" );
+ printf( "--cow-stats-file creates and updates the cow status file\n" );
+ printf( " -m --merge tell server to merge and create new revision on exit\n" );
exit( exitCode );
}
-static const char *optString = "dfHh:i:l:o:r:SsVv";
+static const char *optString = "dfHh:i:l:o:r:SsVvc:L:C:m";
static const struct option longOpts[] = {
- { "debug", no_argument, NULL, 'd' },
- { "help", no_argument, NULL, 'H' },
- { "host", required_argument, NULL, 'h' },
- { "image", required_argument, NULL, 'i' },
- { "log", required_argument, NULL, 'l' },
- { "option", required_argument, NULL, 'o' },
- { "rid", required_argument, NULL, 'r' },
- { "sticky", no_argument, NULL, 'S' },
- { "version", no_argument, NULL, 'v' },
- { 0, 0, 0, 0 }
+ { "debug", no_argument, NULL, 'd' },
+ { "help", no_argument, NULL, 'H' },
+ { "host", required_argument, NULL, 'h' },
+ { "image", required_argument, NULL, 'i' },
+ { "log", required_argument, NULL, 'l' },
+ { "option", required_argument, NULL, 'o' },
+ { "rid", required_argument, NULL, 'r' },
+ { "sticky", no_argument, NULL, 'S' },
+ { "version", no_argument, NULL, 'v' },
+ { "cow", required_argument, NULL, 'c' },
+ { "loadcow", required_argument, NULL, 'L' },
+ { "cowServer", required_argument, NULL, 'C' },
+ { "merge", no_argument, NULL, 'm' },
+ { "upload-uuid", required_argument, NULL, 'uuid' },
+ { "cow-stats-stdout", no_argument, NULL, 'sout' },
+ { "cow-stats-file", no_argument, NULL, 'sfil' },
+ { 0, 0, 0, 0 }
};
-int main(int argc, char *argv[])
+int main( int argc, char *argv[] )
{
char *server_address = NULL;
+ char *cow_server_address = NULL;
char *image_Name = NULL;
char *log_file = NULL;
uint16_t rid = 0;
@@ -306,6 +409,17 @@ int main(int argc, char *argv[])
int newArgc;
int opt, lidx;
bool learnNewServers = true;
+ bool single_thread = false;
+ struct fuse_chan *ch;
+ char *mountpoint;
+ int foreground = 0;
+ char *cow_file_path = NULL;
+ bool loadCow = false;
+ bool sStdout = false;
+ bool sFile = false;
+ const char *cowUuidOverride = NULL;
+
+ log_init();
if ( argc <= 1 || strcmp( argv[1], "--help" ) == 0 || strcmp( argv[1], "--usage" ) == 0 ) {
printUsage( argv[0], 0 );
@@ -316,9 +430,10 @@ int main(int argc, char *argv[])
log_setConsoleTimestamps( true );
log_setFileMask( 65535 );
- newArgv = calloc( argc + 10, sizeof(char*) );
+ newArgv = calloc( argc + 10, sizeof( char* ) );
newArgv[0] = argv[0];
newArgc = 1;
+
while ( ( opt = getopt_long( argc, argv, optString, longOpts, &lidx ) ) != -1 ) {
switch ( opt ) {
case 'h':
@@ -328,7 +443,7 @@ int main(int argc, char *argv[])
image_Name = optarg;
break;
case 'r':
- rid = (uint16_t)atoi(optarg);
+ rid = (uint16_t)atoi( optarg );
break;
case 'o':
newArgv[newArgc++] = "-o";
@@ -357,15 +472,40 @@ int main(int argc, char *argv[])
case 'd':
useDebug = true;
newArgv[newArgc++] = "-d";
+ foreground = 1;
break;
case 's':
- newArgv[newArgc++] = "-s";
+ single_thread = true;
break;
case 'S':
learnNewServers = false;
break;
case 'f':
- newArgv[newArgc++] = "-f";
+ foreground = 1;
+ break;
+ case 'c':
+ cow_file_path = optarg;
+ useCow = true;
+ break;
+ case 'C':
+ cow_server_address = optarg;
+ break;
+ case 'm':
+ cow_merge_after_upload = true;
+ break;
+ case 'L':
+ cow_file_path = optarg;
+ useCow = true;
+ loadCow = true;
+ break;
+ case 'sout':
+ sStdout = true;
+ break;
+ case 'sfil':
+ sFile = true;
+ break;
+ case 'uuid':
+ cowUuidOverride = optarg;
break;
default:
printUsage( argv[0], EXIT_FAILURE );
@@ -385,6 +525,37 @@ int main(int argc, char *argv[])
logadd( LOG_WARNING, "Could not open log file at '%s'", log_file );
}
}
+ if( useCow && cow_server_address == NULL ) {
+ printf( "for -c you also need a cow server address. Please also use -C\n" );
+ printUsage( argv[0], EXIT_FAILURE );
+ }
+ if( cow_merge_after_upload && !useCow ) {
+ printf( "-m only works if cow is enabled. \n" );
+ printUsage( argv[0], EXIT_FAILURE );
+ }
+ if ( loadCow ) {
+ if( cow_server_address == NULL ) {
+ printf( "for -L you also need a cow server address. Please also use -C\n" );
+ printUsage( argv[0], EXIT_FAILURE );
+ }
+
+ if ( !cowfile_load( cow_file_path, &imageSizePtr, cow_server_address, sStdout, sFile, cowUuidOverride ) ) {
+ return EXIT_FAILURE;
+ }
+ }
+ do {
+ // The empty handler prevents fuse from registering its own handler
+ struct sigaction newHandler = { .sa_handler = &noopSigHandler };
+ sigemptyset( &newHandler.sa_mask );
+ sigaction( SIGHUP, &newHandler, NULL );
+ sigaction( SIGQUIT, &newHandler, NULL );
+ } while ( 0 );
+ if ( useCow ) {
+ sigset_t sigmask;
+ sigemptyset( &sigmask );
+ sigaddset( &sigmask, SIGQUIT ); // Block here and unblock in cow as abort signal
+ pthread_sigmask( SIG_BLOCK, &sigmask, NULL );
+ }
if ( !connection_init( server_address, image_Name, rid, learnNewServers ) ) {
logadd( LOG_ERROR, "Could not connect to any server. Bye.\n" );
@@ -394,27 +565,82 @@ int main(int argc, char *argv[])
/* initialize benchmark variables */
logInfo.receivedBytes = 0;
- logInfo.imageSize = imageSize;
- logInfo.imageBlockCount = ( imageSize + 4095 ) / 4096;
+ logInfo.imageSize = *imageSizePtr;
+ logInfo.imageBlockCount = ( *imageSizePtr + 4095 ) / 4096;
if ( useDebug ) {
logInfo.blockRequestCount = calloc( logInfo.imageBlockCount, sizeof(uint8_t) );
} else {
logInfo.blockRequestCount = NULL;
}
-
- // Since dnbd3 is always read only and the remote image will not change
+
newArgv[newArgc++] = "-o";
- newArgv[newArgc++] = "ro,auto_cache,default_permissions";
+ if ( useCow ) {
+ newArgv[newArgc++] = "default_permissions";
+ } else {
+ newArgv[newArgc++] = "ro,default_permissions";
+ }
// Mount point goes last
newArgv[newArgc++] = argv[optind];
- printf( "ImagePathName: %s\nFuseArgs:",IMAGE_PATH );
+ printf( "ImagePathName: /%s\nFuseArgs:", IMAGE_NAME );
for ( int i = 0; i < newArgc; ++i ) {
printf( " '%s'", newArgv[i] );
}
- putchar('\n');
+ putchar( '\n' );
clock_gettime( CLOCK_REALTIME, &startupTime );
owner = getuid();
- signalInit();
- return fuse_main( newArgc, newArgv, &image_oper, NULL );
+
+ if ( useCow & !loadCow) {
+ if( !cowfile_init( cow_file_path, connection_getImageName(), connection_getImageRID(), &imageSizePtr, cow_server_address, sStdout, sFile, cowUuidOverride ) ) {
+ return EXIT_FAILURE;
+ }
+ }
+
+ // Fuse lowlevel loop
+ struct fuse_args args = FUSE_ARGS_INIT( newArgc, newArgv );
+ int fuse_err = 1;
+ if ( fuse_parse_cmdline( &args, &mountpoint, NULL, NULL ) == -1 ) {
+ logadd( LOG_ERROR, "FUSE: Parsing command line failed" );
+ } else if ( ( ch = fuse_mount( mountpoint, &args ) ) == NULL ) {
+ logadd( LOG_ERROR, "Mounting file system failed" );
+ } else {
+ if(useCow){
+ _fuseSession = fuse_lowlevel_new( &args, &image_oper_cow, sizeof( image_oper_cow ), NULL );
+ } else{
+ _fuseSession = fuse_lowlevel_new( &args, &image_oper, sizeof( image_oper ), NULL );
+ }
+ if ( _fuseSession == NULL ) {
+ logadd( LOG_ERROR, "Could not initialize fuse session" );
+ } else {
+ fuse_session_add_chan( _fuseSession, ch );
+ // Do not spawn any threads before we daemonize, they'd die at this point
+ fuse_daemonize( foreground );
+ if ( fuse_set_signal_handlers( _fuseSession ) == -1 ) {
+ logadd( LOG_WARNING, "Could not install fuse signal handlers" );
+ }
+ if ( useCow ) {
+ if ( !cowfile_startBackgroundThreads() ) {
+ logadd( LOG_ERROR, "Could not start cow background threads" );
+ }
+ }
+ if ( single_thread ) {
+ fuse_err = fuse_session_loop( _fuseSession );
+ } else {
+ fuse_err = fuse_session_loop_mt( _fuseSession ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all
+ }
+ fuse_remove_signal_handlers( _fuseSession );
+ fuse_session_remove_chan( ch );
+ fuse_session_destroy( _fuseSession );
+ _fuseSession = NULL;
+ }
+ fuse_unmount( mountpoint, ch );
+ if( useCow ) {
+ cowfile_close();
+ }
+ }
+ fuse_opt_free_args( &args );
+ free( newArgv );
+ connection_join();
+ logadd( LOG_DEBUG1, "Terminating. FUSE REPLIED: %d\n", fuse_err );
+ return fuse_err;
}
diff --git a/src/fuse/main.h b/src/fuse/main.h
new file mode 100644
index 0000000..bf21805
--- /dev/null
+++ b/src/fuse/main.h
@@ -0,0 +1,12 @@
+#ifndef _MAIN_H_
+#define _MAIN_H_
+
+#define FUSE_USE_VERSION 30
+#include <fuse_lowlevel.h>
+#include <stdbool.h>
+
+extern bool useCow;
+extern bool cow_merge_after_upload;
+void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );
+
+#endif /* main_H_ */
diff --git a/src/fuse/serialize.c b/src/fuse/serialize.c
deleted file mode 100644
index 4934132..0000000
--- a/src/fuse/serialize.c
+++ /dev/null
@@ -1,5 +0,0 @@
-#include <stdio.h>
-#include <string.h>
-#include <stdint.h>
-
-#include "../serialize.c"