summaryrefslogtreecommitdiffstats
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/CMakeLists.txt112
-rw-r--r--src/server/altservers.c79
-rw-r--r--src/server/altservers.h2
-rw-r--r--src/server/fileutil.c2
-rw-r--r--src/server/fuse.c661
-rw-r--r--src/server/fuse.h10
-rw-r--r--src/server/globals.c72
-rw-r--r--src/server/globals.h93
-rw-r--r--src/server/helper.h4
-rw-r--r--src/server/image.c685
-rw-r--r--src/server/image.h48
-rw-r--r--src/server/ini.c2
-rw-r--r--src/server/integrity.c20
-rw-r--r--src/server/locks.c4
-rw-r--r--src/server/locks.h20
-rw-r--r--src/server/net.c174
-rw-r--r--src/server/net.h4
-rw-r--r--src/server/picohttpparser/CMakeLists.txt11
-rw-r--r--src/server/reference.h5
-rw-r--r--src/server/rpc.c29
-rw-r--r--src/server/serialize.c5
-rw-r--r--src/server/server.c84
-rw-r--r--src/server/server.h4
-rw-r--r--src/server/threadpool.c19
-rw-r--r--src/server/threadpool.h5
-rw-r--r--src/server/uplink.c1127
-rw-r--r--src/server/uplink.h8
27 files changed, 2378 insertions, 911 deletions
diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt
new file mode 100644
index 0000000..9a1e1c4
--- /dev/null
+++ b/src/server/CMakeLists.txt
@@ -0,0 +1,112 @@
+cmake_minimum_required(VERSION 3.10)
+
+# set the project name
+project(dnbd3-server
+ LANGUAGES C)
+
+# find Jansson package required by the dnbd3-server
+find_package(Jansson)
+if(NOT JANSSON_FOUND)
+ message(FATAL_ERROR "*** No jansson lib found, can't build dnbd3-server!")
+endif(NOT JANSSON_FOUND)
+
+# find atomic library required by the dnbd3-server
+find_package(Stdatomic REQUIRED)
+find_package(Libatomic REQUIRED)
+
+# add compile option to enable enhanced POSIX features
+add_definitions(-D_GNU_SOURCE)
+
+if(DNBD3_SERVER_AFL)
+ # check if DNBD3_RELEASE_HARDEN is disabled
+ if(DNBD3_RELEASE_HARDEN)
+ message(FATAL_ERROR "DNBD3_SERVER_AFL can only be enabled if DNBD3_RELEASE_HARDEN is disabled")
+ endif(DNBD3_RELEASE_HARDEN)
+
+ # build dnbd3-server with AFL support
+ message(STATUS "Building dnbd3-server with AFL support")
+ add_definitions(-DDNBD3_SERVER_AFL)
+
+ # change compiler for dnbd3-server sources if AFL enabled
+ include(CheckAFLCCompiler)
+ check_afl_c_compiler(AFL_C_COMPILER AFL_C_COMPILER_NAME ${CMAKE_C_COMPILER} ${CMAKE_C_COMPILER_ID})
+ if(AFL_C_COMPILER)
+ message(STATUS "Check for working AFL C compiler: ${AFL_C_COMPILER} - done")
+ # change C compiler to a corresponding AFL C compiler
+ set(CMAKE_C_COMPILER "${AFL_C_COMPILER}")
+ else(AFL_C_COMPILER)
+ # no corresponding AFL C compiler found
+ message(STATUS "Check for working AFL C compiler: ${AFL_C_COMPILER_NAME} - failed")
+ message(FATAL_ERROR "No corresponding AFL C compiler ${AFL_C_COMPILER_NAME} was found for the C compiler ${CMAKE_C_COMPILER}!")
+ endif(AFL_C_COMPILER)
+endif(DNBD3_SERVER_AFL)
+
+set(DNBD3_SERVER_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/altservers.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/fileutil.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/fuse.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/globals.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/helper.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/image.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/ini.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/integrity.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/locks.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/net.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/reference.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/rpc.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/server.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/threadpool.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/uplink.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/urldecode.c)
+set(DNBD3_SERVER_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/altservers.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/fileutil.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/fuse.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/globals.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/helper.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/image.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/ini.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/integrity.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/locks.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/net.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/reference.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/reftypes.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rpc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/server.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/threadpool.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/uplink.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/urldecode.h)
+
+add_executable(dnbd3-server ${DNBD3_SERVER_SOURCE_FILES})
+target_include_directories(dnbd3-server PRIVATE ${JANSSON_INCLUDE_DIR})
+target_link_libraries(dnbd3-server dnbd3-version dnbd3-build dnbd3-shared picohttpparser Libatomic::Libatomic ${CMAKE_THREAD_LIBS_INIT} ${JANSSON_LIBRARIES})
+
+if(DNBD3_SERVER_FUSE)
+ find_package(Fuse REQUIRED)
+ # include Fuse headers and link with Fuse library
+ target_compile_options(dnbd3-server PRIVATE -DDNBD3_SERVER_FUSE)
+ target_include_directories(dnbd3-server PRIVATE ${FUSE_INCLUDE_DIRS})
+ target_link_libraries(dnbd3-server ${FUSE_LIBRARIES})
+endif(DNBD3_SERVER_FUSE)
+
+if(UNIX AND NOT APPLE)
+ # link dnbd3-server with librt if server is compiled for a Unix system
+ target_link_libraries(dnbd3-server rt)
+endif(UNIX AND NOT APPLE)
+
+if(DNBD3_SERVER_DEBUG_LOCKS)
+ # enable debugging of locks used in the dnbd3-server
+ target_compile_options(dnbd3-server PRIVATE -DDNBD3_SERVER_DEBUG_LOCKS)
+endif(DNBD3_SERVER_DEBUG_LOCKS)
+
+if(DNBD3_SERVER_DEBUG_THREADS)
+ # enable debugging of threads used in the dnbd3-server
+ target_compile_options(dnbd3-server PRIVATE -DDNBD3_SERVER_DEBUG_THREADS)
+endif(DNBD3_SERVER_DEBUG_THREADS)
+
+install(TARGETS dnbd3-server RUNTIME DESTINATION bin
+ COMPONENT server)
+
+add_linter(dnbd3-server-lint "${DNBD3_SERVER_SOURCE_FILES}" "${DNBD3_SERVER_HEADER_FILES}")
+add_linter_fix(dnbd3-server-lint-fix "${DNBD3_SERVER_SOURCE_FILES}" "${DNBD3_SERVER_HEADER_FILES}")
+
+# add external dependency (HTTP parser) for the dnbd3-server
+add_subdirectory(picohttpparser)
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 943345c..4413ca6 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -5,16 +5,16 @@
#include "helper.h"
#include "image.h"
#include "fileutil.h"
-#include "../shared/protocol.h"
-#include "../shared/timing.h"
-#include "../serverconfig.h"
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/timing.h>
+#include <dnbd3/config/server.h>
#include "reference.h"
#include <assert.h>
#include <inttypes.h>
#include <jansson.h>
-#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, image->name, (int)image->rid)
+#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, PIMG(image))
#define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0);
#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__)
@@ -172,7 +172,7 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink)
if ( uplink->rttTestResult != RTT_INPROGRESS ) {
dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref );
if ( current == uplink ) {
- threadpool_run( &altservers_runCheck, uplink );
+ threadpool_run( &altservers_runCheck, uplink, "UPLINK" );
} else if ( current != NULL ) {
ref_put( &current->reference );
}
@@ -268,12 +268,32 @@ int altservers_getHostListForReplication(const char *image, dnbd3_host_t *server
int idx[size];
int num = altservers_getListForUplink( NULL, image, idx, size, -1 );
for ( int i = 0; i < num; ++i ) {
- servers[i] = altServers[i].host;
+ servers[i] = altServers[idx[i]].host;
}
return num;
}
/**
+ * Returns true if there is at least one alt-server the
+ * given image name would be allowed to be cloned from.
+ */
+bool altservers_imageHasAltServers(const char *image)
+{
+ bool ret = false;
+ mutex_lock( &altServersLock );
+ for ( int i = 0; i < numAltServers; ++i ) {
+ if ( altServers[i].isClientOnly || ( !altServers[i].isPrivate && _proxyPrivateOnly ) )
+ continue;
+ if ( !isImageAllowed( &altServers[i], image ) )
+ continue;
+ ret = true;
+ break;
+ }
+ mutex_unlock( &altServersLock );
+ return ret;
+}
+
+/**
* Get <size> alt servers. If there are more alt servers than
* requested, random servers will be picked.
* This function is suited for finding uplink servers as
@@ -450,6 +470,11 @@ static void *altservers_runCheck(void *data)
void altservers_findUplink(dnbd3_uplink_t *uplink)
{
altservers_findUplinkInternal( uplink );
+ // Above function is sync, which means normally when it
+ // returns, rttTestResult will not be RTT_INPROGRESS.
+ // But we might have an ansync call running in parallel, which would
+ // mean the above call returns immediately. Wait for that check
+ // to finish too.
while ( uplink->rttTestResult == RTT_INPROGRESS ) {
usleep( 5000 );
}
@@ -504,17 +529,29 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" );
return;
}
- LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid );
+ logadd( LOG_DEBUG2, "Running alt check for %s:%d", PIMG(image) );
assert( uplink->rttTestResult == RTT_INPROGRESS );
// Test them all
dnbd3_server_connection_t best = { .fd = -1 };
unsigned long bestRtt = RTT_UNREACHABLE;
unsigned long currentRtt = RTT_UNREACHABLE;
+ uint64_t offset = 0;
+ uint32_t length = DNBD3_BLOCK_SIZE;
+ // Try to use the range of the first request in the queue as RTT block.
+ // In case we have a cluster of servers where none of them has a complete
+ // copy, we at least make sure the one we're potentially switching to
+ // has the next block we're about to request.
+ mutex_lock( &uplink->queueLock );
+ if ( uplink->queue != NULL ) {
+ offset = uplink->queue->from;
+ length = (uint32_t)( uplink->queue->to - offset );
+ }
+ mutex_unlock( &uplink->queueLock );
for (itAlt = 0; itAlt < numAlts; ++itAlt) {
int server = servers[itAlt];
// Connect
clock_gettime( BEST_CLOCK_SOURCE, &start );
- int sock = sock_connect( &altServers[server].host, 750, 1000 );
+ int sock = sock_connect( &altServers[server].host, 750, _uplinkTimeout );
if ( sock == -1 ) { // Connection failed means global error
altservers_serverFailed( server );
continue;
@@ -524,7 +561,8 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
goto image_failed;
}
// See if selecting the image succeeded ++++++++++++++++++++++++++++++
- uint16_t protocolVersion, rid;
+ uint16_t protocolVersion = 0;
+ uint16_t rid;
uint64_t imageSize;
char *name;
serialized_buffer_t serialized;
@@ -543,9 +581,9 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
if ( imageSize != image->virtualFilesize ) {
ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize );
}
- // Request first block (NOT random!) ++++++++++++++++++++++++++++++
- if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
- LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server );
+ // Request block (NOT random! First or from queue) ++++++++++++
+ if ( !dnbd3_get_block( sock, offset, length, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
+ LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request block", server );
}
// See if requesting the block succeeded ++++++++++++++++++++++
dnbd3_reply_t reply;
@@ -553,13 +591,18 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server );
}
// check reply header
- if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
+ if ( reply.cmd != CMD_GET_BLOCK || reply.size != length ) {
// Sanity check failed; count this as global error (malicious/broken server)
ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size );
}
// flush payload to include this into measurement
char buffer[DNBD3_BLOCK_SIZE];
- if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
+ uint32_t todo = length;
+ ssize_t ret;
+ while ( todo != 0 && ( ret = recv( sock, buffer, MIN( DNBD3_BLOCK_SIZE, todo ), MSG_WAITALL ) ) > 0 ) {
+ todo -= (uint32_t)ret;
+ }
+ if ( todo != 0 ) {
ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server );
}
clock_gettime( BEST_CLOCK_SOURCE, &end );
@@ -567,9 +610,6 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink)
mutex_lock( &uplink->rttLock );
const bool isCurrent = ( uplink->current.index == server );
mutex_unlock( &uplink->rttLock );
- // Penaltize rtt if this was a cycle; this will treat this server with lower priority
- // in the near future too, so we prevent alternating between two servers that are both
- // part of a cycle and have the lowest latency.
uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000
+ (end.tv_nsec - start.tv_nsec) / 1000); // µs
uint32_t avg = altservers_updateRtt( uplink, server, rtt );
@@ -614,7 +654,6 @@ failed:
} else {
LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
}
- sock_setTimeout( best.fd, _uplinkTimeout );
mutex_lock( &uplink->rttLock );
uplink->better = best;
uplink->rttTestResult = RTT_DOCHANGE;
@@ -628,10 +667,6 @@ failed:
if ( best.fd != -1 ) {
close( best.fd );
}
- if ( !image->working || uplink->cycleDetected ) {
- image->working = true;
- LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid );
- }
uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;
diff --git a/src/server/altservers.h b/src/server/altservers.h
index 8e29aaa..78f6fcc 100644
--- a/src/server/altservers.h
+++ b/src/server/altservers.h
@@ -19,6 +19,8 @@ int altservers_getListForClient(dnbd3_client_t *client, dnbd3_server_entry_t *ou
int altservers_getHostListForReplication(const char *image, dnbd3_host_t *servers, int size);
+bool altservers_imageHasAltServers(const char *image);
+
bool altservers_toString(int server, char *buffer, size_t len);
int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2);
diff --git a/src/server/fileutil.c b/src/server/fileutil.c
index 336ab68..9a9f066 100644
--- a/src/server/fileutil.c
+++ b/src/server/fileutil.c
@@ -68,7 +68,7 @@ bool file_setSize(int fd, uint64_t size)
// Try really hard... image loading logic relies on the file
// having the proper apparent size
uint8_t byte = 0;
- pread( fd, &byte, 1, size - 1 );
+ (void)!pread( fd, &byte, 1, size - 1 );
if ( pwrite( fd, &byte, 1, size - 1 ) == 1 ) return true;
return false;
}
diff --git a/src/server/fuse.c b/src/server/fuse.c
new file mode 100644
index 0000000..12913a6
--- /dev/null
+++ b/src/server/fuse.c
@@ -0,0 +1,661 @@
+#include "fuse.h"
+#include <dnbd3/types.h>
+#include <dnbd3/shared/log.h>
+
+#ifndef DNBD3_SERVER_FUSE
+
+//
+bool dfuse_init(const char *opts UNUSED, const char *dir UNUSED)
+{
+ logadd( LOG_ERROR, "FUSE: Not compiled in" );
+ return false;
+}
+
+void dfuse_shutdown()
+{
+}
+
+#else
+
+#define PATHLEN (2000)
+static char nullbytes[DNBD3_BLOCK_SIZE];
+
+// FUSE ENABLED
+#define FUSE_USE_VERSION 30
+//
+#include <dnbd3/config.h>
+#include "locks.h"
+#include "threadpool.h"
+#include "image.h"
+#include "uplink.h"
+#include "reference.h"
+#include "helper.h"
+
+#include <fuse_lowlevel.h>
+#include <ctype.h>
+#include <assert.h>
+#include <string.h>
+#include <signal.h>
+
+#define INO_ROOT (1)
+#define INO_CTRL (2)
+#define INO_DIR (3)
+static const char *NAME_CTRL = "control";
+static const char *NAME_DIR = "images";
+
+typedef struct {
+ fuse_req_t req;
+ uint16_t rid;
+ char name[PATHLEN];
+} lookup_t;
+
+static fuse_ino_t inoCounter = 10;
+typedef struct _dfuse_dir {
+ struct _dfuse_dir *next;
+ struct _dfuse_dir *child;
+ const char *name;
+ uint64_t size;
+ fuse_ino_t ino;
+ int refcount;
+ lookup_t *img;
+} dfuse_entry_t;
+
+typedef struct {
+ dfuse_entry_t *entry;
+ dnbd3_image_t *image;
+} cmdopen_t;
+
+static dfuse_entry_t sroot = {
+ .name = "images",
+ .ino = INO_DIR,
+ .refcount = 2,
+}, *root = &sroot;
+static pthread_mutex_t dirLock;
+
+#define INIT_NONE (0)
+#define INIT_DONE (1)
+#define INIT_SHUTDOWN (2)
+#define INIT_INPROGRESS (3)
+
+static struct fuse_session *fuseSession = NULL;
+static struct fuse_chan *fuseChannel = NULL;
+static char *fuseMountPoint = NULL;
+static pthread_t fuseThreadId;
+static bool haveThread = false;
+static _Atomic(int) initState = INIT_NONE;
+static pthread_mutex_t initLock;
+static struct timespec startupTime;
+
+static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name);
+static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino);
+
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer);
+static void cleanupFuse();
+static void* fuseMainLoop(void *data);
+
+static void ll_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
+{
+ fi->fh = 0;
+ if ( ino == INO_CTRL ) {
+ if ( ( fi->flags & 3 ) != O_WRONLY ) {
+ fuse_reply_err( req, EINVAL );
+ } else {
+ fi->nonseekable = 1;
+ fuse_reply_open( req, fi );
+ }
+ } else if ( ino == INO_ROOT ) {
+ fuse_reply_err( req, EISDIR );
+ } else {
+ if ( ( fi->flags & 3 ) != O_RDONLY ) {
+ fuse_reply_err( req, EINVAL );
+ return;
+ }
+ mutex_lock( &dirLock );
+ dfuse_entry_t *entry = inoRecursive( root, ino );
+ if ( entry == NULL ) {
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, ENOENT );
+ } else if ( entry->img == NULL ) {
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, EISDIR );
+ } else if ( entry->img->rid == 0 ) {
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, ENOENT );
+ } else {
+ entry->refcount++;
+ mutex_unlock( &dirLock );
+ dnbd3_image_t *image = image_get( entry->img->name, entry->img->rid, true );
+ if ( image == NULL ) {
+ fuse_reply_err( req, ENOENT );
+ mutex_lock( &dirLock );
+ entry->refcount--;
+ mutex_unlock( &dirLock );
+ } else {
+ cmdopen_t *handle = malloc( sizeof(cmdopen_t) );
+ handle->entry = entry;
+ handle->image = image;
+ fi->fh = (uintptr_t)handle;
+ fi->keep_cache = 1;
+ fuse_reply_open( req, fi );
+ }
+ }
+ }
+}
+
+static dfuse_entry_t* addImage(dfuse_entry_t **dir, const char *name, lookup_t *img)
+{
+ const char *slash = strchr( name, '/' );
+ if ( slash == NULL ) {
+ // Name portion at the end
+ char *path = NULL;
+ if ( asprintf( &path, "%s:%d", name, (int)img->rid ) == -1 )
+ abort();
+ dfuse_entry_t *entry = dirLookup( *dir, path );
+ if ( entry == NULL ) {
+ entry = calloc( 1, sizeof( *entry ) );
+ entry->next = *dir;
+ *dir = entry;
+ entry->name = path;
+ entry->ino = inoCounter++;
+ entry->img = img;
+ } else {
+ free( path );
+ if ( entry->img == NULL ) {
+ return NULL;
+ }
+ }
+ return entry;
+ } else {
+ // Dirname
+ char *path = NULL;
+ if ( asprintf( &path, "%.*s", (int)( slash - name ), name ) == -1 )
+ abort();
+ dfuse_entry_t *entry = dirLookup( *dir, path );
+ if ( entry == NULL ) {
+ entry = calloc( 1, sizeof( *entry ) );
+ entry->next = *dir;
+ *dir = entry;
+ entry->name = path;
+ entry->ino = inoCounter++;
+ } else {
+ free( path );
+ }
+ return addImage( &entry->child, slash + 1, img );
+ }
+}
+
+static void ll_write(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi UNUSED)
+{
+ if ( ino != INO_CTRL ) {
+ fuse_reply_err( req, EROFS );
+ return;
+ }
+ if ( off != 0 ) {
+ fuse_reply_err( req, ESPIPE );
+ return;
+ }
+ if ( size >= PATHLEN ) {
+ fuse_reply_err( req, ENOSPC );
+ return;
+ }
+ size_t colon = 0;
+ int rid = 0;
+ for ( size_t i = 0; i < size; ++i ) {
+ if ( buf[i] == '\0' || buf[i] == '\n' ) {
+ if ( colon == 0 ) {
+ colon = i;
+ }
+ break;
+ }
+ if ( colon != 0 ) {
+ if ( !isdigit( buf[i] ) ) {
+ logadd( LOG_WARNING, "FUSE: Malformed rid" );
+ fuse_reply_err( req, EINVAL );
+ return;
+ }
+ rid = rid * 10 + ( buf[i] - '0' ); // Can overflow but who cares
+ } else if ( buf[i] == ':' ) {
+ colon = i; // Image name starting with ':' would be broken...
+ }
+ }
+ if ( rid < 0 || rid > 65535 ) {
+ logadd( LOG_WARNING, "FUSE: Invalid rid '%d'", rid );
+ fuse_reply_err( req, EINVAL );
+ return;
+ }
+ if ( colon == 0 ) {
+ colon = size;
+ }
+ lookup_t *lu = malloc( sizeof(lookup_t) );
+ lu->rid = (uint16_t)rid;
+ lu->req = req;
+ if ( snprintf( lu->name, PATHLEN, "%.*s", (int)colon, buf ) == -1 ) {
+ free( lu );
+ fuse_reply_err( req, ENOSPC );
+ return;
+ }
+ logadd( LOG_DEBUG1, "FUSE: Request for '%s:%d'", lu->name, (int)lu->rid );
+ dnbd3_image_t *image = image_getOrLoad( lu->name, lu->rid );
+ if ( image == NULL ) {
+ fuse_reply_err( lu->req, ENOENT );
+ free( lu );
+ } else {
+ mutex_lock( &dirLock );
+ dfuse_entry_t *entry = addImage( &root->child, lu->name, lu );
+ if ( entry != NULL ) {
+ entry->size = image->virtualFilesize;
+ }
+ lu->rid = image->rid; // In case it was 0
+ mutex_unlock( &dirLock );
+ image_release( image );
+ if ( entry == NULL ) {
+ fuse_reply_err( lu->req, EINVAL );
+ free( lu );
+ } else {
+ fuse_reply_write( lu->req, size );
+ }
+ }
+}
+
+static void ll_read( fuse_req_t req, fuse_ino_t ino UNUSED, size_t size, off_t off, struct fuse_file_info *fi )
+{
+ if ( fi->fh == 0 ) {
+ fuse_reply_err( req, 0 );
+ return;
+ }
+ cmdopen_t *handle = (cmdopen_t*)fi->fh;
+ dnbd3_image_t *image = handle->image;
+ if ( off < 0 || (uint64_t)off >= image->virtualFilesize ) {
+ fuse_reply_err( req, 0 );
+ return;
+ }
+ if ( off + size > image->virtualFilesize ) {
+ size = image->virtualFilesize - off;
+ }
+
+ // Check if cached locally
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache != NULL ) {
+ // This is a proxyed image, check if we need to relay the request...
+ const uint64_t start = (uint64_t)off & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint64_t end = (off + size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ if ( !image_isRangeCachedUnsafe( cache, start, end ) ) {
+ ref_put( &cache->reference );
+ if ( size > (uint32_t)_maxPayload ) {
+ size = (uint32_t)_maxPayload;
+ }
+ if ( !uplink_request( image, req, &uplinkCallback, 0, off, (uint32_t)size ) ) {
+ logadd( LOG_DEBUG1, "FUSE: Could not relay uncached request to upstream proxy for image %s:%d",
+ image->name, image->rid );
+ fuse_reply_err( req, EIO );
+ }
+ return; // ASYNC
+ }
+ ref_put( &cache->reference );
+ }
+
+ // Is cached
+ size_t readSize = size;
+ if ( off + readSize > image->realFilesize ) {
+ if ( (uint64_t)off >= image->realFilesize ) {
+ readSize = 0;
+ } else {
+ readSize = image->realFilesize - off;
+ }
+ }
+ struct fuse_bufvec *vec = calloc( 1, sizeof(*vec) + sizeof(struct fuse_buf) );
+ if ( readSize != 0 ) {
+ // Real data from file
+ vec->buf[vec->count++] = (struct fuse_buf){
+ .size = readSize,
+ .flags = FUSE_BUF_IS_FD | FUSE_BUF_FD_RETRY | FUSE_BUF_FD_SEEK,
+ .fd = image->readFd,
+ .pos = off,
+ };
+ }
+ if ( readSize != size ) {
+ vec->buf[vec->count++] = (struct fuse_buf){
+ .size = size - readSize,
+ .mem = nullbytes,
+ .fd = -1,
+ };
+ }
+ fuse_reply_data( req, vec, FUSE_BUF_SPLICE_MOVE );
+ free( vec );
+}
+
+static bool statInternal(fuse_ino_t ino, struct stat *stbuf)
+{
+ switch ( ino ) {
+ case INO_ROOT:
+ case INO_DIR:
+ stbuf->st_mode = S_IFDIR | 0555;
+ stbuf->st_nlink = 2;
+ stbuf->st_mtim = startupTime;
+ break;
+ case INO_CTRL:
+ stbuf->st_mode = S_IFREG | 0222;
+ stbuf->st_nlink = 1;
+ stbuf->st_size = 0;
+ clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim );
+ break;
+ default:
+ return false;
+ }
+ stbuf->st_ctim = stbuf->st_atim = startupTime;
+ stbuf->st_uid = 0;
+ stbuf->st_ino = ino;
+ return true;
+}
+
+/**
+ * HOLD LOCK
+ */
+static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name)
+{
+ if ( dir == NULL )
+ return NULL;
+ for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) {
+ if ( strcmp( it->name, name ) == 0 )
+ return it;
+ }
+ return NULL;
+}
+
+static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino)
+{
+ for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) {
+ logadd( LOG_DEBUG1, "ino %d is %s", (int)it->ino, it->name );
+ if ( it->ino == ino )
+ return it;
+ if ( it->img == NULL ) {
+ dir = inoRecursive( it->child, ino );
+ if ( dir != NULL )
+ return dir;
+ }
+ }
+ return NULL;
+}
+
+/**
+ * HOLD LOCK
+ */
+static void entryToStat(dfuse_entry_t *entry, struct stat *stbuf)
+{
+ if ( entry->img == NULL ) {
+ stbuf->st_mode = S_IFDIR | 0555;
+ stbuf->st_nlink = 2;
+ } else {
+ stbuf->st_mode = S_IFREG | 0444;
+ stbuf->st_nlink = 1;
+ stbuf->st_size = entry->size;
+ }
+ stbuf->st_ino = entry->ino;
+ stbuf->st_uid = 0;
+ stbuf->st_ctim = stbuf->st_atim = stbuf->st_mtim = startupTime;
+}
+
+static void ll_lookup(fuse_req_t req, fuse_ino_t parent, const char *name)
+{
+ logadd( LOG_DEBUG2, "Lookup at ino %d for '%s'", (int)parent, name );
+ if ( parent == INO_ROOT ) {
+ struct fuse_entry_param e = { 0 };
+ if ( strcmp( name, NAME_DIR ) == 0 ) {
+ e.ino = INO_DIR;
+ } else if ( strcmp( name, NAME_CTRL ) == 0 ) {
+ e.ino = INO_CTRL;
+ e.attr_timeout = e.entry_timeout = 3600;
+ }
+ if ( e.ino != 0 && statInternal( e.ino, &e.attr ) ) {
+ fuse_reply_entry( req, &e );
+ return;
+ }
+ } else {
+ mutex_lock( &dirLock );
+ dfuse_entry_t *dir = inoRecursive( root, parent );
+ if ( dir != NULL ) {
+ if ( dir->img != NULL ) {
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, ENOTDIR );
+ return;
+ }
+ dfuse_entry_t *entry = dirLookup( dir->child, name );
+ if ( entry != NULL ) {
+ struct fuse_entry_param e = { .ino = entry->ino };
+ entryToStat( entry, &e.attr );
+ mutex_unlock( &dirLock );
+ fuse_reply_entry( req, &e );
+ return;
+ }
+ }
+ mutex_unlock( &dirLock );
+ }
+ fuse_reply_err( req, ENOENT );
+}
+
+struct dirbuf {
+ char *p;
+ size_t size;
+};
+
+static void dirbuf_add( fuse_req_t req, struct dirbuf *b, const char *name, fuse_ino_t ino )
+{
+ struct stat stbuf = { .st_ino = ino };
+ size_t oldsize = b->size;
+ b->size += fuse_add_direntry( req, NULL, 0, name, NULL, 0 );
+ b->p = ( char * ) realloc( b->p, b->size );
+ fuse_add_direntry( req, b->p + oldsize, b->size - oldsize, name, &stbuf, b->size );
+ return;
+}
+
+static int reply_buf_limited( fuse_req_t req, const char *buf, size_t bufsize, off_t off, size_t maxsize )
+{
+ if ( off >= 0 && off < (off_t)bufsize ) {
+ return fuse_reply_buf( req, buf + off, MIN( bufsize - off, maxsize ) );
+ }
+ return fuse_reply_buf( req, NULL, 0 );
+}
+
+static void ll_readdir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info *fi UNUSED)
+{
+ if ( ino != INO_ROOT ) {
+ fuse_reply_err( req, EACCES );
+ } else {
+ struct dirbuf b;
+ memset( &b, 0, sizeof( b ) );
+ dirbuf_add( req, &b, ".", INO_ROOT );
+ dirbuf_add( req, &b, "..", INO_ROOT );
+ dirbuf_add( req, &b, NAME_CTRL, INO_CTRL );
+ dirbuf_add( req, &b, NAME_DIR, INO_DIR );
+ reply_buf_limited( req, b.p, b.size, off, size );
+ free( b.p );
+ }
+}
+
+static void ll_getattr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi UNUSED)
+{
+ struct stat stbuf = { .st_ino = 0 };
+ if ( !statInternal( ino, &stbuf ) ) {
+ mutex_lock( &dirLock );
+ dfuse_entry_t *entry = inoRecursive( root, ino );
+ if ( entry != NULL ) {
+ entryToStat( entry, &stbuf );
+ }
+ mutex_unlock( &dirLock );
+ }
+ if ( stbuf.st_ino == 0 ) {
+ fuse_reply_err( req, ENOENT );
+ } else {
+ fuse_reply_attr( req, &stbuf, 0 );
+ }
+}
+
+void ll_setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr UNUSED, int to_set UNUSED, struct fuse_file_info *fi)
+{
+ ll_getattr( req, ino, fi );
+}
+
+void ll_release(fuse_req_t req, fuse_ino_t ino UNUSED, struct fuse_file_info *fi)
+{
+ if ( fi->fh != 0 ) {
+ cmdopen_t *handle = (cmdopen_t*)fi->fh;
+ image_release( handle->image );
+ mutex_lock( &dirLock );
+ handle->entry->refcount--;
+ mutex_unlock( &dirLock );
+ free( handle );
+ }
+ fuse_reply_err( req, 0 );
+}
+
+static void uplinkCallback(void *data, uint64_t handle UNUSED, uint64_t start UNUSED, uint32_t length, const char *buffer)
+{
+ fuse_req_t req = (fuse_req_t)data;
+ if ( buffer == NULL ) {
+ fuse_reply_err( req, EIO );
+ } else {
+ fuse_reply_buf( req, buffer, length );
+ }
+}
+
+#define DUMP(key,type) logadd( LOG_DEBUG1, "FUSE: " #key ": " type, conn->key )
+void ll_init(void *userdata, struct fuse_conn_info *conn)
+{
+ DUMP( capable, "%u" );
+ DUMP( congestion_threshold, "%u" );
+ DUMP( max_background, "%u" );
+ //DUMP( max_read, "%u" );
+ DUMP( max_readahead, "%u" );
+ DUMP( max_write, "%u" );
+ DUMP( want, "%u" );
+ conn->want |= FUSE_CAP_SPLICE_READ | FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE;
+}
+#undef DUMP
+
+/* map the implemented fuse operations */
+static struct fuse_lowlevel_ops fuseOps = {
+ .lookup = ll_lookup,
+ .getattr = ll_getattr,
+ .setattr = ll_setattr,
+ .readdir = ll_readdir,
+ .open = ll_open,
+ .release = ll_release,
+ .read = ll_read,
+ .write = ll_write,
+ .init = ll_init,
+ //.destroy = ll_destroy,
+};
+
+bool dfuse_init(const char *opts, const char *dir)
+{
+ int ex = INIT_NONE;
+ if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_INPROGRESS ) ) {
+ logadd( LOG_ERROR, "Calling dfuse_init twice" );
+ exit( 1 );
+ }
+ mutex_init( &initLock, LOCK_FUSE_INIT );
+ mutex_lock( &initLock );
+ mutex_init( &dirLock, LOCK_FUSE_DIR );
+ clock_gettime( CLOCK_REALTIME, &startupTime );
+ struct fuse_args args = FUSE_ARGS_INIT( 0, NULL );
+ fuse_opt_add_arg( &args, "dnbd3fs" ); // argv[0]
+ if ( opts != NULL ) {
+ fuse_opt_add_arg( &args, opts );
+ }
+ fuse_opt_add_arg( &args, "-odefault_permissions" );
+ fuse_opt_add_arg( &args, dir ); // last param is mount point
+ //
+ if ( fuse_parse_cmdline( &args, &fuseMountPoint, NULL, NULL ) == -1 ) {
+ logadd( LOG_ERROR, "FUSE: Error parsing command line" );
+ goto fail;
+ }
+ fuseChannel = fuse_mount( fuseMountPoint, &args );
+ if ( fuseChannel == NULL ) {
+ logadd( LOG_ERROR, "FUSE: Cannot mount to %s", dir );
+ goto fail;
+ }
+ fuseSession = fuse_lowlevel_new( &args, &fuseOps, sizeof( fuseOps ), NULL );
+ if ( fuseSession == NULL ) {
+ logadd( LOG_ERROR, "FUSE: Error initializing fuse session" );
+ goto fail;
+ }
+ fuse_session_add_chan( fuseSession, fuseChannel );
+ if ( 0 != thread_create( &fuseThreadId, NULL, &fuseMainLoop, (void *)NULL ) ) {
+ logadd( LOG_ERROR, "FUSE: Could not start thread" );
+ goto fail;
+ }
+ haveThread = true;
+ // Init OK
+ mutex_unlock( &initLock );
+ return true;
+fail:
+ cleanupFuse();
+ fuse_opt_free_args( &args );
+ initState = INIT_SHUTDOWN;
+ mutex_unlock( &initLock );
+ return false;
+}
+
+void dfuse_shutdown()
+{
+ if ( initState == INIT_NONE )
+ return;
+ for ( ;; ) {
+ int ex = INIT_DONE;
+ if ( atomic_compare_exchange_strong( &initState, &ex, INIT_SHUTDOWN ) )
+ break; // OK, do the shutdown
+ if ( ex == INIT_INPROGRESS )
+ continue; // dfuse_init in progress, wait for mutex
+ // Wrong state
+ logadd( LOG_WARNING, "Called dfuse_shutdown without dfuse_init first" );
+ return;
+ }
+ logadd( LOG_INFO, "Shutting down fuse mainloop..." );
+ mutex_lock( &initLock );
+ if ( fuseSession != NULL ) {
+ fuse_session_exit( fuseSession );
+ }
+ if ( !haveThread ) {
+ cleanupFuse();
+ }
+ mutex_unlock( &initLock );
+ if ( haveThread ) {
+ logadd( LOG_DEBUG1, "FUSE: Sending USR1 to mainloop thread" );
+ pthread_kill( fuseThreadId, SIGUSR1 );
+ pthread_join( fuseThreadId, NULL );
+ }
+}
+
+static void* fuseMainLoop(void *data UNUSED)
+{
+ int ex = INIT_INPROGRESS;
+ if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_DONE ) ) {
+ logadd( LOG_WARNING, "FUSE: Unexpected state in fuseMainLoop: %d", ex );
+ return NULL;
+ }
+ setThreadName( "fuse" );
+ logadd( LOG_INFO, "FUSE: Starting mainloop" );
+ fuse_session_loop_mt( fuseSession );
+ logadd( LOG_INFO, "FUSE: Left mainloop" );
+ mutex_lock( &initLock );
+ cleanupFuse();
+ mutex_unlock( &initLock );
+ return NULL;
+}
+
+static void cleanupFuse()
+{
+ if ( fuseChannel != NULL ) {
+ fuse_session_remove_chan( fuseChannel );
+ }
+ if ( fuseSession != NULL ) {
+ fuse_session_destroy( fuseSession );
+ fuseSession = NULL;
+ }
+ if ( fuseMountPoint != NULL && fuseChannel != NULL ) {
+ fuse_unmount( fuseMountPoint, fuseChannel );
+ }
+ fuseChannel = NULL;
+}
+
+#endif // DNBD3_SERVER_FUSE
diff --git a/src/server/fuse.h b/src/server/fuse.h
new file mode 100644
index 0000000..f01ad58
--- /dev/null
+++ b/src/server/fuse.h
@@ -0,0 +1,10 @@
+#ifndef _FUSE_H_
+#define _FUSE_H_
+
+#include <stdbool.h>
+
+bool dfuse_init(const char *opts, const char *dir);
+
+void dfuse_shutdown();
+
+#endif
diff --git a/src/server/globals.c b/src/server/globals.c
index f8c3f66..f6432cb 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -1,7 +1,7 @@
#include "globals.h"
#include "ini.h"
#include "locks.h"
-#include "../shared/log.h"
+#include <dnbd3/shared/log.h>
#include <string.h>
#include <stdlib.h>
#include <inttypes.h>
@@ -19,22 +19,26 @@ atomic_int _clientPenalty = 0;
atomic_bool _isProxy = false;
atomic_int _backgroundReplication = BGR_FULL;
atomic_int _bgrMinClients = 0;
+atomic_int _bgrWindowSize = 1;
atomic_bool _lookupMissingForProxy = true;
atomic_bool _sparseFiles = false;
+atomic_bool _ignoreAllocErrors = false;
atomic_bool _removeMissingImages = true;
-atomic_int _uplinkTimeout = SOCKET_TIMEOUT_UPLINK;
-atomic_int _clientTimeout = SOCKET_TIMEOUT_CLIENT;
+atomic_uint _uplinkTimeout = SOCKET_TIMEOUT_UPLINK;
+atomic_uint _clientTimeout = SOCKET_TIMEOUT_CLIENT;
atomic_bool _closeUnusedFd = false;
atomic_bool _vmdkLegacyMode = false;
// Not really needed anymore since we have '+' and '-' in alt-servers
atomic_bool _proxyPrivateOnly = false;
+atomic_bool _pretendClient = false;
atomic_int _autoFreeDiskSpaceDelay = 3600 * 10;
// [limits]
atomic_int _maxClients = SERVER_MAX_CLIENTS;
atomic_int _maxImages = SERVER_MAX_IMAGES;
-atomic_int _maxPayload = 9000000; // 9MB
+atomic_uint _maxPayload = 9000000; // 9MB
atomic_uint_fast64_t _maxReplicationSize = (uint64_t)100000000000LL;
-atomic_bool _pretendClient = false;
+atomic_uint _maxPrefetch = 262144; // 256KB
+atomic_uint _minRequestSize = 0;
/**
* True when loading config the first time. Consecutive loads will
@@ -58,31 +62,35 @@ static const char* units = "KMGTPEZY";
static bool parse64(const char *in, atomic_int_fast64_t *out, const char *optname);
static bool parse64u(const char *in, atomic_uint_fast64_t *out, const char *optname);
-static bool parse32(const char *in, atomic_int *out, const char *optname) UNUSED;
-static bool parse32u(const char *in, atomic_int *out, const char *optname);
+static bool parse32(const char *in, atomic_int *out, const char *optname);
+static bool parse32u(const char *in, atomic_uint *out, const char *optname);
static int ini_handler(void *custom UNUSED, const char* section, const char* key, const char* value)
{
if ( initialLoad ) {
if ( _basePath == NULL ) SAVE_TO_VAR_STR( dnbd3, basePath );
SAVE_TO_VAR_BOOL( dnbd3, vmdkLegacyMode );
- SAVE_TO_VAR_UINT( dnbd3, listenPort );
- SAVE_TO_VAR_UINT( limits, maxClients );
- SAVE_TO_VAR_UINT( limits, maxImages );
+ SAVE_TO_VAR_INT( dnbd3, listenPort );
+ SAVE_TO_VAR_INT( limits, maxClients );
+ SAVE_TO_VAR_INT( limits, maxImages );
}
SAVE_TO_VAR_BOOL( dnbd3, isProxy );
SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly );
SAVE_TO_VAR_INT( dnbd3, bgrMinClients );
+ SAVE_TO_VAR_INT( dnbd3, bgrWindowSize );
SAVE_TO_VAR_BOOL( dnbd3, lookupMissingForProxy );
SAVE_TO_VAR_BOOL( dnbd3, sparseFiles );
+ SAVE_TO_VAR_BOOL( dnbd3, ignoreAllocErrors );
SAVE_TO_VAR_BOOL( dnbd3, removeMissingImages );
SAVE_TO_VAR_BOOL( dnbd3, closeUnusedFd );
- SAVE_TO_VAR_UINT( dnbd3, serverPenalty );
- SAVE_TO_VAR_UINT( dnbd3, clientPenalty );
+ SAVE_TO_VAR_INT( dnbd3, serverPenalty );
+ SAVE_TO_VAR_INT( dnbd3, clientPenalty );
SAVE_TO_VAR_UINT( dnbd3, uplinkTimeout );
SAVE_TO_VAR_UINT( dnbd3, clientTimeout );
SAVE_TO_VAR_UINT( limits, maxPayload );
SAVE_TO_VAR_UINT64( limits, maxReplicationSize );
+ SAVE_TO_VAR_UINT( limits, maxPrefetch );
+ SAVE_TO_VAR_UINT( limits, minRequestSize );
SAVE_TO_VAR_BOOL( dnbd3, pretendClient );
SAVE_TO_VAR_INT( dnbd3, autoFreeDiskSpaceDelay );
if ( strcmp( section, "dnbd3" ) == 0 && strcmp( key, "backgroundReplication" ) == 0 ) {
@@ -111,7 +119,10 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key
void globals_loadConfig()
{
char *name = NULL;
- asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME );
+ if ( asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME ) == -1 ) {
+ logadd( LOG_ERROR, "Memory allocation error for config filename" );
+ exit( 1 );
+ }
if ( name == NULL ) return;
if ( initialLoad ) {
mutex_init( &loadLock, LOCK_LOAD_CONFIG );
@@ -125,9 +136,30 @@ void globals_loadConfig()
if ( initialLoad ) {
sanitizeFixedConfig();
}
- if ( _backgroundReplication == BGR_FULL && _sparseFiles && _bgrMinClients < 5 ) {
- logadd( LOG_WARNING, "Ignoring 'sparseFiles=true' since backgroundReplication is set to true and bgrMinClients is too low" );
- _sparseFiles = false;
+ if ( _isProxy ) {
+ if ( _backgroundReplication == BGR_FULL && _sparseFiles && _bgrMinClients < 5 ) {
+ logadd( LOG_WARNING, "Ignoring 'sparseFiles=true' since backgroundReplication is set to true and bgrMinClients is too low" );
+ _sparseFiles = false;
+ }
+ if ( _bgrWindowSize < 1 ) {
+ _bgrWindowSize = 1;
+ } else if ( _bgrWindowSize > UPLINK_MAX_QUEUE - 10 ) {
+ _bgrWindowSize = UPLINK_MAX_QUEUE - 10;
+ logadd( LOG_MINOR, "Limiting bgrWindowSize to %d, because of UPLINK_MAX_QUEUE",
+ _bgrWindowSize );
+ }
+ if ( _maxPayload < 256 * 1024 ) {
+ logadd( LOG_WARNING, "maxPayload was increased to 256k" );
+ _maxPayload = 256 * 1024;
+ }
+ if ( _maxPrefetch > _maxPayload ) {
+ logadd( LOG_WARNING, "Reducing maxPrefetch to maxPayload" );
+ _maxPrefetch = _maxPayload;
+ }
+ if ( _minRequestSize > _maxPayload ) {
+ logadd( LOG_WARNING, "Reducing minRequestSize to maxPayload" );
+ _minRequestSize = _maxPayload;
+ }
}
// Dump config as interpreted
char buffer[2000];
@@ -281,7 +313,7 @@ static bool parse32(const char *in, atomic_int *out, const char *optname)
return true;
}
-static bool parse32u(const char *in, atomic_int *out, const char *optname)
+static bool parse32u(const char *in, atomic_uint *out, const char *optname)
{
atomic_int_fast64_t v;
if ( !parse64( in, &v, optname ) ) return false;
@@ -289,7 +321,7 @@ static bool parse32u(const char *in, atomic_int *out, const char *optname)
logadd( LOG_WARNING, "'%s' must be between %d and %d, but is '%s'", optname, (int)0, (int)INT_MAX, in );
return false;
}
- *out = (int)v;
+ *out = (unsigned int)v;
return true;
}
@@ -320,8 +352,10 @@ size_t globals_dumpConfig(char *buffer, size_t size)
PBOOL(backgroundReplication);
}
PINT(bgrMinClients);
+ PINT(bgrWindowSize);
PBOOL(lookupMissingForProxy);
PBOOL(sparseFiles);
+ PBOOL(ignoreAllocErrors);
PBOOL(removeMissingImages);
PINT(uplinkTimeout);
PINT(clientTimeout);
@@ -335,6 +369,8 @@ size_t globals_dumpConfig(char *buffer, size_t size)
PINT(maxImages);
PINT(maxPayload);
PUINT64(maxReplicationSize);
+ PINT(maxPrefetch);
+ PINT(minRequestSize);
return size - rem;
}
diff --git a/src/server/globals.h b/src/server/globals.h
index df8c595..bde1184 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -1,9 +1,9 @@
#ifndef _GLOBALS_H_
#define _GLOBALS_H_
-#include "../types.h"
-#include "../shared/fdsignal.h"
-#include "../serverconfig.h"
+#include <dnbd3/types.h>
+#include <dnbd3/shared/fdsignal.h>
+#include <dnbd3/config/server.h>
#include <stdint.h>
#include <stdatomic.h>
#include <time.h>
@@ -18,18 +18,30 @@ typedef struct _dnbd3_uplink dnbd3_uplink_t;
typedef struct _dnbd3_image dnbd3_image_t;
typedef struct _dnbd3_client dnbd3_client_t;
-typedef struct
+typedef void (*uplink_callback)(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer);
+
+typedef struct _dnbd3_queue_client
{
- uint64_t handle; // Client defined handle to pass back in reply
- uint64_t from; // First byte offset of requested block (ie. 4096)
- uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191)
- dnbd3_client_t * client; // Client to send reply to
- int status; // status of this entry: ULR_*
-#ifdef _DEBUG
- ticks entered; // When this request entered the queue (for debugging)
+ struct _dnbd3_queue_client *next;
+ void* data; // Passed back to callback
+ uint64_t handle; // Passed back to callback
+ uint64_t from, to; // Client range
+ uplink_callback callback; // Callback function
+} dnbd3_queue_client_t;
+
+typedef struct _dnbd3_queue_entry
+{
+ struct _dnbd3_queue_entry *next;
+ uint64_t handle; // Our handle for this entry
+ uint64_t from; // First byte offset of requested block (ie. 4096)
+ uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191)
+ dnbd3_queue_client_t *clients;
+#ifdef DEBUG
+ ticks entered; // When this request entered the queue (for debugging)
#endif
- uint8_t hopCount; // How many hops this request has already taken across proxies
-} dnbd3_queued_request_t;
+ uint8_t hopCount; // How many hops this request has already taken across proxies
+ bool sent; // Already sent to uplink?
+} dnbd3_queue_entry_t;
typedef struct _ns
{
@@ -91,11 +103,12 @@ struct _dnbd3_uplink
bool cycleDetected; // connection cycle between proxies detected for current remote server
int nextReplicationIndex; // Which index in the cache map we should start looking for incomplete blocks at
// If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block"
- uint64_t replicationHandle; // Handle of pending replication request
atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup.
- atomic_int queueLen; // length of queue
- uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives)
- dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
+ atomic_uint_fast64_t bytesReceivedLastSave; // Number of bytes received when we last saved the cache map
+ int queueLen; // length of queue
+ int idleTime; // How many seconds the uplink was idle (apart from keep-alives)
+ dnbd3_queue_entry_t *queue;
+ atomic_uint_fast32_t queueId;
dnbd3_alt_local_t altData[SERVER_MAX_ALTS];
};
@@ -110,6 +123,8 @@ typedef struct
typedef struct
{
ref reference;
+ atomic_bool dirty; // Cache map has been modified outside uplink (only integrity checker for now)
+ bool unchanged; // How many times in a row a reloaded cache map went unchanged
_Atomic uint8_t map[];
} dnbd3_cache_map_t;
@@ -128,7 +143,6 @@ struct _dnbd3_image
uint64_t virtualFilesize; // virtual size of image (real size rounded up to multiple of 4k)
uint64_t realFilesize; // actual file size on disk
ticks atime; // last access time
- ticks lastWorkCheck; // last time a non-working image has been checked
ticks nextCompletenessEstimate; // next time the completeness estimate should be updated
uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image
uint32_t masterCrc32; // CRC-32 of the crc-32 list
@@ -136,10 +150,18 @@ struct _dnbd3_image
atomic_int completenessEstimate; // Completeness estimate in percent
atomic_int users; // clients currently using this image. XXX Lock on imageListLock when modifying and checking whether the image should be freed. Reading it elsewhere is fine without the lock.
int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server
- atomic_bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected
+ struct {
+ atomic_bool read; // Error reading from file
+ atomic_bool write; // Error writing to file
+ atomic_bool changed; // File disappeared or changed, thorough check required if it seems to be back
+ atomic_bool uplink; // No uplink connected
+ atomic_bool queue; // Too many requests waiting on uplink
+ } problem;
uint16_t rid; // revision of image
+ bool accessed; // image was accessed since .meta was written
pthread_mutex_t lock;
};
+#define PIMG(x) (x)->name, (int)(x)->rid
struct _dnbd3_client
{
@@ -147,6 +169,7 @@ struct _dnbd3_client
atomic_uint_fast64_t bytesSent; // Byte counter for this client.
dnbd3_image_t * _Atomic image; // Image in use by this client, or NULL during handshake
int sock;
+ _Atomic uint8_t relayedCount; // How many requests are in-flight to the uplink server
bool isServer; // true if a server in proxy mode, false if real client
dnbd3_host_t host;
char hostName[HOSTNAMELEN]; // inet_ntop version of host
@@ -206,12 +229,12 @@ extern atomic_bool _removeMissingImages;
/**
* Read timeout when waiting for or sending data on an uplink
*/
-extern atomic_int _uplinkTimeout;
+extern atomic_uint _uplinkTimeout;
/**
* Read timeout when waiting for or sending data from/to client
*/
-extern atomic_int _clientTimeout;
+extern atomic_uint _clientTimeout;
/**
* If true, images with no active client will have their fd closed after some
@@ -234,6 +257,11 @@ extern atomic_int _backgroundReplication;
extern atomic_int _bgrMinClients;
/**
+ * How many in-flight replication requests we should target (per uplink)
+ */
+extern atomic_int _bgrWindowSize;
+
+/**
* (In proxy mode): If connecting client is a proxy, and the requested image
* is not known locally, should we ask our known alt servers for it?
* Otherwise the request is rejected.
@@ -255,6 +283,12 @@ extern atomic_bool _lookupMissingForProxy;
extern atomic_bool _sparseFiles;
/**
+ * If true, don't abort image replication if preallocating
+ * the image fails, but retry with sparse file.
+ */
+extern atomic_bool _ignoreAllocErrors;
+
+/**
* Port to listen on (default: #define PORT (5003))
*/
extern atomic_int _listenPort;
@@ -275,7 +309,7 @@ extern atomic_int _maxImages;
* Usually this isn't even a megabyte for "real" clients (blockdev
* or fuse).
*/
-extern atomic_int _maxPayload;
+extern atomic_uint _maxPayload;
/**
* If in proxy mode, don't replicate images that are
@@ -298,6 +332,21 @@ extern atomic_bool _pretendClient;
extern atomic_int _autoFreeDiskSpaceDelay;
/**
+ * When handling a client request, this sets the maximum amount
+ * of bytes we prefetch offset right at the end of the client request.
+ * The prefetch size will be MIN( length * 3, _maxPrefetch ), if
+ * length <= _maxPrefetch, so effectively, setting this to 0 disables
+ * any prefetching.
+ */
+extern atomic_uint _maxPrefetch;
+
+/**
+ * Use with care. Can severely degrade performance.
+ * Set either 0 or very high.
+ */
+extern atomic_uint _minRequestSize;
+
+/**
* Load the server configuration.
*/
void globals_loadConfig();
diff --git a/src/server/helper.h b/src/server/helper.h
index 102cb36..3e1b661 100644
--- a/src/server/helper.h
+++ b/src/server/helper.h
@@ -2,8 +2,8 @@
#define HELPER_H_
#include "server.h"
-#include "../shared/log.h"
-#include "../types.h"
+#include <dnbd3/shared/log.h>
+#include <dnbd3/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
diff --git a/src/server/image.c b/src/server/image.c
index 16dae45..51fd5b6 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -5,9 +5,9 @@
#include "locks.h"
#include "integrity.h"
#include "altservers.h"
-#include "../shared/protocol.h"
-#include "../shared/timing.h"
-#include "../shared/crc32.h"
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/timing.h>
+#include <dnbd3/shared/crc32.h>
#include "reference.h"
#include <assert.h>
@@ -46,16 +46,21 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image);
static dnbd3_image_t* image_free(dnbd3_image_t *image);
static bool image_load_all_internal(char *base, char *path);
static bool image_addToList(dnbd3_image_t *image);
-static bool image_load(char *base, char *path, int withUplink);
+static bool image_load(char *base, char *path, bool withUplink);
static bool image_clone(int sock, char *name, uint16_t revision, uint64_t imageSize);
static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_t realFilesize, uint32_t *crc);
static bool image_ensureDiskSpace(uint64_t size, bool force);
static dnbd3_cache_map_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize);
static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc);
-static void image_checkRandomBlocks(dnbd3_image_t *image, const int count);
+static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd);
static void* closeUnusedFds(void*);
+static bool isImageFromUpstream(dnbd3_image_t *image);
+static void* saveLoadAllCacheMaps(void*);
+static void saveCacheMap(dnbd3_image_t *image);
static void allocCacheMap(dnbd3_image_t *image, bool complete);
+static void saveMetaData(dnbd3_image_t *image, ticks *now, time_t walltime);
+static void loadImageMeta(dnbd3_image_t *image);
static void cmfree(ref *ref)
{
@@ -73,6 +78,7 @@ void image_serverStartup()
mutex_init( &remoteCloneLock, LOCK_REMOTE_CLONE );
mutex_init( &reloadLock, LOCK_RELOAD );
server_addJob( &closeUnusedFds, NULL, 10, 900 );
+ server_addJob( &saveLoadAllCacheMaps, NULL, 9, 20 );
}
/**
@@ -118,39 +124,35 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
const uint64_t firstByteInMap = start >> 15;
const uint64_t lastByteInMap = (end - 1) >> 15;
uint64_t pos;
- // First byte
- uint8_t fb = 0, lb = 0;
- for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) {
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = (uint8_t)( 1 << map_x );
- fb |= bit_mask;
- }
- // Last byte
- if ( lastByteInMap != firstByteInMap ) {
- for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) {
- assert( lastByteInMap == (pos >> 15) );
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = (uint8_t)( 1 << map_x );
- lb |= bit_mask;
- }
- }
- atomic_thread_fence( memory_order_acquire );
- if ( set ) {
- uint8_t fo = atomic_fetch_or_explicit( &cache->map[firstByteInMap], fb, memory_order_relaxed );
- uint8_t lo = atomic_fetch_or_explicit( &cache->map[lastByteInMap], lb, memory_order_relaxed );
- setNewBlocks = ( fo != cache->map[firstByteInMap] || lo != cache->map[lastByteInMap] );
+ // First and last byte masks
+ const uint8_t fb = (uint8_t)(0xff << ((start >> 12) & 7));
+ const uint8_t lb = (uint8_t)(~(0xff << ((((end - 1) >> 12) & 7) + 1)));
+ if ( firstByteInMap == lastByteInMap ) {
+ if ( set ) {
+ uint8_t o = atomic_fetch_or( &cache->map[firstByteInMap], (uint8_t)(fb & lb) );
+ setNewBlocks = o != ( o | (fb & lb) );
+ } else {
+ atomic_fetch_and( &cache->map[firstByteInMap], (uint8_t)~(fb & lb) );
+ }
} else {
- atomic_fetch_and_explicit( &cache->map[firstByteInMap], (uint8_t)~fb, memory_order_relaxed );
- atomic_fetch_and_explicit( &cache->map[lastByteInMap], (uint8_t)~lb, memory_order_relaxed );
- }
- const uint8_t nval = set ? 0xff : 0;
- // Everything in between
- for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) {
- if ( atomic_exchange_explicit( &cache->map[pos], nval, memory_order_relaxed ) != nval && set ) {
- setNewBlocks = true;
+ atomic_thread_fence( memory_order_acquire );
+ if ( set ) {
+ uint8_t fo = atomic_fetch_or_explicit( &cache->map[firstByteInMap], fb, memory_order_relaxed );
+ uint8_t lo = atomic_fetch_or_explicit( &cache->map[lastByteInMap], lb, memory_order_relaxed );
+ setNewBlocks = ( fo != ( fo | fb ) || lo != ( lo | lb ) );
+ } else {
+ atomic_fetch_and_explicit( &cache->map[firstByteInMap], (uint8_t)~fb, memory_order_relaxed );
+ atomic_fetch_and_explicit( &cache->map[lastByteInMap], (uint8_t)~lb, memory_order_relaxed );
+ }
+ // Everything in between
+ const uint8_t nval = set ? 0xff : 0;
+ for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) {
+ if ( atomic_exchange_explicit( &cache->map[pos], nval, memory_order_relaxed ) != nval && set ) {
+ setNewBlocks = true;
+ }
}
+ atomic_thread_fence( memory_order_release );
}
- atomic_thread_fence( memory_order_release );
if ( setNewBlocks && image->crc32 != NULL ) {
// If setNewBlocks is set, at least one of the blocks was not cached before, so queue all hash blocks
// for checking, even though this might lead to checking some hash block again, if it was
@@ -164,6 +166,8 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
integrity_check( image, block, false );
}
}
+ } else if ( !set ) {
+ cache->dirty = true;
}
ref_put( &cache->reference );
}
@@ -239,35 +243,74 @@ bool image_isComplete(dnbd3_image_t *image)
*/
bool image_ensureOpen(dnbd3_image_t *image)
{
- if ( image->readFd != -1 ) return image;
- int newFd = open( image->path, O_RDONLY );
+ bool sizeChanged = false;
+ if ( image->readFd != -1 && !image->problem.changed )
+ return true;
+ int newFd = image->readFd == -1 ? open( image->path, O_RDONLY ) : dup( image->readFd );
if ( newFd == -1 ) {
- logadd( LOG_WARNING, "Cannot open %s for reading", image->path );
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "[access] Cannot open '%s' for reading (errno=%d)", image->path, errno );
+ image->problem.read = true;
+ }
} else {
- // Check size
+ // Check size + read access
+ char buffer[100];
const off_t flen = lseek( newFd, 0, SEEK_END );
if ( flen == -1 ) {
- logadd( LOG_WARNING, "Could not seek to end of %s (errno %d)", image->path, errno );
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Could not seek to end of %s (errno=%d)", image->path, errno );
+ image->problem.read = true;
+ }
close( newFd );
newFd = -1;
} else if ( (uint64_t)flen != image->realFilesize ) {
- logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64, image->realFilesize, (uint64_t)flen );
+ if ( !image->problem.changed ) {
+ logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64,
+ image->realFilesize, (uint64_t)flen );
+ }
+ sizeChanged = true;
+ } else if ( pread( newFd, buffer, sizeof(buffer), 0 ) == -1 ) {
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)",
+ (int)sizeof(buffer), image->path, errno );
+ image->problem.read = true;
+ }
close( newFd );
newFd = -1;
}
}
if ( newFd == -1 ) {
- mutex_lock( &image->lock );
- image->working = false;
- mutex_unlock( &image->lock );
+ if ( sizeChanged ) {
+ image->problem.changed = true;
+ }
return false;
}
+
+ // Re-opened. Check if the "size/content changed" flag was set before and if so, check crc32,
+ // but only if the size we just got above is correct.
+ if ( image->problem.changed && !sizeChanged ) {
+ if ( image->crc32 == NULL ) {
+ // Cannot verify further, hope for the best
+ image->problem.changed = false;
+ logadd( LOG_DEBUG1, "Size of image %s:%d changed back to expected value", PIMG(image) );
+ } else if ( image_checkRandomBlocks( image, 1, newFd ) ) {
+ // This should have checked the first block (if complete) -> All is well again
+ image->problem.changed = false;
+ logadd( LOG_DEBUG1, "Size and CRC of image %s:%d changed back to expected value", PIMG(image) );
+ }
+ } else {
+ image->problem.changed = sizeChanged;
+ }
+
mutex_lock( &image->lock );
if ( image->readFd == -1 ) {
image->readFd = newFd;
+ image->problem.read = false;
mutex_unlock( &image->lock );
} else {
- // There was a race while opening the file (happens cause not locked cause blocking), we lost the race so close new fd and proceed
+ // There was a race while opening the file (happens cause not locked cause blocking),
+ // we lost the race so close new fd and proceed.
+ // *OR* we dup()'ed above for cheating when the image changed before.
mutex_unlock( &image->lock );
close( newFd );
}
@@ -296,10 +339,9 @@ dnbd3_image_t* image_byId(int imgId)
* point...
* Locks on: imageListLock, _images[].lock
*/
-dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
+dnbd3_image_t* image_get(const char *name, uint16_t revision, bool ensureFdOpen)
{
int i;
- const char *removingText = _removeMissingImages ? ", removing from list" : "";
dnbd3_image_t *candidate = NULL;
// Simple sanity check
const size_t slen = strlen( name );
@@ -326,84 +368,36 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
candidate->users++;
mutex_unlock( &imageListLock );
- // Found, see if it works
- // TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list
- // TODO: But remember size-changed images forever
- if ( candidate->working || checkIfWorking ) {
- // Is marked working, but might not have an fd open
- if ( !image_ensureOpen( candidate ) ) {
- mutex_lock( &candidate->lock );
- timing_get( &candidate->lastWorkCheck );
- mutex_unlock( &candidate->lock );
- if ( _removeMissingImages ) {
- candidate = image_remove( candidate ); // No release here, the image is still returned and should be released by caller
- }
- return candidate;
- }
- }
-
- if ( !checkIfWorking ) return candidate; // Not interested in re-cechking working state
-
- // ...not working...
-
- // Don't re-check too often
- mutex_lock( &candidate->lock );
- bool check;
- declare_now;
- check = timing_diff( &candidate->lastWorkCheck, &now ) > NONWORKING_RECHECK_INTERVAL_SECONDS;
- if ( check ) {
- candidate->lastWorkCheck = now;
- }
- mutex_unlock( &candidate->lock );
- if ( !check ) {
+ if ( !ensureFdOpen ) // Don't want to re-check
return candidate;
- }
- // reaching this point means:
- // 1) We should check if the image is working, it might or might not be in working state right now
- // 2) The image is open for reading (or at least was at some point, the fd might be stale if images lie on an NFS share etc.)
- // 3) We made sure not to re-check this image too often
-
- // Common for ro and rw images: Size check, read check
- const off_t len = lseek( candidate->readFd, 0, SEEK_END );
- bool reload = false;
- if ( len == -1 ) {
- logadd( LOG_WARNING, "lseek() on %s failed (errno=%d)%s.", candidate->path, errno, removingText );
- reload = true;
- } else if ( (uint64_t)len != candidate->realFilesize ) {
- logadd( LOG_WARNING, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64
- ". Try sending SIGHUP to server if you know what you're doing.",
- candidate->path, candidate->realFilesize, (uint64_t)len );
- } else {
- // Seek worked, file size is same, now see if we can read from file
- char buffer[100];
- if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) {
- logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)%s.",
- (int)sizeof(buffer), candidate->path, errno, removingText );
- reload = true;
- } else if ( !candidate->working ) {
- // Seems everything is fine again \o/
- candidate->working = true;
- logadd( LOG_INFO, "Changed state of %s:%d to 'working'", candidate->name, candidate->rid );
- }
- }
+ if ( image_ensureOpen( candidate ) && !candidate->problem.read )
+ return candidate; // We have a read fd and no read or changed problems
- if ( reload ) {
+ // -- image could not be opened again, or is open but has problem --
+
+ if ( _removeMissingImages && !file_isReadable( candidate->path ) ) {
+ candidate = image_remove( candidate );
+ // No image_release here, the image is still returned and should be released by caller
+ } else if ( candidate->readFd != -1 ) {
+ // We cannot just close the fd as it might be in use. Make a copy and remove old entry.
+ candidate = image_remove( candidate );
// Could not access the image with exising fd - mark for reload which will re-open the file.
// make a copy of the image struct but keep the old one around. If/When it's not being used
// anymore, it will be freed automatically.
- logadd( LOG_DEBUG1, "Reloading image file %s", candidate->path );
+ logadd( LOG_DEBUG1, "Reloading image file %s because of read problem/changed", candidate->path );
dnbd3_image_t *img = calloc( sizeof(dnbd3_image_t), 1 );
img->path = strdup( candidate->path );
img->name = strdup( candidate->name );
img->virtualFilesize = candidate->virtualFilesize;
img->realFilesize = candidate->realFilesize;
- img->atime = now;
+ timing_get( &img->atime );
img->masterCrc32 = candidate->masterCrc32;
img->readFd = -1;
img->rid = candidate->rid;
img->users = 1;
- img->working = false;
+ img->problem.read = true;
+ img->problem.changed = candidate->problem.changed;
img->ref_cacheMap = NULL;
mutex_init( &img->lock, LOCK_IMAGE );
if ( candidate->crc32 != NULL ) {
@@ -419,18 +413,17 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
if ( image_addToList( img ) ) {
image_release( candidate );
candidate = img;
+ // Check if image is incomplete, initialize uplink
+ if ( candidate->ref_cacheMap != NULL ) {
+ uplink_init( candidate, -1, NULL, -1 );
+ }
+ // Try again with new instance
+ image_ensureOpen( candidate );
} else {
img->users = 0;
image_free( img );
}
- // Check if image is incomplete, initialize uplink
- if ( candidate->ref_cacheMap != NULL ) {
- uplink_init( candidate, -1, NULL, -1 );
- }
- // readFd == -1 and working == FALSE at this point,
- // this function needs some splitting up for handling as we need to run most
- // of the above code again. for now we know that the next call for this
- // name:rid will get ne newly inserted "img" and try to re-open the file.
+ // readFd == -1 and problem.read == true
}
return candidate; // We did all we can, hopefully it's working
@@ -449,6 +442,7 @@ dnbd3_image_t* image_lock(dnbd3_image_t *image)
mutex_lock( &imageListLock );
for (i = 0; i < _num_images; ++i) {
if ( _images[i] == image ) {
+ assert( _images[i]->id == image->id );
image->users++;
mutex_unlock( &imageListLock );
return image;
@@ -479,6 +473,7 @@ dnbd3_image_t* image_release(dnbd3_image_t *image)
// responsible for freeing it
for (int i = 0; i < _num_images; ++i) {
if ( _images[i] == image ) { // Found, do nothing
+ assert( _images[i]->id == image->id );
mutex_unlock( &imageListLock );
return NULL;
}
@@ -518,6 +513,7 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image)
mutex_lock( &imageListLock );
for ( int i = _num_images - 1; i >= 0; --i ) {
if ( _images[i] == image ) {
+ assert( _images[i]->id == image->id );
_images[i] = NULL;
mustFree = ( image->users == 0 );
}
@@ -630,12 +626,18 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image)
{
assert( image != NULL );
assert( image->users == 0 );
- logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", image->name, (int)image->rid );
+ logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", PIMG(image) );
// uplink_shutdown might return false to tell us
// that the shutdown is in progress. Bail out since
// this will get called again when the uplink is done.
if ( !uplink_shutdown( image ) )
return NULL;
+ if ( isImageFromUpstream( image ) ) {
+ saveMetaData( image, NULL, 0 );
+ if ( image->ref_cacheMap != NULL ) {
+ saveCacheMap( image );
+ }
+ }
mutex_lock( &image->lock );
ref_setref( &image->ref_cacheMap, NULL );
free( image->crc32 );
@@ -700,7 +702,8 @@ static bool image_load_all_internal(char *base, char *path)
while ( !_shutdown && (entryPtr = readdir( dir )) != NULL ) {
entry = *entryPtr;
- if ( strcmp( entry.d_name, "." ) == 0 || strcmp( entry.d_name, ".." ) == 0 ) continue;
+ if ( entry.d_name[0] == '.' )
+ continue; // No hidden files, no . or ..
if ( strlen( entry.d_name ) > SUBDIR_LEN ) {
logadd( LOG_WARNING, "Skipping entry %s: Too long (max %d bytes)", entry.d_name, (int)SUBDIR_LEN );
continue;
@@ -717,7 +720,7 @@ static bool image_load_all_internal(char *base, char *path)
if ( S_ISDIR( st.st_mode ) ) {
image_load_all_internal( base, subpath ); // Recurse
} else if ( !isForbiddenExtension( subpath ) ) {
- image_load( base, subpath, true ); // Load image if possible
+ image_load( base, subpath, false ); // Load image if possible
}
}
closedir( dir );
@@ -756,10 +759,9 @@ static bool image_addToList(dnbd3_image_t *image)
* Note that this is NOT THREAD SAFE so make sure its always
* called on one thread only.
*/
-static bool image_load(char *base, char *path, int withUplink)
+static bool image_load(char *base, char *path, bool withUplink)
{
int revision = -1;
- struct stat st;
dnbd3_cache_map_t *cache = NULL;
uint32_t *crc32list = NULL;
dnbd3_image_t *existing = NULL;
@@ -824,7 +826,9 @@ static bool image_load(char *base, char *path, int withUplink)
fdImage = open( path, O_RDONLY );
}
if ( fdImage == -1 ) {
- logadd( LOG_ERROR, "Could not open '%s' for reading...", path );
+ if ( errno != ENOENT ) {
+ logadd( LOG_ERROR, "[load] Cannot open '%s' for reading (errno=%d)", path, errno );
+ }
goto load_error;
}
// Determine file size
@@ -855,16 +859,16 @@ static bool image_load(char *base, char *path, int withUplink)
// Compare data just loaded to identical image we apparently already loaded
if ( existing != NULL ) {
if ( existing->realFilesize != realFilesize ) {
- logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", existing->name, (int)existing->rid );
+ logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", PIMG(existing) );
// Image will be replaced below
} else if ( existing->crc32 != NULL && crc32list != NULL
&& memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlockCount ) != 0 ) {
- logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", existing->name, (int)existing->rid );
+ logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", PIMG(existing) );
logadd( LOG_WARNING, "The image will be reloaded, but you should NOT replace existing images while the server is running." );
logadd( LOG_WARNING, "Actually even if it's not running this should never be done. Use a new RID instead!" );
// Image will be replaced below
} else if ( existing->crc32 == NULL && crc32list != NULL ) {
- logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", existing->name, (int)existing->rid );
+ logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", PIMG(existing) );
existing->crc32 = crc32list;
existing->masterCrc32 = masterCrc;
crc32list = NULL;
@@ -872,7 +876,7 @@ static bool image_load(char *base, char *path, int withUplink)
goto load_error; // Keep existing
} else if ( existing->ref_cacheMap != NULL && cache == NULL ) {
// Just ignore that fact, if replication is really complete the cache map will be removed anyways
- logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", existing->name, (int)existing->rid );
+ logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", PIMG(existing) );
function_return = true;
goto load_error; // Keep existing
} else {
@@ -900,19 +904,10 @@ static bool image_load(char *base, char *path, int withUplink)
image->rid = (uint16_t)revision;
image->users = 0;
image->readFd = -1;
- image->working = ( cache == NULL );
timing_get( &image->nextCompletenessEstimate );
image->completenessEstimate = -1;
mutex_init( &image->lock, LOCK_IMAGE );
- int32_t offset;
- if ( stat( path, &st ) == 0 ) {
- // Negatively offset atime by file modification time
- offset = (int32_t)( st.st_mtime - time( NULL ) );
- if ( offset > 0 ) offset = 0;
- } else {
- offset = 0;
- }
- timing_gets( &image->atime, offset );
+ loadImageMeta( image );
// Prevent freeing in cleanup
cache = NULL;
@@ -925,7 +920,7 @@ static bool image_load(char *base, char *path, int withUplink)
// Image is definitely incomplete, initialize uplink worker
if ( image->ref_cacheMap != NULL ) {
- image->working = false;
+ image->problem.uplink = true;
if ( withUplink ) {
uplink_init( image, -1, NULL, -1 );
}
@@ -937,14 +932,14 @@ static bool image_load(char *base, char *path, int withUplink)
// Keep fd for reading
fdImage = -1;
// Check CRC32
- image_checkRandomBlocks( image, 4 );
+ image_checkRandomBlocks( image, 4, -1 );
} else {
logadd( LOG_ERROR, "Image list full: Could not add image %s", path );
image->readFd = -1; // Keep fdImage instead, will be closed below
image = image_free( image );
goto load_error;
}
- logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->name, (int)image->rid );
+ logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", PIMG(image) );
function_return = true;
// Clean exit:
@@ -1027,10 +1022,19 @@ static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t f
return retval;
}
-static void image_checkRandomBlocks(dnbd3_image_t *image, const int count)
+/**
+ * Check up to count random blocks from given image. If fromFd is -1, the check will
+ * be run asynchronously using the integrity checker. Otherwise, the check will
+ * happen in the function and return the result of the check.
+ * @param image image to check
+ * @param count number of blocks to check (max)
+ * @param fromFd, check synchronously and use this fd for reading, -1 = async
+ * @return true = OK, false = error. Meaningless if fromFd == -1
+ */
+static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd)
{
if ( image->crc32 == NULL )
- return;
+ return true;
// This checks the first block and (up to) count - 1 random blocks for corruption
// via the known crc32 list. This is very sloppy and is merely supposed to detect
// accidental corruption due to broken dnbd3-proxy functionality or file system
@@ -1038,7 +1042,7 @@ static void image_checkRandomBlocks(dnbd3_image_t *image, const int count)
assert( count > 0 );
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize );
- int blocks[count];
+ int blocks[count+1]; // +1 for "-1" in sync case
int index = 0, j;
int block;
if ( image_isHashBlockComplete( cache, 0, image->virtualFilesize ) ) {
@@ -1062,9 +1066,16 @@ while_end: ;
if ( cache != NULL ) {
ref_put( &cache->reference );
}
- for ( int i = 0; i < index; ++i ) {
- integrity_check( image, blocks[i], true );
+ if ( fromFd == -1 ) {
+ // Async
+ for ( int i = 0; i < index; ++i ) {
+ integrity_check( image, blocks[i], true );
+ }
+ return true;
}
+ // Sync
+ blocks[index] = -1;
+ return image_checkBlocksCrc32( fromFd, image->crc32, blocks, image->realFilesize );
}
/**
@@ -1079,7 +1090,7 @@ bool image_create(char *image, int revision, uint64_t size)
logadd( LOG_ERROR, "revision id invalid: %d", revision );
return false;
}
- char path[PATHLEN], cache[PATHLEN];
+ char path[PATHLEN], cache[PATHLEN+4];
char *lastSlash = strrchr( image, '/' );
if ( lastSlash == NULL ) {
snprintf( path, PATHLEN, "%s/%s.r%d", _basePath, image, revision );
@@ -1090,7 +1101,7 @@ bool image_create(char *image, int revision, uint64_t size)
*lastSlash = '/';
snprintf( path, PATHLEN, "%s/%s.r%d", _basePath, image, revision );
}
- snprintf( cache, PATHLEN, "%s.map", path );
+ snprintf( cache, PATHLEN+4, "%s.map", path );
size = (size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
const int mapsize = IMGSIZE_TO_MAPBYTES(size);
// Write files
@@ -1111,14 +1122,19 @@ bool image_create(char *image, int revision, uint64_t size)
logadd( LOG_DEBUG1, "Could not allocate %d bytes for %s (errno=%d)", mapsize, cache, err );
}
// Now write image
+ bool fallback = false;
if ( !_sparseFiles && !file_alloc( fdImage, 0, size ) ) {
logadd( LOG_ERROR, "Could not allocate %" PRIu64 " bytes for %s (errno=%d)", size, path, errno );
logadd( LOG_ERROR, "It is highly recommended to use a file system that supports preallocating disk"
" space without actually writing all zeroes to the block device." );
logadd( LOG_ERROR, "If you cannot fix this, try setting sparseFiles=true, but don't expect"
" divine performance during replication." );
- goto failure_cleanup;
- } else if ( _sparseFiles && !file_setSize( fdImage, size ) ) {
+ if ( !_ignoreAllocErrors ) {
+ goto failure_cleanup;
+ }
+ fallback = true;
+ }
+ if ( ( _sparseFiles || fallback ) && !file_setSize( fdImage, size ) ) {
logadd( LOG_ERROR, "Could not create sparse file of %" PRIu64 " bytes for %s (errno=%d)", size, path, errno );
logadd( LOG_ERROR, "Make sure you have enough disk space, check directory permissions, fs errors etc." );
goto failure_cleanup;
@@ -1162,14 +1178,18 @@ dnbd3_image_t* image_getOrLoad(char * const name, const uint16_t revision)
// Sanity check
if ( len == 0 || name[len - 1] == '/' || name[0] == '/'
|| name[0] == '.' || strstr( name, "/." ) != NULL ) return NULL;
- // If in proxy mode, check with upstream server first
+ // Re-check latest local revision
+ image = loadImageServer( name, revision );
+ // If in proxy mode, check with upstream servers
if ( _isProxy ) {
+ // Forget the locally loaded one
+ image_release( image );
+ // Check with upstream - if unsuccessful, will return the same
+ // as loadImageServer did
image = loadImageProxy( name, revision, len );
- if ( image != NULL )
- return image;
}
// Lookup on local storage
- return loadImageServer( name, revision );
+ return image;
}
/**
@@ -1227,19 +1247,20 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
int uplinkSock = -1;
dnbd3_host_t uplinkServer;
const int count = altservers_getHostListForReplication( name, servers, REP_NUM_SRV );
- uint16_t remoteProtocolVersion;
uint16_t remoteRid = revision;
- uint64_t remoteImageSize;
+ uint16_t acceptedRemoteRid = 0;
+ uint16_t remoteProtocolVersion = 0;
struct sockaddr_storage sa;
socklen_t salen;
poll_list_t *cons = sock_newPollList();
logadd( LOG_DEBUG2, "Trying to clone %s:%d from %d hosts", name, (int)revision, count );
for (int i = 0; i < count + 5; ++i) { // "i < count + 5" for 5 additional iterations, waiting on pending connects
- char *remoteName;
+ char *remoteName = NULL;
+ uint64_t remoteImageSize = 0;
bool ok = false;
int sock;
if ( i >= count ) {
- sock = sock_multiConnect( cons, NULL, 100, 1000 );
+ sock = sock_multiConnect( cons, NULL, 100, _uplinkTimeout );
if ( sock == -2 ) break;
} else {
if ( log_hasMask( LOG_DEBUG2 ) ) {
@@ -1248,7 +1269,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
host[len] = '\0';
logadd( LOG_DEBUG2, "Trying to replicate from %s", host );
}
- sock = sock_multiConnect( cons, &servers[i], 100, 1000 );
+ sock = sock_multiConnect( cons, &servers[i], 100, _uplinkTimeout );
}
if ( sock == -1 || sock == -2 ) continue;
salen = sizeof(sa);
@@ -1273,7 +1294,11 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
} else {
ok = image_ensureDiskSpace( remoteImageSize + ( 10 * 1024 * 1024 ), false ); // some extra space for cache map etc.
}
- ok = ok && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img
+ if ( ok ) {
+ ok = image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img
+ } else {
+ logadd( LOG_INFO, "Not enough space to replicate '%s:%d'", name, (int)revision );
+ }
mutex_unlock( &reloadLock );
if ( !ok ) goto server_fail;
@@ -1282,26 +1307,32 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &uplinkServer ) ) {
uplinkServer.type = 0;
}
- break;
+ acceptedRemoteRid = remoteRid;
+ break; // TODO: Maybe we should try the remaining servers if rid == 0, in case there's an even newer one
server_fail: ;
close( sock );
}
sock_destroyPollList( cons );
- // If we still have a pointer to a local image, release the reference
- if ( image != NULL ) image_release( image );
+ // If we still have a pointer to a local image, compare rid
+ if ( image != NULL ) {
+ if ( ( revision == 0 && image->rid >= acceptedRemoteRid ) || ( image->rid == revision ) ) {
+ return image;
+ }
+ // release the reference
+ image_release( image );
+ }
// If everything worked out, this call should now actually return the image
- image = image_get( name, remoteRid, false );
+ image = image_get( name, acceptedRemoteRid, false );
if ( image != NULL && uplinkSock != -1 ) {
// If so, init the uplink and pass it the socket
- sock_setTimeout( uplinkSock, _uplinkTimeout );
if ( !uplink_init( image, uplinkSock, &uplinkServer, remoteProtocolVersion ) ) {
close( uplinkSock );
} else {
// Clumsy busy wait, but this should only take as long as it takes to start a thread, so is it really worth using a signalling mechanism?
int i = 0;
- while ( !image->working && ++i < 100 )
+ while ( image->problem.uplink && ++i < 100 )
usleep( 2000 );
}
} else if ( uplinkSock != -1 ) {
@@ -1318,6 +1349,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
{
char imageFile[PATHLEN] = "";
uint16_t detectedRid = 0;
+ bool isLegacyFile = false;
if ( requestedRid != 0 ) {
snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, (int)requestedRid );
@@ -1354,6 +1386,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
&& ( detectedRid == 0 || !file_isReadable( imageFile ) ) ) {
snprintf( imageFile, PATHLEN, "%s/%s", _basePath, name );
detectedRid = 1;
+ isLegacyFile = true;
}
logadd( LOG_DEBUG2, "Trying to load %s:%d ( -> %d) as %s", name, (int)requestedRid, (int)detectedRid, imageFile );
// No file was determined, or it doesn't seem to exist/be readable
@@ -1361,7 +1394,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
logadd( LOG_DEBUG2, "Not found, bailing out" );
return image_get( name, requestedRid, true );
}
- if ( !_vmdkLegacyMode && requestedRid == 0 ) {
+ if ( !isLegacyFile && requestedRid == 0 ) {
// rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0
while ( detectedRid != 0 ) {
dnbd3_image_t *image = image_get( name, detectedRid, true );
@@ -1429,9 +1462,13 @@ static bool image_clone(int sock, char *name, uint16_t revision, uint64_t imageS
logadd( LOG_WARNING, "OTF-Clone: Corrupted CRC-32 list. ignored. (%s)", name );
} else {
int fd = open( crcFile, O_WRONLY | O_CREAT, 0644 );
- write( fd, &masterCrc, sizeof(uint32_t) );
- write( fd, crc32list, crc32len );
+ ssize_t ret = write( fd, &masterCrc, sizeof(masterCrc) );
+ ret += write( fd, crc32list, crc32len );
close( fd );
+ if ( (size_t)ret != crc32len + sizeof(masterCrc) ) {
+ logadd( LOG_WARNING, "Could not save freshly received crc32 list for %s:%d", name, (int)revision );
+ unlink( crcFile );
+ }
}
}
free( crc32list );
@@ -1564,14 +1601,23 @@ json_t* image_getListAsJson()
ref_put( &uplink->reference );
}
- jsonImage = json_pack( "{sisssisisisisI}",
+ int problems = 0;
+#define addproblem(name,val) if (image->problem.name) problems |= (1 << val)
+ addproblem(read, 0);
+ addproblem(write, 1);
+ addproblem(changed, 2);
+ addproblem(uplink, 3);
+ addproblem(queue, 4);
+
+ jsonImage = json_pack( "{sisssisisisisIsi}",
"id", image->id, // id, name, rid never change, so access them without locking
"name", image->name,
"rid", (int) image->rid,
"users", image->users,
"complete", completeness,
"idle", idleTime,
- "size", (json_int_t)image->virtualFilesize );
+ "size", (json_int_t)image->virtualFilesize,
+ "problems", problems );
if ( bytesReceived != 0 ) {
json_object_set_new( jsonImage, "bytesReceived", json_integer( (json_int_t) bytesReceived ) );
}
@@ -1594,7 +1640,7 @@ int image_getCompletenessEstimate(dnbd3_image_t * const image)
assert( image != NULL );
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
if ( cache == NULL )
- return image->working ? 100 : 0;
+ return 100;
const int len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
if ( unlikely( len == 0 ) ) {
ref_put( &cache->reference );
@@ -1705,46 +1751,51 @@ bool image_ensureDiskSpaceLocked(uint64_t size, bool force)
/**
* Make sure at least size bytes are available in _basePath.
* Will delete old images to make room for new ones.
- * TODO: Store last access time of images. Currently the
- * last access time is reset to the file modification time
- * on server restart. Thus it will
- * currently only delete images if server uptime is > 24 hours.
+ * It will only delete images if a configurable uptime is
+ * reached.
* This can be overridden by setting force to true, in case
* free space is desperately needed.
* Return true iff enough space is available. false in random other cases
*/
static bool image_ensureDiskSpace(uint64_t size, bool force)
{
- for ( int maxtries = 0; maxtries < 20; ++maxtries ) {
+ for ( int maxtries = 0; maxtries < 50; ++maxtries ) {
uint64_t available;
if ( !file_freeDiskSpace( _basePath, NULL, &available ) ) {
- logadd( LOG_WARNING, "Could not get free disk space (errno %d), will assume there is enough space left... ;-)\n", errno );
+ logadd( LOG_WARNING, "Could not get free disk space (errno %d), will assume there is enough space left.", errno );
return true;
}
if ( available > size )
return true; // Yay
- if ( !_isProxy || _autoFreeDiskSpaceDelay == -1 )
+ if ( !_isProxy || _autoFreeDiskSpaceDelay == -1 ) {
+ logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but auto-freeing of disk space is disabled.",
+ (int)(available / (1024ll * 1024)),
+ (int)(size / (1024ll * 1024)) );
return false; // If not in proxy mode at all, or explicitly disabled, never delete anything
+ }
if ( !force && dnbd3_serverUptime() < (uint32_t)_autoFreeDiskSpaceDelay ) {
- logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but server uptime < %d minutes...", (int)(available / (1024ll * 1024ll)),
- (int)(size / (1024 * 1024)), _autoFreeDiskSpaceDelay / 60 );
+ logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but server uptime < %d minutes...",
+ (int)(available / (1024ll * 1024)),
+ (int)(size / (1024ll * 1024)), _autoFreeDiskSpaceDelay / 60 );
return false;
}
- logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, freeing an image...", (int)(available / (1024ll * 1024ll)),
- (int)(size / (1024 * 1024)) );
+ logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, freeing an image...",
+ (int)(available / (1024ll * 1024)),
+ (int)(size / (1024ll * 1024)) );
// Find least recently used image
dnbd3_image_t *oldest = NULL;
int i;
mutex_lock( &imageListLock );
for (i = 0; i < _num_images; ++i) {
dnbd3_image_t *current = _images[i];
- if ( current == NULL ) continue;
- if ( current->users == 0 ) { // Not in use :-)
- if ( oldest == NULL || timing_1le2( &current->atime, &oldest->atime ) ) {
- // Oldest access time so far
- oldest = current;
- }
- }
+ if ( current == NULL || current->users != 0 )
+ continue; // Empty slot or in use
+ if ( oldest != NULL && timing_1le2( &oldest->atime, &current->atime ) )
+ continue; // Already got a newer one
+ if ( !isImageFromUpstream( current ) )
+ continue; // Not replicated, don't touch
+ // Oldest access time so far
+ oldest = current;
}
if ( oldest != NULL ) {
oldest->users++;
@@ -1760,7 +1811,7 @@ static bool image_ensureDiskSpace(uint64_t size, bool force)
image_release( oldest ); // We did users++ above; image might have to be freed entirely
return false;
}
- logadd( LOG_INFO, "'%s:%d' has to go!", oldest->name, (int)oldest->rid );
+ logadd( LOG_INFO, "'%s:%d' has to go!", PIMG(oldest) );
char *filename = strdup( oldest->path ); // Copy name as we remove the image first
oldest = image_remove( oldest ); // Remove from list first...
oldest = image_release( oldest ); // Decrease users counter; if it falls to 0, image will be freed
@@ -1790,15 +1841,14 @@ static void* closeUnusedFds(void* nix UNUSED)
timing_gets( &deadline, -UNUSED_FD_TIMEOUT );
int fds[FDCOUNT];
int fdindex = 0;
+ setThreadName( "unused-fd-close" );
mutex_lock( &imageListLock );
for ( int i = 0; i < _num_images; ++i ) {
dnbd3_image_t * const image = _images[i];
if ( image == NULL || image->readFd == -1 )
continue;
- // TODO: Also close for idle uplinks (uplink_connectionShouldShutdown)
- // TODO: And close writeFd for idle uplinks....
if ( image->users == 0 && image->uplinkref == NULL && timing_reached( &image->atime, &deadline ) ) {
- logadd( LOG_DEBUG1, "Inactive fd closed for %s:%d", image->name, (int)image->rid );
+ logadd( LOG_DEBUG1, "Inactive fd closed for %s:%d", PIMG(image) );
fds[fdindex++] = image->readFd;
image->readFd = -1; // Not a race; image->users is 0 and to increase it you need imageListLock
if ( fdindex == FDCOUNT )
@@ -1813,6 +1863,177 @@ static void* closeUnusedFds(void* nix UNUSED)
return NULL;
}
+static bool isImageFromUpstream(dnbd3_image_t *image)
+{
+ if ( !_isProxy )
+ return false; // Nothing to do
+ // Check if we're a "hybrid proxy", i.e. there are only some namespaces (directories)
+ // for which we have any upstream servers configured. If there's none, don't touch
+ // the cache map on disk.
+ if ( !altservers_imageHasAltServers( image->name ) )
+ return false; // Nothing to do
+ return true;
+}
+
+static void* saveLoadAllCacheMaps(void* nix UNUSED)
+{
+ static ticks nextSave;
+ declare_now;
+ bool full = timing_reached( &nextSave, &now );
+ time_t walltime = 0;
+ setThreadName( "cache-mapper" );
+ if ( full ) {
+ walltime = time( NULL );
+ // Update at start to avoid concurrent runs
+ timing_addSeconds( &nextSave, &now, CACHE_MAP_MAX_SAVE_DELAY );
+ }
+ mutex_lock( &imageListLock );
+ for ( int i = 0; i < _num_images; ++i ) {
+ dnbd3_image_t * const image = _images[i];
+ if ( image == NULL )
+ continue;
+ image->users++;
+ mutex_unlock( &imageListLock );
+ const bool fromUpstream = isImageFromUpstream( image );
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache != NULL ) {
+ if ( fromUpstream ) {
+ // Replicated image, we're responsible for updating the map, so save it
+ // Save if dirty bit is set, blocks were invalidated
+ bool save = cache->dirty;
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( !save ) {
+ // Otherwise, consider longer timeout and byte count limits of uplink
+ if ( uplink != NULL ) {
+ assert( uplink->bytesReceivedLastSave <= uplink->bytesReceived );
+ uint64_t diff = uplink->bytesReceived - uplink->bytesReceivedLastSave;
+ if ( diff > CACHE_MAP_MAX_UNSAVED_BYTES || ( full && diff != 0 ) ) {
+ save = true;
+ }
+ }
+ }
+ if ( save ) {
+ cache->dirty = false;
+ if ( uplink != NULL ) {
+ uplink->bytesReceivedLastSave = uplink->bytesReceived;
+ }
+ saveCacheMap( image );
+ }
+ if ( uplink != NULL ) {
+ ref_put( &uplink->reference );
+ }
+ } else {
+ // We're not replicating this image, if there's a cache map, reload
+ // it periodically, since we might read from a shared storage that
+ // another server instance is writing to.
+ if ( full || ( !cache->unchanged && !image->problem.read ) ) {
+ logadd( LOG_DEBUG2, "Reloading cache map of %s:%d", PIMG(image) );
+ dnbd3_cache_map_t *onDisk = image_loadCacheMap(image->path, image->virtualFilesize);
+ if ( onDisk == NULL ) {
+ // Should be complete now
+ logadd( LOG_DEBUG1, "External replication of %s:%d complete", PIMG(image) );
+ ref_setref( &image->ref_cacheMap, NULL );
+ } else {
+ const int mapSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
+ if ( memcmp( cache->map, onDisk->map, mapSize ) == 0 ) {
+ // Unchanged
+ cache->unchanged = true;
+ onDisk->reference.free( &onDisk->reference );
+ } else {
+ // Replace
+ ref_setref( &image->ref_cacheMap, &onDisk->reference );
+ logadd( LOG_DEBUG2, "Map changed" );
+ }
+ }
+ }
+ } // end reload cache map
+ ref_put( &cache->reference );
+ } // end has cache map
+ if ( full && fromUpstream ) {
+ saveMetaData( image, &now, walltime );
+ }
+ image_release( image ); // Always do this instead of users-- to handle freeing
+ mutex_lock( &imageListLock );
+ }
+ mutex_unlock( &imageListLock );
+ return NULL;
+}
+
+/**
+ * Saves the cache map of the given image.
+ * Return false if this image doesn't have a cache map, or if the image
+ * doesn't have any uplink to replicate from. In this case the image might
+ * still have a cache map that was loaded from disk, and should be reloaded
+ * periodically.
+ * @param image the image
+ */
+static void saveCacheMap(dnbd3_image_t *image)
+{
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache == NULL )
+ return; // Race - wasn't NULL in function call above...
+
+ logadd( LOG_DEBUG2, "Saving cache map of %s:%d", PIMG(image) );
+ const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
+ char mapfile[strlen( image->path ) + 4 + 1];
+ strcpy( mapfile, image->path );
+ strcat( mapfile, ".map" );
+
+ int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
+ if ( fd == -1 ) {
+ const int err = errno;
+ ref_put( &cache->reference );
+ logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
+ return;
+ }
+
+ // On Linux we could use readFd, but in general it's not guaranteed to work
+ int imgFd = open( image->path, O_WRONLY );
+ if ( imgFd == -1 ) {
+ logadd( LOG_WARNING, "Cannot open %s for fsync(): errno=%d", image->path, errno );
+ } else {
+ if ( fsync( imgFd ) == -1 ) {
+ logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d. Resetting cache map.", image->path, errno );
+ dnbd3_cache_map_t *old = image_loadCacheMap(image->path, image->virtualFilesize);
+ const int mapSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
+ if ( old == NULL ) {
+ // Could not load old map. FS might be toast.
+ logadd( LOG_ERROR, "Cannot load old cache map. Setting all zero." );
+ memset( cache->map, 0, mapSize );
+ } else {
+ // AND the maps together to be safe
+ for ( int i = 0; i < mapSize; ++i ) {
+ cache->map[i] &= old->map[i];
+ }
+ old->reference.free( &old->reference );
+ }
+ }
+ close( imgFd );
+ }
+
+ // Write current map to file
+ size_t done = 0;
+ while ( done < size ) {
+ const ssize_t ret = write( fd, cache->map + done, size - done );
+ if ( ret == -1 ) {
+ if ( errno == EINTR ) continue;
+ logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
+ break;
+ }
+ if ( ret <= 0 ) {
+ logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
+ break;
+ }
+ done += (size_t)ret;
+ }
+ ref_put( &cache->reference );
+ if ( fsync( fd ) == -1 ) {
+ logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
+ }
+ close( fd );
+ // TODO fsync on parent directory
+}
+
static void allocCacheMap(dnbd3_image_t *image, bool complete)
{
const uint8_t val = complete ? 0xff : 0;
@@ -1822,7 +2043,7 @@ static void allocCacheMap(dnbd3_image_t *image, bool complete)
memset( cache->map, val, byteSize );
mutex_lock( &image->lock );
if ( image->ref_cacheMap != NULL ) {
- logadd( LOG_WARNING, "BUG: allocCacheMap called but there already is a cache map for %s:%d", image->name, (int)image->rid );
+ logadd( LOG_WARNING, "BUG: allocCacheMap called but there already is a map for %s:%d", PIMG(image) );
free( cache );
} else {
ref_setref( &image->ref_cacheMap, &cache->reference );
@@ -1830,3 +2051,77 @@ static void allocCacheMap(dnbd3_image_t *image, bool complete)
mutex_unlock( &image->lock );
}
+/**
+ * It's assumed you hold a reference to the image
+ */
+static void saveMetaData(dnbd3_image_t *image, ticks *now, time_t walltime)
+{
+ if ( !image->accessed )
+ return;
+ ticks tmp;
+ uint32_t diff;
+ char *fn;
+ if ( asprintf( &fn, "%s.meta", image->path ) == -1 ) {
+ logadd( LOG_WARNING, "Cannot asprintf meta" );
+ return;
+ }
+ if ( now == NULL ) {
+ timing_get( &tmp );
+ now = &tmp;
+ walltime = time( NULL );
+ }
+ mutex_lock( &image->lock );
+ image->accessed = false;
+ diff = timing_diff( &image->atime, now );
+ mutex_unlock( &image->lock );
+ FILE *f = fopen( fn, "w" );
+ if ( f == NULL ) {
+ logadd( LOG_WARNING, "Cannot open %s for writing", fn );
+ } else {
+ fprintf( f, "[main]\natime=%"PRIu64"\n", (uint64_t)( walltime - diff ) );
+ fclose( f );
+ }
+ free( fn );
+ // TODO: fsync() dir
+}
+
+static void loadImageMeta(dnbd3_image_t *image)
+{
+ int32_t offset = 1;
+ char *fn;
+ if ( asprintf( &fn, "%s.meta", image->path ) == -1 ) {
+ logadd( LOG_WARNING, "asprintf load" );
+ } else {
+ int fh = open( fn, O_RDONLY );
+ free( fn );
+ if ( fh != -1 ) {
+ char buf[200];
+ ssize_t ret = read( fh, buf, sizeof(buf)-1 );
+ close( fh );
+ if ( ret > 0 ) {
+ buf[ret] = '\0';
+ // Do it the cheap way until we actually store more stuff
+ char *pos = strstr( buf, "atime=" );
+ if ( pos != NULL ) {
+ offset = (int32_t)( atol( pos + 6 ) - time( NULL ) );
+ }
+ }
+ }
+ }
+ if ( offset == 1 ) {
+ // Nothing from .meta file, use old guesstimate
+ struct stat st;
+ if ( stat( image->path, &st ) == 0 ) {
+ // Negatively offset atime by file modification time
+ offset = (int32_t)( st.st_mtime - time( NULL ) );
+ } else {
+ offset = 0;
+ }
+ image->accessed = true;
+ }
+ if ( offset > 0 ) {
+ offset = 0;
+ }
+ timing_gets( &image->atime, offset );
+}
+
diff --git a/src/server/image.h b/src/server/image.h
index 89791fc..7b6583c 100644
--- a/src/server/image.h
+++ b/src/server/image.h
@@ -19,7 +19,7 @@ bool image_ensureOpen(dnbd3_image_t *image);
dnbd3_image_t* image_byId(int imgId);
-dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking);
+dnbd3_image_t* image_get(const char *name, uint16_t revision, bool checkIfWorking);
bool image_reopenCacheFd(dnbd3_image_t *image, const bool force);
@@ -49,6 +49,52 @@ void image_closeUnusedFd();
bool image_ensureDiskSpaceLocked(uint64_t size, bool force);
+bool image_saveCacheMap(dnbd3_image_t *image);
+
+/**
+ * Check if given range is cached. Be careful when using this function because:
+ * 1) you need to hold a reference to the cache map
+ * 2) start and end are assumed to be 4k aligned
+ * 3) start and end are not checked to be in bounds (we don't know the image in this context)
+ */
+static inline bool image_isRangeCachedUnsafe(dnbd3_cache_map_t *cache, uint64_t start, uint64_t end)
+{
+ const uint64_t firstByteInMap = start >> 15;
+ const uint64_t lastByteInMap = (end - 1) >> 15;
+ const uint8_t fb = (uint8_t)(0xff << ((start >> 12) & 7));
+ const uint8_t lb = (uint8_t)(~(0xff << ((((end - 1) >> 12) & 7) + 1)));
+ uint64_t pos;
+ uint8_t b;
+ bool isCached;
+ if ( firstByteInMap == lastByteInMap ) { // Single byte to check, much simpler
+ b = cache->map[firstByteInMap];
+ isCached = ( b & ( fb & lb ) ) == ( fb & lb );
+ } else {
+ isCached = true;
+ atomic_thread_fence( memory_order_acquire );
+ // First byte
+ if ( isCached ) {
+ b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed );
+ isCached = ( ( b & fb ) == fb );
+ }
+ // Last byte
+ if ( isCached ) {
+ b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed );
+ isCached = ( ( b & lb ) == lb );
+ }
+ // Middle, must be all bits set (0xff)
+ if ( isCached ) {
+ for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) {
+ if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) {
+ isCached = false;
+ break;
+ }
+ }
+ }
+ }
+ return isCached;
+}
+
// one byte in the map covers 8 4kib blocks, so 32kib per byte
// "+ (1 << 15) - 1" is required to account for the last bit of
// the image that is smaller than 32kib
diff --git a/src/server/ini.c b/src/server/ini.c
index c796d5c..37c44a3 100644
--- a/src/server/ini.c
+++ b/src/server/ini.c
@@ -52,7 +52,7 @@ static char* find_char_or_comment(const char* s, char c)
/* Version of strncpy that ensures dest (size bytes) is null-terminated. */
static char* strncpy0(char* dest, const char* src, size_t size)
{
- strncpy( dest, src, size );
+ strncpy( dest, src, size - 1 );
dest[size - 1] = '\0';
return dest;
}
diff --git a/src/server/integrity.c b/src/server/integrity.c
index 4006dfc..91e53b8 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -195,9 +195,10 @@ static void* integrity_main(void * data UNUSED)
readFd = directFd;
}
}
- if ( readFd == -1 ) { // Try buffered; flush to disk for that
- image_ensureOpen( image );
- readFd = image->readFd;
+ if ( readFd == -1 ) { // Try buffered as fallback
+ if ( image_ensureOpen( image ) && !image->problem.read ) {
+ readFd = image->readFd;
+ }
}
if ( readFd == -1 ) {
logadd( LOG_MINOR, "Couldn't get any valid fd for integrity check of %s... ignoring...", image->path );
@@ -237,16 +238,6 @@ static void* integrity_main(void * data UNUSED)
// Done with this task as nothing left
checkQueue[i].image = NULL;
if ( i + 1 == queueLen ) queueLen--;
- // Mark as working again if applicable
- if ( !foundCorrupted ) {
- dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
- if ( uplink != NULL ) { // TODO: image_determineWorkingState() helper?
- mutex_lock( &image->lock );
- image->working = uplink->current.fd != -1 && image->readFd != -1;
- mutex_unlock( &image->lock );
- ref_put( &uplink->reference );
- }
- }
} else {
// Still more blocks to go...
checkQueue[i].block = blocks[0];
@@ -254,9 +245,6 @@ static void* integrity_main(void * data UNUSED)
}
if ( foundCorrupted && !_shutdown ) {
// Something was fishy, make sure uplink exists
- mutex_lock( &image->lock );
- image->working = false;
- mutex_unlock( &image->lock );
uplink_init( image, -1, NULL, -1 );
}
// Release :-)
diff --git a/src/server/locks.c b/src/server/locks.c
index b39576b..3be73b3 100644
--- a/src/server/locks.c
+++ b/src/server/locks.c
@@ -7,9 +7,9 @@
#include "locks.h"
#include "helper.h"
-#include "../shared/timing.h"
+#include <dnbd3/shared/timing.h>
-#ifdef _DEBUG
+#ifdef DNBD3_SERVER_DEBUG_LOCKS
#define MAXLOCKS (SERVER_MAX_CLIENTS * 2 + SERVER_MAX_ALTS + 200 + SERVER_MAX_IMAGES)
#define MAXTHREADS (SERVER_MAX_CLIENTS + 100)
#define MAXLPT 20
diff --git a/src/server/locks.h b/src/server/locks.h
index e5c9801..3b04caa 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -23,10 +23,12 @@
#define LOCK_UPLINK_RTT 200
#define LOCK_UPLINK_SEND 210
#define LOCK_RPC_ACL 220
+#define LOCK_FUSE_INIT 300
+#define LOCK_FUSE_DIR 310
//
-#ifdef _DEBUG
+#ifdef DNBD3_SERVER_DEBUG_LOCKS
#define mutex_init( lock, prio ) debug_mutex_init( #lock, __FILE__, __LINE__, lock, prio)
#define mutex_lock( lock ) debug_mutex_lock( #lock, __FILE__, __LINE__, lock, false)
@@ -55,10 +57,12 @@ void debug_dump_lock_stats();
#endif
-#ifdef DEBUG_THREADS
+#ifdef DNBD3_SERVER_DEBUG_THREADS
+
+#include <dnbd3/shared/log.h>
extern int debugThreadCount;
-#define thread_create(thread,attr,routine,arg) (logadd( LOG_THREAD CREATE, "%d @ %s:%d\n", debugThreadCount, __FILE__, (int)__LINE__), debug_thread_create(thread, attr, routine, arg))
+#define thread_create(thread,attr,routine,arg) (logadd( LOG_INFO, "THREAD_CREATE: %d @ %s:%d\n", debugThreadCount, __FILE__, (int)__LINE__), debug_thread_create(thread, attr, routine, arg))
static inline pthread_t debug_thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg)
{
int i;
@@ -68,26 +72,26 @@ static inline pthread_t debug_thread_create(pthread_t *thread, const pthread_att
return pthread_create( thread, attr, start_routine, arg );
}
-#define thread_detach(thread) (logadd( LOG_THREAD DETACH, "%d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_detach(thread))
+#define thread_detach(thread) (logadd( LOG_INFO, "THREAD_DETACH: %d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_detach(thread))
static inline int debug_thread_detach(pthread_t thread)
{
const int ret = pthread_detach(thread);
if (ret == 0) {
--debugThreadCount;
} else {
- logadd( LOG_THREAD DETACH, "Tried to detach invalid thread (error %d)\n", (int)errno);
+ logadd( LOG_INFO, "THREAD_DETACH: Tried to detach invalid thread (error %d)\n", (int)errno);
exit(1);
}
return ret;
}
-#define thread_join(thread,value) (logadd( LOG_THREAD JOIN, "%d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_join(thread,value))
+#define thread_join(thread,value) (logadd( LOG_INFO, "THREAD_JOIN: %d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_join(thread,value))
static inline int debug_thread_join(pthread_t thread, void **value_ptr)
{
const int ret = pthread_join(thread, value_ptr);
if (ret == 0) {
--debugThreadCount;
} else {
- logadd( LOG_THREAD JOIN, "Tried to join invalid thread (error %d)\n", (int)errno);
+ logadd( LOG_INFO, "THREAD_JOIN: Tried to join invalid thread (error %d)\n", (int)errno);
exit(1);
}
return ret;
@@ -99,6 +103,6 @@ static inline int debug_thread_join(pthread_t thread, void **value_ptr)
#define thread_detach(thread) pthread_detach( thread )
#define thread_join(thread,value) pthread_join( thread, value )
-#endif
+#endif /* DNBD3_SERVER_DEBUG_THREADS */
#endif /* LOCKS_H_ */
diff --git a/src/server/net.c b/src/server/net.c
index aba4e7d..eb51d29 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -3,7 +3,7 @@
*
* Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de>
*
- * This file may be licensed under the terms of of the
+ * This file may be licensed under the terms of the
* GNU General Public License Version 2 (the ``GPL'').
*
* Software distributed under the License is distributed
@@ -26,10 +26,10 @@
#include "altservers.h"
#include "reference.h"
-#include "../shared/sockhelper.h"
-#include "../shared/timing.h"
-#include "../shared/protocol.h"
-#include "../serialize.h"
+#include <dnbd3/shared/sockhelper.h>
+#include <dnbd3/shared/timing.h>
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/serialize.h>
#include <assert.h>
@@ -58,11 +58,12 @@ static atomic_uint_fast64_t totalBytesSent = 0;
static bool addToList(dnbd3_client_t *client);
static void removeFromList(dnbd3_client_t *client);
static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client);
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer);
static inline bool recv_request_header(int sock, dnbd3_request_t *request)
{
ssize_t ret, fails = 0;
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
sock = 0;
#endif
// Read request header from socket
@@ -89,7 +90,7 @@ static inline bool recv_request_header(int sock, dnbd3_request_t *request)
static inline bool recv_request_payload(int sock, uint32_t size, serialized_buffer_t *payload)
{
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
sock = 0;
#endif
if ( size == 0 ) {
@@ -113,7 +114,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
* Send reply with optional payload. payload can be null. The caller has to
* acquire the sendMutex first.
*/
-static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
+static inline bool send_reply(int sock, dnbd3_reply_t *reply, const void *payload)
{
const uint32_t size = reply->size;
fixup_reply( *reply );
@@ -159,7 +160,7 @@ void* net_handleNewConnection(void *clientPtr)
// Await data from client. Since this is a fresh connection, we expect data right away
sock_setTimeout( client->sock, _clientTimeout );
do {
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
const int ret = (int)recv( 0, &request, sizeof(request), MSG_WAITALL );
#else
const int ret = (int)recv( client->sock, &request, sizeof(request), MSG_WAITALL );
@@ -197,6 +198,7 @@ void* net_handleNewConnection(void *clientPtr)
client->hostName[HOSTNAMELEN-1] = '\0';
mutex_unlock( &client->lock );
client->bytesSent = 0;
+ client->relayedCount = 0;
if ( !addToList( client ) ) {
freeClientStruct( client );
@@ -207,6 +209,7 @@ void* net_handleNewConnection(void *clientPtr)
dnbd3_reply_t reply;
dnbd3_image_t *image = NULL;
+ dnbd3_cache_map_t *cache = NULL;
int image_file = -1;
int num;
@@ -215,7 +218,6 @@ void* net_handleNewConnection(void *clientPtr)
serialized_buffer_t payload;
uint16_t rid, client_version;
- uint64_t start, end;
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -262,22 +264,24 @@ void* net_handleNewConnection(void *clientPtr)
atomic_thread_fence( memory_order_release );
if ( unlikely( image == NULL ) ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( unlikely( !image->working ) ) {
+ } else if ( unlikely( image->problem.read || image->problem.changed ) ) {
logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
client->hostName, image_name, (int)rid );
} else {
// Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
bOk = true;
if ( image->ref_cacheMap != NULL ) {
- dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
- if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) {
+ if ( image->problem.queue || image->problem.write ) {
bOk = ( rand() % 4 ) == 1;
}
- if ( bOk && uplink != NULL && uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
- usleep( 100000 ); // server gets a penalty and is less likely to be selected
- }
- if ( uplink != NULL ) {
- ref_put( &uplink->reference );
+ if ( bOk ) {
+ if ( image->problem.write ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ if ( image->problem.uplink ) {
+ // Penaltize depending on completeness, if no uplink is available
+ usleep( ( 100 - image->completenessEstimate ) * 100 );
+ }
}
}
if ( bOk ) {
@@ -286,6 +290,7 @@ void* net_handleNewConnection(void *clientPtr)
if ( !client->isServer ) {
// Only update immediately if this is a client. Servers are handled on disconnect.
timing_get( &image->atime );
+ image->accessed = true;
}
mutex_unlock( &image->lock );
serializer_reset_write( &payload );
@@ -313,9 +318,8 @@ void* net_handleNewConnection(void *clientPtr)
// client handling mainloop
while ( recv_request_header( client->sock, &request ) ) {
if ( _shutdown ) break;
- switch ( request.cmd ) {
+ if ( likely ( request.cmd == CMD_GET_BLOCK ) ) {
- case CMD_GET_BLOCK:;
const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking
reply.handle = request.handle;
if ( unlikely( offset >= image->virtualFilesize ) ) {
@@ -324,7 +328,7 @@ void* net_handleNewConnection(void *clientPtr)
reply.size = 0;
reply.cmd = CMD_ERROR;
send_reply( client->sock, &reply, NULL );
- break;
+ continue;
}
if ( unlikely( offset + request.size > image->virtualFilesize ) ) {
// Sanity check
@@ -332,63 +336,36 @@ void* net_handleNewConnection(void *clientPtr)
reply.size = 0;
reply.cmd = CMD_ERROR;
send_reply( client->sock, &reply, NULL );
- break;
+ continue;
}
- dnbd3_cache_map_t *cache;
- if ( request.size != 0 && ( cache = ref_get_cachemap( image ) ) != NULL ) {
+ if ( cache == NULL ) {
+ cache = ref_get_cachemap( image );
+ }
+
+ if ( request.size != 0 && cache != NULL ) {
// This is a proxyed image, check if we need to relay the request...
- start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- bool isCached = true;
- const uint64_t firstByteInMap = start >> 15;
- const uint64_t lastByteInMap = (end - 1) >> 15;
- uint64_t pos;
- uint8_t b;
- atomic_thread_fence( memory_order_acquire );
- // Middle - quick checking
- if ( isCached ) {
- for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) {
- if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) {
- isCached = false;
- break;
- }
- }
- }
- // First byte
- if ( isCached ) {
- b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed );
- for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) {
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = (uint8_t)( 1 << map_x );
- if ( (b & bit_mask) == 0 ) {
- isCached = false;
- break;
+ const uint64_t start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint64_t end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ if ( !image_isRangeCachedUnsafe( cache, start, end ) ) {
+ if ( unlikely( client->relayedCount > 250 ) ) {
+ logadd( LOG_DEBUG1, "Client is overloading uplink; throttling" );
+ for ( int i = 0; i < 100 && client->relayedCount > 200; ++i ) {
+ usleep( 10000 );
}
- }
- }
- // Last byte - only check if request spans multiple bytes in cache map
- if ( isCached && firstByteInMap != lastByteInMap ) {
- b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed );
- for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) {
- assert( lastByteInMap == (pos >> 15) );
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = (uint8_t)( 1 << map_x );
- if ( (b & bit_mask) == 0 ) {
- isCached = false;
- break;
+ if ( client->relayedCount > 250 ) {
+ logadd( LOG_WARNING, "Could not lower client's uplink backlog; dropping client" );
+ goto exit_client_cleanup;
}
}
- }
- ref_put( &cache->reference );
- if ( !isCached ) {
- if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d",
+ client->relayedCount++;
+ if ( !uplink_requestClient( client, &uplinkCallback, request.handle, offset, request.size, request.hops ) ) {
+ client->relayedCount--;
+ logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d",
client->hostName, image->name, image->rid );
- image->working = false;
goto exit_client_cleanup;
}
- break; // DONE, exit request.cmd switch
+ continue; // Reply arrives on uplink some time later, handle next request now
}
}
@@ -419,7 +396,7 @@ void* net_handleNewConnection(void *clientPtr)
// TODO: Should we consider EOPNOTSUPP on BSD for sendfile and fallback to read/write?
// Linux would set EINVAL or ENOSYS instead, which it unfortunately also does for a couple of other failures :/
// read/write would kill performance anyways so a fallback would probably be of little use either way.
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
char buf[1000];
size_t cnt = realBytes - done;
if ( cnt > 1000 ) {
@@ -456,7 +433,7 @@ void* net_handleNewConnection(void *clientPtr)
}
if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) {
logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid );
- image->working = false;
+ image->problem.read = true;
}
}
goto exit_client_cleanup;
@@ -473,7 +450,16 @@ void* net_handleNewConnection(void *clientPtr)
if ( lock ) mutex_unlock( &client->sendMutex );
// Global per-client counter
client->bytesSent += request.size; // Increase counter for statistics.
- break;
+ continue;
+ }
+ // Any other command
+ // Release cache map every now and then, in case the image was replicated
+ // entirely. Will be re-grabbed on next CMD_GET_BLOCK otherwise.
+ if ( cache != NULL ) {
+ ref_put( &cache->reference );
+ cache = NULL;
+ }
+ switch ( request.cmd ) {
case CMD_GET_SERVERS:
// Build list of known working alt servers
@@ -522,9 +508,9 @@ set_name: ;
logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd );
break;
- }
- }
- }
+ } // end switch
+ } // end loop
+ } // end bOk
exit_client_cleanup: ;
// First remove from list, then add to counter to prevent race condition
removeFromList( client );
@@ -533,8 +519,12 @@ exit_client_cleanup: ;
if ( image != NULL && client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) {
mutex_lock( &image->lock );
timing_get( &image->atime );
+ image->accessed = true;
mutex_unlock( &image->lock );
}
+ if ( cache != NULL ) {
+ ref_put( &cache->reference );
+ }
freeClientStruct( client ); // This will also call image_release on client->image
return NULL ;
fail_preadd: ;
@@ -695,9 +685,21 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client)
if ( client->image != NULL ) {
dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
if ( uplink != NULL ) {
- uplink_removeClient( uplink, client );
+ if ( client->relayedCount != 0 ) {
+ uplink_removeEntry( uplink, client, &uplinkCallback );
+ }
ref_put( &uplink->reference );
}
+ if ( client->relayedCount != 0 ) {
+ logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount );
+ int i;
+ for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) {
+ usleep( 10000 );
+ }
+ if ( client->relayedCount != 0 ) {
+ logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount );
+ }
+ }
}
mutex_lock( &client->sendMutex );
if ( client->sock != -1 ) {
@@ -739,15 +741,21 @@ static bool addToList(dnbd3_client_t *client)
return true;
}
-void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle)
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer)
{
- dnbd3_reply_t reply;
- reply.magic = dnbd3_packet_magic;
- reply.cmd = cmd;
- reply.handle = handle;
- reply.size = 0;
+ dnbd3_client_t *client = (dnbd3_client_t*)data;
+ dnbd3_reply_t reply = {
+ .magic = dnbd3_packet_magic,
+ .cmd = buffer == NULL ? CMD_ERROR : CMD_GET_BLOCK,
+ .handle = handle,
+ .size = length,
+ };
mutex_lock( &client->sendMutex );
- send_reply( client->sock, &reply, NULL );
+ send_reply( client->sock, &reply, buffer );
+ if ( buffer == NULL ) {
+ shutdown( client->sock, SHUT_RDWR );
+ }
+ client->relayedCount--;
mutex_unlock( &client->sendMutex );
}
diff --git a/src/server/net.h b/src/server/net.h
index 7719aef..2d6e5e7 100644
--- a/src/server/net.h
+++ b/src/server/net.h
@@ -3,7 +3,7 @@
*
* Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de>
*
- * This file may be licensed under the terms of of the
+ * This file may be licensed under the terms of the
* GNU General Public License Version 2 (the ``GPL'').
*
* Software distributed under the License is distributed
@@ -37,6 +37,4 @@ void net_disconnectAll();
void net_waitForAllDisconnected();
-void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle);
-
#endif /* NET_H_ */
diff --git a/src/server/picohttpparser/CMakeLists.txt b/src/server/picohttpparser/CMakeLists.txt
new file mode 100644
index 0000000..cc6ec96
--- /dev/null
+++ b/src/server/picohttpparser/CMakeLists.txt
@@ -0,0 +1,11 @@
+cmake_minimum_required(VERSION 3.10)
+
+# set the project name
+project(picohttpparser
+ LANGUAGES C)
+
+set(PICOHTTPPARSER_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/picohttpparser.c)
+set(PICOHTTPPARSER_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/picohttpparser.h)
+
+add_library(picohttpparser STATIC ${PICOHTTPPARSER_SOURCE_FILES})
+target_include_directories(picohttpparser PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
diff --git a/src/server/reference.h b/src/server/reference.h
index 4eda546..75a681f 100644
--- a/src/server/reference.h
+++ b/src/server/reference.h
@@ -39,6 +39,11 @@ static inline ref *ref_get( weakref *weakref )
return ref;
}
+static inline void ref_inc( ref *ref )
+{
+ ++ref->count;
+}
+
static inline void ref_put( ref *ref )
{
if ( --ref->count == 0 ) {
diff --git a/src/server/rpc.c b/src/server/rpc.c
index a454d6d..119bbd5 100644
--- a/src/server/rpc.c
+++ b/src/server/rpc.c
@@ -5,7 +5,9 @@
#include "locks.h"
#include "image.h"
#include "altservers.h"
-#include "../shared/sockhelper.h"
+#include <dnbd3/shared/sockhelper.h>
+#include <dnbd3/version.h>
+#include <dnbd3/build.h>
#include "fileutil.h"
#include "picohttpparser/picohttpparser.h"
#include "urldecode.h"
@@ -101,8 +103,8 @@ void rpc_init()
int fd = open( "/dev/urandom", O_RDONLY );
if ( fd != -1 ) {
uint32_t bla = 1;
- read( fd, &bla, 4 );
- randomRunId = (randomRunId << 32) | bla;
+ (void)!read( fd, &bla, 4 );
+ randomRunId = ((randomRunId & 0xffffffff) << 32) | bla;
}
close( fd );
}
@@ -144,7 +146,7 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
while ( !_shutdown ) {
// Read request from client
struct phr_header headers[100];
- size_t numHeaders, prevLen = 0, consumed;
+ size_t numHeaders, prevLen = 0, consumed = 0;
struct string method, path;
int minorVersion;
while ( !_shutdown ) {
@@ -174,7 +176,7 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
// Reaching here means partial request or parse error
if ( pret == -2 ) { // Partial, keep reading
prevLen = hoff;
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
ssize_t ret = recv( 0, headerBuf + hoff, sizeof(headerBuf) - hoff, 0 );
#else
ssize_t ret = recv( sock, headerBuf + hoff, sizeof(headerBuf) - hoff, 0 );
@@ -259,7 +261,7 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t
{
bool ok;
bool stats = false, images = false, clients = false, space = false;
- bool logfile = false, config = false, altservers = false;
+ bool logfile = false, config = false, altservers = false, version = false;
#define SETVAR(var) if ( !var && STRCMP(fields[i].value, #var) ) var = true
for (size_t i = 0; i < fields_num; ++i) {
if ( !equals( &fields[i].name, &STR_Q ) ) continue;
@@ -270,9 +272,10 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t
else SETVAR(logfile);
else SETVAR(config);
else SETVAR(altservers);
+ else SETVAR(version);
}
#undef SETVAR
- if ( ( stats || space ) && !(permissions & ACL_STATS) ) {
+ if ( ( stats || space || version ) && !(permissions & ACL_STATS) ) {
return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access statistics", -1, keepAlive );
}
if ( images && !(permissions & ACL_IMAGE_LIST) ) {
@@ -308,6 +311,10 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t
statisticsJson = json_pack( "{sI}",
"runId", randomRunId );
}
+ if ( version ) {
+ json_object_set_new( statisticsJson, "version", json_string( DNBD3_VERSION_LONG ", built " DNBD3_BUILD_DATE ) );
+ json_object_set_new( statisticsJson, "build", json_string( DNBD3_BUILD ) );
+ }
if ( space ) {
uint64_t spaceTotal = 0, spaceAvail = 0;
file_freeDiskSpace( _basePath, &spaceTotal, &spaceAvail );
@@ -405,9 +412,11 @@ static bool sendReply(int sock, const char *status, const char *ctype, const cha
if ( keepAlive == HTTP_CLOSE ) {
// Wait for flush
shutdown( sock, SHUT_WR );
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
sock = 0;
#endif
+ // Don't wait too long in case other side ignores the shutdown
+ sock_setTimeout( sock, 600 );
while ( read( sock, buffer, sizeof buffer ) > 0 );
return false;
}
@@ -451,7 +460,7 @@ static int getacl(dnbd3_host_t *host)
if ( aclRules[i].bitMask != 0 && aclRules[i].host[aclRules[i].bytes] != ( host->addr[aclRules[i].bytes] & aclRules[i].bitMask ) ) continue;
return aclRules[i].permissions;
}
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
return 0x7fffff;
#else
return 0;
@@ -487,7 +496,7 @@ static void addacl(int argc, char **argv, void *data UNUSED)
*slash++ = '\0';
}
if ( !parse_address( argv[0], &host ) ) goto unlock_end;
- long int bits;
+ long int bits = 0;
if ( slash != NULL ) {
char *last;
bits = strtol( slash, &last, 10 );
diff --git a/src/server/serialize.c b/src/server/serialize.c
deleted file mode 100644
index 4934132..0000000
--- a/src/server/serialize.c
+++ /dev/null
@@ -1,5 +0,0 @@
-#include <stdio.h>
-#include <string.h>
-#include <stdint.h>
-
-#include "../serialize.c"
diff --git a/src/server/server.c b/src/server/server.c
index 0dddea7..0f75935 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -3,7 +3,7 @@
*
* Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de>
*
- * This file may be licensed under the terms of of the
+ * This file may be licensed under the terms of the
* GNU General Public License Version 2 (the ``GPL'').
*
* Software distributed under the License is distributed
@@ -29,10 +29,12 @@
#include "integrity.h"
#include "threadpool.h"
#include "rpc.h"
+#include "fuse.h"
-#include "../version.h"
-#include "../shared/sockhelper.h"
-#include "../shared/timing.h"
+#include <dnbd3/version.h>
+#include <dnbd3/build.h>
+#include <dnbd3/shared/sockhelper.h>
+#include <dnbd3/shared/timing.h>
#include <signal.h>
#include <getopt.h>
@@ -104,10 +106,14 @@ static void queueJobInternal(job_t *job);
*/
void dnbd3_printHelp(char *argv_0)
{
- printf( "Version: %s\n\n", VERSION_STRING );
+ printf( "Version: %s\n\n", DNBD3_VERSION_LONG );
+ printf( "Built: %s\n", DNBD3_BUILD_DATE );
printf( "Usage: %s [OPTIONS]...\n", argv_0 );
printf( "Start the DNBD3 server\n" );
printf( "-c or --config Configuration directory (default /etc/dnbd3-server/)\n" );
+#ifdef DNBD3_SERVER_FUSE
+ printf( "-m or --mount FUSE mount point\n");
+#endif
printf( "-n or --nodaemon Start server in foreground\n" );
printf( "-b or --bind Local Address to bind to\n" );
printf( "-h or --help Show this help text and quit\n" );
@@ -126,7 +132,8 @@ void dnbd3_printHelp(char *argv_0)
*/
void dnbd3_printVersion()
{
- printf( "Version: %s\n", VERSION_STRING );
+ printf( "dnbd3-server version: %s\n", DNBD3_VERSION_LONG );
+ printf( "Built: %s\n", DNBD3_BUILD_DATE );
exit( 0 );
}
@@ -140,6 +147,8 @@ _Noreturn static void dnbd3_cleanup()
_shutdown = true;
logadd( LOG_INFO, "Cleanup..." );
+ dfuse_shutdown();
+
if ( hasTimerThread ) {
pthread_kill( timerThread, SIGINT );
thread_join( timerThread, NULL );
@@ -190,11 +199,13 @@ int main(int argc, char *argv[])
char *paramCreate = NULL;
char *bindAddress = NULL;
char *errorMsg = NULL;
+ char *mountDir = NULL;
int64_t paramSize = -1;
int paramRevision = -1;
- static const char *optString = "b:c:d:hnv?";
+ static const char *optString = "b:c:m:d:hnv?";
static const struct option longOpts[] = {
{ "config", required_argument, NULL, 'c' },
+ { "mount", required_argument, NULL, 'm' },
{ "nodaemon", no_argument, NULL, 'n' },
{ "reload", no_argument, NULL, 'r' },
{ "help", no_argument, NULL, 'h' },
@@ -209,6 +220,16 @@ int main(int argc, char *argv[])
{ 0, 0, 0, 0 }
};
+ log_init();
+
+ /* set proper output stream for AFL */
+#ifdef DNBD3_SERVER_AFL
+ if ( log_setConsoleOutputStream(stderr) < 0 ) {
+ logadd( LOG_ERROR, "Failed to set output stream for AFL to stderr" );
+ exit( EXIT_FAILURE );
+ }
+#endif
+
mainPid = getpid();
mainThread = pthread_self();
opt = getopt_long( argc, argv, optString, longOpts, &longIndex );
@@ -218,6 +239,13 @@ int main(int argc, char *argv[])
case 'c':
_configDir = strdup( optarg );
break;
+ case 'm':
+#ifndef DNBD3_SERVER_FUSE
+ fprintf( stderr, "FUSE support not enabled at build time.\n" );
+ return 8;
+#endif
+ mountDir = strdup( optarg );
+ break;
case 'n':
demonize = 0;
break;
@@ -263,6 +291,7 @@ int main(int argc, char *argv[])
opt = getopt_long( argc, argv, optString, longOpts, &longIndex );
}
+
// Load general config
if ( _configDir == NULL ) _configDir = strdup( "/etc/dnbd3-server" );
@@ -275,9 +304,7 @@ int main(int argc, char *argv[])
timing_setBase();
timing_get( &startupTime );
-#ifdef AFL_MODE
- // ###### AFL
- //
+#ifdef DNBD3_SERVER_AFL
image_serverStartup();
net_init();
uplink_globalsInit();
@@ -301,9 +328,7 @@ int main(int argc, char *argv[])
net_handleNewConnection( dnbd3_client );
exit( 0 );
}
- //
- // ###### AFL END
-#endif
+#endif /* DNBD3_SERVER_AFL */
// One-shots first:
@@ -315,7 +340,10 @@ int main(int argc, char *argv[])
// No one-shot detected, normal server operation or errormsg serving
if ( demonize ) {
logadd( LOG_INFO, "Forking into background, see log file for further information" );
- daemon( 1, 0 );
+ if ( daemon( 0, 0 ) == -1 ) {
+ logadd( LOG_ERROR, "Could not daemon(): errno=%d", errno );
+ exit( 1 );
+ }
}
if ( errorMsg != NULL ) {
setupNetwork( bindAddress );
@@ -339,7 +367,15 @@ int main(int argc, char *argv[])
net_init();
uplink_globalsInit();
rpc_init();
- logadd( LOG_INFO, "DNBD3 server starting.... Machine type: " ENDIAN_MODE );
+ if ( mountDir != NULL && !dfuse_init( "-oallow_other", mountDir ) ) {
+ logadd( LOG_ERROR, "Cannot mount fuse directory to %s", mountDir );
+ dnbd3_cleanup();
+ return EXIT_FAILURE;
+ }
+ logadd( LOG_INFO, "DNBD3 server starting...." );
+ logadd( LOG_INFO, "Machine type: " DNBD3_ENDIAN_MODE );
+ logadd( LOG_INFO, "Build Type: %s", DNBD3_BUILD );
+ logadd( LOG_INFO, "Version: %s, built %s", DNBD3_VERSION_LONG, DNBD3_BUILD_DATE );
if ( altservers_load() < 0 ) {
logadd( LOG_WARNING, "Could not load alt-servers. Does the file exist in %s?", _configDir );
@@ -379,10 +415,11 @@ int main(int argc, char *argv[])
// Initialize thread pool
if ( !threadpool_init( 8 ) ) {
logadd( LOG_ERROR, "Could not init thread pool!\n" );
+ dnbd3_cleanup();
exit( EXIT_FAILURE );
}
- logadd( LOG_INFO, "Server is ready. (%s)", VERSION_STRING );
+ logadd( LOG_INFO, "Server is ready." );
if ( thread_create( &timerThread, NULL, &timerMainloop, NULL ) == 0 ) {
hasTimerThread = true;
@@ -398,7 +435,7 @@ int main(int argc, char *argv[])
if ( sigReload ) {
sigReload = false;
logadd( LOG_INFO, "SIGHUP received, re-scanning image directory" );
- threadpool_run( &server_asyncImageListLoad, NULL );
+ threadpool_run( &server_asyncImageListLoad, NULL, "IMAGE_RELOAD" );
}
if ( sigLogCycle ) {
sigLogCycle = false;
@@ -425,7 +462,7 @@ int main(int argc, char *argv[])
continue;
}
- if ( !threadpool_run( &net_handleNewConnection, (void *)dnbd3_client ) ) {
+ if ( !threadpool_run( &net_handleNewConnection, (void *)dnbd3_client, "CLIENT" ) ) {
logadd( LOG_ERROR, "Could not start thread for new connection." );
free( dnbd3_client );
continue;
@@ -520,10 +557,11 @@ static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED)
if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) {
pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread
}
- }
- if ( pthread_equal( pthread_self(), mainThread ) ) {
- // Signal received by main thread -- handle
- dnbd3_handleSignal( signum );
+ // Source is not this process -- only then do we honor signals
+ if ( pthread_equal( pthread_self(), mainThread ) ) {
+ // Signal received by main thread -- handle
+ dnbd3_handleSignal( signum );
+ }
}
}
@@ -568,7 +606,7 @@ static int handlePendingJobs(void)
jobHead = *temp; // Make it list head
*temp = NULL; // Split off part before that
while ( todo != NULL ) {
- threadpool_run( todo->startRoutine, todo->arg );
+ threadpool_run( todo->startRoutine, todo->arg, "TIMER_TASK" );
old = todo;
todo = todo->next;
if ( old->intervalSecs == 0 ) {
diff --git a/src/server/server.h b/src/server/server.h
index a026eb6..e93d8f5 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -3,7 +3,7 @@
*
* Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de>
*
- * This file may be licensed under the terms of of the
+ * This file may be licensed under the terms of the
* GNU General Public License Version 2 (the ``GPL'').
*
* Software distributed under the License is distributed
@@ -22,7 +22,7 @@
#define SERVER_H_
#include "globals.h"
-#include "../types.h"
+#include <dnbd3/types.h>
uint32_t dnbd3_serverUptime();
void server_addJob(void *(*startRoutine)(void *), void *arg, int delaySecs, int intervalSecs);
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 0b46fd6..a21bd0d 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -8,6 +8,7 @@ typedef struct _entry_t {
dnbd3_signal_t* signal;
void *(*startRoutine)(void *);
void * arg;
+ const char *name;
} entry_t;
static void *threadpool_worker(void *entryPtr);
@@ -56,21 +57,22 @@ void threadpool_waitEmpty()
} while ( activeThreads != 0 );
}
-bool threadpool_run(void *(*startRoutine)(void *), void *arg)
+bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name)
{
if ( unlikely( _shutdown ) ) {
logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" );
return false;
}
+#ifdef DEBUG
if ( unlikely( startRoutine == NULL ) ) {
logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" );
return false; // Or bail out!?
}
+#endif
entry_t *entry = NULL;
for ( int i = 0; i < maxIdleThreads; ++i ) {
- entry_t *cur = pool[i];
- if ( cur != NULL && atomic_compare_exchange_weak( &pool[i], &cur, NULL ) ) {
- entry = cur;
+ entry = atomic_exchange( &pool[i], NULL );
+ if ( entry != NULL ) {
break;
}
}
@@ -87,7 +89,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
return false;
}
if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) {
- logadd( LOG_WARNING, "Could not create new thread for thread pool\n" );
+ logadd( LOG_WARNING, "Could not create new thread for thread pool (%d active)\n", (int)activeThreads );
signal_close( entry->signal );
free( entry );
return false;
@@ -96,6 +98,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
}
entry->startRoutine = startRoutine;
entry->arg = arg;
+ entry->name = name;
atomic_thread_fence( memory_order_release );
signal_call( entry->signal );
return true;
@@ -120,10 +123,15 @@ keep_going:;
logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret );
continue;
}
+#ifdef DEBUG
if ( entry->startRoutine == NULL ) {
logadd( LOG_ERROR, "Worker woke up but has no work to do!" );
exit( 1 );
}
+ if ( entry->name != NULL ) {
+ setThreadName( entry->name );
+ }
+#endif
// Start assigned work
(*entry->startRoutine)( entry->arg );
// Reset vars for safety
@@ -143,6 +151,7 @@ keep_going:;
// Reaching here means pool is full; just let the thread exit
break;
}
+ setThreadName( "[dead]" );
signal_close( entry->signal );
free( entry );
activeThreads--;
diff --git a/src/server/threadpool.h b/src/server/threadpool.h
index ee0b3aa..c30d44f 100644
--- a/src/server/threadpool.h
+++ b/src/server/threadpool.h
@@ -1,7 +1,7 @@
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
-#include "../types.h"
+#include <dnbd3/types.h>
/**
* Initialize the thread pool. This must be called before using
@@ -26,9 +26,10 @@ void threadpool_waitEmpty();
* Run a thread using the thread pool.
* @param startRoutine function to run in new thread
* @param arg argument to pass to thead
+ * @param name STRING CONSTANT (literal) for debugging purposes
* @return true if thread was started
*/
-bool threadpool_run(void *(*startRoutine)(void *), void *arg);
+bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name);
#endif
diff --git a/src/server/uplink.c b/src/server/uplink.c
index f39e633..8a83124 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -4,10 +4,11 @@
#include "image.h"
#include "altservers.h"
#include "net.h"
-#include "../shared/sockhelper.h"
-#include "../shared/protocol.h"
-#include "../shared/timing.h"
-#include "../shared/crc32.h"
+#include <dnbd3/shared/sockhelper.h>
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/timing.h>
+#include <dnbd3/shared/crc32.h>
+#include "threadpool.h"
#include "reference.h"
#include <assert.h>
@@ -17,49 +18,35 @@
#include <unistd.h>
#include <stdatomic.h>
+static const uint8_t HOP_FLAG_BGR = 0x80;
+static const uint8_t HOP_FLAG_PREFETCH = 0x40;
#define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 )
#define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE )
#define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) )
-#define REP_NONE ( (uint64_t)0xffffffffffffffff )
-
-// Status of request in queue
-
-// Slot is free, can be used.
-// Must only be set in uplink_handle_receive() or uplink_remove_client()
-#define ULR_FREE 0
-// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse.
-// Must only be set in uplink_request()
-#define ULR_NEW 1
-// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse.
-// Must only be set in uplink_mainloop() or uplink_request()
-#define ULR_PENDING 2
-// Slot is being processed, do not consider for hop on.
-// Must only be set in uplink_handle_receive()
-#define ULR_PROCESSING 3
-
-static const char *const NAMES_ULR[4] = {
- [ULR_FREE] = "ULR_FREE",
- [ULR_NEW] = "ULR_NEW",
- [ULR_PENDING] = "ULR_PENDING",
- [ULR_PROCESSING] = "ULR_PROCESSING",
-};
-
static atomic_uint_fast64_t totalBytesReceived = 0;
+typedef struct {
+ uint64_t start, end, handle;
+} req_t;
+
static void cancelAllRequests(dnbd3_uplink_t *uplink);
-static void uplink_free(ref *ref);
+static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
-static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
-static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
-static void uplink_handleReceive(dnbd3_uplink_t *uplink);
-static int uplink_sendKeepalive(const int fd);
-static void uplink_addCrc32(dnbd3_uplink_t *uplink);
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
-static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
-static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink);
-static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink);
-static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
+static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly);
+static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
+static void handleReceive(dnbd3_uplink_t *uplink);
+static bool sendKeepalive(dnbd3_uplink_t *uplink);
+static void requestCrc32List(dnbd3_uplink_t *uplink);
+static bool sendReplicationRequest(dnbd3_uplink_t *uplink);
+static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
+static bool connectionShouldShutdown(dnbd3_uplink_t *uplink);
+static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
+static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
+static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
+
+#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) )
// ############ uplink connection handling
@@ -81,6 +68,8 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
{
if ( !_isProxy || _shutdown ) return false;
assert( image != NULL );
+ if ( sock == -1 && !altservers_imageHasAltServers( image->name ) )
+ return false; // Nothing to do
mutex_lock( &image->lock );
dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
if ( uplink != NULL ) {
@@ -97,13 +86,15 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
}
uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
// Start with one reference for the uplink thread. We'll return it when the thread finishes
- ref_init( &uplink->reference, uplink_free, 1 );
+ ref_init( &uplink->reference, freeUplinkStruct, 1 );
mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
uplink->image = image;
uplink->bytesReceived = 0;
- uplink->idleTime = 0;
+ uplink->bytesReceivedLastSave = 0;
+ uplink->idleTime = SERVER_UPLINK_IDLE_TIMEOUT - 90;
+ uplink->queue = NULL;
uplink->queueLen = 0;
uplink->cacheFd = -1;
uplink->signal = signal_new();
@@ -111,12 +102,14 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." );
goto failure;
}
- uplink->replicationHandle = REP_NONE;
mutex_lock( &uplink->rttLock );
mutex_lock( &uplink->sendMutex );
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
+ image->problem.uplink = true;
+ image->problem.write = true;
+ image->problem.queue = false;
if ( sock != -1 ) {
uplink->better.fd = sock;
int index = altservers_hostToIndex( host );
@@ -139,7 +132,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
return true;
failure: ;
if ( uplink != NULL ) {
- image->users++; // Expected by uplink_free()
+ image->users++; // Expected by freeUplinkStruct()
ref_put( &uplink->reference ); // The ref for the uplink thread that never was
}
mutex_unlock( &image->lock );
@@ -166,13 +159,13 @@ bool uplink_shutdown(dnbd3_image_t *image)
image->users++; // Prevent free while uplink shuts down
signal_call( uplink->signal );
} else {
- logadd( LOG_ERROR, "This will never happen. '%s:%d'", image->name, (int)image->rid );
+ logadd( LOG_ERROR, "This will never happen. '%s:%d'", PIMG(image) );
}
cancelAllRequests( uplink );
ref_setref( &image->uplinkref, NULL );
- ref_put( &uplink->reference );
mutex_unlock( &uplink->queueLock );
bool retval = ( exp && image->users == 0 );
+ ref_put( &uplink->reference );
mutex_unlock( &image->lock );
return retval;
}
@@ -183,19 +176,28 @@ bool uplink_shutdown(dnbd3_image_t *image)
*/
static void cancelAllRequests(dnbd3_uplink_t *uplink)
{
- for ( int i = 0; i < uplink->queueLen; ++i ) {
- if ( uplink->queue[i].status != ULR_FREE ) {
- net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle );
- uplink->queue[i].status = ULR_FREE;
+ dnbd3_queue_entry_t *it = uplink->queue;
+ while ( it != NULL ) {
+ dnbd3_queue_client_t *cit = it->clients;
+ while ( cit != NULL ) {
+ (*cit->callback)( cit->data, cit->handle, 0, 0, NULL );
+ dnbd3_queue_client_t *next = cit->next;
+ free( cit );
+ cit = next;
}
+ dnbd3_queue_entry_t *next = it->next;
+ free( it );
+ it = next;
}
+ uplink->queue = NULL;
uplink->queueLen = 0;
+ uplink->image->problem.queue = false;
}
-static void uplink_free(ref *ref)
+static void freeUplinkStruct(ref *ref)
{
dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference);
- logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", uplink->image->name, (int)uplink->image->rid );
+ logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", PIMG(uplink->image) );
assert( uplink->queueLen == 0 );
if ( uplink->signal != NULL ) {
signal_close( uplink->signal );
@@ -226,35 +228,36 @@ static void uplink_free(ref *ref)
* Remove given client from uplink request queue
* Locks on: uplink.queueLock
*/
-void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
+void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback)
{
mutex_lock( &uplink->queueLock );
- for (int i = uplink->queueLen - 1; i >= 0; --i) {
- if ( uplink->queue[i].client == client ) {
- // Make sure client doesn't get destroyed while we're sending it data
- mutex_lock( &client->sendMutex );
- mutex_unlock( &client->sendMutex );
- uplink->queue[i].client = NULL;
- uplink->queue[i].status = ULR_FREE;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) {
+ if ( (**cit).data == data && (**cit).callback == callback ) {
+ (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL );
+ dnbd3_queue_client_t *entry = *cit;
+ *cit = (**cit).next;
+ free( entry );
+ } else {
+ cit = &(**cit).next;
+ }
}
- if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--;
}
mutex_unlock( &uplink->queueLock );
}
/**
- * Request a chunk of data through an uplink server
- * Locks on: image.lock, uplink.queueLock
+ * Called from a client (proxy) connection to request a missing part of the image.
+ * The caller has made sure that the range is actually missing.
*/
-bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
- if ( client == NULL || client->image == NULL )
- return false;
- if ( length > (uint32_t)_maxPayload ) {
- logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
+ assert( client != NULL && callback != NULL );
+ if ( ( hops & 0x3f ) > 60 ) { // This is just silly
+ logadd( LOG_WARNING, "Refusing to relay a request that has > 60 hops" );
return false;
}
- dnbd3_uplink_t * uplink = ref_get_uplink( &client->image->uplinkref );
+ dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
if ( unlikely( uplink == NULL ) ) {
uplink_init( client->image, -1, NULL, -1 );
uplink = ref_get_uplink( &client->image->uplinkref );
@@ -263,160 +266,275 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
return false;
}
}
- if ( uplink->shutdown ) {
- logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
- goto fail_ref;
- }
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
+ bool ret;
+ if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- goto fail_ref;
+ ret = false;
+ } else {
+ ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops );
}
+ ref_put( &uplink->reference );
+ return ret;
+}
- int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise
- int existingType = -1; // ULR_* type of existing request
- int i;
- int freeSlot = -1;
- int firstUsedSlot = -1;
- bool requestLoop = false;
- const uint64_t end = start + length;
+/**
+ * Called by integrated fuse module
+ */
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length)
+{
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( image, -1, NULL, -1 );
+ uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
+ }
+ }
+ bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 );
+ ref_put( &uplink->reference );
+ return ret;
+}
+
+static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted)
+{
+ uint32_t length = (uint32_t)( *end - start );
+ if ( length >= wanted )
+ return;
+ length = wanted;
+ if ( unlikely( _backgroundReplication == BGR_HASHBLOCK
+ && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) {
+ // Don't extend across hash-block border in this mode
+ *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 );
+ } else {
+ *end = start + length;
+ }
+ if ( unlikely( *end > image->virtualFilesize ) ) {
+ *end = image->virtualFilesize;
+ }
+ *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 );
+ //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end );
+}
+
+static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops)
+{
+ if ( uplink->current.fd == -1 )
+ return false;
+ return dnbd3_get_block( uplink->current.fd, req->start,
+ (uint32_t)( req->end - req->start ), req->handle,
+ COND_HOPCOUNT( uplink->current.version, hops ) );
+}
+
+/**
+ * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
+ * If callback is NULL, this is assumed to be a background replication request.
+ * Locks on: uplink.queueLock, uplink.sendMutex
+ */
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+{
+ assert( uplink != NULL );
+ assert( data == NULL || callback != NULL );
+ if ( ( hops & HOP_FLAG_BGR ) // This is a background replication request
+ && _backgroundReplication != BGR_FULL ) { // Deny if we're not doing BGR
+ // TODO: Allow BGR_HASHBLOCK too, but only if hash block isn't completely empty
+ logadd( LOG_DEBUG2, "Dopping client because of BGR policy" );
+ return false;
+ }
+ if ( uplink->shutdown ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
+ return false;
+ }
+ if ( length > (uint32_t)_maxPayload ) {
+ logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload",
+ length );
+ return false;
+ }
+ hops++;
+ if ( callback == NULL ) {
+ // Set upper-most bit for replication requests that we fire
+ // In client mode, at least set prefetch flag to prevent prefetch cascading
+ hops |= (uint8_t)( _pretendClient ? HOP_FLAG_PREFETCH : HOP_FLAG_BGR );
+ }
+
+ req_t req, preReq;
+ dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL;
+ bool isNew;
+ const uint64_t end = start + length;
+ req.start = start & ~(DNBD3_BLOCK_SIZE - 1);
+ req.end = end;
+ /* Don't do this -- this breaks matching of prefetch jobs, since they'd
+ * be misaligned, and the next client request wouldn't match anything.
+ * To improve this, we need to be able to attach a queue_client to multiple queue_entries
+ * and then serve it once all the queue_entries are done (atomic_int in queue_client).
+ * But currently we directly send the receive buffer's content to the queue_client after
+ * receiving the payload, as this will also work when the local cache is borked (we just
+ * tunnel though the traffic). One could argue that this mode of operation is nonsense,
+ * and we should just drop all affected clients. Then as a next step, don't serve the
+ * clients form the receive buffer, but just issue a normal sendfile() call after writing
+ * the received data to the local cache.
+ */
+ if ( callback != NULL && _minRequestSize != 0 ) {
+ // Not background replication request, extend request size
+ extendRequest( req.start, &req.end, uplink->image, _minRequestSize );
+ }
+ req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1);
+ // Critical section - work with the queue
mutex_lock( &uplink->queueLock );
if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
goto fail_lock;
}
- for (i = 0; i < uplink->queueLen; ++i) {
- // find free slot to place this request into
- if ( uplink->queue[i].status == ULR_FREE ) {
- if ( freeSlot == -1 || existingType != ULR_PROCESSING ) {
- freeSlot = i;
- }
- continue;
- }
- if ( firstUsedSlot == -1 ) {
- firstUsedSlot = i;
- }
- // find existing request to attach to
- if ( uplink->queue[i].from > start || uplink->queue[i].to < end )
- continue; // Range not suitable
- // Detect potential proxy cycle. New request hopcount is greater, range is same, old request has already been sent -> suspicious
- if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end && uplink->queue[i].status == ULR_PENDING ) {
- requestLoop = true;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->from <= start && it->to >= end ) {
+ // Matching range, attach
+ request = it;
break;
}
- if ( foundExisting == -1 || existingType == ULR_PROCESSING ) {
- foundExisting = i;
- existingType = uplink->queue[i].status;
+ if ( it->next == NULL ) {
+ // Not matching, last in list, remember
+ last = it;
+ break;
}
}
- if ( unlikely( requestLoop ) ) {
- uplink->cycleDetected = true;
- signal_call( uplink->signal );
- logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- goto fail_lock;
- }
- if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
- freeSlot = -1; // Not attaching to existing request, make it use a higher slot
- }
- if ( freeSlot == -1 ) {
- if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
- logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
+ dnbd3_queue_client_t **c = NULL;
+ if ( request == NULL ) {
+ // No existing request to attach to
+ if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
+ logadd( LOG_WARNING,
+ "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
+ goto fail_lock;
+ }
+ uplink->queueLen++;
+ if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = true;
+ }
+ request = malloc( sizeof(*request) );
+ if ( last == NULL ) {
+ uplink->queue = request;
+ } else {
+ last->next = request;
+ }
+ request->next = NULL;
+ request->handle = ++uplink->queueId;
+ request->from = req.start;
+ request->to = req.end;
+#ifdef DEBUG
+ timing_get( &request->entered );
+#endif
+ request->hopCount = hops;
+ request->sent = true; // Optimistic; would be set to false on failure
+ if ( callback == NULL ) {
+ // BGR
+ request->clients = NULL;
+ } else {
+ c = &request->clients;
+ }
+ isNew = true;
+ } else if ( callback == NULL ) {
+ // Replication request that maches existing request. Do nothing
+ isNew = false;
+ } else {
+ // Existing request. Check if potential cycle
+ if ( hops > request->hopCount && request->from == start && request->to == end ) {
+ logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
goto fail_lock;
}
- freeSlot = uplink->queueLen++;
+ // Count number if clients, get tail of list
+ int count = 0;
+ c = &request->clients;
+ while ( *c != NULL ) {
+ c = &(**c).next;
+ if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
+ logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
+ goto fail_lock;
+ }
+ }
+ isNew = false;
}
- // Do not send request to uplink server if we have a matching pending request AND the request either has the
- // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise
- // explicitly send this request to the uplink server. The second condition mentioned here is to prevent
- // a race condition where the reply for the outstanding request already arrived and the uplink thread
- // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might
- // already have passed the index of the free slot we determined, but not reached the existing request we just found above.
- if ( foundExisting != -1 && existingType == ULR_PROCESSING && freeSlot > foundExisting ) {
- foundExisting = -1; // -1 means "send request"
+ // Prefetch immediately, without unlocking the list - the old approach of
+ // async prefetching in another thread was sometimes so slow that we'd process
+ // another request from the same client before the prefetch job would execute.
+ if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data )
+ && !( hops & (HOP_FLAG_BGR | HOP_FLAG_PREFETCH) ) // No cascading of prefetches
+ && end == request->to && length <= _maxPrefetch ) {
+ // Only if this is a client request, and the !! end boundary matches exactly !!
+ // (See above for reason why)
+ // - We neither check the local cache, nor other pending requests. Worth it?
+ // Complexity vs. probability
+ preReq.start = end;
+ preReq.end = end;
+ extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) );
+ if ( preReq.start < preReq.end ) {
+ //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end );
+ uplink->queueLen++;
+ pre = malloc( sizeof(*pre) );
+ pre->next = request->next;
+ request->next = pre;
+ pre->handle = preReq.handle = ++uplink->queueId;
+ pre->from = preReq.start;
+ pre->to = preReq.end;
+ pre->hopCount = hops | HOP_FLAG_PREFETCH;
+ pre->sent = true; // Optimistic; would be set to false on failure
+ pre->clients = NULL;
+#ifdef DEBUG
+ timing_get( &pre->entered );
+#endif
+ }
}
-#ifdef _DEBUG
- if ( foundExisting != -1 ) {
- logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, NAMES_ULR[existingType], foundExisting, freeSlot );
- logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n"
- "New %" PRIu64 "-%" PRIu64 " (%p)\n",
- uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client,
- start, end, (void*)client );
+ // // // //
+ // Copy data - need this after unlocking
+ req.handle = request->handle;
+ if ( callback != NULL ) {
+ assert( c != NULL );
+ *c = malloc( sizeof( *request->clients ) );
+ (**c).next = NULL;
+ (**c).handle = handle;
+ (**c).from = start;
+ (**c).to = end;
+ (**c).data = data;
+ (**c).callback = callback;
}
-#endif
- // Fill structure
- uplink->queue[freeSlot].from = start;
- uplink->queue[freeSlot].to = end;
- uplink->queue[freeSlot].handle = handle;
- uplink->queue[freeSlot].client = client;
- //int old = uplink->queue[freeSlot].status;
- uplink->queue[freeSlot].status = ( foundExisting == -1 ? ULR_NEW :
- ( existingType == ULR_NEW ? ULR_PENDING : existingType ) );
- uplink->queue[freeSlot].hopCount = hops;
-#ifdef _DEBUG
- timing_get( &uplink->queue[freeSlot].entered );
- //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end );
-#endif
mutex_unlock( &uplink->queueLock );
+ // End queue critical section
+ if ( pre == NULL && !isNew )
+ return true; // Nothing to do
- if ( foundExisting != -1 ) {
- ref_put( &uplink->reference );
- return true; // Attached to pending request, do nothing
+ // Fire away the request(s)
+ mutex_lock( &uplink->sendMutex );
+ bool ret1 = true;
+ bool ret2 = true;
+ if ( isNew ) {
+ ret1 = requestBlock( uplink, &req, hops );
}
-
- // See if we can fire away the request
- if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) {
- logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
- } else {
- if ( unlikely( uplink->current.fd == -1 ) ) {
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
- } else {
- const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
- if ( hops < 200 ) ++hops;
- const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
- if ( unlikely( !ret ) ) {
- logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
- } else {
- // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
- int state;
- mutex_lock( &uplink->queueLock );
- if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
- state = uplink->queue[freeSlot].status;
- if ( uplink->queue[freeSlot].status == ULR_NEW ) {
- uplink->queue[freeSlot].status = ULR_PENDING;
- }
- } else {
- state = -1;
- }
- mutex_unlock( &uplink->queueLock );
- if ( state == -1 ) {
- logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" );
- } else if ( state == ULR_NEW ) {
- //logadd( LOG_DEBUG2, "Direct uplink request" );
- } else {
- logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
- }
- ref_put( &uplink->reference );
- return true;
- }
- // Fall through to waking up sender thread
- }
+ if ( pre != NULL ) {
+ ret2 = requestBlock( uplink, &preReq, hops | HOP_FLAG_PREFETCH );
+ }
+ if ( !ret1 || !ret2 ) { // Set with send locked
+ uplink->image->problem.uplink = true;
+ }
+ mutex_unlock( &uplink->sendMutex );
+ // markRequestUnsend locks the queue, would violate locking order with send mutex
+ if ( !ret1 ) {
+ markRequestUnsent( uplink, req.handle );
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
+ }
+ if ( !ret2 ) {
+ markRequestUnsent( uplink, preReq.handle );
}
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
- ref_put( &uplink->reference );
return true;
+
fail_lock:
mutex_unlock( &uplink->queueLock );
-fail_ref:
- ref_put( &uplink->reference );
return false;
}
@@ -431,11 +549,10 @@ static void* uplink_mainloop(void *data)
#define EV_COUNT (2)
struct pollfd events[EV_COUNT];
dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data;
- int numSocks, i, waitTime;
+ int numSocks, waitTime;
int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
int rttTestResult;
uint32_t discoverFailCount = 0;
- uint32_t unsavedSeconds = 0;
ticks nextAltCheck, lastKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
@@ -447,7 +564,7 @@ static void* uplink_mainloop(void *data)
thread_detach( uplink->thread );
blockNoncriticalSignals();
// Make sure file is open for writing
- if ( !uplink_reopenCacheFd( uplink, false ) ) {
+ if ( !reopenCacheFd( uplink, false ) ) {
// It might have failed - still offer proxy mode, we just can't cache
logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno );
}
@@ -460,14 +577,14 @@ static void* uplink_mainloop(void *data)
}
while ( !_shutdown && !uplink->shutdown ) {
// poll()
- waitTime = uplink->rttTestResult == RTT_DOCHANGE ? 0 : -1;
- if ( waitTime == 0 ) {
+ if ( uplink->rttTestResult == RTT_DOCHANGE ) {
// 0 means poll, since we're about to change the server
+ waitTime = 0;
} else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
if ( waitTime < 100 ) waitTime = 100;
- if ( waitTime > 10000 ) waitTime = 10000;
+ else if ( waitTime > 10000 ) waitTime = 10000;
}
events[EV_SOCKET].fd = uplink->current.fd;
numSocks = poll( events, EV_COUNT, waitTime );
@@ -494,8 +611,7 @@ static void* uplink_mainloop(void *data)
mutex_unlock( &uplink->rttLock );
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
- uplink->replicationHandle = REP_NONE;
- uplink->image->working = true;
+ uplink->image->problem.uplink = false;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -504,12 +620,17 @@ static void* uplink_mainloop(void *data)
}
// If we don't have a crc32 list yet, see if the new server has one
if ( uplink->image->crc32 == NULL ) {
- uplink_addCrc32( uplink );
+ requestCrc32List( uplink );
}
// Re-send all pending requests
- uplink_sendRequests( uplink, false );
- uplink_sendReplicationRequest( uplink );
+ sendQueuedRequests( uplink, false );
+ sendReplicationRequest( uplink );
events[EV_SOCKET].events = POLLIN | POLLRDHUP;
+ if ( uplink->image->problem.uplink ) {
+ // Some of the requests above must have failed again already :-(
+ logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
+ connectionFailed( uplink, true );
+ }
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
@@ -517,6 +638,7 @@ static void* uplink_mainloop(void *data)
// Check events
// Signal
if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ uplink->image->problem.uplink = true;
logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
goto cleanup;
} else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
@@ -526,46 +648,37 @@ static void* uplink_mainloop(void *data)
}
if ( uplink->current.fd != -1 ) {
// Uplink seems fine, relay requests to it...
- uplink_sendRequests( uplink, true );
+ sendQueuedRequests( uplink, true );
} else if ( uplink->queueLen != 0 ) { // No uplink; maybe it was shutdown since it was idle for too long
uplink->idleTime = 0;
}
}
// Uplink socket
if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
- uplink_connectionFailed( uplink, true );
+ connectionFailed( uplink, true );
logadd( LOG_DEBUG1, "Uplink gone away, panic! (revents=%d)\n", (int)events[EV_SOCKET].revents );
setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
- uplink_handleReceive( uplink );
+ handleReceive( uplink );
if ( _shutdown || uplink->shutdown ) goto cleanup;
}
declare_now;
uint32_t timepassed = timing_diff( &lastKeepalive, &now );
- if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL
+ || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) {
lastKeepalive = now;
uplink->idleTime += timepassed;
- unsavedSeconds += timepassed;
- if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && uplink->idleTime >= 20 && uplink->idleTime <= 70 ) ) {
- // fsync/save every 4 minutes, or every 60 seconds if uplink is idle
- unsavedSeconds = 0;
- uplink_saveCacheMap( uplink );
- }
// Keep-alive
- if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) {
- // Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( uplink->current.fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( uplink );
- } else {
- uplink_connectionFailed( uplink, true );
- logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
- setThreadName( "panic-uplink" );
+ if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) {
+ // Send keep-alive if nothing is happening, and try to trigger background rep.
+ if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) {
+ connectionFailed( uplink, true );
+ logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
}
}
// Don't keep uplink established if we're idle for too much
- if ( uplink_connectionShouldShutdown( uplink ) ) {
- logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", uplink->image->name, (int)uplink->image->rid );
+ if ( connectionShouldShutdown( uplink ) ) {
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) );
goto cleanup;
}
}
@@ -578,6 +691,7 @@ static void* uplink_mainloop(void *data)
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name );
setThreadName( "finished-uplink" );
+ uplink->image->problem.uplink = false;
goto cleanup;
} else {
// Not complete - do measurement
@@ -592,46 +706,44 @@ static void* uplink_mainloop(void *data)
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) {
discoverFailCount++;
- if ( uplink->image->working && uplink->current.fd == -1 && discoverFailCount > (SERVER_RTT_MAX_UNREACH / 2) ) {
- logadd( LOG_DEBUG1, "Disabling %s:%d since no uplink is available", uplink->image->name, (int)uplink->image->rid );
- uplink->image->working = false;
- }
if ( uplink->current.fd == -1 ) {
uplink->cycleDetected = false;
}
}
timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH) ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED );
}
-#ifdef _DEBUG
+#ifdef DEBUG
if ( uplink->current.fd != -1 && !uplink->shutdown ) {
bool resend = false;
ticks deadline;
timing_set( &deadline, &now, -10 );
mutex_lock( &uplink->queueLock );
- for (i = 0; i < uplink->queueLen; ++i) {
- if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) {
- snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
- "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name,
- uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status );
- uplink->queue[i].entered = now;
-#ifdef _DEBUG_RESEND_STARVING
- uplink->queue[i].status = ULR_NEW;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( timing_reached( &it->entered, &deadline ) ) {
+ logadd( LOG_WARNING, "Starving request detected:"
+ " (from %" PRIu64 " to %" PRIu64 ", sent: %d) %s:%d",
+ it->from, it->to, (int)it->sent, PIMG(uplink->image) );
+ it->entered = now;
+#ifdef DEBUG_RESEND_STARVING
+ it->sent = false;
resend = true;
#endif
- mutex_unlock( &uplink->queueLock );
- logadd( LOG_WARNING, "%s", buffer );
- mutex_lock( &uplink->queueLock );
}
}
mutex_unlock( &uplink->queueLock );
- if ( resend )
- uplink_sendRequests( uplink, true );
+ if ( resend ) {
+ sendQueuedRequests( uplink, true );
+ }
}
#endif
}
- cleanup: ;
- uplink_saveCacheMap( uplink );
+cleanup: ;
dnbd3_image_t *image = uplink->image;
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache != NULL ) {
+ cache->dirty = true; // Force writeout of cache map
+ ref_put( &cache->reference );
+ }
mutex_lock( &image->lock );
bool exp = false;
if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
@@ -653,37 +765,60 @@ static void* uplink_mainloop(void *data)
return NULL ;
}
-static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
+/**
+ * Only called from uplink thread.
+ */
+static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
- // Scan for new requests
- int j;
+ assert_uplink_thread();
+ // Scan for new requests, or optionally, (re)send all
+ // Build a buffer, so if there aren't too many requests, we can send them after
+ // unlocking the queue again. Otherwise we need flushes during iteration, which
+ // is no ideal, but in that case the uplink is probably overwhelmed anyways.
+ // Try 125 as that's exactly 300bytes, usually 2*MTU.
+#define MAX_RESEND_BATCH 125
+ dnbd3_request_t reqs[MAX_RESEND_BATCH];
+ int count = 0;
mutex_lock( &uplink->queueLock );
- for (j = 0; j < uplink->queueLen; ++j) {
- if ( uplink->queue[j].status != ULR_NEW && (newOnly || uplink->queue[j].status != ULR_PENDING) ) continue;
- uplink->queue[j].status = ULR_PENDING;
- uint8_t hops = uplink->queue[j].hopCount;
- const uint64_t reqStart = uplink->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
- /*
- logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
- (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize );
- */
- mutex_unlock( &uplink->queueLock );
- if ( hops < 200 ) ++hops;
- mutex_lock( &uplink->sendMutex );
- const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !ret ) {
- // Non-critical - if the connection dropped or the server was changed
- // the thread will re-send this request as soon as the connection
- // is reestablished.
- logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( uplink->current.index );
- return;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( newOnly && it->sent )
+ continue;
+ it->sent = true;
+ dnbd3_request_t *hdr = &reqs[count++];
+ hdr->magic = dnbd3_packet_magic;
+ hdr->cmd = CMD_GET_BLOCK;
+ hdr->size = (uint32_t)( it->to - it->from );
+ hdr->offset = it->from; // Offset first, then hops! (union)
+ hdr->hops = COND_HOPCOUNT( uplink->current.version, it->hopCount );
+ hdr->handle = it->handle;
+ fixup_request( *hdr );
+ if ( count == MAX_RESEND_BATCH ) {
+ bool ok = false;
+ logadd( LOG_DEBUG2, "BLOCKING resend of %d", count );
+ count = 0;
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ ok = ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH, 3 )
+ == DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH );
+ }
+ mutex_unlock( &uplink->sendMutex );
+ if ( !ok ) {
+ uplink->image->problem.uplink = true;
+ break;
+ }
}
- mutex_lock( &uplink->queueLock );
}
mutex_unlock( &uplink->queueLock );
+ if ( count != 0 ) {
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ uplink->image->problem.uplink =
+ ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * count, 3 )
+ != DNBD3_REQUEST_SIZE * count );
+ }
+ mutex_unlock( &uplink->sendMutex );
+ }
+#undef MAX_RESEND_BATCH
}
/**
@@ -695,73 +830,97 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
* server. This means we might request data we already have, but it makes
* the code simpler. Worst case would be only one bit is zero, which means
* 4kb are missing, but we will request 32kb.
+ *
+ * Only called form uplink thread, so current.fd is assumed to be valid.
+ *
+ * @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
*/
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
+static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( uplink == NULL || uplink->current.fd == -1 ) return;
- if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication
- if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
- return; // Already a replication request on the wire, or no more blocks to replicate
+ assert_uplink_thread();
+ if ( uplink->current.fd == -1 )
+ return false; // Should never be called in this state, consider send error
+ if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
+ return true; // Don't do background replication
+ if ( uplink->nextReplicationIndex == -1 )
+ return true; // No more blocks to replicate
dnbd3_image_t * const image = uplink->image;
- if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
- if ( image->users < _bgrMinClients ) return; // Not enough active users
+ if ( image->users < _bgrMinClients )
+ return true; // Not enough active users
+ const int numNewRequests = numWantedReplicationRequests( uplink );
+ if ( numNewRequests <= 0 )
+ return true; // Already sufficient amount of requests on the wire
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
- if ( cache == NULL || image->users < _bgrMinClients ) {
+ if ( cache == NULL ) {
// No cache map (=image complete)
- ref_put( &cache->reference );
- return;
+ return true;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
const int lastBlockIndex = mapBytes - 1;
- int endByte;
- if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
- endByte = uplink->nextReplicationIndex + mapBytes;
- } else { // Hashblock based: Only look for match in current hash block
- endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
- if ( endByte > mapBytes ) {
- endByte = mapBytes;
+ for ( int bc = 0; bc < numNewRequests; ++bc ) {
+ int endByte;
+ if ( UPLINK_MAX_QUEUE - uplink->queueLen < 10 )
+ break; // Don't overload queue
+ if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
+ endByte = uplink->nextReplicationIndex + mapBytes;
+ } else { // Hashblock based: Only look for match in current hash block
+ endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
+ if ( endByte > mapBytes ) {
+ endByte = mapBytes;
+ }
}
- }
- atomic_thread_fence( memory_order_acquire );
- int replicationIndex = -1;
- for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) {
- const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
- if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff
- && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) {
- // Found incomplete one
- replicationIndex = i;
+ atomic_thread_fence( memory_order_acquire );
+ int replicationIndex = -1;
+ for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) {
+ const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
+ if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff
+ && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) {
+ // Found incomplete one
+ replicationIndex = i;
+ break;
+ }
+ }
+ if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
+ // Nothing left in current block, find next one
+ replicationIndex = findNextIncompleteHashBlock( uplink, endByte );
+ }
+ if ( replicationIndex == -1 ) {
+ // Replication might be complete, uplink_mainloop should take care....
+ uplink->nextReplicationIndex = -1;
break;
}
+ const uint64_t handle = ++uplink->queueId;
+ const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
+ uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
+ // Extend the default 32k request size if _minRequestSize is > 32k
+ for ( size_t extra = 1; extra < ( _minRequestSize / FILE_BYTES_PER_MAP_BYTE )
+ && offset + size < image->virtualFilesize
+ && _backgroundReplication == BGR_FULL; ++extra ) {
+ if ( atomic_load_explicit( &cache->map[replicationIndex+1], memory_order_relaxed ) == 0xff )
+ break; // Hit complete 32k block, stop here
+ replicationIndex++;
+ size += (uint32_t)MIN( image->virtualFilesize - offset - size, FILE_BYTES_PER_MAP_BYTE );
+ }
+ if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) {
+ logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)",
+ PIMG(uplink->image) );
+ ref_put( &cache->reference );
+ return false;
+ }
+ if ( replicationIndex == lastBlockIndex ) {
+ uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
+ }
+ uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
+ if ( _backgroundReplication == BGR_HASHBLOCK
+ && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
+ // Just crossed a hash block boundary, look for new candidate starting at this very index
+ uplink->nextReplicationIndex = findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
+ if ( uplink->nextReplicationIndex == -1 )
+ break;
+ }
}
ref_put( &cache->reference );
- if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
- // Nothing left in current block, find next one
- replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte );
- }
- if ( replicationIndex == -1 ) {
- // Replication might be complete, uplink_mainloop should take care....
- uplink->nextReplicationIndex = -1;
- return;
- }
- const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
- uplink->replicationHandle = offset;
- const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
- mutex_lock( &uplink->sendMutex );
- bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !sendOk ) {
- logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
- return;
- }
- if ( replicationIndex == lastBlockIndex ) {
- uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
- }
- uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
- if ( _backgroundReplication == BGR_HASHBLOCK
- && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
- // Just crossed a hash block boundary, look for new candidate starting at this very index
- uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
- }
+ return true;
}
/**
@@ -769,7 +928,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
* of a hash block which is neither completely empty nor completely
* replicated yet. Returns -1 if no match.
*/
-static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex)
+static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex)
{
int retval = -1;
dnbd3_cache_map_t *cache = ref_get_cachemap( uplink->image );
@@ -816,29 +975,32 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
/**
* Receive data from uplink server and process/dispatch
* Locks on: uplink.lock, images[].lock
+ * Only called from uplink thread, so current.fd is assumed to be valid.
*/
-static void uplink_handleReceive(dnbd3_uplink_t *uplink)
+static void handleReceive(dnbd3_uplink_t *uplink)
{
- dnbd3_reply_t inReply, outReply;
- int ret, i;
+ dnbd3_reply_t inReply;
+ int ret;
+ assert_uplink_thread();
+ assert( uplink->queueLen >= 0 );
for (;;) {
ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
if ( unlikely( ret == REPLY_CLOSED ) ) {
- logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", uplink->image->path );
+ logadd( LOG_INFO, "Uplink: Remote host hung up (%s:%d)", PIMG(uplink->image) );
goto error_cleanup;
}
if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
- logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", uplink->image->path );
+ logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s:%d)", PIMG(uplink->image) );
goto error_cleanup;
}
if ( unlikely( ret != REPLY_OK ) ) {
- logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, uplink->image->path );
+ logadd( LOG_INFO, "Uplink: Connection error %d (%s:%d)", ret, PIMG(uplink->image) );
goto error_cleanup;
}
if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
- logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, uplink->image->path );
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s:%d", inReply.size, PIMG(uplink->image) );
goto error_cleanup;
}
@@ -851,21 +1013,41 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
}
}
if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
- logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path );
+ logadd( LOG_INFO, "Lost connection to uplink server of %s:%d (payload)", PIMG(uplink->image) );
goto error_cleanup;
}
// Payload read completely
// Bail out if we're not interested
- if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue;
+ if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) )
+ continue;
// Is a legit block reply
- struct iovec iov[2];
- const uint64_t start = inReply.handle;
- const uint64_t end = inReply.handle + inReply.size;
totalBytesReceived += inReply.size;
uplink->bytesReceived += inReply.size;
+ // Get entry from queue
+ dnbd3_queue_entry_t *entry;
+ mutex_lock( &uplink->queueLock );
+ for ( entry = uplink->queue; entry != NULL; entry = entry->next ) {
+ if ( entry->handle == inReply.handle )
+ break;
+ }
+ if ( entry == NULL ) {
+ mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)",
+ inReply.handle, PIMG(uplink->image) );
+ continue;
+ }
+ const uint64_t start = entry->from;
+ const uint64_t end = entry->to;
+ mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ // We don't remove the entry from the list here yet, to slightly increase the chance of other
+ // clients attaching to this request while we write the data to disk
+ if ( end - start != inReply.size ) {
+ logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)",
+ inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) );
+ }
// 1) Write to cache file
if ( unlikely( uplink->cacheFd == -1 ) ) {
- uplink_reopenCacheFd( uplink, false );
+ reopenCacheFd( uplink, false );
}
if ( likely( uplink->cacheFd != -1 ) ) {
int err = 0;
@@ -884,16 +1066,19 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
continue; // Success, retry write
}
if ( err == EBADF || err == EINVAL || err == EIO ) {
- if ( !tryAgain || !uplink_reopenCacheFd( uplink, true ) )
+ uplink->image->problem.write = true;
+ if ( !tryAgain || !reopenCacheFd( uplink, true ) )
break;
tryAgain = false;
continue; // Write handle to image successfully re-opened, try again
}
- logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", uplink->image->name, (int)uplink->image->rid, err );
+ logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d",
+ PIMG(uplink->image), err );
break;
}
if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) {
- logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, uplink->image->name, (int)uplink->image->rid );
+ logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d",
+ ret, PIMG(uplink->image) );
break;
}
done += (uint32_t)ret;
@@ -903,114 +1088,79 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
}
if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
- uplink->image->name, (int)uplink->image->rid, err );
+ PIMG(uplink->image), err );
}
}
- // 2) Figure out which clients are interested in it
- // Mark as ULR_PROCESSING, since we unlock repeatedly in the second loop
- // below; this prevents uplink_request() from attaching to this request
- // by populating a slot with index greater than the highest matching
- // request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW
- // where it's fine if the index is greater)
+ bool found = false;
+ dnbd3_queue_entry_t **it;
mutex_lock( &uplink->queueLock );
- for (i = 0; i < uplink->queueLen; ++i) {
- dnbd3_queued_request_t * const req = &uplink->queue[i];
- assert( req->status != ULR_PROCESSING );
- if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue;
- assert( req->client != NULL );
- if ( req->from >= start && req->to <= end ) { // Match :-)
- req->status = ULR_PROCESSING;
+ for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) {
+ if ( *it == entry && entry->handle == inReply.handle ) { // ABA check
+ assert( found == false );
+ *it = (**it).next;
+ found = true;
+ uplink->queueLen--;
+ break;
}
}
- // 3) Send to interested clients - iterate backwards so request collaboration works, and
- // so we can decrease queueLen on the fly while iterating. Should you ever change this to start
- // from 0, you also need to change the "attach to existing request"-logic in uplink_request()
- outReply.magic = dnbd3_packet_magic;
- bool served = false;
- for ( i = uplink->queueLen - 1; i >= 0; --i ) {
- dnbd3_queued_request_t * const req = &uplink->queue[i];
- if ( req->status == ULR_PROCESSING ) {
- size_t bytesSent = 0;
- assert( req->from >= start && req->to <= end );
- dnbd3_client_t * const client = req->client;
- outReply.cmd = CMD_GET_BLOCK;
- outReply.handle = req->handle;
- outReply.size = (uint32_t)( req->to - req->from );
- iov[0].iov_base = &outReply;
- iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = uplink->recvBuffer + (req->from - start);
- iov[1].iov_len = outReply.size;
- fixup_reply( outReply );
- req->status = ULR_FREE;
- req->client = NULL;
- served = true;
- mutex_lock( &client->sendMutex );
- mutex_unlock( &uplink->queueLock );
- if ( client->sock != -1 ) {
- ssize_t sent = writev( client->sock, iov, 2 );
- if ( sent > (ssize_t)sizeof outReply ) {
- bytesSent = (size_t)sent - sizeof outReply;
- }
- }
- if ( bytesSent != 0 ) {
- client->bytesSent += bytesSent;
- }
- mutex_unlock( &client->sendMutex );
- mutex_lock( &uplink->queueLock );
- if ( i > uplink->queueLen ) {
- i = uplink->queueLen; // Might have been set to 0 by cancelAllRequests
- }
- }
- if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--;
+ if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = false;
}
mutex_unlock( &uplink->queueLock );
-#ifdef _DEBUG
- if ( !served && start != uplink->replicationHandle ) {
- logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end );
+ if ( !found ) {
+ logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)",
+ PIMG(uplink->image) );
+ continue;
}
-#endif
- if ( start == uplink->replicationHandle ) {
- // Was our background replication
- uplink->replicationHandle = REP_NONE;
- // Try to remove from fs cache if no client was interested in this data
- if ( !served && uplink->cacheFd != -1 ) {
- posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
- }
+ dnbd3_queue_client_t *next;
+ for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
+ assert( c->from >= start && c->to <= end );
+ (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ),
+ (const char*)( uplink->recvBuffer + (c->from - start) ) );
+ next = c->next;
+ free( c );
}
- if ( served ) {
+ if ( entry->clients != NULL ) {
// Was some client -- reset idle counter
uplink->idleTime = 0;
// Re-enable replication if disabled
if ( uplink->nextReplicationIndex == -1 ) {
uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
}
+ } else {
+ if ( uplink->cacheFd != -1 ) {
+ // Try to remove from fs cache if no client was interested in this data
+ posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
+ }
}
+ free( entry );
+ } // main receive loop
+ // Trigger background replication if applicable
+ if ( !sendReplicationRequest( uplink ) ) {
+ goto error_cleanup;
}
- if ( uplink->replicationHandle == REP_NONE ) {
- mutex_lock( &uplink->queueLock );
- const bool rep = ( uplink->queueLen == 0 );
- mutex_unlock( &uplink->queueLock );
- if ( rep ) uplink_sendReplicationRequest( uplink );
- }
+ // Normal end
return;
// Error handling from failed receive or message parsing
- error_cleanup: ;
- uplink_connectionFailed( uplink, true );
+error_cleanup: ;
+ connectionFailed( uplink, true );
}
/**
* Only call from uplink thread
*/
-static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
+static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
+ assert_uplink_thread();
if ( uplink->current.fd == -1 )
return;
+ setThreadName( "panic-uplink" );
altservers_serverFailed( uplink->current.index );
mutex_lock( &uplink->sendMutex );
+ uplink->image->problem.uplink = true;
close( uplink->current.fd );
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
- uplink->replicationHandle = REP_NONE;
if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
uplink->nextReplicationIndex = 0;
}
@@ -1025,15 +1175,26 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
}
/**
- * Send keep alive request to server
+ * Send keep alive request to server.
+ * Called from uplink thread, current.fd must be valid.
*/
-static int uplink_sendKeepalive(const int fd)
+static bool sendKeepalive(dnbd3_uplink_t *uplink)
{
static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
- return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ assert_uplink_thread();
+ mutex_lock( &uplink->sendMutex );
+ bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_unlock( &uplink->sendMutex );
+ return sendOk;
}
-static void uplink_addCrc32(dnbd3_uplink_t *uplink)
+/**
+ * Request crclist from uplink.
+ * Called from uplink thread, current.fd must be valid.
+ * FIXME This is broken as it could happen that another message arrives after sending
+ * the request. Refactor, split and move receive into general receive handler.
+ */
+static void requestCrc32List(dnbd3_uplink_t *uplink)
{
dnbd3_image_t *image = uplink->image;
if ( image == NULL || image->virtualFilesize == 0 ) return;
@@ -1042,6 +1203,9 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
uint32_t *buffer = malloc( bytes );
mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
+ if ( !sendOk ) {
+ uplink->image->problem.uplink = true;
+ }
mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );
@@ -1051,7 +1215,7 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
lists_crc = crc32( lists_crc, (uint8_t*)buffer, bytes );
lists_crc = net_order_32( lists_crc );
if ( lists_crc != masterCrc ) {
- logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s)!", uplink->image->name );
+ logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s:%d)!", PIMG(uplink->image) );
free( buffer );
return;
}
@@ -1061,10 +1225,14 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
char path[len];
snprintf( path, len, "%s.crc", uplink->image->path );
const int fd = open( path, O_WRONLY | O_CREAT, 0644 );
- if ( fd >= 0 ) {
- write( fd, &masterCrc, sizeof(uint32_t) );
- write( fd, buffer, bytes );
+ if ( fd != -1 ) {
+ ssize_t ret = write( fd, &masterCrc, sizeof(masterCrc) );
+ ret += write( fd, buffer, bytes );
close( fd );
+ if ( (size_t)ret != sizeof(masterCrc) + bytes ) {
+ unlink( path );
+ logadd( LOG_WARNING, "Could not write crc32 file for %s:%d", PIMG(uplink->image) );
+ }
}
}
@@ -1076,80 +1244,24 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
* it will be closed first. Otherwise, nothing will happen and true will be returned
* immediately.
*/
-static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
+static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
{
if ( uplink->cacheFd != -1 ) {
if ( !force ) return true;
close( uplink->cacheFd );
}
uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 );
+ uplink->image->problem.write = uplink->cacheFd == -1;
return uplink->cacheFd != -1;
}
/**
- * Saves the cache map of the given image.
- * Return true on success.
- * Locks on: imageListLock, image.lock
+ * Returns true if the uplink has been idle for some time (apart from
+ * background replication, if it is set to hashblock, or if it has
+ * a minimum number of active clients configured that is not currently
+ * reached)
*/
-static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink)
-{
- dnbd3_image_t *image = uplink->image;
- assert( image != NULL );
-
- if ( uplink->cacheFd != -1 ) {
- if ( fsync( uplink->cacheFd ) == -1 ) {
- // A failing fsync means we have no guarantee that any data
- // since the last fsync (or open if none) has been saved. Apart
- // from keeping the cache map from the last successful fsync
- // around and restoring it there isn't much we can do to recover
- // a consistent state. Bail out.
- logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno );
- logadd( LOG_ERROR, "Bailing out immediately" );
- exit( 1 );
- }
- }
-
- dnbd3_cache_map_t *cache = ref_get_cachemap( image );
- if ( cache == NULL )
- return true;
- logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid );
- const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
- assert( image->path != NULL );
- char mapfile[strlen( image->path ) + 4 + 1];
- strcpy( mapfile, image->path );
- strcat( mapfile, ".map" );
-
- int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
- if ( fd == -1 ) {
- const int err = errno;
- ref_put( &cache->reference );
- logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
- return false;
- }
-
- size_t done = 0;
- while ( done < size ) {
- const ssize_t ret = write( fd, cache->map + done, size - done );
- if ( ret == -1 ) {
- if ( errno == EINTR ) continue;
- logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
- break;
- }
- if ( ret <= 0 ) {
- logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
- break;
- }
- done += (size_t)ret;
- }
- ref_put( &cache->reference );
- if ( fsync( fd ) == -1 ) {
- logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
- }
- close( fd );
- return true;
-}
-
-static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink)
+static bool connectionShouldShutdown(dnbd3_uplink_t *uplink)
{
return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT
&& ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );
@@ -1165,3 +1277,44 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
return false;
return altservers_toString( current, buffer, len );
}
+
+/**
+ * Get number of replication requests that should be sent right now to
+ * meet the configured bgrWindowSize. Returns 0 if any client requests
+ * are pending.
+ * This applies a sort of "slow start" in case the uplink was recently
+ * dealing with actual client requests, in that the uplink's idle time
+ * (in seconds) is an upper bound for the number returned, so we don't
+ * saturate the uplink with loads of requests right away, in case that
+ * client triggers more requests to the uplink server.
+ */
+static int numWantedReplicationRequests(dnbd3_uplink_t *uplink)
+{
+ int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 );
+ if ( uplink->queueLen == 0 )
+ return ret;
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->clients == NULL ) {
+ ret--;
+ } else {
+ ret = 0; // Do not allow BGR if client requests are being handled
+ break;
+ }
+ }
+ mutex_unlock( &uplink->queueLock );
+ return ret;
+}
+
+static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle)
+{
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->handle == handle ) {
+ it->sent = false;
+ break;
+ }
+ }
+ mutex_unlock( &uplink->queueLock );
+}
+
diff --git a/src/server/uplink.h b/src/server/uplink.h
index 49ff0b4..b6037d6 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -2,7 +2,7 @@
#define _UPLINK_H_
#include "globals.h"
-#include "../types.h"
+#include <dnbd3/types.h>
void uplink_globalsInit();
@@ -10,9 +10,11 @@ uint64_t uplink_getTotalBytesReceived();
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version);
-void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client);
+void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback);
-bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount);
+bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
+
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length);
bool uplink_shutdown(dnbd3_image_t *image);