diff options
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/CMakeLists.txt | 112 | ||||
-rw-r--r-- | src/server/altservers.c | 79 | ||||
-rw-r--r-- | src/server/altservers.h | 2 | ||||
-rw-r--r-- | src/server/fileutil.c | 2 | ||||
-rw-r--r-- | src/server/fuse.c | 661 | ||||
-rw-r--r-- | src/server/fuse.h | 10 | ||||
-rw-r--r-- | src/server/globals.c | 72 | ||||
-rw-r--r-- | src/server/globals.h | 93 | ||||
-rw-r--r-- | src/server/helper.h | 4 | ||||
-rw-r--r-- | src/server/image.c | 685 | ||||
-rw-r--r-- | src/server/image.h | 48 | ||||
-rw-r--r-- | src/server/ini.c | 2 | ||||
-rw-r--r-- | src/server/integrity.c | 20 | ||||
-rw-r--r-- | src/server/locks.c | 4 | ||||
-rw-r--r-- | src/server/locks.h | 20 | ||||
-rw-r--r-- | src/server/net.c | 174 | ||||
-rw-r--r-- | src/server/net.h | 4 | ||||
-rw-r--r-- | src/server/picohttpparser/CMakeLists.txt | 11 | ||||
-rw-r--r-- | src/server/reference.h | 5 | ||||
-rw-r--r-- | src/server/rpc.c | 29 | ||||
-rw-r--r-- | src/server/serialize.c | 5 | ||||
-rw-r--r-- | src/server/server.c | 84 | ||||
-rw-r--r-- | src/server/server.h | 4 | ||||
-rw-r--r-- | src/server/threadpool.c | 19 | ||||
-rw-r--r-- | src/server/threadpool.h | 5 | ||||
-rw-r--r-- | src/server/uplink.c | 1127 | ||||
-rw-r--r-- | src/server/uplink.h | 8 |
27 files changed, 2378 insertions, 911 deletions
diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt new file mode 100644 index 0000000..9a1e1c4 --- /dev/null +++ b/src/server/CMakeLists.txt @@ -0,0 +1,112 @@ +cmake_minimum_required(VERSION 3.10) + +# set the project name +project(dnbd3-server + LANGUAGES C) + +# find Jansson package required by the dnbd3-server +find_package(Jansson) +if(NOT JANSSON_FOUND) + message(FATAL_ERROR "*** No jansson lib found, can't build dnbd3-server!") +endif(NOT JANSSON_FOUND) + +# find atomic library required by the dnbd3-server +find_package(Stdatomic REQUIRED) +find_package(Libatomic REQUIRED) + +# add compile option to enable enhanced POSIX features +add_definitions(-D_GNU_SOURCE) + +if(DNBD3_SERVER_AFL) + # check if DNBD3_RELEASE_HARDEN is disabled + if(DNBD3_RELEASE_HARDEN) + message(FATAL_ERROR "DNBD3_SERVER_AFL can only be enabled if DNBD3_RELEASE_HARDEN is disabled") + endif(DNBD3_RELEASE_HARDEN) + + # build dnbd3-server with AFL support + message(STATUS "Building dnbd3-server with AFL support") + add_definitions(-DDNBD3_SERVER_AFL) + + # change compiler for dnbd3-server sources if AFL enabled + include(CheckAFLCCompiler) + check_afl_c_compiler(AFL_C_COMPILER AFL_C_COMPILER_NAME ${CMAKE_C_COMPILER} ${CMAKE_C_COMPILER_ID}) + if(AFL_C_COMPILER) + message(STATUS "Check for working AFL C compiler: ${AFL_C_COMPILER} - done") + # change C compiler to a corresponding AFL C compiler + set(CMAKE_C_COMPILER "${AFL_C_COMPILER}") + else(AFL_C_COMPILER) + # no corresponding AFL C compiler found + message(STATUS "Check for working AFL C compiler: ${AFL_C_COMPILER_NAME} - failed") + message(FATAL_ERROR "No corresponding AFL C compiler ${AFL_C_COMPILER_NAME} was found for the C compiler ${CMAKE_C_COMPILER}!") + endif(AFL_C_COMPILER) +endif(DNBD3_SERVER_AFL) + +set(DNBD3_SERVER_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/altservers.c + ${CMAKE_CURRENT_SOURCE_DIR}/fileutil.c + ${CMAKE_CURRENT_SOURCE_DIR}/fuse.c + ${CMAKE_CURRENT_SOURCE_DIR}/globals.c + ${CMAKE_CURRENT_SOURCE_DIR}/helper.c + ${CMAKE_CURRENT_SOURCE_DIR}/image.c + ${CMAKE_CURRENT_SOURCE_DIR}/ini.c + ${CMAKE_CURRENT_SOURCE_DIR}/integrity.c + ${CMAKE_CURRENT_SOURCE_DIR}/locks.c + ${CMAKE_CURRENT_SOURCE_DIR}/net.c + ${CMAKE_CURRENT_SOURCE_DIR}/reference.c + ${CMAKE_CURRENT_SOURCE_DIR}/rpc.c + ${CMAKE_CURRENT_SOURCE_DIR}/server.c + ${CMAKE_CURRENT_SOURCE_DIR}/threadpool.c + ${CMAKE_CURRENT_SOURCE_DIR}/uplink.c + ${CMAKE_CURRENT_SOURCE_DIR}/urldecode.c) +set(DNBD3_SERVER_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/altservers.h + ${CMAKE_CURRENT_SOURCE_DIR}/fileutil.h + ${CMAKE_CURRENT_SOURCE_DIR}/fuse.h + ${CMAKE_CURRENT_SOURCE_DIR}/globals.h + ${CMAKE_CURRENT_SOURCE_DIR}/helper.h + ${CMAKE_CURRENT_SOURCE_DIR}/image.h + ${CMAKE_CURRENT_SOURCE_DIR}/ini.h + ${CMAKE_CURRENT_SOURCE_DIR}/integrity.h + ${CMAKE_CURRENT_SOURCE_DIR}/locks.h + ${CMAKE_CURRENT_SOURCE_DIR}/net.h + ${CMAKE_CURRENT_SOURCE_DIR}/reference.h + ${CMAKE_CURRENT_SOURCE_DIR}/reftypes.h + ${CMAKE_CURRENT_SOURCE_DIR}/rpc.h + ${CMAKE_CURRENT_SOURCE_DIR}/server.h + ${CMAKE_CURRENT_SOURCE_DIR}/threadpool.h + ${CMAKE_CURRENT_SOURCE_DIR}/uplink.h + ${CMAKE_CURRENT_SOURCE_DIR}/urldecode.h) + +add_executable(dnbd3-server ${DNBD3_SERVER_SOURCE_FILES}) +target_include_directories(dnbd3-server PRIVATE ${JANSSON_INCLUDE_DIR}) +target_link_libraries(dnbd3-server dnbd3-version dnbd3-build dnbd3-shared picohttpparser Libatomic::Libatomic ${CMAKE_THREAD_LIBS_INIT} ${JANSSON_LIBRARIES}) + +if(DNBD3_SERVER_FUSE) + find_package(Fuse REQUIRED) + # include Fuse headers and link with Fuse library + target_compile_options(dnbd3-server PRIVATE -DDNBD3_SERVER_FUSE) + target_include_directories(dnbd3-server PRIVATE ${FUSE_INCLUDE_DIRS}) + target_link_libraries(dnbd3-server ${FUSE_LIBRARIES}) +endif(DNBD3_SERVER_FUSE) + +if(UNIX AND NOT APPLE) + # link dnbd3-server with librt if server is compiled for a Unix system + target_link_libraries(dnbd3-server rt) +endif(UNIX AND NOT APPLE) + +if(DNBD3_SERVER_DEBUG_LOCKS) + # enable debugging of locks used in the dnbd3-server + target_compile_options(dnbd3-server PRIVATE -DDNBD3_SERVER_DEBUG_LOCKS) +endif(DNBD3_SERVER_DEBUG_LOCKS) + +if(DNBD3_SERVER_DEBUG_THREADS) + # enable debugging of threads used in the dnbd3-server + target_compile_options(dnbd3-server PRIVATE -DDNBD3_SERVER_DEBUG_THREADS) +endif(DNBD3_SERVER_DEBUG_THREADS) + +install(TARGETS dnbd3-server RUNTIME DESTINATION bin + COMPONENT server) + +add_linter(dnbd3-server-lint "${DNBD3_SERVER_SOURCE_FILES}" "${DNBD3_SERVER_HEADER_FILES}") +add_linter_fix(dnbd3-server-lint-fix "${DNBD3_SERVER_SOURCE_FILES}" "${DNBD3_SERVER_HEADER_FILES}") + +# add external dependency (HTTP parser) for the dnbd3-server +add_subdirectory(picohttpparser) diff --git a/src/server/altservers.c b/src/server/altservers.c index 943345c..4413ca6 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -5,16 +5,16 @@ #include "helper.h" #include "image.h" #include "fileutil.h" -#include "../shared/protocol.h" -#include "../shared/timing.h" -#include "../serverconfig.h" +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/config/server.h> #include "reference.h" #include <assert.h> #include <inttypes.h> #include <jansson.h> -#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, image->name, (int)image->rid) +#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, PIMG(image)) #define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0); #define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) @@ -172,7 +172,7 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) if ( uplink->rttTestResult != RTT_INPROGRESS ) { dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref ); if ( current == uplink ) { - threadpool_run( &altservers_runCheck, uplink ); + threadpool_run( &altservers_runCheck, uplink, "UPLINK" ); } else if ( current != NULL ) { ref_put( ¤t->reference ); } @@ -268,12 +268,32 @@ int altservers_getHostListForReplication(const char *image, dnbd3_host_t *server int idx[size]; int num = altservers_getListForUplink( NULL, image, idx, size, -1 ); for ( int i = 0; i < num; ++i ) { - servers[i] = altServers[i].host; + servers[i] = altServers[idx[i]].host; } return num; } /** + * Returns true if there is at least one alt-server the + * given image name would be allowed to be cloned from. + */ +bool altservers_imageHasAltServers(const char *image) +{ + bool ret = false; + mutex_lock( &altServersLock ); + for ( int i = 0; i < numAltServers; ++i ) { + if ( altServers[i].isClientOnly || ( !altServers[i].isPrivate && _proxyPrivateOnly ) ) + continue; + if ( !isImageAllowed( &altServers[i], image ) ) + continue; + ret = true; + break; + } + mutex_unlock( &altServersLock ); + return ret; +} + +/** * Get <size> alt servers. If there are more alt servers than * requested, random servers will be picked. * This function is suited for finding uplink servers as @@ -450,6 +470,11 @@ static void *altservers_runCheck(void *data) void altservers_findUplink(dnbd3_uplink_t *uplink) { altservers_findUplinkInternal( uplink ); + // Above function is sync, which means normally when it + // returns, rttTestResult will not be RTT_INPROGRESS. + // But we might have an ansync call running in parallel, which would + // mean the above call returns immediately. Wait for that check + // to finish too. while ( uplink->rttTestResult == RTT_INPROGRESS ) { usleep( 5000 ); } @@ -504,17 +529,29 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" ); return; } - LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid ); + logadd( LOG_DEBUG2, "Running alt check for %s:%d", PIMG(image) ); assert( uplink->rttTestResult == RTT_INPROGRESS ); // Test them all dnbd3_server_connection_t best = { .fd = -1 }; unsigned long bestRtt = RTT_UNREACHABLE; unsigned long currentRtt = RTT_UNREACHABLE; + uint64_t offset = 0; + uint32_t length = DNBD3_BLOCK_SIZE; + // Try to use the range of the first request in the queue as RTT block. + // In case we have a cluster of servers where none of them has a complete + // copy, we at least make sure the one we're potentially switching to + // has the next block we're about to request. + mutex_lock( &uplink->queueLock ); + if ( uplink->queue != NULL ) { + offset = uplink->queue->from; + length = (uint32_t)( uplink->queue->to - offset ); + } + mutex_unlock( &uplink->queueLock ); for (itAlt = 0; itAlt < numAlts; ++itAlt) { int server = servers[itAlt]; // Connect clock_gettime( BEST_CLOCK_SOURCE, &start ); - int sock = sock_connect( &altServers[server].host, 750, 1000 ); + int sock = sock_connect( &altServers[server].host, 750, _uplinkTimeout ); if ( sock == -1 ) { // Connection failed means global error altservers_serverFailed( server ); continue; @@ -524,7 +561,8 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) goto image_failed; } // See if selecting the image succeeded ++++++++++++++++++++++++++++++ - uint16_t protocolVersion, rid; + uint16_t protocolVersion = 0; + uint16_t rid; uint64_t imageSize; char *name; serialized_buffer_t serialized; @@ -543,9 +581,9 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) if ( imageSize != image->virtualFilesize ) { ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); } - // Request first block (NOT random!) ++++++++++++++++++++++++++++++ - if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { - LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server ); + // Request block (NOT random! First or from queue) ++++++++++++ + if ( !dnbd3_get_block( sock, offset, length, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request block", server ); } // See if requesting the block succeeded ++++++++++++++++++++++ dnbd3_reply_t reply; @@ -553,13 +591,18 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server ); } // check reply header - if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { + if ( reply.cmd != CMD_GET_BLOCK || reply.size != length ) { // Sanity check failed; count this as global error (malicious/broken server) ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); } // flush payload to include this into measurement char buffer[DNBD3_BLOCK_SIZE]; - if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { + uint32_t todo = length; + ssize_t ret; + while ( todo != 0 && ( ret = recv( sock, buffer, MIN( DNBD3_BLOCK_SIZE, todo ), MSG_WAITALL ) ) > 0 ) { + todo -= (uint32_t)ret; + } + if ( todo != 0 ) { ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server ); } clock_gettime( BEST_CLOCK_SOURCE, &end ); @@ -567,9 +610,6 @@ static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) mutex_lock( &uplink->rttLock ); const bool isCurrent = ( uplink->current.index == server ); mutex_unlock( &uplink->rttLock ); - // Penaltize rtt if this was a cycle; this will treat this server with lower priority - // in the near future too, so we prevent alternating between two servers that are both - // part of a cycle and have the lowest latency. uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000); // µs uint32_t avg = altservers_updateRtt( uplink, server, rtt ); @@ -614,7 +654,6 @@ failed: } else { LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); } - sock_setTimeout( best.fd, _uplinkTimeout ); mutex_lock( &uplink->rttLock ); uplink->better = best; uplink->rttTestResult = RTT_DOCHANGE; @@ -628,10 +667,6 @@ failed: if ( best.fd != -1 ) { close( best.fd ); } - if ( !image->working || uplink->cycleDetected ) { - image->working = true; - LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid ); - } uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away mutex_lock( &uplink->rttLock ); uplink->rttTestResult = RTT_DONTCHANGE; diff --git a/src/server/altservers.h b/src/server/altservers.h index 8e29aaa..78f6fcc 100644 --- a/src/server/altservers.h +++ b/src/server/altservers.h @@ -19,6 +19,8 @@ int altservers_getListForClient(dnbd3_client_t *client, dnbd3_server_entry_t *ou int altservers_getHostListForReplication(const char *image, dnbd3_host_t *servers, int size); +bool altservers_imageHasAltServers(const char *image); + bool altservers_toString(int server, char *buffer, size_t len); int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2); diff --git a/src/server/fileutil.c b/src/server/fileutil.c index 336ab68..9a9f066 100644 --- a/src/server/fileutil.c +++ b/src/server/fileutil.c @@ -68,7 +68,7 @@ bool file_setSize(int fd, uint64_t size) // Try really hard... image loading logic relies on the file // having the proper apparent size uint8_t byte = 0; - pread( fd, &byte, 1, size - 1 ); + (void)!pread( fd, &byte, 1, size - 1 ); if ( pwrite( fd, &byte, 1, size - 1 ) == 1 ) return true; return false; } diff --git a/src/server/fuse.c b/src/server/fuse.c new file mode 100644 index 0000000..12913a6 --- /dev/null +++ b/src/server/fuse.c @@ -0,0 +1,661 @@ +#include "fuse.h" +#include <dnbd3/types.h> +#include <dnbd3/shared/log.h> + +#ifndef DNBD3_SERVER_FUSE + +// +bool dfuse_init(const char *opts UNUSED, const char *dir UNUSED) +{ + logadd( LOG_ERROR, "FUSE: Not compiled in" ); + return false; +} + +void dfuse_shutdown() +{ +} + +#else + +#define PATHLEN (2000) +static char nullbytes[DNBD3_BLOCK_SIZE]; + +// FUSE ENABLED +#define FUSE_USE_VERSION 30 +// +#include <dnbd3/config.h> +#include "locks.h" +#include "threadpool.h" +#include "image.h" +#include "uplink.h" +#include "reference.h" +#include "helper.h" + +#include <fuse_lowlevel.h> +#include <ctype.h> +#include <assert.h> +#include <string.h> +#include <signal.h> + +#define INO_ROOT (1) +#define INO_CTRL (2) +#define INO_DIR (3) +static const char *NAME_CTRL = "control"; +static const char *NAME_DIR = "images"; + +typedef struct { + fuse_req_t req; + uint16_t rid; + char name[PATHLEN]; +} lookup_t; + +static fuse_ino_t inoCounter = 10; +typedef struct _dfuse_dir { + struct _dfuse_dir *next; + struct _dfuse_dir *child; + const char *name; + uint64_t size; + fuse_ino_t ino; + int refcount; + lookup_t *img; +} dfuse_entry_t; + +typedef struct { + dfuse_entry_t *entry; + dnbd3_image_t *image; +} cmdopen_t; + +static dfuse_entry_t sroot = { + .name = "images", + .ino = INO_DIR, + .refcount = 2, +}, *root = &sroot; +static pthread_mutex_t dirLock; + +#define INIT_NONE (0) +#define INIT_DONE (1) +#define INIT_SHUTDOWN (2) +#define INIT_INPROGRESS (3) + +static struct fuse_session *fuseSession = NULL; +static struct fuse_chan *fuseChannel = NULL; +static char *fuseMountPoint = NULL; +static pthread_t fuseThreadId; +static bool haveThread = false; +static _Atomic(int) initState = INIT_NONE; +static pthread_mutex_t initLock; +static struct timespec startupTime; + +static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name); +static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino); + +static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer); +static void cleanupFuse(); +static void* fuseMainLoop(void *data); + +static void ll_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) +{ + fi->fh = 0; + if ( ino == INO_CTRL ) { + if ( ( fi->flags & 3 ) != O_WRONLY ) { + fuse_reply_err( req, EINVAL ); + } else { + fi->nonseekable = 1; + fuse_reply_open( req, fi ); + } + } else if ( ino == INO_ROOT ) { + fuse_reply_err( req, EISDIR ); + } else { + if ( ( fi->flags & 3 ) != O_RDONLY ) { + fuse_reply_err( req, EINVAL ); + return; + } + mutex_lock( &dirLock ); + dfuse_entry_t *entry = inoRecursive( root, ino ); + if ( entry == NULL ) { + mutex_unlock( &dirLock ); + fuse_reply_err( req, ENOENT ); + } else if ( entry->img == NULL ) { + mutex_unlock( &dirLock ); + fuse_reply_err( req, EISDIR ); + } else if ( entry->img->rid == 0 ) { + mutex_unlock( &dirLock ); + fuse_reply_err( req, ENOENT ); + } else { + entry->refcount++; + mutex_unlock( &dirLock ); + dnbd3_image_t *image = image_get( entry->img->name, entry->img->rid, true ); + if ( image == NULL ) { + fuse_reply_err( req, ENOENT ); + mutex_lock( &dirLock ); + entry->refcount--; + mutex_unlock( &dirLock ); + } else { + cmdopen_t *handle = malloc( sizeof(cmdopen_t) ); + handle->entry = entry; + handle->image = image; + fi->fh = (uintptr_t)handle; + fi->keep_cache = 1; + fuse_reply_open( req, fi ); + } + } + } +} + +static dfuse_entry_t* addImage(dfuse_entry_t **dir, const char *name, lookup_t *img) +{ + const char *slash = strchr( name, '/' ); + if ( slash == NULL ) { + // Name portion at the end + char *path = NULL; + if ( asprintf( &path, "%s:%d", name, (int)img->rid ) == -1 ) + abort(); + dfuse_entry_t *entry = dirLookup( *dir, path ); + if ( entry == NULL ) { + entry = calloc( 1, sizeof( *entry ) ); + entry->next = *dir; + *dir = entry; + entry->name = path; + entry->ino = inoCounter++; + entry->img = img; + } else { + free( path ); + if ( entry->img == NULL ) { + return NULL; + } + } + return entry; + } else { + // Dirname + char *path = NULL; + if ( asprintf( &path, "%.*s", (int)( slash - name ), name ) == -1 ) + abort(); + dfuse_entry_t *entry = dirLookup( *dir, path ); + if ( entry == NULL ) { + entry = calloc( 1, sizeof( *entry ) ); + entry->next = *dir; + *dir = entry; + entry->name = path; + entry->ino = inoCounter++; + } else { + free( path ); + } + return addImage( &entry->child, slash + 1, img ); + } +} + +static void ll_write(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi UNUSED) +{ + if ( ino != INO_CTRL ) { + fuse_reply_err( req, EROFS ); + return; + } + if ( off != 0 ) { + fuse_reply_err( req, ESPIPE ); + return; + } + if ( size >= PATHLEN ) { + fuse_reply_err( req, ENOSPC ); + return; + } + size_t colon = 0; + int rid = 0; + for ( size_t i = 0; i < size; ++i ) { + if ( buf[i] == '\0' || buf[i] == '\n' ) { + if ( colon == 0 ) { + colon = i; + } + break; + } + if ( colon != 0 ) { + if ( !isdigit( buf[i] ) ) { + logadd( LOG_WARNING, "FUSE: Malformed rid" ); + fuse_reply_err( req, EINVAL ); + return; + } + rid = rid * 10 + ( buf[i] - '0' ); // Can overflow but who cares + } else if ( buf[i] == ':' ) { + colon = i; // Image name starting with ':' would be broken... + } + } + if ( rid < 0 || rid > 65535 ) { + logadd( LOG_WARNING, "FUSE: Invalid rid '%d'", rid ); + fuse_reply_err( req, EINVAL ); + return; + } + if ( colon == 0 ) { + colon = size; + } + lookup_t *lu = malloc( sizeof(lookup_t) ); + lu->rid = (uint16_t)rid; + lu->req = req; + if ( snprintf( lu->name, PATHLEN, "%.*s", (int)colon, buf ) == -1 ) { + free( lu ); + fuse_reply_err( req, ENOSPC ); + return; + } + logadd( LOG_DEBUG1, "FUSE: Request for '%s:%d'", lu->name, (int)lu->rid ); + dnbd3_image_t *image = image_getOrLoad( lu->name, lu->rid ); + if ( image == NULL ) { + fuse_reply_err( lu->req, ENOENT ); + free( lu ); + } else { + mutex_lock( &dirLock ); + dfuse_entry_t *entry = addImage( &root->child, lu->name, lu ); + if ( entry != NULL ) { + entry->size = image->virtualFilesize; + } + lu->rid = image->rid; // In case it was 0 + mutex_unlock( &dirLock ); + image_release( image ); + if ( entry == NULL ) { + fuse_reply_err( lu->req, EINVAL ); + free( lu ); + } else { + fuse_reply_write( lu->req, size ); + } + } +} + +static void ll_read( fuse_req_t req, fuse_ino_t ino UNUSED, size_t size, off_t off, struct fuse_file_info *fi ) +{ + if ( fi->fh == 0 ) { + fuse_reply_err( req, 0 ); + return; + } + cmdopen_t *handle = (cmdopen_t*)fi->fh; + dnbd3_image_t *image = handle->image; + if ( off < 0 || (uint64_t)off >= image->virtualFilesize ) { + fuse_reply_err( req, 0 ); + return; + } + if ( off + size > image->virtualFilesize ) { + size = image->virtualFilesize - off; + } + + // Check if cached locally + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + // This is a proxyed image, check if we need to relay the request... + const uint64_t start = (uint64_t)off & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint64_t end = (off + size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + if ( !image_isRangeCachedUnsafe( cache, start, end ) ) { + ref_put( &cache->reference ); + if ( size > (uint32_t)_maxPayload ) { + size = (uint32_t)_maxPayload; + } + if ( !uplink_request( image, req, &uplinkCallback, 0, off, (uint32_t)size ) ) { + logadd( LOG_DEBUG1, "FUSE: Could not relay uncached request to upstream proxy for image %s:%d", + image->name, image->rid ); + fuse_reply_err( req, EIO ); + } + return; // ASYNC + } + ref_put( &cache->reference ); + } + + // Is cached + size_t readSize = size; + if ( off + readSize > image->realFilesize ) { + if ( (uint64_t)off >= image->realFilesize ) { + readSize = 0; + } else { + readSize = image->realFilesize - off; + } + } + struct fuse_bufvec *vec = calloc( 1, sizeof(*vec) + sizeof(struct fuse_buf) ); + if ( readSize != 0 ) { + // Real data from file + vec->buf[vec->count++] = (struct fuse_buf){ + .size = readSize, + .flags = FUSE_BUF_IS_FD | FUSE_BUF_FD_RETRY | FUSE_BUF_FD_SEEK, + .fd = image->readFd, + .pos = off, + }; + } + if ( readSize != size ) { + vec->buf[vec->count++] = (struct fuse_buf){ + .size = size - readSize, + .mem = nullbytes, + .fd = -1, + }; + } + fuse_reply_data( req, vec, FUSE_BUF_SPLICE_MOVE ); + free( vec ); +} + +static bool statInternal(fuse_ino_t ino, struct stat *stbuf) +{ + switch ( ino ) { + case INO_ROOT: + case INO_DIR: + stbuf->st_mode = S_IFDIR | 0555; + stbuf->st_nlink = 2; + stbuf->st_mtim = startupTime; + break; + case INO_CTRL: + stbuf->st_mode = S_IFREG | 0222; + stbuf->st_nlink = 1; + stbuf->st_size = 0; + clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim ); + break; + default: + return false; + } + stbuf->st_ctim = stbuf->st_atim = startupTime; + stbuf->st_uid = 0; + stbuf->st_ino = ino; + return true; +} + +/** + * HOLD LOCK + */ +static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name) +{ + if ( dir == NULL ) + return NULL; + for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) { + if ( strcmp( it->name, name ) == 0 ) + return it; + } + return NULL; +} + +static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino) +{ + for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) { + logadd( LOG_DEBUG1, "ino %d is %s", (int)it->ino, it->name ); + if ( it->ino == ino ) + return it; + if ( it->img == NULL ) { + dir = inoRecursive( it->child, ino ); + if ( dir != NULL ) + return dir; + } + } + return NULL; +} + +/** + * HOLD LOCK + */ +static void entryToStat(dfuse_entry_t *entry, struct stat *stbuf) +{ + if ( entry->img == NULL ) { + stbuf->st_mode = S_IFDIR | 0555; + stbuf->st_nlink = 2; + } else { + stbuf->st_mode = S_IFREG | 0444; + stbuf->st_nlink = 1; + stbuf->st_size = entry->size; + } + stbuf->st_ino = entry->ino; + stbuf->st_uid = 0; + stbuf->st_ctim = stbuf->st_atim = stbuf->st_mtim = startupTime; +} + +static void ll_lookup(fuse_req_t req, fuse_ino_t parent, const char *name) +{ + logadd( LOG_DEBUG2, "Lookup at ino %d for '%s'", (int)parent, name ); + if ( parent == INO_ROOT ) { + struct fuse_entry_param e = { 0 }; + if ( strcmp( name, NAME_DIR ) == 0 ) { + e.ino = INO_DIR; + } else if ( strcmp( name, NAME_CTRL ) == 0 ) { + e.ino = INO_CTRL; + e.attr_timeout = e.entry_timeout = 3600; + } + if ( e.ino != 0 && statInternal( e.ino, &e.attr ) ) { + fuse_reply_entry( req, &e ); + return; + } + } else { + mutex_lock( &dirLock ); + dfuse_entry_t *dir = inoRecursive( root, parent ); + if ( dir != NULL ) { + if ( dir->img != NULL ) { + mutex_unlock( &dirLock ); + fuse_reply_err( req, ENOTDIR ); + return; + } + dfuse_entry_t *entry = dirLookup( dir->child, name ); + if ( entry != NULL ) { + struct fuse_entry_param e = { .ino = entry->ino }; + entryToStat( entry, &e.attr ); + mutex_unlock( &dirLock ); + fuse_reply_entry( req, &e ); + return; + } + } + mutex_unlock( &dirLock ); + } + fuse_reply_err( req, ENOENT ); +} + +struct dirbuf { + char *p; + size_t size; +}; + +static void dirbuf_add( fuse_req_t req, struct dirbuf *b, const char *name, fuse_ino_t ino ) +{ + struct stat stbuf = { .st_ino = ino }; + size_t oldsize = b->size; + b->size += fuse_add_direntry( req, NULL, 0, name, NULL, 0 ); + b->p = ( char * ) realloc( b->p, b->size ); + fuse_add_direntry( req, b->p + oldsize, b->size - oldsize, name, &stbuf, b->size ); + return; +} + +static int reply_buf_limited( fuse_req_t req, const char *buf, size_t bufsize, off_t off, size_t maxsize ) +{ + if ( off >= 0 && off < (off_t)bufsize ) { + return fuse_reply_buf( req, buf + off, MIN( bufsize - off, maxsize ) ); + } + return fuse_reply_buf( req, NULL, 0 ); +} + +static void ll_readdir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info *fi UNUSED) +{ + if ( ino != INO_ROOT ) { + fuse_reply_err( req, EACCES ); + } else { + struct dirbuf b; + memset( &b, 0, sizeof( b ) ); + dirbuf_add( req, &b, ".", INO_ROOT ); + dirbuf_add( req, &b, "..", INO_ROOT ); + dirbuf_add( req, &b, NAME_CTRL, INO_CTRL ); + dirbuf_add( req, &b, NAME_DIR, INO_DIR ); + reply_buf_limited( req, b.p, b.size, off, size ); + free( b.p ); + } +} + +static void ll_getattr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi UNUSED) +{ + struct stat stbuf = { .st_ino = 0 }; + if ( !statInternal( ino, &stbuf ) ) { + mutex_lock( &dirLock ); + dfuse_entry_t *entry = inoRecursive( root, ino ); + if ( entry != NULL ) { + entryToStat( entry, &stbuf ); + } + mutex_unlock( &dirLock ); + } + if ( stbuf.st_ino == 0 ) { + fuse_reply_err( req, ENOENT ); + } else { + fuse_reply_attr( req, &stbuf, 0 ); + } +} + +void ll_setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr UNUSED, int to_set UNUSED, struct fuse_file_info *fi) +{ + ll_getattr( req, ino, fi ); +} + +void ll_release(fuse_req_t req, fuse_ino_t ino UNUSED, struct fuse_file_info *fi) +{ + if ( fi->fh != 0 ) { + cmdopen_t *handle = (cmdopen_t*)fi->fh; + image_release( handle->image ); + mutex_lock( &dirLock ); + handle->entry->refcount--; + mutex_unlock( &dirLock ); + free( handle ); + } + fuse_reply_err( req, 0 ); +} + +static void uplinkCallback(void *data, uint64_t handle UNUSED, uint64_t start UNUSED, uint32_t length, const char *buffer) +{ + fuse_req_t req = (fuse_req_t)data; + if ( buffer == NULL ) { + fuse_reply_err( req, EIO ); + } else { + fuse_reply_buf( req, buffer, length ); + } +} + +#define DUMP(key,type) logadd( LOG_DEBUG1, "FUSE: " #key ": " type, conn->key ) +void ll_init(void *userdata, struct fuse_conn_info *conn) +{ + DUMP( capable, "%u" ); + DUMP( congestion_threshold, "%u" ); + DUMP( max_background, "%u" ); + //DUMP( max_read, "%u" ); + DUMP( max_readahead, "%u" ); + DUMP( max_write, "%u" ); + DUMP( want, "%u" ); + conn->want |= FUSE_CAP_SPLICE_READ | FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE; +} +#undef DUMP + +/* map the implemented fuse operations */ +static struct fuse_lowlevel_ops fuseOps = { + .lookup = ll_lookup, + .getattr = ll_getattr, + .setattr = ll_setattr, + .readdir = ll_readdir, + .open = ll_open, + .release = ll_release, + .read = ll_read, + .write = ll_write, + .init = ll_init, + //.destroy = ll_destroy, +}; + +bool dfuse_init(const char *opts, const char *dir) +{ + int ex = INIT_NONE; + if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_INPROGRESS ) ) { + logadd( LOG_ERROR, "Calling dfuse_init twice" ); + exit( 1 ); + } + mutex_init( &initLock, LOCK_FUSE_INIT ); + mutex_lock( &initLock ); + mutex_init( &dirLock, LOCK_FUSE_DIR ); + clock_gettime( CLOCK_REALTIME, &startupTime ); + struct fuse_args args = FUSE_ARGS_INIT( 0, NULL ); + fuse_opt_add_arg( &args, "dnbd3fs" ); // argv[0] + if ( opts != NULL ) { + fuse_opt_add_arg( &args, opts ); + } + fuse_opt_add_arg( &args, "-odefault_permissions" ); + fuse_opt_add_arg( &args, dir ); // last param is mount point + // + if ( fuse_parse_cmdline( &args, &fuseMountPoint, NULL, NULL ) == -1 ) { + logadd( LOG_ERROR, "FUSE: Error parsing command line" ); + goto fail; + } + fuseChannel = fuse_mount( fuseMountPoint, &args ); + if ( fuseChannel == NULL ) { + logadd( LOG_ERROR, "FUSE: Cannot mount to %s", dir ); + goto fail; + } + fuseSession = fuse_lowlevel_new( &args, &fuseOps, sizeof( fuseOps ), NULL ); + if ( fuseSession == NULL ) { + logadd( LOG_ERROR, "FUSE: Error initializing fuse session" ); + goto fail; + } + fuse_session_add_chan( fuseSession, fuseChannel ); + if ( 0 != thread_create( &fuseThreadId, NULL, &fuseMainLoop, (void *)NULL ) ) { + logadd( LOG_ERROR, "FUSE: Could not start thread" ); + goto fail; + } + haveThread = true; + // Init OK + mutex_unlock( &initLock ); + return true; +fail: + cleanupFuse(); + fuse_opt_free_args( &args ); + initState = INIT_SHUTDOWN; + mutex_unlock( &initLock ); + return false; +} + +void dfuse_shutdown() +{ + if ( initState == INIT_NONE ) + return; + for ( ;; ) { + int ex = INIT_DONE; + if ( atomic_compare_exchange_strong( &initState, &ex, INIT_SHUTDOWN ) ) + break; // OK, do the shutdown + if ( ex == INIT_INPROGRESS ) + continue; // dfuse_init in progress, wait for mutex + // Wrong state + logadd( LOG_WARNING, "Called dfuse_shutdown without dfuse_init first" ); + return; + } + logadd( LOG_INFO, "Shutting down fuse mainloop..." ); + mutex_lock( &initLock ); + if ( fuseSession != NULL ) { + fuse_session_exit( fuseSession ); + } + if ( !haveThread ) { + cleanupFuse(); + } + mutex_unlock( &initLock ); + if ( haveThread ) { + logadd( LOG_DEBUG1, "FUSE: Sending USR1 to mainloop thread" ); + pthread_kill( fuseThreadId, SIGUSR1 ); + pthread_join( fuseThreadId, NULL ); + } +} + +static void* fuseMainLoop(void *data UNUSED) +{ + int ex = INIT_INPROGRESS; + if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_DONE ) ) { + logadd( LOG_WARNING, "FUSE: Unexpected state in fuseMainLoop: %d", ex ); + return NULL; + } + setThreadName( "fuse" ); + logadd( LOG_INFO, "FUSE: Starting mainloop" ); + fuse_session_loop_mt( fuseSession ); + logadd( LOG_INFO, "FUSE: Left mainloop" ); + mutex_lock( &initLock ); + cleanupFuse(); + mutex_unlock( &initLock ); + return NULL; +} + +static void cleanupFuse() +{ + if ( fuseChannel != NULL ) { + fuse_session_remove_chan( fuseChannel ); + } + if ( fuseSession != NULL ) { + fuse_session_destroy( fuseSession ); + fuseSession = NULL; + } + if ( fuseMountPoint != NULL && fuseChannel != NULL ) { + fuse_unmount( fuseMountPoint, fuseChannel ); + } + fuseChannel = NULL; +} + +#endif // DNBD3_SERVER_FUSE diff --git a/src/server/fuse.h b/src/server/fuse.h new file mode 100644 index 0000000..f01ad58 --- /dev/null +++ b/src/server/fuse.h @@ -0,0 +1,10 @@ +#ifndef _FUSE_H_ +#define _FUSE_H_ + +#include <stdbool.h> + +bool dfuse_init(const char *opts, const char *dir); + +void dfuse_shutdown(); + +#endif diff --git a/src/server/globals.c b/src/server/globals.c index f8c3f66..f6432cb 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -1,7 +1,7 @@ #include "globals.h" #include "ini.h" #include "locks.h" -#include "../shared/log.h" +#include <dnbd3/shared/log.h> #include <string.h> #include <stdlib.h> #include <inttypes.h> @@ -19,22 +19,26 @@ atomic_int _clientPenalty = 0; atomic_bool _isProxy = false; atomic_int _backgroundReplication = BGR_FULL; atomic_int _bgrMinClients = 0; +atomic_int _bgrWindowSize = 1; atomic_bool _lookupMissingForProxy = true; atomic_bool _sparseFiles = false; +atomic_bool _ignoreAllocErrors = false; atomic_bool _removeMissingImages = true; -atomic_int _uplinkTimeout = SOCKET_TIMEOUT_UPLINK; -atomic_int _clientTimeout = SOCKET_TIMEOUT_CLIENT; +atomic_uint _uplinkTimeout = SOCKET_TIMEOUT_UPLINK; +atomic_uint _clientTimeout = SOCKET_TIMEOUT_CLIENT; atomic_bool _closeUnusedFd = false; atomic_bool _vmdkLegacyMode = false; // Not really needed anymore since we have '+' and '-' in alt-servers atomic_bool _proxyPrivateOnly = false; +atomic_bool _pretendClient = false; atomic_int _autoFreeDiskSpaceDelay = 3600 * 10; // [limits] atomic_int _maxClients = SERVER_MAX_CLIENTS; atomic_int _maxImages = SERVER_MAX_IMAGES; -atomic_int _maxPayload = 9000000; // 9MB +atomic_uint _maxPayload = 9000000; // 9MB atomic_uint_fast64_t _maxReplicationSize = (uint64_t)100000000000LL; -atomic_bool _pretendClient = false; +atomic_uint _maxPrefetch = 262144; // 256KB +atomic_uint _minRequestSize = 0; /** * True when loading config the first time. Consecutive loads will @@ -58,31 +62,35 @@ static const char* units = "KMGTPEZY"; static bool parse64(const char *in, atomic_int_fast64_t *out, const char *optname); static bool parse64u(const char *in, atomic_uint_fast64_t *out, const char *optname); -static bool parse32(const char *in, atomic_int *out, const char *optname) UNUSED; -static bool parse32u(const char *in, atomic_int *out, const char *optname); +static bool parse32(const char *in, atomic_int *out, const char *optname); +static bool parse32u(const char *in, atomic_uint *out, const char *optname); static int ini_handler(void *custom UNUSED, const char* section, const char* key, const char* value) { if ( initialLoad ) { if ( _basePath == NULL ) SAVE_TO_VAR_STR( dnbd3, basePath ); SAVE_TO_VAR_BOOL( dnbd3, vmdkLegacyMode ); - SAVE_TO_VAR_UINT( dnbd3, listenPort ); - SAVE_TO_VAR_UINT( limits, maxClients ); - SAVE_TO_VAR_UINT( limits, maxImages ); + SAVE_TO_VAR_INT( dnbd3, listenPort ); + SAVE_TO_VAR_INT( limits, maxClients ); + SAVE_TO_VAR_INT( limits, maxImages ); } SAVE_TO_VAR_BOOL( dnbd3, isProxy ); SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly ); SAVE_TO_VAR_INT( dnbd3, bgrMinClients ); + SAVE_TO_VAR_INT( dnbd3, bgrWindowSize ); SAVE_TO_VAR_BOOL( dnbd3, lookupMissingForProxy ); SAVE_TO_VAR_BOOL( dnbd3, sparseFiles ); + SAVE_TO_VAR_BOOL( dnbd3, ignoreAllocErrors ); SAVE_TO_VAR_BOOL( dnbd3, removeMissingImages ); SAVE_TO_VAR_BOOL( dnbd3, closeUnusedFd ); - SAVE_TO_VAR_UINT( dnbd3, serverPenalty ); - SAVE_TO_VAR_UINT( dnbd3, clientPenalty ); + SAVE_TO_VAR_INT( dnbd3, serverPenalty ); + SAVE_TO_VAR_INT( dnbd3, clientPenalty ); SAVE_TO_VAR_UINT( dnbd3, uplinkTimeout ); SAVE_TO_VAR_UINT( dnbd3, clientTimeout ); SAVE_TO_VAR_UINT( limits, maxPayload ); SAVE_TO_VAR_UINT64( limits, maxReplicationSize ); + SAVE_TO_VAR_UINT( limits, maxPrefetch ); + SAVE_TO_VAR_UINT( limits, minRequestSize ); SAVE_TO_VAR_BOOL( dnbd3, pretendClient ); SAVE_TO_VAR_INT( dnbd3, autoFreeDiskSpaceDelay ); if ( strcmp( section, "dnbd3" ) == 0 && strcmp( key, "backgroundReplication" ) == 0 ) { @@ -111,7 +119,10 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key void globals_loadConfig() { char *name = NULL; - asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME ); + if ( asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME ) == -1 ) { + logadd( LOG_ERROR, "Memory allocation error for config filename" ); + exit( 1 ); + } if ( name == NULL ) return; if ( initialLoad ) { mutex_init( &loadLock, LOCK_LOAD_CONFIG ); @@ -125,9 +136,30 @@ void globals_loadConfig() if ( initialLoad ) { sanitizeFixedConfig(); } - if ( _backgroundReplication == BGR_FULL && _sparseFiles && _bgrMinClients < 5 ) { - logadd( LOG_WARNING, "Ignoring 'sparseFiles=true' since backgroundReplication is set to true and bgrMinClients is too low" ); - _sparseFiles = false; + if ( _isProxy ) { + if ( _backgroundReplication == BGR_FULL && _sparseFiles && _bgrMinClients < 5 ) { + logadd( LOG_WARNING, "Ignoring 'sparseFiles=true' since backgroundReplication is set to true and bgrMinClients is too low" ); + _sparseFiles = false; + } + if ( _bgrWindowSize < 1 ) { + _bgrWindowSize = 1; + } else if ( _bgrWindowSize > UPLINK_MAX_QUEUE - 10 ) { + _bgrWindowSize = UPLINK_MAX_QUEUE - 10; + logadd( LOG_MINOR, "Limiting bgrWindowSize to %d, because of UPLINK_MAX_QUEUE", + _bgrWindowSize ); + } + if ( _maxPayload < 256 * 1024 ) { + logadd( LOG_WARNING, "maxPayload was increased to 256k" ); + _maxPayload = 256 * 1024; + } + if ( _maxPrefetch > _maxPayload ) { + logadd( LOG_WARNING, "Reducing maxPrefetch to maxPayload" ); + _maxPrefetch = _maxPayload; + } + if ( _minRequestSize > _maxPayload ) { + logadd( LOG_WARNING, "Reducing minRequestSize to maxPayload" ); + _minRequestSize = _maxPayload; + } } // Dump config as interpreted char buffer[2000]; @@ -281,7 +313,7 @@ static bool parse32(const char *in, atomic_int *out, const char *optname) return true; } -static bool parse32u(const char *in, atomic_int *out, const char *optname) +static bool parse32u(const char *in, atomic_uint *out, const char *optname) { atomic_int_fast64_t v; if ( !parse64( in, &v, optname ) ) return false; @@ -289,7 +321,7 @@ static bool parse32u(const char *in, atomic_int *out, const char *optname) logadd( LOG_WARNING, "'%s' must be between %d and %d, but is '%s'", optname, (int)0, (int)INT_MAX, in ); return false; } - *out = (int)v; + *out = (unsigned int)v; return true; } @@ -320,8 +352,10 @@ size_t globals_dumpConfig(char *buffer, size_t size) PBOOL(backgroundReplication); } PINT(bgrMinClients); + PINT(bgrWindowSize); PBOOL(lookupMissingForProxy); PBOOL(sparseFiles); + PBOOL(ignoreAllocErrors); PBOOL(removeMissingImages); PINT(uplinkTimeout); PINT(clientTimeout); @@ -335,6 +369,8 @@ size_t globals_dumpConfig(char *buffer, size_t size) PINT(maxImages); PINT(maxPayload); PUINT64(maxReplicationSize); + PINT(maxPrefetch); + PINT(minRequestSize); return size - rem; } diff --git a/src/server/globals.h b/src/server/globals.h index df8c595..bde1184 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -1,9 +1,9 @@ #ifndef _GLOBALS_H_ #define _GLOBALS_H_ -#include "../types.h" -#include "../shared/fdsignal.h" -#include "../serverconfig.h" +#include <dnbd3/types.h> +#include <dnbd3/shared/fdsignal.h> +#include <dnbd3/config/server.h> #include <stdint.h> #include <stdatomic.h> #include <time.h> @@ -18,18 +18,30 @@ typedef struct _dnbd3_uplink dnbd3_uplink_t; typedef struct _dnbd3_image dnbd3_image_t; typedef struct _dnbd3_client dnbd3_client_t; -typedef struct +typedef void (*uplink_callback)(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer); + +typedef struct _dnbd3_queue_client { - uint64_t handle; // Client defined handle to pass back in reply - uint64_t from; // First byte offset of requested block (ie. 4096) - uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) - dnbd3_client_t * client; // Client to send reply to - int status; // status of this entry: ULR_* -#ifdef _DEBUG - ticks entered; // When this request entered the queue (for debugging) + struct _dnbd3_queue_client *next; + void* data; // Passed back to callback + uint64_t handle; // Passed back to callback + uint64_t from, to; // Client range + uplink_callback callback; // Callback function +} dnbd3_queue_client_t; + +typedef struct _dnbd3_queue_entry +{ + struct _dnbd3_queue_entry *next; + uint64_t handle; // Our handle for this entry + uint64_t from; // First byte offset of requested block (ie. 4096) + uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) + dnbd3_queue_client_t *clients; +#ifdef DEBUG + ticks entered; // When this request entered the queue (for debugging) #endif - uint8_t hopCount; // How many hops this request has already taken across proxies -} dnbd3_queued_request_t; + uint8_t hopCount; // How many hops this request has already taken across proxies + bool sent; // Already sent to uplink? +} dnbd3_queue_entry_t; typedef struct _ns { @@ -91,11 +103,12 @@ struct _dnbd3_uplink bool cycleDetected; // connection cycle between proxies detected for current remote server int nextReplicationIndex; // Which index in the cache map we should start looking for incomplete blocks at // If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block" - uint64_t replicationHandle; // Handle of pending replication request atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup. - atomic_int queueLen; // length of queue - uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives) - dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; + atomic_uint_fast64_t bytesReceivedLastSave; // Number of bytes received when we last saved the cache map + int queueLen; // length of queue + int idleTime; // How many seconds the uplink was idle (apart from keep-alives) + dnbd3_queue_entry_t *queue; + atomic_uint_fast32_t queueId; dnbd3_alt_local_t altData[SERVER_MAX_ALTS]; }; @@ -110,6 +123,8 @@ typedef struct typedef struct { ref reference; + atomic_bool dirty; // Cache map has been modified outside uplink (only integrity checker for now) + bool unchanged; // How many times in a row a reloaded cache map went unchanged _Atomic uint8_t map[]; } dnbd3_cache_map_t; @@ -128,7 +143,6 @@ struct _dnbd3_image uint64_t virtualFilesize; // virtual size of image (real size rounded up to multiple of 4k) uint64_t realFilesize; // actual file size on disk ticks atime; // last access time - ticks lastWorkCheck; // last time a non-working image has been checked ticks nextCompletenessEstimate; // next time the completeness estimate should be updated uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image uint32_t masterCrc32; // CRC-32 of the crc-32 list @@ -136,10 +150,18 @@ struct _dnbd3_image atomic_int completenessEstimate; // Completeness estimate in percent atomic_int users; // clients currently using this image. XXX Lock on imageListLock when modifying and checking whether the image should be freed. Reading it elsewhere is fine without the lock. int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server - atomic_bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected + struct { + atomic_bool read; // Error reading from file + atomic_bool write; // Error writing to file + atomic_bool changed; // File disappeared or changed, thorough check required if it seems to be back + atomic_bool uplink; // No uplink connected + atomic_bool queue; // Too many requests waiting on uplink + } problem; uint16_t rid; // revision of image + bool accessed; // image was accessed since .meta was written pthread_mutex_t lock; }; +#define PIMG(x) (x)->name, (int)(x)->rid struct _dnbd3_client { @@ -147,6 +169,7 @@ struct _dnbd3_client atomic_uint_fast64_t bytesSent; // Byte counter for this client. dnbd3_image_t * _Atomic image; // Image in use by this client, or NULL during handshake int sock; + _Atomic uint8_t relayedCount; // How many requests are in-flight to the uplink server bool isServer; // true if a server in proxy mode, false if real client dnbd3_host_t host; char hostName[HOSTNAMELEN]; // inet_ntop version of host @@ -206,12 +229,12 @@ extern atomic_bool _removeMissingImages; /** * Read timeout when waiting for or sending data on an uplink */ -extern atomic_int _uplinkTimeout; +extern atomic_uint _uplinkTimeout; /** * Read timeout when waiting for or sending data from/to client */ -extern atomic_int _clientTimeout; +extern atomic_uint _clientTimeout; /** * If true, images with no active client will have their fd closed after some @@ -234,6 +257,11 @@ extern atomic_int _backgroundReplication; extern atomic_int _bgrMinClients; /** + * How many in-flight replication requests we should target (per uplink) + */ +extern atomic_int _bgrWindowSize; + +/** * (In proxy mode): If connecting client is a proxy, and the requested image * is not known locally, should we ask our known alt servers for it? * Otherwise the request is rejected. @@ -255,6 +283,12 @@ extern atomic_bool _lookupMissingForProxy; extern atomic_bool _sparseFiles; /** + * If true, don't abort image replication if preallocating + * the image fails, but retry with sparse file. + */ +extern atomic_bool _ignoreAllocErrors; + +/** * Port to listen on (default: #define PORT (5003)) */ extern atomic_int _listenPort; @@ -275,7 +309,7 @@ extern atomic_int _maxImages; * Usually this isn't even a megabyte for "real" clients (blockdev * or fuse). */ -extern atomic_int _maxPayload; +extern atomic_uint _maxPayload; /** * If in proxy mode, don't replicate images that are @@ -298,6 +332,21 @@ extern atomic_bool _pretendClient; extern atomic_int _autoFreeDiskSpaceDelay; /** + * When handling a client request, this sets the maximum amount + * of bytes we prefetch offset right at the end of the client request. + * The prefetch size will be MIN( length * 3, _maxPrefetch ), if + * length <= _maxPrefetch, so effectively, setting this to 0 disables + * any prefetching. + */ +extern atomic_uint _maxPrefetch; + +/** + * Use with care. Can severely degrade performance. + * Set either 0 or very high. + */ +extern atomic_uint _minRequestSize; + +/** * Load the server configuration. */ void globals_loadConfig(); diff --git a/src/server/helper.h b/src/server/helper.h index 102cb36..3e1b661 100644 --- a/src/server/helper.h +++ b/src/server/helper.h @@ -2,8 +2,8 @@ #define HELPER_H_ #include "server.h" -#include "../shared/log.h" -#include "../types.h" +#include <dnbd3/shared/log.h> +#include <dnbd3/types.h> #include <netinet/in.h> #include <string.h> #include <unistd.h> diff --git a/src/server/image.c b/src/server/image.c index 16dae45..51fd5b6 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -5,9 +5,9 @@ #include "locks.h" #include "integrity.h" #include "altservers.h" -#include "../shared/protocol.h" -#include "../shared/timing.h" -#include "../shared/crc32.h" +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/shared/crc32.h> #include "reference.h" #include <assert.h> @@ -46,16 +46,21 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image); static dnbd3_image_t* image_free(dnbd3_image_t *image); static bool image_load_all_internal(char *base, char *path); static bool image_addToList(dnbd3_image_t *image); -static bool image_load(char *base, char *path, int withUplink); +static bool image_load(char *base, char *path, bool withUplink); static bool image_clone(int sock, char *name, uint16_t revision, uint64_t imageSize); static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_t realFilesize, uint32_t *crc); static bool image_ensureDiskSpace(uint64_t size, bool force); static dnbd3_cache_map_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize); static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc); -static void image_checkRandomBlocks(dnbd3_image_t *image, const int count); +static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd); static void* closeUnusedFds(void*); +static bool isImageFromUpstream(dnbd3_image_t *image); +static void* saveLoadAllCacheMaps(void*); +static void saveCacheMap(dnbd3_image_t *image); static void allocCacheMap(dnbd3_image_t *image, bool complete); +static void saveMetaData(dnbd3_image_t *image, ticks *now, time_t walltime); +static void loadImageMeta(dnbd3_image_t *image); static void cmfree(ref *ref) { @@ -73,6 +78,7 @@ void image_serverStartup() mutex_init( &remoteCloneLock, LOCK_REMOTE_CLONE ); mutex_init( &reloadLock, LOCK_RELOAD ); server_addJob( &closeUnusedFds, NULL, 10, 900 ); + server_addJob( &saveLoadAllCacheMaps, NULL, 9, 20 ); } /** @@ -118,39 +124,35 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co const uint64_t firstByteInMap = start >> 15; const uint64_t lastByteInMap = (end - 1) >> 15; uint64_t pos; - // First byte - uint8_t fb = 0, lb = 0; - for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) { - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - fb |= bit_mask; - } - // Last byte - if ( lastByteInMap != firstByteInMap ) { - for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) { - assert( lastByteInMap == (pos >> 15) ); - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - lb |= bit_mask; - } - } - atomic_thread_fence( memory_order_acquire ); - if ( set ) { - uint8_t fo = atomic_fetch_or_explicit( &cache->map[firstByteInMap], fb, memory_order_relaxed ); - uint8_t lo = atomic_fetch_or_explicit( &cache->map[lastByteInMap], lb, memory_order_relaxed ); - setNewBlocks = ( fo != cache->map[firstByteInMap] || lo != cache->map[lastByteInMap] ); + // First and last byte masks + const uint8_t fb = (uint8_t)(0xff << ((start >> 12) & 7)); + const uint8_t lb = (uint8_t)(~(0xff << ((((end - 1) >> 12) & 7) + 1))); + if ( firstByteInMap == lastByteInMap ) { + if ( set ) { + uint8_t o = atomic_fetch_or( &cache->map[firstByteInMap], (uint8_t)(fb & lb) ); + setNewBlocks = o != ( o | (fb & lb) ); + } else { + atomic_fetch_and( &cache->map[firstByteInMap], (uint8_t)~(fb & lb) ); + } } else { - atomic_fetch_and_explicit( &cache->map[firstByteInMap], (uint8_t)~fb, memory_order_relaxed ); - atomic_fetch_and_explicit( &cache->map[lastByteInMap], (uint8_t)~lb, memory_order_relaxed ); - } - const uint8_t nval = set ? 0xff : 0; - // Everything in between - for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { - if ( atomic_exchange_explicit( &cache->map[pos], nval, memory_order_relaxed ) != nval && set ) { - setNewBlocks = true; + atomic_thread_fence( memory_order_acquire ); + if ( set ) { + uint8_t fo = atomic_fetch_or_explicit( &cache->map[firstByteInMap], fb, memory_order_relaxed ); + uint8_t lo = atomic_fetch_or_explicit( &cache->map[lastByteInMap], lb, memory_order_relaxed ); + setNewBlocks = ( fo != ( fo | fb ) || lo != ( lo | lb ) ); + } else { + atomic_fetch_and_explicit( &cache->map[firstByteInMap], (uint8_t)~fb, memory_order_relaxed ); + atomic_fetch_and_explicit( &cache->map[lastByteInMap], (uint8_t)~lb, memory_order_relaxed ); + } + // Everything in between + const uint8_t nval = set ? 0xff : 0; + for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { + if ( atomic_exchange_explicit( &cache->map[pos], nval, memory_order_relaxed ) != nval && set ) { + setNewBlocks = true; + } } + atomic_thread_fence( memory_order_release ); } - atomic_thread_fence( memory_order_release ); if ( setNewBlocks && image->crc32 != NULL ) { // If setNewBlocks is set, at least one of the blocks was not cached before, so queue all hash blocks // for checking, even though this might lead to checking some hash block again, if it was @@ -164,6 +166,8 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co integrity_check( image, block, false ); } } + } else if ( !set ) { + cache->dirty = true; } ref_put( &cache->reference ); } @@ -239,35 +243,74 @@ bool image_isComplete(dnbd3_image_t *image) */ bool image_ensureOpen(dnbd3_image_t *image) { - if ( image->readFd != -1 ) return image; - int newFd = open( image->path, O_RDONLY ); + bool sizeChanged = false; + if ( image->readFd != -1 && !image->problem.changed ) + return true; + int newFd = image->readFd == -1 ? open( image->path, O_RDONLY ) : dup( image->readFd ); if ( newFd == -1 ) { - logadd( LOG_WARNING, "Cannot open %s for reading", image->path ); + if ( !image->problem.read ) { + logadd( LOG_WARNING, "[access] Cannot open '%s' for reading (errno=%d)", image->path, errno ); + image->problem.read = true; + } } else { - // Check size + // Check size + read access + char buffer[100]; const off_t flen = lseek( newFd, 0, SEEK_END ); if ( flen == -1 ) { - logadd( LOG_WARNING, "Could not seek to end of %s (errno %d)", image->path, errno ); + if ( !image->problem.read ) { + logadd( LOG_WARNING, "Could not seek to end of %s (errno=%d)", image->path, errno ); + image->problem.read = true; + } close( newFd ); newFd = -1; } else if ( (uint64_t)flen != image->realFilesize ) { - logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64, image->realFilesize, (uint64_t)flen ); + if ( !image->problem.changed ) { + logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64, + image->realFilesize, (uint64_t)flen ); + } + sizeChanged = true; + } else if ( pread( newFd, buffer, sizeof(buffer), 0 ) == -1 ) { + if ( !image->problem.read ) { + logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)", + (int)sizeof(buffer), image->path, errno ); + image->problem.read = true; + } close( newFd ); newFd = -1; } } if ( newFd == -1 ) { - mutex_lock( &image->lock ); - image->working = false; - mutex_unlock( &image->lock ); + if ( sizeChanged ) { + image->problem.changed = true; + } return false; } + + // Re-opened. Check if the "size/content changed" flag was set before and if so, check crc32, + // but only if the size we just got above is correct. + if ( image->problem.changed && !sizeChanged ) { + if ( image->crc32 == NULL ) { + // Cannot verify further, hope for the best + image->problem.changed = false; + logadd( LOG_DEBUG1, "Size of image %s:%d changed back to expected value", PIMG(image) ); + } else if ( image_checkRandomBlocks( image, 1, newFd ) ) { + // This should have checked the first block (if complete) -> All is well again + image->problem.changed = false; + logadd( LOG_DEBUG1, "Size and CRC of image %s:%d changed back to expected value", PIMG(image) ); + } + } else { + image->problem.changed = sizeChanged; + } + mutex_lock( &image->lock ); if ( image->readFd == -1 ) { image->readFd = newFd; + image->problem.read = false; mutex_unlock( &image->lock ); } else { - // There was a race while opening the file (happens cause not locked cause blocking), we lost the race so close new fd and proceed + // There was a race while opening the file (happens cause not locked cause blocking), + // we lost the race so close new fd and proceed. + // *OR* we dup()'ed above for cheating when the image changed before. mutex_unlock( &image->lock ); close( newFd ); } @@ -296,10 +339,9 @@ dnbd3_image_t* image_byId(int imgId) * point... * Locks on: imageListLock, _images[].lock */ -dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) +dnbd3_image_t* image_get(const char *name, uint16_t revision, bool ensureFdOpen) { int i; - const char *removingText = _removeMissingImages ? ", removing from list" : ""; dnbd3_image_t *candidate = NULL; // Simple sanity check const size_t slen = strlen( name ); @@ -326,84 +368,36 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) candidate->users++; mutex_unlock( &imageListLock ); - // Found, see if it works - // TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list - // TODO: But remember size-changed images forever - if ( candidate->working || checkIfWorking ) { - // Is marked working, but might not have an fd open - if ( !image_ensureOpen( candidate ) ) { - mutex_lock( &candidate->lock ); - timing_get( &candidate->lastWorkCheck ); - mutex_unlock( &candidate->lock ); - if ( _removeMissingImages ) { - candidate = image_remove( candidate ); // No release here, the image is still returned and should be released by caller - } - return candidate; - } - } - - if ( !checkIfWorking ) return candidate; // Not interested in re-cechking working state - - // ...not working... - - // Don't re-check too often - mutex_lock( &candidate->lock ); - bool check; - declare_now; - check = timing_diff( &candidate->lastWorkCheck, &now ) > NONWORKING_RECHECK_INTERVAL_SECONDS; - if ( check ) { - candidate->lastWorkCheck = now; - } - mutex_unlock( &candidate->lock ); - if ( !check ) { + if ( !ensureFdOpen ) // Don't want to re-check return candidate; - } - // reaching this point means: - // 1) We should check if the image is working, it might or might not be in working state right now - // 2) The image is open for reading (or at least was at some point, the fd might be stale if images lie on an NFS share etc.) - // 3) We made sure not to re-check this image too often - - // Common for ro and rw images: Size check, read check - const off_t len = lseek( candidate->readFd, 0, SEEK_END ); - bool reload = false; - if ( len == -1 ) { - logadd( LOG_WARNING, "lseek() on %s failed (errno=%d)%s.", candidate->path, errno, removingText ); - reload = true; - } else if ( (uint64_t)len != candidate->realFilesize ) { - logadd( LOG_WARNING, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64 - ". Try sending SIGHUP to server if you know what you're doing.", - candidate->path, candidate->realFilesize, (uint64_t)len ); - } else { - // Seek worked, file size is same, now see if we can read from file - char buffer[100]; - if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) { - logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)%s.", - (int)sizeof(buffer), candidate->path, errno, removingText ); - reload = true; - } else if ( !candidate->working ) { - // Seems everything is fine again \o/ - candidate->working = true; - logadd( LOG_INFO, "Changed state of %s:%d to 'working'", candidate->name, candidate->rid ); - } - } + if ( image_ensureOpen( candidate ) && !candidate->problem.read ) + return candidate; // We have a read fd and no read or changed problems - if ( reload ) { + // -- image could not be opened again, or is open but has problem -- + + if ( _removeMissingImages && !file_isReadable( candidate->path ) ) { + candidate = image_remove( candidate ); + // No image_release here, the image is still returned and should be released by caller + } else if ( candidate->readFd != -1 ) { + // We cannot just close the fd as it might be in use. Make a copy and remove old entry. + candidate = image_remove( candidate ); // Could not access the image with exising fd - mark for reload which will re-open the file. // make a copy of the image struct but keep the old one around. If/When it's not being used // anymore, it will be freed automatically. - logadd( LOG_DEBUG1, "Reloading image file %s", candidate->path ); + logadd( LOG_DEBUG1, "Reloading image file %s because of read problem/changed", candidate->path ); dnbd3_image_t *img = calloc( sizeof(dnbd3_image_t), 1 ); img->path = strdup( candidate->path ); img->name = strdup( candidate->name ); img->virtualFilesize = candidate->virtualFilesize; img->realFilesize = candidate->realFilesize; - img->atime = now; + timing_get( &img->atime ); img->masterCrc32 = candidate->masterCrc32; img->readFd = -1; img->rid = candidate->rid; img->users = 1; - img->working = false; + img->problem.read = true; + img->problem.changed = candidate->problem.changed; img->ref_cacheMap = NULL; mutex_init( &img->lock, LOCK_IMAGE ); if ( candidate->crc32 != NULL ) { @@ -419,18 +413,17 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) if ( image_addToList( img ) ) { image_release( candidate ); candidate = img; + // Check if image is incomplete, initialize uplink + if ( candidate->ref_cacheMap != NULL ) { + uplink_init( candidate, -1, NULL, -1 ); + } + // Try again with new instance + image_ensureOpen( candidate ); } else { img->users = 0; image_free( img ); } - // Check if image is incomplete, initialize uplink - if ( candidate->ref_cacheMap != NULL ) { - uplink_init( candidate, -1, NULL, -1 ); - } - // readFd == -1 and working == FALSE at this point, - // this function needs some splitting up for handling as we need to run most - // of the above code again. for now we know that the next call for this - // name:rid will get ne newly inserted "img" and try to re-open the file. + // readFd == -1 and problem.read == true } return candidate; // We did all we can, hopefully it's working @@ -449,6 +442,7 @@ dnbd3_image_t* image_lock(dnbd3_image_t *image) mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { if ( _images[i] == image ) { + assert( _images[i]->id == image->id ); image->users++; mutex_unlock( &imageListLock ); return image; @@ -479,6 +473,7 @@ dnbd3_image_t* image_release(dnbd3_image_t *image) // responsible for freeing it for (int i = 0; i < _num_images; ++i) { if ( _images[i] == image ) { // Found, do nothing + assert( _images[i]->id == image->id ); mutex_unlock( &imageListLock ); return NULL; } @@ -518,6 +513,7 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image) mutex_lock( &imageListLock ); for ( int i = _num_images - 1; i >= 0; --i ) { if ( _images[i] == image ) { + assert( _images[i]->id == image->id ); _images[i] = NULL; mustFree = ( image->users == 0 ); } @@ -630,12 +626,18 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) { assert( image != NULL ); assert( image->users == 0 ); - logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", image->name, (int)image->rid ); + logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", PIMG(image) ); // uplink_shutdown might return false to tell us // that the shutdown is in progress. Bail out since // this will get called again when the uplink is done. if ( !uplink_shutdown( image ) ) return NULL; + if ( isImageFromUpstream( image ) ) { + saveMetaData( image, NULL, 0 ); + if ( image->ref_cacheMap != NULL ) { + saveCacheMap( image ); + } + } mutex_lock( &image->lock ); ref_setref( &image->ref_cacheMap, NULL ); free( image->crc32 ); @@ -700,7 +702,8 @@ static bool image_load_all_internal(char *base, char *path) while ( !_shutdown && (entryPtr = readdir( dir )) != NULL ) { entry = *entryPtr; - if ( strcmp( entry.d_name, "." ) == 0 || strcmp( entry.d_name, ".." ) == 0 ) continue; + if ( entry.d_name[0] == '.' ) + continue; // No hidden files, no . or .. if ( strlen( entry.d_name ) > SUBDIR_LEN ) { logadd( LOG_WARNING, "Skipping entry %s: Too long (max %d bytes)", entry.d_name, (int)SUBDIR_LEN ); continue; @@ -717,7 +720,7 @@ static bool image_load_all_internal(char *base, char *path) if ( S_ISDIR( st.st_mode ) ) { image_load_all_internal( base, subpath ); // Recurse } else if ( !isForbiddenExtension( subpath ) ) { - image_load( base, subpath, true ); // Load image if possible + image_load( base, subpath, false ); // Load image if possible } } closedir( dir ); @@ -756,10 +759,9 @@ static bool image_addToList(dnbd3_image_t *image) * Note that this is NOT THREAD SAFE so make sure its always * called on one thread only. */ -static bool image_load(char *base, char *path, int withUplink) +static bool image_load(char *base, char *path, bool withUplink) { int revision = -1; - struct stat st; dnbd3_cache_map_t *cache = NULL; uint32_t *crc32list = NULL; dnbd3_image_t *existing = NULL; @@ -824,7 +826,9 @@ static bool image_load(char *base, char *path, int withUplink) fdImage = open( path, O_RDONLY ); } if ( fdImage == -1 ) { - logadd( LOG_ERROR, "Could not open '%s' for reading...", path ); + if ( errno != ENOENT ) { + logadd( LOG_ERROR, "[load] Cannot open '%s' for reading (errno=%d)", path, errno ); + } goto load_error; } // Determine file size @@ -855,16 +859,16 @@ static bool image_load(char *base, char *path, int withUplink) // Compare data just loaded to identical image we apparently already loaded if ( existing != NULL ) { if ( existing->realFilesize != realFilesize ) { - logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", existing->name, (int)existing->rid ); + logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", PIMG(existing) ); // Image will be replaced below } else if ( existing->crc32 != NULL && crc32list != NULL && memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlockCount ) != 0 ) { - logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", existing->name, (int)existing->rid ); + logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", PIMG(existing) ); logadd( LOG_WARNING, "The image will be reloaded, but you should NOT replace existing images while the server is running." ); logadd( LOG_WARNING, "Actually even if it's not running this should never be done. Use a new RID instead!" ); // Image will be replaced below } else if ( existing->crc32 == NULL && crc32list != NULL ) { - logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", existing->name, (int)existing->rid ); + logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", PIMG(existing) ); existing->crc32 = crc32list; existing->masterCrc32 = masterCrc; crc32list = NULL; @@ -872,7 +876,7 @@ static bool image_load(char *base, char *path, int withUplink) goto load_error; // Keep existing } else if ( existing->ref_cacheMap != NULL && cache == NULL ) { // Just ignore that fact, if replication is really complete the cache map will be removed anyways - logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", existing->name, (int)existing->rid ); + logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", PIMG(existing) ); function_return = true; goto load_error; // Keep existing } else { @@ -900,19 +904,10 @@ static bool image_load(char *base, char *path, int withUplink) image->rid = (uint16_t)revision; image->users = 0; image->readFd = -1; - image->working = ( cache == NULL ); timing_get( &image->nextCompletenessEstimate ); image->completenessEstimate = -1; mutex_init( &image->lock, LOCK_IMAGE ); - int32_t offset; - if ( stat( path, &st ) == 0 ) { - // Negatively offset atime by file modification time - offset = (int32_t)( st.st_mtime - time( NULL ) ); - if ( offset > 0 ) offset = 0; - } else { - offset = 0; - } - timing_gets( &image->atime, offset ); + loadImageMeta( image ); // Prevent freeing in cleanup cache = NULL; @@ -925,7 +920,7 @@ static bool image_load(char *base, char *path, int withUplink) // Image is definitely incomplete, initialize uplink worker if ( image->ref_cacheMap != NULL ) { - image->working = false; + image->problem.uplink = true; if ( withUplink ) { uplink_init( image, -1, NULL, -1 ); } @@ -937,14 +932,14 @@ static bool image_load(char *base, char *path, int withUplink) // Keep fd for reading fdImage = -1; // Check CRC32 - image_checkRandomBlocks( image, 4 ); + image_checkRandomBlocks( image, 4, -1 ); } else { logadd( LOG_ERROR, "Image list full: Could not add image %s", path ); image->readFd = -1; // Keep fdImage instead, will be closed below image = image_free( image ); goto load_error; } - logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->name, (int)image->rid ); + logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", PIMG(image) ); function_return = true; // Clean exit: @@ -1027,10 +1022,19 @@ static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t f return retval; } -static void image_checkRandomBlocks(dnbd3_image_t *image, const int count) +/** + * Check up to count random blocks from given image. If fromFd is -1, the check will + * be run asynchronously using the integrity checker. Otherwise, the check will + * happen in the function and return the result of the check. + * @param image image to check + * @param count number of blocks to check (max) + * @param fromFd, check synchronously and use this fd for reading, -1 = async + * @return true = OK, false = error. Meaningless if fromFd == -1 + */ +static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd) { if ( image->crc32 == NULL ) - return; + return true; // This checks the first block and (up to) count - 1 random blocks for corruption // via the known crc32 list. This is very sloppy and is merely supposed to detect // accidental corruption due to broken dnbd3-proxy functionality or file system @@ -1038,7 +1042,7 @@ static void image_checkRandomBlocks(dnbd3_image_t *image, const int count) assert( count > 0 ); dnbd3_cache_map_t *cache = ref_get_cachemap( image ); const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ); - int blocks[count]; + int blocks[count+1]; // +1 for "-1" in sync case int index = 0, j; int block; if ( image_isHashBlockComplete( cache, 0, image->virtualFilesize ) ) { @@ -1062,9 +1066,16 @@ while_end: ; if ( cache != NULL ) { ref_put( &cache->reference ); } - for ( int i = 0; i < index; ++i ) { - integrity_check( image, blocks[i], true ); + if ( fromFd == -1 ) { + // Async + for ( int i = 0; i < index; ++i ) { + integrity_check( image, blocks[i], true ); + } + return true; } + // Sync + blocks[index] = -1; + return image_checkBlocksCrc32( fromFd, image->crc32, blocks, image->realFilesize ); } /** @@ -1079,7 +1090,7 @@ bool image_create(char *image, int revision, uint64_t size) logadd( LOG_ERROR, "revision id invalid: %d", revision ); return false; } - char path[PATHLEN], cache[PATHLEN]; + char path[PATHLEN], cache[PATHLEN+4]; char *lastSlash = strrchr( image, '/' ); if ( lastSlash == NULL ) { snprintf( path, PATHLEN, "%s/%s.r%d", _basePath, image, revision ); @@ -1090,7 +1101,7 @@ bool image_create(char *image, int revision, uint64_t size) *lastSlash = '/'; snprintf( path, PATHLEN, "%s/%s.r%d", _basePath, image, revision ); } - snprintf( cache, PATHLEN, "%s.map", path ); + snprintf( cache, PATHLEN+4, "%s.map", path ); size = (size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); const int mapsize = IMGSIZE_TO_MAPBYTES(size); // Write files @@ -1111,14 +1122,19 @@ bool image_create(char *image, int revision, uint64_t size) logadd( LOG_DEBUG1, "Could not allocate %d bytes for %s (errno=%d)", mapsize, cache, err ); } // Now write image + bool fallback = false; if ( !_sparseFiles && !file_alloc( fdImage, 0, size ) ) { logadd( LOG_ERROR, "Could not allocate %" PRIu64 " bytes for %s (errno=%d)", size, path, errno ); logadd( LOG_ERROR, "It is highly recommended to use a file system that supports preallocating disk" " space without actually writing all zeroes to the block device." ); logadd( LOG_ERROR, "If you cannot fix this, try setting sparseFiles=true, but don't expect" " divine performance during replication." ); - goto failure_cleanup; - } else if ( _sparseFiles && !file_setSize( fdImage, size ) ) { + if ( !_ignoreAllocErrors ) { + goto failure_cleanup; + } + fallback = true; + } + if ( ( _sparseFiles || fallback ) && !file_setSize( fdImage, size ) ) { logadd( LOG_ERROR, "Could not create sparse file of %" PRIu64 " bytes for %s (errno=%d)", size, path, errno ); logadd( LOG_ERROR, "Make sure you have enough disk space, check directory permissions, fs errors etc." ); goto failure_cleanup; @@ -1162,14 +1178,18 @@ dnbd3_image_t* image_getOrLoad(char * const name, const uint16_t revision) // Sanity check if ( len == 0 || name[len - 1] == '/' || name[0] == '/' || name[0] == '.' || strstr( name, "/." ) != NULL ) return NULL; - // If in proxy mode, check with upstream server first + // Re-check latest local revision + image = loadImageServer( name, revision ); + // If in proxy mode, check with upstream servers if ( _isProxy ) { + // Forget the locally loaded one + image_release( image ); + // Check with upstream - if unsuccessful, will return the same + // as loadImageServer did image = loadImageProxy( name, revision, len ); - if ( image != NULL ) - return image; } // Lookup on local storage - return loadImageServer( name, revision ); + return image; } /** @@ -1227,19 +1247,20 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, int uplinkSock = -1; dnbd3_host_t uplinkServer; const int count = altservers_getHostListForReplication( name, servers, REP_NUM_SRV ); - uint16_t remoteProtocolVersion; uint16_t remoteRid = revision; - uint64_t remoteImageSize; + uint16_t acceptedRemoteRid = 0; + uint16_t remoteProtocolVersion = 0; struct sockaddr_storage sa; socklen_t salen; poll_list_t *cons = sock_newPollList(); logadd( LOG_DEBUG2, "Trying to clone %s:%d from %d hosts", name, (int)revision, count ); for (int i = 0; i < count + 5; ++i) { // "i < count + 5" for 5 additional iterations, waiting on pending connects - char *remoteName; + char *remoteName = NULL; + uint64_t remoteImageSize = 0; bool ok = false; int sock; if ( i >= count ) { - sock = sock_multiConnect( cons, NULL, 100, 1000 ); + sock = sock_multiConnect( cons, NULL, 100, _uplinkTimeout ); if ( sock == -2 ) break; } else { if ( log_hasMask( LOG_DEBUG2 ) ) { @@ -1248,7 +1269,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, host[len] = '\0'; logadd( LOG_DEBUG2, "Trying to replicate from %s", host ); } - sock = sock_multiConnect( cons, &servers[i], 100, 1000 ); + sock = sock_multiConnect( cons, &servers[i], 100, _uplinkTimeout ); } if ( sock == -1 || sock == -2 ) continue; salen = sizeof(sa); @@ -1273,7 +1294,11 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, } else { ok = image_ensureDiskSpace( remoteImageSize + ( 10 * 1024 * 1024 ), false ); // some extra space for cache map etc. } - ok = ok && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img + if ( ok ) { + ok = image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img + } else { + logadd( LOG_INFO, "Not enough space to replicate '%s:%d'", name, (int)revision ); + } mutex_unlock( &reloadLock ); if ( !ok ) goto server_fail; @@ -1282,26 +1307,32 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &uplinkServer ) ) { uplinkServer.type = 0; } - break; + acceptedRemoteRid = remoteRid; + break; // TODO: Maybe we should try the remaining servers if rid == 0, in case there's an even newer one server_fail: ; close( sock ); } sock_destroyPollList( cons ); - // If we still have a pointer to a local image, release the reference - if ( image != NULL ) image_release( image ); + // If we still have a pointer to a local image, compare rid + if ( image != NULL ) { + if ( ( revision == 0 && image->rid >= acceptedRemoteRid ) || ( image->rid == revision ) ) { + return image; + } + // release the reference + image_release( image ); + } // If everything worked out, this call should now actually return the image - image = image_get( name, remoteRid, false ); + image = image_get( name, acceptedRemoteRid, false ); if ( image != NULL && uplinkSock != -1 ) { // If so, init the uplink and pass it the socket - sock_setTimeout( uplinkSock, _uplinkTimeout ); if ( !uplink_init( image, uplinkSock, &uplinkServer, remoteProtocolVersion ) ) { close( uplinkSock ); } else { // Clumsy busy wait, but this should only take as long as it takes to start a thread, so is it really worth using a signalling mechanism? int i = 0; - while ( !image->working && ++i < 100 ) + while ( image->problem.uplink && ++i < 100 ) usleep( 2000 ); } } else if ( uplinkSock != -1 ) { @@ -1318,6 +1349,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste { char imageFile[PATHLEN] = ""; uint16_t detectedRid = 0; + bool isLegacyFile = false; if ( requestedRid != 0 ) { snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, (int)requestedRid ); @@ -1354,6 +1386,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste && ( detectedRid == 0 || !file_isReadable( imageFile ) ) ) { snprintf( imageFile, PATHLEN, "%s/%s", _basePath, name ); detectedRid = 1; + isLegacyFile = true; } logadd( LOG_DEBUG2, "Trying to load %s:%d ( -> %d) as %s", name, (int)requestedRid, (int)detectedRid, imageFile ); // No file was determined, or it doesn't seem to exist/be readable @@ -1361,7 +1394,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste logadd( LOG_DEBUG2, "Not found, bailing out" ); return image_get( name, requestedRid, true ); } - if ( !_vmdkLegacyMode && requestedRid == 0 ) { + if ( !isLegacyFile && requestedRid == 0 ) { // rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0 while ( detectedRid != 0 ) { dnbd3_image_t *image = image_get( name, detectedRid, true ); @@ -1429,9 +1462,13 @@ static bool image_clone(int sock, char *name, uint16_t revision, uint64_t imageS logadd( LOG_WARNING, "OTF-Clone: Corrupted CRC-32 list. ignored. (%s)", name ); } else { int fd = open( crcFile, O_WRONLY | O_CREAT, 0644 ); - write( fd, &masterCrc, sizeof(uint32_t) ); - write( fd, crc32list, crc32len ); + ssize_t ret = write( fd, &masterCrc, sizeof(masterCrc) ); + ret += write( fd, crc32list, crc32len ); close( fd ); + if ( (size_t)ret != crc32len + sizeof(masterCrc) ) { + logadd( LOG_WARNING, "Could not save freshly received crc32 list for %s:%d", name, (int)revision ); + unlink( crcFile ); + } } } free( crc32list ); @@ -1564,14 +1601,23 @@ json_t* image_getListAsJson() ref_put( &uplink->reference ); } - jsonImage = json_pack( "{sisssisisisisI}", + int problems = 0; +#define addproblem(name,val) if (image->problem.name) problems |= (1 << val) + addproblem(read, 0); + addproblem(write, 1); + addproblem(changed, 2); + addproblem(uplink, 3); + addproblem(queue, 4); + + jsonImage = json_pack( "{sisssisisisisIsi}", "id", image->id, // id, name, rid never change, so access them without locking "name", image->name, "rid", (int) image->rid, "users", image->users, "complete", completeness, "idle", idleTime, - "size", (json_int_t)image->virtualFilesize ); + "size", (json_int_t)image->virtualFilesize, + "problems", problems ); if ( bytesReceived != 0 ) { json_object_set_new( jsonImage, "bytesReceived", json_integer( (json_int_t) bytesReceived ) ); } @@ -1594,7 +1640,7 @@ int image_getCompletenessEstimate(dnbd3_image_t * const image) assert( image != NULL ); dnbd3_cache_map_t *cache = ref_get_cachemap( image ); if ( cache == NULL ) - return image->working ? 100 : 0; + return 100; const int len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); if ( unlikely( len == 0 ) ) { ref_put( &cache->reference ); @@ -1705,46 +1751,51 @@ bool image_ensureDiskSpaceLocked(uint64_t size, bool force) /** * Make sure at least size bytes are available in _basePath. * Will delete old images to make room for new ones. - * TODO: Store last access time of images. Currently the - * last access time is reset to the file modification time - * on server restart. Thus it will - * currently only delete images if server uptime is > 24 hours. + * It will only delete images if a configurable uptime is + * reached. * This can be overridden by setting force to true, in case * free space is desperately needed. * Return true iff enough space is available. false in random other cases */ static bool image_ensureDiskSpace(uint64_t size, bool force) { - for ( int maxtries = 0; maxtries < 20; ++maxtries ) { + for ( int maxtries = 0; maxtries < 50; ++maxtries ) { uint64_t available; if ( !file_freeDiskSpace( _basePath, NULL, &available ) ) { - logadd( LOG_WARNING, "Could not get free disk space (errno %d), will assume there is enough space left... ;-)\n", errno ); + logadd( LOG_WARNING, "Could not get free disk space (errno %d), will assume there is enough space left.", errno ); return true; } if ( available > size ) return true; // Yay - if ( !_isProxy || _autoFreeDiskSpaceDelay == -1 ) + if ( !_isProxy || _autoFreeDiskSpaceDelay == -1 ) { + logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but auto-freeing of disk space is disabled.", + (int)(available / (1024ll * 1024)), + (int)(size / (1024ll * 1024)) ); return false; // If not in proxy mode at all, or explicitly disabled, never delete anything + } if ( !force && dnbd3_serverUptime() < (uint32_t)_autoFreeDiskSpaceDelay ) { - logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but server uptime < %d minutes...", (int)(available / (1024ll * 1024ll)), - (int)(size / (1024 * 1024)), _autoFreeDiskSpaceDelay / 60 ); + logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but server uptime < %d minutes...", + (int)(available / (1024ll * 1024)), + (int)(size / (1024ll * 1024)), _autoFreeDiskSpaceDelay / 60 ); return false; } - logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, freeing an image...", (int)(available / (1024ll * 1024ll)), - (int)(size / (1024 * 1024)) ); + logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, freeing an image...", + (int)(available / (1024ll * 1024)), + (int)(size / (1024ll * 1024)) ); // Find least recently used image dnbd3_image_t *oldest = NULL; int i; mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { dnbd3_image_t *current = _images[i]; - if ( current == NULL ) continue; - if ( current->users == 0 ) { // Not in use :-) - if ( oldest == NULL || timing_1le2( ¤t->atime, &oldest->atime ) ) { - // Oldest access time so far - oldest = current; - } - } + if ( current == NULL || current->users != 0 ) + continue; // Empty slot or in use + if ( oldest != NULL && timing_1le2( &oldest->atime, ¤t->atime ) ) + continue; // Already got a newer one + if ( !isImageFromUpstream( current ) ) + continue; // Not replicated, don't touch + // Oldest access time so far + oldest = current; } if ( oldest != NULL ) { oldest->users++; @@ -1760,7 +1811,7 @@ static bool image_ensureDiskSpace(uint64_t size, bool force) image_release( oldest ); // We did users++ above; image might have to be freed entirely return false; } - logadd( LOG_INFO, "'%s:%d' has to go!", oldest->name, (int)oldest->rid ); + logadd( LOG_INFO, "'%s:%d' has to go!", PIMG(oldest) ); char *filename = strdup( oldest->path ); // Copy name as we remove the image first oldest = image_remove( oldest ); // Remove from list first... oldest = image_release( oldest ); // Decrease users counter; if it falls to 0, image will be freed @@ -1790,15 +1841,14 @@ static void* closeUnusedFds(void* nix UNUSED) timing_gets( &deadline, -UNUSED_FD_TIMEOUT ); int fds[FDCOUNT]; int fdindex = 0; + setThreadName( "unused-fd-close" ); mutex_lock( &imageListLock ); for ( int i = 0; i < _num_images; ++i ) { dnbd3_image_t * const image = _images[i]; if ( image == NULL || image->readFd == -1 ) continue; - // TODO: Also close for idle uplinks (uplink_connectionShouldShutdown) - // TODO: And close writeFd for idle uplinks.... if ( image->users == 0 && image->uplinkref == NULL && timing_reached( &image->atime, &deadline ) ) { - logadd( LOG_DEBUG1, "Inactive fd closed for %s:%d", image->name, (int)image->rid ); + logadd( LOG_DEBUG1, "Inactive fd closed for %s:%d", PIMG(image) ); fds[fdindex++] = image->readFd; image->readFd = -1; // Not a race; image->users is 0 and to increase it you need imageListLock if ( fdindex == FDCOUNT ) @@ -1813,6 +1863,177 @@ static void* closeUnusedFds(void* nix UNUSED) return NULL; } +static bool isImageFromUpstream(dnbd3_image_t *image) +{ + if ( !_isProxy ) + return false; // Nothing to do + // Check if we're a "hybrid proxy", i.e. there are only some namespaces (directories) + // for which we have any upstream servers configured. If there's none, don't touch + // the cache map on disk. + if ( !altservers_imageHasAltServers( image->name ) ) + return false; // Nothing to do + return true; +} + +static void* saveLoadAllCacheMaps(void* nix UNUSED) +{ + static ticks nextSave; + declare_now; + bool full = timing_reached( &nextSave, &now ); + time_t walltime = 0; + setThreadName( "cache-mapper" ); + if ( full ) { + walltime = time( NULL ); + // Update at start to avoid concurrent runs + timing_addSeconds( &nextSave, &now, CACHE_MAP_MAX_SAVE_DELAY ); + } + mutex_lock( &imageListLock ); + for ( int i = 0; i < _num_images; ++i ) { + dnbd3_image_t * const image = _images[i]; + if ( image == NULL ) + continue; + image->users++; + mutex_unlock( &imageListLock ); + const bool fromUpstream = isImageFromUpstream( image ); + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + if ( fromUpstream ) { + // Replicated image, we're responsible for updating the map, so save it + // Save if dirty bit is set, blocks were invalidated + bool save = cache->dirty; + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( !save ) { + // Otherwise, consider longer timeout and byte count limits of uplink + if ( uplink != NULL ) { + assert( uplink->bytesReceivedLastSave <= uplink->bytesReceived ); + uint64_t diff = uplink->bytesReceived - uplink->bytesReceivedLastSave; + if ( diff > CACHE_MAP_MAX_UNSAVED_BYTES || ( full && diff != 0 ) ) { + save = true; + } + } + } + if ( save ) { + cache->dirty = false; + if ( uplink != NULL ) { + uplink->bytesReceivedLastSave = uplink->bytesReceived; + } + saveCacheMap( image ); + } + if ( uplink != NULL ) { + ref_put( &uplink->reference ); + } + } else { + // We're not replicating this image, if there's a cache map, reload + // it periodically, since we might read from a shared storage that + // another server instance is writing to. + if ( full || ( !cache->unchanged && !image->problem.read ) ) { + logadd( LOG_DEBUG2, "Reloading cache map of %s:%d", PIMG(image) ); + dnbd3_cache_map_t *onDisk = image_loadCacheMap(image->path, image->virtualFilesize); + if ( onDisk == NULL ) { + // Should be complete now + logadd( LOG_DEBUG1, "External replication of %s:%d complete", PIMG(image) ); + ref_setref( &image->ref_cacheMap, NULL ); + } else { + const int mapSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + if ( memcmp( cache->map, onDisk->map, mapSize ) == 0 ) { + // Unchanged + cache->unchanged = true; + onDisk->reference.free( &onDisk->reference ); + } else { + // Replace + ref_setref( &image->ref_cacheMap, &onDisk->reference ); + logadd( LOG_DEBUG2, "Map changed" ); + } + } + } + } // end reload cache map + ref_put( &cache->reference ); + } // end has cache map + if ( full && fromUpstream ) { + saveMetaData( image, &now, walltime ); + } + image_release( image ); // Always do this instead of users-- to handle freeing + mutex_lock( &imageListLock ); + } + mutex_unlock( &imageListLock ); + return NULL; +} + +/** + * Saves the cache map of the given image. + * Return false if this image doesn't have a cache map, or if the image + * doesn't have any uplink to replicate from. In this case the image might + * still have a cache map that was loaded from disk, and should be reloaded + * periodically. + * @param image the image + */ +static void saveCacheMap(dnbd3_image_t *image) +{ + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL ) + return; // Race - wasn't NULL in function call above... + + logadd( LOG_DEBUG2, "Saving cache map of %s:%d", PIMG(image) ); + const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); + char mapfile[strlen( image->path ) + 4 + 1]; + strcpy( mapfile, image->path ); + strcat( mapfile, ".map" ); + + int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); + if ( fd == -1 ) { + const int err = errno; + ref_put( &cache->reference ); + logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); + return; + } + + // On Linux we could use readFd, but in general it's not guaranteed to work + int imgFd = open( image->path, O_WRONLY ); + if ( imgFd == -1 ) { + logadd( LOG_WARNING, "Cannot open %s for fsync(): errno=%d", image->path, errno ); + } else { + if ( fsync( imgFd ) == -1 ) { + logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d. Resetting cache map.", image->path, errno ); + dnbd3_cache_map_t *old = image_loadCacheMap(image->path, image->virtualFilesize); + const int mapSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + if ( old == NULL ) { + // Could not load old map. FS might be toast. + logadd( LOG_ERROR, "Cannot load old cache map. Setting all zero." ); + memset( cache->map, 0, mapSize ); + } else { + // AND the maps together to be safe + for ( int i = 0; i < mapSize; ++i ) { + cache->map[i] &= old->map[i]; + } + old->reference.free( &old->reference ); + } + } + close( imgFd ); + } + + // Write current map to file + size_t done = 0; + while ( done < size ) { + const ssize_t ret = write( fd, cache->map + done, size - done ); + if ( ret == -1 ) { + if ( errno == EINTR ) continue; + logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); + break; + } + if ( ret <= 0 ) { + logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); + break; + } + done += (size_t)ret; + } + ref_put( &cache->reference ); + if ( fsync( fd ) == -1 ) { + logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); + } + close( fd ); + // TODO fsync on parent directory +} + static void allocCacheMap(dnbd3_image_t *image, bool complete) { const uint8_t val = complete ? 0xff : 0; @@ -1822,7 +2043,7 @@ static void allocCacheMap(dnbd3_image_t *image, bool complete) memset( cache->map, val, byteSize ); mutex_lock( &image->lock ); if ( image->ref_cacheMap != NULL ) { - logadd( LOG_WARNING, "BUG: allocCacheMap called but there already is a cache map for %s:%d", image->name, (int)image->rid ); + logadd( LOG_WARNING, "BUG: allocCacheMap called but there already is a map for %s:%d", PIMG(image) ); free( cache ); } else { ref_setref( &image->ref_cacheMap, &cache->reference ); @@ -1830,3 +2051,77 @@ static void allocCacheMap(dnbd3_image_t *image, bool complete) mutex_unlock( &image->lock ); } +/** + * It's assumed you hold a reference to the image + */ +static void saveMetaData(dnbd3_image_t *image, ticks *now, time_t walltime) +{ + if ( !image->accessed ) + return; + ticks tmp; + uint32_t diff; + char *fn; + if ( asprintf( &fn, "%s.meta", image->path ) == -1 ) { + logadd( LOG_WARNING, "Cannot asprintf meta" ); + return; + } + if ( now == NULL ) { + timing_get( &tmp ); + now = &tmp; + walltime = time( NULL ); + } + mutex_lock( &image->lock ); + image->accessed = false; + diff = timing_diff( &image->atime, now ); + mutex_unlock( &image->lock ); + FILE *f = fopen( fn, "w" ); + if ( f == NULL ) { + logadd( LOG_WARNING, "Cannot open %s for writing", fn ); + } else { + fprintf( f, "[main]\natime=%"PRIu64"\n", (uint64_t)( walltime - diff ) ); + fclose( f ); + } + free( fn ); + // TODO: fsync() dir +} + +static void loadImageMeta(dnbd3_image_t *image) +{ + int32_t offset = 1; + char *fn; + if ( asprintf( &fn, "%s.meta", image->path ) == -1 ) { + logadd( LOG_WARNING, "asprintf load" ); + } else { + int fh = open( fn, O_RDONLY ); + free( fn ); + if ( fh != -1 ) { + char buf[200]; + ssize_t ret = read( fh, buf, sizeof(buf)-1 ); + close( fh ); + if ( ret > 0 ) { + buf[ret] = '\0'; + // Do it the cheap way until we actually store more stuff + char *pos = strstr( buf, "atime=" ); + if ( pos != NULL ) { + offset = (int32_t)( atol( pos + 6 ) - time( NULL ) ); + } + } + } + } + if ( offset == 1 ) { + // Nothing from .meta file, use old guesstimate + struct stat st; + if ( stat( image->path, &st ) == 0 ) { + // Negatively offset atime by file modification time + offset = (int32_t)( st.st_mtime - time( NULL ) ); + } else { + offset = 0; + } + image->accessed = true; + } + if ( offset > 0 ) { + offset = 0; + } + timing_gets( &image->atime, offset ); +} + diff --git a/src/server/image.h b/src/server/image.h index 89791fc..7b6583c 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -19,7 +19,7 @@ bool image_ensureOpen(dnbd3_image_t *image); dnbd3_image_t* image_byId(int imgId); -dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking); +dnbd3_image_t* image_get(const char *name, uint16_t revision, bool checkIfWorking); bool image_reopenCacheFd(dnbd3_image_t *image, const bool force); @@ -49,6 +49,52 @@ void image_closeUnusedFd(); bool image_ensureDiskSpaceLocked(uint64_t size, bool force); +bool image_saveCacheMap(dnbd3_image_t *image); + +/** + * Check if given range is cached. Be careful when using this function because: + * 1) you need to hold a reference to the cache map + * 2) start and end are assumed to be 4k aligned + * 3) start and end are not checked to be in bounds (we don't know the image in this context) + */ +static inline bool image_isRangeCachedUnsafe(dnbd3_cache_map_t *cache, uint64_t start, uint64_t end) +{ + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + const uint8_t fb = (uint8_t)(0xff << ((start >> 12) & 7)); + const uint8_t lb = (uint8_t)(~(0xff << ((((end - 1) >> 12) & 7) + 1))); + uint64_t pos; + uint8_t b; + bool isCached; + if ( firstByteInMap == lastByteInMap ) { // Single byte to check, much simpler + b = cache->map[firstByteInMap]; + isCached = ( b & ( fb & lb ) ) == ( fb & lb ); + } else { + isCached = true; + atomic_thread_fence( memory_order_acquire ); + // First byte + if ( isCached ) { + b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed ); + isCached = ( ( b & fb ) == fb ); + } + // Last byte + if ( isCached ) { + b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed ); + isCached = ( ( b & lb ) == lb ); + } + // Middle, must be all bits set (0xff) + if ( isCached ) { + for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { + if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) { + isCached = false; + break; + } + } + } + } + return isCached; +} + // one byte in the map covers 8 4kib blocks, so 32kib per byte // "+ (1 << 15) - 1" is required to account for the last bit of // the image that is smaller than 32kib diff --git a/src/server/ini.c b/src/server/ini.c index c796d5c..37c44a3 100644 --- a/src/server/ini.c +++ b/src/server/ini.c @@ -52,7 +52,7 @@ static char* find_char_or_comment(const char* s, char c) /* Version of strncpy that ensures dest (size bytes) is null-terminated. */ static char* strncpy0(char* dest, const char* src, size_t size) { - strncpy( dest, src, size ); + strncpy( dest, src, size - 1 ); dest[size - 1] = '\0'; return dest; } diff --git a/src/server/integrity.c b/src/server/integrity.c index 4006dfc..91e53b8 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -195,9 +195,10 @@ static void* integrity_main(void * data UNUSED) readFd = directFd; } } - if ( readFd == -1 ) { // Try buffered; flush to disk for that - image_ensureOpen( image ); - readFd = image->readFd; + if ( readFd == -1 ) { // Try buffered as fallback + if ( image_ensureOpen( image ) && !image->problem.read ) { + readFd = image->readFd; + } } if ( readFd == -1 ) { logadd( LOG_MINOR, "Couldn't get any valid fd for integrity check of %s... ignoring...", image->path ); @@ -237,16 +238,6 @@ static void* integrity_main(void * data UNUSED) // Done with this task as nothing left checkQueue[i].image = NULL; if ( i + 1 == queueLen ) queueLen--; - // Mark as working again if applicable - if ( !foundCorrupted ) { - dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); - if ( uplink != NULL ) { // TODO: image_determineWorkingState() helper? - mutex_lock( &image->lock ); - image->working = uplink->current.fd != -1 && image->readFd != -1; - mutex_unlock( &image->lock ); - ref_put( &uplink->reference ); - } - } } else { // Still more blocks to go... checkQueue[i].block = blocks[0]; @@ -254,9 +245,6 @@ static void* integrity_main(void * data UNUSED) } if ( foundCorrupted && !_shutdown ) { // Something was fishy, make sure uplink exists - mutex_lock( &image->lock ); - image->working = false; - mutex_unlock( &image->lock ); uplink_init( image, -1, NULL, -1 ); } // Release :-) diff --git a/src/server/locks.c b/src/server/locks.c index b39576b..3be73b3 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -7,9 +7,9 @@ #include "locks.h" #include "helper.h" -#include "../shared/timing.h" +#include <dnbd3/shared/timing.h> -#ifdef _DEBUG +#ifdef DNBD3_SERVER_DEBUG_LOCKS #define MAXLOCKS (SERVER_MAX_CLIENTS * 2 + SERVER_MAX_ALTS + 200 + SERVER_MAX_IMAGES) #define MAXTHREADS (SERVER_MAX_CLIENTS + 100) #define MAXLPT 20 diff --git a/src/server/locks.h b/src/server/locks.h index e5c9801..3b04caa 100644 --- a/src/server/locks.h +++ b/src/server/locks.h @@ -23,10 +23,12 @@ #define LOCK_UPLINK_RTT 200 #define LOCK_UPLINK_SEND 210 #define LOCK_RPC_ACL 220 +#define LOCK_FUSE_INIT 300 +#define LOCK_FUSE_DIR 310 // -#ifdef _DEBUG +#ifdef DNBD3_SERVER_DEBUG_LOCKS #define mutex_init( lock, prio ) debug_mutex_init( #lock, __FILE__, __LINE__, lock, prio) #define mutex_lock( lock ) debug_mutex_lock( #lock, __FILE__, __LINE__, lock, false) @@ -55,10 +57,12 @@ void debug_dump_lock_stats(); #endif -#ifdef DEBUG_THREADS +#ifdef DNBD3_SERVER_DEBUG_THREADS + +#include <dnbd3/shared/log.h> extern int debugThreadCount; -#define thread_create(thread,attr,routine,arg) (logadd( LOG_THREAD CREATE, "%d @ %s:%d\n", debugThreadCount, __FILE__, (int)__LINE__), debug_thread_create(thread, attr, routine, arg)) +#define thread_create(thread,attr,routine,arg) (logadd( LOG_INFO, "THREAD_CREATE: %d @ %s:%d\n", debugThreadCount, __FILE__, (int)__LINE__), debug_thread_create(thread, attr, routine, arg)) static inline pthread_t debug_thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg) { int i; @@ -68,26 +72,26 @@ static inline pthread_t debug_thread_create(pthread_t *thread, const pthread_att return pthread_create( thread, attr, start_routine, arg ); } -#define thread_detach(thread) (logadd( LOG_THREAD DETACH, "%d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_detach(thread)) +#define thread_detach(thread) (logadd( LOG_INFO, "THREAD_DETACH: %d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_detach(thread)) static inline int debug_thread_detach(pthread_t thread) { const int ret = pthread_detach(thread); if (ret == 0) { --debugThreadCount; } else { - logadd( LOG_THREAD DETACH, "Tried to detach invalid thread (error %d)\n", (int)errno); + logadd( LOG_INFO, "THREAD_DETACH: Tried to detach invalid thread (error %d)\n", (int)errno); exit(1); } return ret; } -#define thread_join(thread,value) (logadd( LOG_THREAD JOIN, "%d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_join(thread,value)) +#define thread_join(thread,value) (logadd( LOG_INFO, "THREAD_JOIN: %d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_join(thread,value)) static inline int debug_thread_join(pthread_t thread, void **value_ptr) { const int ret = pthread_join(thread, value_ptr); if (ret == 0) { --debugThreadCount; } else { - logadd( LOG_THREAD JOIN, "Tried to join invalid thread (error %d)\n", (int)errno); + logadd( LOG_INFO, "THREAD_JOIN: Tried to join invalid thread (error %d)\n", (int)errno); exit(1); } return ret; @@ -99,6 +103,6 @@ static inline int debug_thread_join(pthread_t thread, void **value_ptr) #define thread_detach(thread) pthread_detach( thread ) #define thread_join(thread,value) pthread_join( thread, value ) -#endif +#endif /* DNBD3_SERVER_DEBUG_THREADS */ #endif /* LOCKS_H_ */ diff --git a/src/server/net.c b/src/server/net.c index aba4e7d..eb51d29 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -3,7 +3,7 @@ * * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> * - * This file may be licensed under the terms of of the + * This file may be licensed under the terms of the * GNU General Public License Version 2 (the ``GPL''). * * Software distributed under the License is distributed @@ -26,10 +26,10 @@ #include "altservers.h" #include "reference.h" -#include "../shared/sockhelper.h" -#include "../shared/timing.h" -#include "../shared/protocol.h" -#include "../serialize.h" +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/serialize.h> #include <assert.h> @@ -58,11 +58,12 @@ static atomic_uint_fast64_t totalBytesSent = 0; static bool addToList(dnbd3_client_t *client); static void removeFromList(dnbd3_client_t *client); static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client); +static void uplinkCallback(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer); static inline bool recv_request_header(int sock, dnbd3_request_t *request) { ssize_t ret, fails = 0; -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL sock = 0; #endif // Read request header from socket @@ -89,7 +90,7 @@ static inline bool recv_request_header(int sock, dnbd3_request_t *request) static inline bool recv_request_payload(int sock, uint32_t size, serialized_buffer_t *payload) { -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL sock = 0; #endif if ( size == 0 ) { @@ -113,7 +114,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff * Send reply with optional payload. payload can be null. The caller has to * acquire the sendMutex first. */ -static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) +static inline bool send_reply(int sock, dnbd3_reply_t *reply, const void *payload) { const uint32_t size = reply->size; fixup_reply( *reply ); @@ -159,7 +160,7 @@ void* net_handleNewConnection(void *clientPtr) // Await data from client. Since this is a fresh connection, we expect data right away sock_setTimeout( client->sock, _clientTimeout ); do { -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL const int ret = (int)recv( 0, &request, sizeof(request), MSG_WAITALL ); #else const int ret = (int)recv( client->sock, &request, sizeof(request), MSG_WAITALL ); @@ -197,6 +198,7 @@ void* net_handleNewConnection(void *clientPtr) client->hostName[HOSTNAMELEN-1] = '\0'; mutex_unlock( &client->lock ); client->bytesSent = 0; + client->relayedCount = 0; if ( !addToList( client ) ) { freeClientStruct( client ); @@ -207,6 +209,7 @@ void* net_handleNewConnection(void *clientPtr) dnbd3_reply_t reply; dnbd3_image_t *image = NULL; + dnbd3_cache_map_t *cache = NULL; int image_file = -1; int num; @@ -215,7 +218,6 @@ void* net_handleNewConnection(void *clientPtr) serialized_buffer_t payload; uint16_t rid, client_version; - uint64_t start, end; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -262,22 +264,24 @@ void* net_handleNewConnection(void *clientPtr) atomic_thread_fence( memory_order_release ); if ( unlikely( image == NULL ) ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( unlikely( !image->working ) ) { + } else if ( unlikely( image->problem.read || image->problem.changed ) ) { logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", client->hostName, image_name, (int)rid ); } else { // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; if ( image->ref_cacheMap != NULL ) { - dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); - if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) { + if ( image->problem.queue || image->problem.write ) { bOk = ( rand() % 4 ) == 1; } - if ( bOk && uplink != NULL && uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this - usleep( 100000 ); // server gets a penalty and is less likely to be selected - } - if ( uplink != NULL ) { - ref_put( &uplink->reference ); + if ( bOk ) { + if ( image->problem.write ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + if ( image->problem.uplink ) { + // Penaltize depending on completeness, if no uplink is available + usleep( ( 100 - image->completenessEstimate ) * 100 ); + } } } if ( bOk ) { @@ -286,6 +290,7 @@ void* net_handleNewConnection(void *clientPtr) if ( !client->isServer ) { // Only update immediately if this is a client. Servers are handled on disconnect. timing_get( &image->atime ); + image->accessed = true; } mutex_unlock( &image->lock ); serializer_reset_write( &payload ); @@ -313,9 +318,8 @@ void* net_handleNewConnection(void *clientPtr) // client handling mainloop while ( recv_request_header( client->sock, &request ) ) { if ( _shutdown ) break; - switch ( request.cmd ) { + if ( likely ( request.cmd == CMD_GET_BLOCK ) ) { - case CMD_GET_BLOCK:; const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking reply.handle = request.handle; if ( unlikely( offset >= image->virtualFilesize ) ) { @@ -324,7 +328,7 @@ void* net_handleNewConnection(void *clientPtr) reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); - break; + continue; } if ( unlikely( offset + request.size > image->virtualFilesize ) ) { // Sanity check @@ -332,63 +336,36 @@ void* net_handleNewConnection(void *clientPtr) reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); - break; + continue; } - dnbd3_cache_map_t *cache; - if ( request.size != 0 && ( cache = ref_get_cachemap( image ) ) != NULL ) { + if ( cache == NULL ) { + cache = ref_get_cachemap( image ); + } + + if ( request.size != 0 && cache != NULL ) { // This is a proxyed image, check if we need to relay the request... - start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - bool isCached = true; - const uint64_t firstByteInMap = start >> 15; - const uint64_t lastByteInMap = (end - 1) >> 15; - uint64_t pos; - uint8_t b; - atomic_thread_fence( memory_order_acquire ); - // Middle - quick checking - if ( isCached ) { - for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { - if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) { - isCached = false; - break; - } - } - } - // First byte - if ( isCached ) { - b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed ); - for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) { - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (b & bit_mask) == 0 ) { - isCached = false; - break; + const uint64_t start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint64_t end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + if ( !image_isRangeCachedUnsafe( cache, start, end ) ) { + if ( unlikely( client->relayedCount > 250 ) ) { + logadd( LOG_DEBUG1, "Client is overloading uplink; throttling" ); + for ( int i = 0; i < 100 && client->relayedCount > 200; ++i ) { + usleep( 10000 ); } - } - } - // Last byte - only check if request spans multiple bytes in cache map - if ( isCached && firstByteInMap != lastByteInMap ) { - b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed ); - for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) { - assert( lastByteInMap == (pos >> 15) ); - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (b & bit_mask) == 0 ) { - isCached = false; - break; + if ( client->relayedCount > 250 ) { + logadd( LOG_WARNING, "Could not lower client's uplink backlog; dropping client" ); + goto exit_client_cleanup; } } - } - ref_put( &cache->reference ); - if ( !isCached ) { - if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { - logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", + client->relayedCount++; + if ( !uplink_requestClient( client, &uplinkCallback, request.handle, offset, request.size, request.hops ) ) { + client->relayedCount--; + logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d", client->hostName, image->name, image->rid ); - image->working = false; goto exit_client_cleanup; } - break; // DONE, exit request.cmd switch + continue; // Reply arrives on uplink some time later, handle next request now } } @@ -419,7 +396,7 @@ void* net_handleNewConnection(void *clientPtr) // TODO: Should we consider EOPNOTSUPP on BSD for sendfile and fallback to read/write? // Linux would set EINVAL or ENOSYS instead, which it unfortunately also does for a couple of other failures :/ // read/write would kill performance anyways so a fallback would probably be of little use either way. -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL char buf[1000]; size_t cnt = realBytes - done; if ( cnt > 1000 ) { @@ -456,7 +433,7 @@ void* net_handleNewConnection(void *clientPtr) } if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) { logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid ); - image->working = false; + image->problem.read = true; } } goto exit_client_cleanup; @@ -473,7 +450,16 @@ void* net_handleNewConnection(void *clientPtr) if ( lock ) mutex_unlock( &client->sendMutex ); // Global per-client counter client->bytesSent += request.size; // Increase counter for statistics. - break; + continue; + } + // Any other command + // Release cache map every now and then, in case the image was replicated + // entirely. Will be re-grabbed on next CMD_GET_BLOCK otherwise. + if ( cache != NULL ) { + ref_put( &cache->reference ); + cache = NULL; + } + switch ( request.cmd ) { case CMD_GET_SERVERS: // Build list of known working alt servers @@ -522,9 +508,9 @@ set_name: ; logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd ); break; - } - } - } + } // end switch + } // end loop + } // end bOk exit_client_cleanup: ; // First remove from list, then add to counter to prevent race condition removeFromList( client ); @@ -533,8 +519,12 @@ exit_client_cleanup: ; if ( image != NULL && client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { mutex_lock( &image->lock ); timing_get( &image->atime ); + image->accessed = true; mutex_unlock( &image->lock ); } + if ( cache != NULL ) { + ref_put( &cache->reference ); + } freeClientStruct( client ); // This will also call image_release on client->image return NULL ; fail_preadd: ; @@ -695,9 +685,21 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) if ( client->image != NULL ) { dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); if ( uplink != NULL ) { - uplink_removeClient( uplink, client ); + if ( client->relayedCount != 0 ) { + uplink_removeEntry( uplink, client, &uplinkCallback ); + } ref_put( &uplink->reference ); } + if ( client->relayedCount != 0 ) { + logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); + int i; + for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { + usleep( 10000 ); + } + if ( client->relayedCount != 0 ) { + logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); + } + } } mutex_lock( &client->sendMutex ); if ( client->sock != -1 ) { @@ -739,15 +741,21 @@ static bool addToList(dnbd3_client_t *client) return true; } -void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle) +static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer) { - dnbd3_reply_t reply; - reply.magic = dnbd3_packet_magic; - reply.cmd = cmd; - reply.handle = handle; - reply.size = 0; + dnbd3_client_t *client = (dnbd3_client_t*)data; + dnbd3_reply_t reply = { + .magic = dnbd3_packet_magic, + .cmd = buffer == NULL ? CMD_ERROR : CMD_GET_BLOCK, + .handle = handle, + .size = length, + }; mutex_lock( &client->sendMutex ); - send_reply( client->sock, &reply, NULL ); + send_reply( client->sock, &reply, buffer ); + if ( buffer == NULL ) { + shutdown( client->sock, SHUT_RDWR ); + } + client->relayedCount--; mutex_unlock( &client->sendMutex ); } diff --git a/src/server/net.h b/src/server/net.h index 7719aef..2d6e5e7 100644 --- a/src/server/net.h +++ b/src/server/net.h @@ -3,7 +3,7 @@ * * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> * - * This file may be licensed under the terms of of the + * This file may be licensed under the terms of the * GNU General Public License Version 2 (the ``GPL''). * * Software distributed under the License is distributed @@ -37,6 +37,4 @@ void net_disconnectAll(); void net_waitForAllDisconnected(); -void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle); - #endif /* NET_H_ */ diff --git a/src/server/picohttpparser/CMakeLists.txt b/src/server/picohttpparser/CMakeLists.txt new file mode 100644 index 0000000..cc6ec96 --- /dev/null +++ b/src/server/picohttpparser/CMakeLists.txt @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 3.10) + +# set the project name +project(picohttpparser + LANGUAGES C) + +set(PICOHTTPPARSER_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/picohttpparser.c) +set(PICOHTTPPARSER_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/picohttpparser.h) + +add_library(picohttpparser STATIC ${PICOHTTPPARSER_SOURCE_FILES}) +target_include_directories(picohttpparser PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/src/server/reference.h b/src/server/reference.h index 4eda546..75a681f 100644 --- a/src/server/reference.h +++ b/src/server/reference.h @@ -39,6 +39,11 @@ static inline ref *ref_get( weakref *weakref ) return ref; } +static inline void ref_inc( ref *ref ) +{ + ++ref->count; +} + static inline void ref_put( ref *ref ) { if ( --ref->count == 0 ) { diff --git a/src/server/rpc.c b/src/server/rpc.c index a454d6d..119bbd5 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -5,7 +5,9 @@ #include "locks.h" #include "image.h" #include "altservers.h" -#include "../shared/sockhelper.h" +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/version.h> +#include <dnbd3/build.h> #include "fileutil.h" #include "picohttpparser/picohttpparser.h" #include "urldecode.h" @@ -101,8 +103,8 @@ void rpc_init() int fd = open( "/dev/urandom", O_RDONLY ); if ( fd != -1 ) { uint32_t bla = 1; - read( fd, &bla, 4 ); - randomRunId = (randomRunId << 32) | bla; + (void)!read( fd, &bla, 4 ); + randomRunId = ((randomRunId & 0xffffffff) << 32) | bla; } close( fd ); } @@ -144,7 +146,7 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int while ( !_shutdown ) { // Read request from client struct phr_header headers[100]; - size_t numHeaders, prevLen = 0, consumed; + size_t numHeaders, prevLen = 0, consumed = 0; struct string method, path; int minorVersion; while ( !_shutdown ) { @@ -174,7 +176,7 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int // Reaching here means partial request or parse error if ( pret == -2 ) { // Partial, keep reading prevLen = hoff; -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL ssize_t ret = recv( 0, headerBuf + hoff, sizeof(headerBuf) - hoff, 0 ); #else ssize_t ret = recv( sock, headerBuf + hoff, sizeof(headerBuf) - hoff, 0 ); @@ -259,7 +261,7 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t { bool ok; bool stats = false, images = false, clients = false, space = false; - bool logfile = false, config = false, altservers = false; + bool logfile = false, config = false, altservers = false, version = false; #define SETVAR(var) if ( !var && STRCMP(fields[i].value, #var) ) var = true for (size_t i = 0; i < fields_num; ++i) { if ( !equals( &fields[i].name, &STR_Q ) ) continue; @@ -270,9 +272,10 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t else SETVAR(logfile); else SETVAR(config); else SETVAR(altservers); + else SETVAR(version); } #undef SETVAR - if ( ( stats || space ) && !(permissions & ACL_STATS) ) { + if ( ( stats || space || version ) && !(permissions & ACL_STATS) ) { return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access statistics", -1, keepAlive ); } if ( images && !(permissions & ACL_IMAGE_LIST) ) { @@ -308,6 +311,10 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t statisticsJson = json_pack( "{sI}", "runId", randomRunId ); } + if ( version ) { + json_object_set_new( statisticsJson, "version", json_string( DNBD3_VERSION_LONG ", built " DNBD3_BUILD_DATE ) ); + json_object_set_new( statisticsJson, "build", json_string( DNBD3_BUILD ) ); + } if ( space ) { uint64_t spaceTotal = 0, spaceAvail = 0; file_freeDiskSpace( _basePath, &spaceTotal, &spaceAvail ); @@ -405,9 +412,11 @@ static bool sendReply(int sock, const char *status, const char *ctype, const cha if ( keepAlive == HTTP_CLOSE ) { // Wait for flush shutdown( sock, SHUT_WR ); -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL sock = 0; #endif + // Don't wait too long in case other side ignores the shutdown + sock_setTimeout( sock, 600 ); while ( read( sock, buffer, sizeof buffer ) > 0 ); return false; } @@ -451,7 +460,7 @@ static int getacl(dnbd3_host_t *host) if ( aclRules[i].bitMask != 0 && aclRules[i].host[aclRules[i].bytes] != ( host->addr[aclRules[i].bytes] & aclRules[i].bitMask ) ) continue; return aclRules[i].permissions; } -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL return 0x7fffff; #else return 0; @@ -487,7 +496,7 @@ static void addacl(int argc, char **argv, void *data UNUSED) *slash++ = '\0'; } if ( !parse_address( argv[0], &host ) ) goto unlock_end; - long int bits; + long int bits = 0; if ( slash != NULL ) { char *last; bits = strtol( slash, &last, 10 ); diff --git a/src/server/serialize.c b/src/server/serialize.c deleted file mode 100644 index 4934132..0000000 --- a/src/server/serialize.c +++ /dev/null @@ -1,5 +0,0 @@ -#include <stdio.h> -#include <string.h> -#include <stdint.h> - -#include "../serialize.c" diff --git a/src/server/server.c b/src/server/server.c index 0dddea7..0f75935 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -3,7 +3,7 @@ * * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> * - * This file may be licensed under the terms of of the + * This file may be licensed under the terms of the * GNU General Public License Version 2 (the ``GPL''). * * Software distributed under the License is distributed @@ -29,10 +29,12 @@ #include "integrity.h" #include "threadpool.h" #include "rpc.h" +#include "fuse.h" -#include "../version.h" -#include "../shared/sockhelper.h" -#include "../shared/timing.h" +#include <dnbd3/version.h> +#include <dnbd3/build.h> +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/shared/timing.h> #include <signal.h> #include <getopt.h> @@ -104,10 +106,14 @@ static void queueJobInternal(job_t *job); */ void dnbd3_printHelp(char *argv_0) { - printf( "Version: %s\n\n", VERSION_STRING ); + printf( "Version: %s\n\n", DNBD3_VERSION_LONG ); + printf( "Built: %s\n", DNBD3_BUILD_DATE ); printf( "Usage: %s [OPTIONS]...\n", argv_0 ); printf( "Start the DNBD3 server\n" ); printf( "-c or --config Configuration directory (default /etc/dnbd3-server/)\n" ); +#ifdef DNBD3_SERVER_FUSE + printf( "-m or --mount FUSE mount point\n"); +#endif printf( "-n or --nodaemon Start server in foreground\n" ); printf( "-b or --bind Local Address to bind to\n" ); printf( "-h or --help Show this help text and quit\n" ); @@ -126,7 +132,8 @@ void dnbd3_printHelp(char *argv_0) */ void dnbd3_printVersion() { - printf( "Version: %s\n", VERSION_STRING ); + printf( "dnbd3-server version: %s\n", DNBD3_VERSION_LONG ); + printf( "Built: %s\n", DNBD3_BUILD_DATE ); exit( 0 ); } @@ -140,6 +147,8 @@ _Noreturn static void dnbd3_cleanup() _shutdown = true; logadd( LOG_INFO, "Cleanup..." ); + dfuse_shutdown(); + if ( hasTimerThread ) { pthread_kill( timerThread, SIGINT ); thread_join( timerThread, NULL ); @@ -190,11 +199,13 @@ int main(int argc, char *argv[]) char *paramCreate = NULL; char *bindAddress = NULL; char *errorMsg = NULL; + char *mountDir = NULL; int64_t paramSize = -1; int paramRevision = -1; - static const char *optString = "b:c:d:hnv?"; + static const char *optString = "b:c:m:d:hnv?"; static const struct option longOpts[] = { { "config", required_argument, NULL, 'c' }, + { "mount", required_argument, NULL, 'm' }, { "nodaemon", no_argument, NULL, 'n' }, { "reload", no_argument, NULL, 'r' }, { "help", no_argument, NULL, 'h' }, @@ -209,6 +220,16 @@ int main(int argc, char *argv[]) { 0, 0, 0, 0 } }; + log_init(); + + /* set proper output stream for AFL */ +#ifdef DNBD3_SERVER_AFL + if ( log_setConsoleOutputStream(stderr) < 0 ) { + logadd( LOG_ERROR, "Failed to set output stream for AFL to stderr" ); + exit( EXIT_FAILURE ); + } +#endif + mainPid = getpid(); mainThread = pthread_self(); opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); @@ -218,6 +239,13 @@ int main(int argc, char *argv[]) case 'c': _configDir = strdup( optarg ); break; + case 'm': +#ifndef DNBD3_SERVER_FUSE + fprintf( stderr, "FUSE support not enabled at build time.\n" ); + return 8; +#endif + mountDir = strdup( optarg ); + break; case 'n': demonize = 0; break; @@ -263,6 +291,7 @@ int main(int argc, char *argv[]) opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); } + // Load general config if ( _configDir == NULL ) _configDir = strdup( "/etc/dnbd3-server" ); @@ -275,9 +304,7 @@ int main(int argc, char *argv[]) timing_setBase(); timing_get( &startupTime ); -#ifdef AFL_MODE - // ###### AFL - // +#ifdef DNBD3_SERVER_AFL image_serverStartup(); net_init(); uplink_globalsInit(); @@ -301,9 +328,7 @@ int main(int argc, char *argv[]) net_handleNewConnection( dnbd3_client ); exit( 0 ); } - // - // ###### AFL END -#endif +#endif /* DNBD3_SERVER_AFL */ // One-shots first: @@ -315,7 +340,10 @@ int main(int argc, char *argv[]) // No one-shot detected, normal server operation or errormsg serving if ( demonize ) { logadd( LOG_INFO, "Forking into background, see log file for further information" ); - daemon( 1, 0 ); + if ( daemon( 0, 0 ) == -1 ) { + logadd( LOG_ERROR, "Could not daemon(): errno=%d", errno ); + exit( 1 ); + } } if ( errorMsg != NULL ) { setupNetwork( bindAddress ); @@ -339,7 +367,15 @@ int main(int argc, char *argv[]) net_init(); uplink_globalsInit(); rpc_init(); - logadd( LOG_INFO, "DNBD3 server starting.... Machine type: " ENDIAN_MODE ); + if ( mountDir != NULL && !dfuse_init( "-oallow_other", mountDir ) ) { + logadd( LOG_ERROR, "Cannot mount fuse directory to %s", mountDir ); + dnbd3_cleanup(); + return EXIT_FAILURE; + } + logadd( LOG_INFO, "DNBD3 server starting...." ); + logadd( LOG_INFO, "Machine type: " DNBD3_ENDIAN_MODE ); + logadd( LOG_INFO, "Build Type: %s", DNBD3_BUILD ); + logadd( LOG_INFO, "Version: %s, built %s", DNBD3_VERSION_LONG, DNBD3_BUILD_DATE ); if ( altservers_load() < 0 ) { logadd( LOG_WARNING, "Could not load alt-servers. Does the file exist in %s?", _configDir ); @@ -379,10 +415,11 @@ int main(int argc, char *argv[]) // Initialize thread pool if ( !threadpool_init( 8 ) ) { logadd( LOG_ERROR, "Could not init thread pool!\n" ); + dnbd3_cleanup(); exit( EXIT_FAILURE ); } - logadd( LOG_INFO, "Server is ready. (%s)", VERSION_STRING ); + logadd( LOG_INFO, "Server is ready." ); if ( thread_create( &timerThread, NULL, &timerMainloop, NULL ) == 0 ) { hasTimerThread = true; @@ -398,7 +435,7 @@ int main(int argc, char *argv[]) if ( sigReload ) { sigReload = false; logadd( LOG_INFO, "SIGHUP received, re-scanning image directory" ); - threadpool_run( &server_asyncImageListLoad, NULL ); + threadpool_run( &server_asyncImageListLoad, NULL, "IMAGE_RELOAD" ); } if ( sigLogCycle ) { sigLogCycle = false; @@ -425,7 +462,7 @@ int main(int argc, char *argv[]) continue; } - if ( !threadpool_run( &net_handleNewConnection, (void *)dnbd3_client ) ) { + if ( !threadpool_run( &net_handleNewConnection, (void *)dnbd3_client, "CLIENT" ) ) { logadd( LOG_ERROR, "Could not start thread for new connection." ); free( dnbd3_client ); continue; @@ -520,10 +557,11 @@ static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED) if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) { pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread } - } - if ( pthread_equal( pthread_self(), mainThread ) ) { - // Signal received by main thread -- handle - dnbd3_handleSignal( signum ); + // Source is not this process -- only then do we honor signals + if ( pthread_equal( pthread_self(), mainThread ) ) { + // Signal received by main thread -- handle + dnbd3_handleSignal( signum ); + } } } @@ -568,7 +606,7 @@ static int handlePendingJobs(void) jobHead = *temp; // Make it list head *temp = NULL; // Split off part before that while ( todo != NULL ) { - threadpool_run( todo->startRoutine, todo->arg ); + threadpool_run( todo->startRoutine, todo->arg, "TIMER_TASK" ); old = todo; todo = todo->next; if ( old->intervalSecs == 0 ) { diff --git a/src/server/server.h b/src/server/server.h index a026eb6..e93d8f5 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -3,7 +3,7 @@ * * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> * - * This file may be licensed under the terms of of the + * This file may be licensed under the terms of the * GNU General Public License Version 2 (the ``GPL''). * * Software distributed under the License is distributed @@ -22,7 +22,7 @@ #define SERVER_H_ #include "globals.h" -#include "../types.h" +#include <dnbd3/types.h> uint32_t dnbd3_serverUptime(); void server_addJob(void *(*startRoutine)(void *), void *arg, int delaySecs, int intervalSecs); diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 0b46fd6..a21bd0d 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -8,6 +8,7 @@ typedef struct _entry_t { dnbd3_signal_t* signal; void *(*startRoutine)(void *); void * arg; + const char *name; } entry_t; static void *threadpool_worker(void *entryPtr); @@ -56,21 +57,22 @@ void threadpool_waitEmpty() } while ( activeThreads != 0 ); } -bool threadpool_run(void *(*startRoutine)(void *), void *arg) +bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name) { if ( unlikely( _shutdown ) ) { logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" ); return false; } +#ifdef DEBUG if ( unlikely( startRoutine == NULL ) ) { logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); return false; // Or bail out!? } +#endif entry_t *entry = NULL; for ( int i = 0; i < maxIdleThreads; ++i ) { - entry_t *cur = pool[i]; - if ( cur != NULL && atomic_compare_exchange_weak( &pool[i], &cur, NULL ) ) { - entry = cur; + entry = atomic_exchange( &pool[i], NULL ); + if ( entry != NULL ) { break; } } @@ -87,7 +89,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) return false; } if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) { - logadd( LOG_WARNING, "Could not create new thread for thread pool\n" ); + logadd( LOG_WARNING, "Could not create new thread for thread pool (%d active)\n", (int)activeThreads ); signal_close( entry->signal ); free( entry ); return false; @@ -96,6 +98,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) } entry->startRoutine = startRoutine; entry->arg = arg; + entry->name = name; atomic_thread_fence( memory_order_release ); signal_call( entry->signal ); return true; @@ -120,10 +123,15 @@ keep_going:; logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); continue; } +#ifdef DEBUG if ( entry->startRoutine == NULL ) { logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); exit( 1 ); } + if ( entry->name != NULL ) { + setThreadName( entry->name ); + } +#endif // Start assigned work (*entry->startRoutine)( entry->arg ); // Reset vars for safety @@ -143,6 +151,7 @@ keep_going:; // Reaching here means pool is full; just let the thread exit break; } + setThreadName( "[dead]" ); signal_close( entry->signal ); free( entry ); activeThreads--; diff --git a/src/server/threadpool.h b/src/server/threadpool.h index ee0b3aa..c30d44f 100644 --- a/src/server/threadpool.h +++ b/src/server/threadpool.h @@ -1,7 +1,7 @@ #ifndef _THREADPOOL_H_ #define _THREADPOOL_H_ -#include "../types.h" +#include <dnbd3/types.h> /** * Initialize the thread pool. This must be called before using @@ -26,9 +26,10 @@ void threadpool_waitEmpty(); * Run a thread using the thread pool. * @param startRoutine function to run in new thread * @param arg argument to pass to thead + * @param name STRING CONSTANT (literal) for debugging purposes * @return true if thread was started */ -bool threadpool_run(void *(*startRoutine)(void *), void *arg); +bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name); #endif diff --git a/src/server/uplink.c b/src/server/uplink.c index f39e633..8a83124 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -4,10 +4,11 @@ #include "image.h" #include "altservers.h" #include "net.h" -#include "../shared/sockhelper.h" -#include "../shared/protocol.h" -#include "../shared/timing.h" -#include "../shared/crc32.h" +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/shared/crc32.h> +#include "threadpool.h" #include "reference.h" #include <assert.h> @@ -17,49 +18,35 @@ #include <unistd.h> #include <stdatomic.h> +static const uint8_t HOP_FLAG_BGR = 0x80; +static const uint8_t HOP_FLAG_PREFETCH = 0x40; #define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 ) #define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE ) #define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) ) -#define REP_NONE ( (uint64_t)0xffffffffffffffff ) - -// Status of request in queue - -// Slot is free, can be used. -// Must only be set in uplink_handle_receive() or uplink_remove_client() -#define ULR_FREE 0 -// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse. -// Must only be set in uplink_request() -#define ULR_NEW 1 -// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. -// Must only be set in uplink_mainloop() or uplink_request() -#define ULR_PENDING 2 -// Slot is being processed, do not consider for hop on. -// Must only be set in uplink_handle_receive() -#define ULR_PROCESSING 3 - -static const char *const NAMES_ULR[4] = { - [ULR_FREE] = "ULR_FREE", - [ULR_NEW] = "ULR_NEW", - [ULR_PENDING] = "ULR_PENDING", - [ULR_PROCESSING] = "ULR_PROCESSING", -}; - static atomic_uint_fast64_t totalBytesReceived = 0; +typedef struct { + uint64_t start, end, handle; +} req_t; + static void cancelAllRequests(dnbd3_uplink_t *uplink); -static void uplink_free(ref *ref); +static void freeUplinkStruct(ref *ref); static void* uplink_mainloop(void *data); -static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly); -static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex); -static void uplink_handleReceive(dnbd3_uplink_t *uplink); -static int uplink_sendKeepalive(const int fd); -static void uplink_addCrc32(dnbd3_uplink_t *uplink); -static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink); -static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force); -static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink); -static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink); -static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew); +static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly); +static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex); +static void handleReceive(dnbd3_uplink_t *uplink); +static bool sendKeepalive(dnbd3_uplink_t *uplink); +static void requestCrc32List(dnbd3_uplink_t *uplink); +static bool sendReplicationRequest(dnbd3_uplink_t *uplink); +static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force); +static bool connectionShouldShutdown(dnbd3_uplink_t *uplink); +static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew); +static int numWantedReplicationRequests(dnbd3_uplink_t *uplink); +static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle); +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); + +#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) ) // ############ uplink connection handling @@ -81,6 +68,8 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version { if ( !_isProxy || _shutdown ) return false; assert( image != NULL ); + if ( sock == -1 && !altservers_imageHasAltServers( image->name ) ) + return false; // Nothing to do mutex_lock( &image->lock ); dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); if ( uplink != NULL ) { @@ -97,13 +86,15 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version } uplink = calloc( 1, sizeof(dnbd3_uplink_t) ); // Start with one reference for the uplink thread. We'll return it when the thread finishes - ref_init( &uplink->reference, uplink_free, 1 ); + ref_init( &uplink->reference, freeUplinkStruct, 1 ); mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE ); mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT ); mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND ); uplink->image = image; uplink->bytesReceived = 0; - uplink->idleTime = 0; + uplink->bytesReceivedLastSave = 0; + uplink->idleTime = SERVER_UPLINK_IDLE_TIMEOUT - 90; + uplink->queue = NULL; uplink->queueLen = 0; uplink->cacheFd = -1; uplink->signal = signal_new(); @@ -111,12 +102,14 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." ); goto failure; } - uplink->replicationHandle = REP_NONE; mutex_lock( &uplink->rttLock ); mutex_lock( &uplink->sendMutex ); uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->cycleDetected = false; + image->problem.uplink = true; + image->problem.write = true; + image->problem.queue = false; if ( sock != -1 ) { uplink->better.fd = sock; int index = altservers_hostToIndex( host ); @@ -139,7 +132,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version return true; failure: ; if ( uplink != NULL ) { - image->users++; // Expected by uplink_free() + image->users++; // Expected by freeUplinkStruct() ref_put( &uplink->reference ); // The ref for the uplink thread that never was } mutex_unlock( &image->lock ); @@ -166,13 +159,13 @@ bool uplink_shutdown(dnbd3_image_t *image) image->users++; // Prevent free while uplink shuts down signal_call( uplink->signal ); } else { - logadd( LOG_ERROR, "This will never happen. '%s:%d'", image->name, (int)image->rid ); + logadd( LOG_ERROR, "This will never happen. '%s:%d'", PIMG(image) ); } cancelAllRequests( uplink ); ref_setref( &image->uplinkref, NULL ); - ref_put( &uplink->reference ); mutex_unlock( &uplink->queueLock ); bool retval = ( exp && image->users == 0 ); + ref_put( &uplink->reference ); mutex_unlock( &image->lock ); return retval; } @@ -183,19 +176,28 @@ bool uplink_shutdown(dnbd3_image_t *image) */ static void cancelAllRequests(dnbd3_uplink_t *uplink) { - for ( int i = 0; i < uplink->queueLen; ++i ) { - if ( uplink->queue[i].status != ULR_FREE ) { - net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle ); - uplink->queue[i].status = ULR_FREE; + dnbd3_queue_entry_t *it = uplink->queue; + while ( it != NULL ) { + dnbd3_queue_client_t *cit = it->clients; + while ( cit != NULL ) { + (*cit->callback)( cit->data, cit->handle, 0, 0, NULL ); + dnbd3_queue_client_t *next = cit->next; + free( cit ); + cit = next; } + dnbd3_queue_entry_t *next = it->next; + free( it ); + it = next; } + uplink->queue = NULL; uplink->queueLen = 0; + uplink->image->problem.queue = false; } -static void uplink_free(ref *ref) +static void freeUplinkStruct(ref *ref) { dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference); - logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", uplink->image->name, (int)uplink->image->rid ); + logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", PIMG(uplink->image) ); assert( uplink->queueLen == 0 ); if ( uplink->signal != NULL ) { signal_close( uplink->signal ); @@ -226,35 +228,36 @@ static void uplink_free(ref *ref) * Remove given client from uplink request queue * Locks on: uplink.queueLock */ -void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) +void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback) { mutex_lock( &uplink->queueLock ); - for (int i = uplink->queueLen - 1; i >= 0; --i) { - if ( uplink->queue[i].client == client ) { - // Make sure client doesn't get destroyed while we're sending it data - mutex_lock( &client->sendMutex ); - mutex_unlock( &client->sendMutex ); - uplink->queue[i].client = NULL; - uplink->queue[i].status = ULR_FREE; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) { + if ( (**cit).data == data && (**cit).callback == callback ) { + (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL ); + dnbd3_queue_client_t *entry = *cit; + *cit = (**cit).next; + free( entry ); + } else { + cit = &(**cit).next; + } } - if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--; } mutex_unlock( &uplink->queueLock ); } /** - * Request a chunk of data through an uplink server - * Locks on: image.lock, uplink.queueLock + * Called from a client (proxy) connection to request a missing part of the image. + * The caller has made sure that the range is actually missing. */ -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - if ( client == NULL || client->image == NULL ) - return false; - if ( length > (uint32_t)_maxPayload ) { - logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); + assert( client != NULL && callback != NULL ); + if ( ( hops & 0x3f ) > 60 ) { // This is just silly + logadd( LOG_WARNING, "Refusing to relay a request that has > 60 hops" ); return false; } - dnbd3_uplink_t * uplink = ref_get_uplink( &client->image->uplinkref ); + dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); if ( unlikely( uplink == NULL ) ) { uplink_init( client->image, -1, NULL, -1 ); uplink = ref_get_uplink( &client->image->uplinkref ); @@ -263,160 +266,275 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin return false; } } - if ( uplink->shutdown ) { - logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); - goto fail_ref; - } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { + bool ret; + if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - goto fail_ref; + ret = false; + } else { + ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops ); } + ref_put( &uplink->reference ); + return ret; +} - int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise - int existingType = -1; // ULR_* type of existing request - int i; - int freeSlot = -1; - int firstUsedSlot = -1; - bool requestLoop = false; - const uint64_t end = start + length; +/** + * Called by integrated fuse module + */ +bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, + uint64_t handle, uint64_t start, uint32_t length) +{ + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( image, -1, NULL, -1 ); + uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } + } + bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 ); + ref_put( &uplink->reference ); + return ret; +} + +static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted) +{ + uint32_t length = (uint32_t)( *end - start ); + if ( length >= wanted ) + return; + length = wanted; + if ( unlikely( _backgroundReplication == BGR_HASHBLOCK + && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) { + // Don't extend across hash-block border in this mode + *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 ); + } else { + *end = start + length; + } + if ( unlikely( *end > image->virtualFilesize ) ) { + *end = image->virtualFilesize; + } + *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 ); + //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end ); +} + +static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops) +{ + if ( uplink->current.fd == -1 ) + return false; + return dnbd3_get_block( uplink->current.fd, req->start, + (uint32_t)( req->end - req->start ), req->handle, + COND_HOPCOUNT( uplink->current.version, hops ) ); +} + +/** + * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. + * If callback is NULL, this is assumed to be a background replication request. + * Locks on: uplink.queueLock, uplink.sendMutex + */ +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, + uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +{ + assert( uplink != NULL ); + assert( data == NULL || callback != NULL ); + if ( ( hops & HOP_FLAG_BGR ) // This is a background replication request + && _backgroundReplication != BGR_FULL ) { // Deny if we're not doing BGR + // TODO: Allow BGR_HASHBLOCK too, but only if hash block isn't completely empty + logadd( LOG_DEBUG2, "Dopping client because of BGR policy" ); + return false; + } + if ( uplink->shutdown ) { + logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); + return false; + } + if ( length > (uint32_t)_maxPayload ) { + logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", + length ); + return false; + } + hops++; + if ( callback == NULL ) { + // Set upper-most bit for replication requests that we fire + // In client mode, at least set prefetch flag to prevent prefetch cascading + hops |= (uint8_t)( _pretendClient ? HOP_FLAG_PREFETCH : HOP_FLAG_BGR ); + } + + req_t req, preReq; + dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL; + bool isNew; + const uint64_t end = start + length; + req.start = start & ~(DNBD3_BLOCK_SIZE - 1); + req.end = end; + /* Don't do this -- this breaks matching of prefetch jobs, since they'd + * be misaligned, and the next client request wouldn't match anything. + * To improve this, we need to be able to attach a queue_client to multiple queue_entries + * and then serve it once all the queue_entries are done (atomic_int in queue_client). + * But currently we directly send the receive buffer's content to the queue_client after + * receiving the payload, as this will also work when the local cache is borked (we just + * tunnel though the traffic). One could argue that this mode of operation is nonsense, + * and we should just drop all affected clients. Then as a next step, don't serve the + * clients form the receive buffer, but just issue a normal sendfile() call after writing + * the received data to the local cache. + */ + if ( callback != NULL && _minRequestSize != 0 ) { + // Not background replication request, extend request size + extendRequest( req.start, &req.end, uplink->image, _minRequestSize ); + } + req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1); + // Critical section - work with the queue mutex_lock( &uplink->queueLock ); if ( uplink->shutdown ) { // Check again after locking to prevent lost requests goto fail_lock; } - for (i = 0; i < uplink->queueLen; ++i) { - // find free slot to place this request into - if ( uplink->queue[i].status == ULR_FREE ) { - if ( freeSlot == -1 || existingType != ULR_PROCESSING ) { - freeSlot = i; - } - continue; - } - if ( firstUsedSlot == -1 ) { - firstUsedSlot = i; - } - // find existing request to attach to - if ( uplink->queue[i].from > start || uplink->queue[i].to < end ) - continue; // Range not suitable - // Detect potential proxy cycle. New request hopcount is greater, range is same, old request has already been sent -> suspicious - if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end && uplink->queue[i].status == ULR_PENDING ) { - requestLoop = true; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->from <= start && it->to >= end ) { + // Matching range, attach + request = it; break; } - if ( foundExisting == -1 || existingType == ULR_PROCESSING ) { - foundExisting = i; - existingType = uplink->queue[i].status; + if ( it->next == NULL ) { + // Not matching, last in list, remember + last = it; + break; } } - if ( unlikely( requestLoop ) ) { - uplink->cycleDetected = true; - signal_call( uplink->signal ); - logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - goto fail_lock; - } - if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { - freeSlot = -1; // Not attaching to existing request, make it use a higher slot - } - if ( freeSlot == -1 ) { - if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { - logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); + dnbd3_queue_client_t **c = NULL; + if ( request == NULL ) { + // No existing request to attach to + if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) { + logadd( LOG_WARNING, + "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." ); + goto fail_lock; + } + uplink->queueLen++; + if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = true; + } + request = malloc( sizeof(*request) ); + if ( last == NULL ) { + uplink->queue = request; + } else { + last->next = request; + } + request->next = NULL; + request->handle = ++uplink->queueId; + request->from = req.start; + request->to = req.end; +#ifdef DEBUG + timing_get( &request->entered ); +#endif + request->hopCount = hops; + request->sent = true; // Optimistic; would be set to false on failure + if ( callback == NULL ) { + // BGR + request->clients = NULL; + } else { + c = &request->clients; + } + isNew = true; + } else if ( callback == NULL ) { + // Replication request that maches existing request. Do nothing + isNew = false; + } else { + // Existing request. Check if potential cycle + if ( hops > request->hopCount && request->from == start && request->to == end ) { + logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) ); goto fail_lock; } - freeSlot = uplink->queueLen++; + // Count number if clients, get tail of list + int count = 0; + c = &request->clients; + while ( *c != NULL ) { + c = &(**c).next; + if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) { + logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count ); + goto fail_lock; + } + } + isNew = false; } - // Do not send request to uplink server if we have a matching pending request AND the request either has the - // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise - // explicitly send this request to the uplink server. The second condition mentioned here is to prevent - // a race condition where the reply for the outstanding request already arrived and the uplink thread - // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might - // already have passed the index of the free slot we determined, but not reached the existing request we just found above. - if ( foundExisting != -1 && existingType == ULR_PROCESSING && freeSlot > foundExisting ) { - foundExisting = -1; // -1 means "send request" + // Prefetch immediately, without unlocking the list - the old approach of + // async prefetching in another thread was sometimes so slow that we'd process + // another request from the same client before the prefetch job would execute. + if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data ) + && !( hops & (HOP_FLAG_BGR | HOP_FLAG_PREFETCH) ) // No cascading of prefetches + && end == request->to && length <= _maxPrefetch ) { + // Only if this is a client request, and the !! end boundary matches exactly !! + // (See above for reason why) + // - We neither check the local cache, nor other pending requests. Worth it? + // Complexity vs. probability + preReq.start = end; + preReq.end = end; + extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) ); + if ( preReq.start < preReq.end ) { + //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end ); + uplink->queueLen++; + pre = malloc( sizeof(*pre) ); + pre->next = request->next; + request->next = pre; + pre->handle = preReq.handle = ++uplink->queueId; + pre->from = preReq.start; + pre->to = preReq.end; + pre->hopCount = hops | HOP_FLAG_PREFETCH; + pre->sent = true; // Optimistic; would be set to false on failure + pre->clients = NULL; +#ifdef DEBUG + timing_get( &pre->entered ); +#endif + } } -#ifdef _DEBUG - if ( foundExisting != -1 ) { - logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, NAMES_ULR[existingType], foundExisting, freeSlot ); - logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n" - "New %" PRIu64 "-%" PRIu64 " (%p)\n", - uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client, - start, end, (void*)client ); + // // // // + // Copy data - need this after unlocking + req.handle = request->handle; + if ( callback != NULL ) { + assert( c != NULL ); + *c = malloc( sizeof( *request->clients ) ); + (**c).next = NULL; + (**c).handle = handle; + (**c).from = start; + (**c).to = end; + (**c).data = data; + (**c).callback = callback; } -#endif - // Fill structure - uplink->queue[freeSlot].from = start; - uplink->queue[freeSlot].to = end; - uplink->queue[freeSlot].handle = handle; - uplink->queue[freeSlot].client = client; - //int old = uplink->queue[freeSlot].status; - uplink->queue[freeSlot].status = ( foundExisting == -1 ? ULR_NEW : - ( existingType == ULR_NEW ? ULR_PENDING : existingType ) ); - uplink->queue[freeSlot].hopCount = hops; -#ifdef _DEBUG - timing_get( &uplink->queue[freeSlot].entered ); - //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end ); -#endif mutex_unlock( &uplink->queueLock ); + // End queue critical section + if ( pre == NULL && !isNew ) + return true; // Nothing to do - if ( foundExisting != -1 ) { - ref_put( &uplink->reference ); - return true; // Attached to pending request, do nothing + // Fire away the request(s) + mutex_lock( &uplink->sendMutex ); + bool ret1 = true; + bool ret2 = true; + if ( isNew ) { + ret1 = requestBlock( uplink, &req, hops ); } - - // See if we can fire away the request - if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) { - logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); - } else { - if ( unlikely( uplink->current.fd == -1 ) ) { - mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); - } else { - const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); - if ( hops < 200 ) ++hops; - const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); - mutex_unlock( &uplink->sendMutex ); - if ( unlikely( !ret ) ) { - logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); - } else { - // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again - int state; - mutex_lock( &uplink->queueLock ); - if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { - state = uplink->queue[freeSlot].status; - if ( uplink->queue[freeSlot].status == ULR_NEW ) { - uplink->queue[freeSlot].status = ULR_PENDING; - } - } else { - state = -1; - } - mutex_unlock( &uplink->queueLock ); - if ( state == -1 ) { - logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" ); - } else if ( state == ULR_NEW ) { - //logadd( LOG_DEBUG2, "Direct uplink request" ); - } else { - logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); - } - ref_put( &uplink->reference ); - return true; - } - // Fall through to waking up sender thread - } + if ( pre != NULL ) { + ret2 = requestBlock( uplink, &preReq, hops | HOP_FLAG_PREFETCH ); + } + if ( !ret1 || !ret2 ) { // Set with send locked + uplink->image->problem.uplink = true; + } + mutex_unlock( &uplink->sendMutex ); + // markRequestUnsend locks the queue, would violate locking order with send mutex + if ( !ret1 ) { + markRequestUnsent( uplink, req.handle ); + logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle ); + } + if ( !ret2 ) { + markRequestUnsent( uplink, preReq.handle ); } - if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { + if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } - ref_put( &uplink->reference ); return true; + fail_lock: mutex_unlock( &uplink->queueLock ); -fail_ref: - ref_put( &uplink->reference ); return false; } @@ -431,11 +549,10 @@ static void* uplink_mainloop(void *data) #define EV_COUNT (2) struct pollfd events[EV_COUNT]; dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data; - int numSocks, i, waitTime; + int numSocks, waitTime; int altCheckInterval = SERVER_RTT_INTERVAL_INIT; int rttTestResult; uint32_t discoverFailCount = 0; - uint32_t unsavedSeconds = 0; ticks nextAltCheck, lastKeepalive; char buffer[200]; memset( events, 0, sizeof(events) ); @@ -447,7 +564,7 @@ static void* uplink_mainloop(void *data) thread_detach( uplink->thread ); blockNoncriticalSignals(); // Make sure file is open for writing - if ( !uplink_reopenCacheFd( uplink, false ) ) { + if ( !reopenCacheFd( uplink, false ) ) { // It might have failed - still offer proxy mode, we just can't cache logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno ); } @@ -460,14 +577,14 @@ static void* uplink_mainloop(void *data) } while ( !_shutdown && !uplink->shutdown ) { // poll() - waitTime = uplink->rttTestResult == RTT_DOCHANGE ? 0 : -1; - if ( waitTime == 0 ) { + if ( uplink->rttTestResult == RTT_DOCHANGE ) { // 0 means poll, since we're about to change the server + waitTime = 0; } else { declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); if ( waitTime < 100 ) waitTime = 100; - if ( waitTime > 10000 ) waitTime = 10000; + else if ( waitTime > 10000 ) waitTime = 10000; } events[EV_SOCKET].fd = uplink->current.fd; numSocks = poll( events, EV_COUNT, waitTime ); @@ -494,8 +611,7 @@ static void* uplink_mainloop(void *data) mutex_unlock( &uplink->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); - uplink->replicationHandle = REP_NONE; - uplink->image->working = true; + uplink->image->problem.uplink = false; uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) { @@ -504,12 +620,17 @@ static void* uplink_mainloop(void *data) } // If we don't have a crc32 list yet, see if the new server has one if ( uplink->image->crc32 == NULL ) { - uplink_addCrc32( uplink ); + requestCrc32List( uplink ); } // Re-send all pending requests - uplink_sendRequests( uplink, false ); - uplink_sendReplicationRequest( uplink ); + sendQueuedRequests( uplink, false ); + sendReplicationRequest( uplink ); events[EV_SOCKET].events = POLLIN | POLLRDHUP; + if ( uplink->image->problem.uplink ) { + // Some of the requests above must have failed again already :-( + logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" ); + connectionFailed( uplink, true ); + } timing_gets( &nextAltCheck, altCheckInterval ); // The rtt worker already did the handshake for our image, so there's nothing // more to do here @@ -517,6 +638,7 @@ static void* uplink_mainloop(void *data) // Check events // Signal if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + uplink->image->problem.uplink = true; logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" ); goto cleanup; } else if ( (events[EV_SIGNAL].revents & POLLIN) ) { @@ -526,46 +648,37 @@ static void* uplink_mainloop(void *data) } if ( uplink->current.fd != -1 ) { // Uplink seems fine, relay requests to it... - uplink_sendRequests( uplink, true ); + sendQueuedRequests( uplink, true ); } else if ( uplink->queueLen != 0 ) { // No uplink; maybe it was shutdown since it was idle for too long uplink->idleTime = 0; } } // Uplink socket if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { - uplink_connectionFailed( uplink, true ); + connectionFailed( uplink, true ); logadd( LOG_DEBUG1, "Uplink gone away, panic! (revents=%d)\n", (int)events[EV_SOCKET].revents ); setThreadName( "panic-uplink" ); } else if ( (events[EV_SOCKET].revents & POLLIN) ) { - uplink_handleReceive( uplink ); + handleReceive( uplink ); if ( _shutdown || uplink->shutdown ) goto cleanup; } declare_now; uint32_t timepassed = timing_diff( &lastKeepalive, &now ); - if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { + if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL + || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) { lastKeepalive = now; uplink->idleTime += timepassed; - unsavedSeconds += timepassed; - if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && uplink->idleTime >= 20 && uplink->idleTime <= 70 ) ) { - // fsync/save every 4 minutes, or every 60 seconds if uplink is idle - unsavedSeconds = 0; - uplink_saveCacheMap( uplink ); - } // Keep-alive - if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) { - // Send keep-alive if nothing is happening - if ( uplink_sendKeepalive( uplink->current.fd ) ) { - // Re-trigger periodically, in case it requires a minimum user count - uplink_sendReplicationRequest( uplink ); - } else { - uplink_connectionFailed( uplink, true ); - logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" ); - setThreadName( "panic-uplink" ); + if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) { + // Send keep-alive if nothing is happening, and try to trigger background rep. + if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) { + connectionFailed( uplink, true ); + logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" ); } } // Don't keep uplink established if we're idle for too much - if ( uplink_connectionShouldShutdown( uplink ) ) { - logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", uplink->image->name, (int)uplink->image->rid ); + if ( connectionShouldShutdown( uplink ) ) { + logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) ); goto cleanup; } } @@ -578,6 +691,7 @@ static void* uplink_mainloop(void *data) // Quit work if image is complete logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name ); setThreadName( "finished-uplink" ); + uplink->image->problem.uplink = false; goto cleanup; } else { // Not complete - do measurement @@ -592,46 +706,44 @@ static void* uplink_mainloop(void *data) } else if ( rttTestResult == RTT_NOT_REACHABLE ) { if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) { discoverFailCount++; - if ( uplink->image->working && uplink->current.fd == -1 && discoverFailCount > (SERVER_RTT_MAX_UNREACH / 2) ) { - logadd( LOG_DEBUG1, "Disabling %s:%d since no uplink is available", uplink->image->name, (int)uplink->image->rid ); - uplink->image->working = false; - } if ( uplink->current.fd == -1 ) { uplink->cycleDetected = false; } } timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH) ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED ); } -#ifdef _DEBUG +#ifdef DEBUG if ( uplink->current.fd != -1 && !uplink->shutdown ) { bool resend = false; ticks deadline; timing_set( &deadline, &now, -10 ); mutex_lock( &uplink->queueLock ); - for (i = 0; i < uplink->queueLen; ++i) { - if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) { - snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" - "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name, - uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status ); - uplink->queue[i].entered = now; -#ifdef _DEBUG_RESEND_STARVING - uplink->queue[i].status = ULR_NEW; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( timing_reached( &it->entered, &deadline ) ) { + logadd( LOG_WARNING, "Starving request detected:" + " (from %" PRIu64 " to %" PRIu64 ", sent: %d) %s:%d", + it->from, it->to, (int)it->sent, PIMG(uplink->image) ); + it->entered = now; +#ifdef DEBUG_RESEND_STARVING + it->sent = false; resend = true; #endif - mutex_unlock( &uplink->queueLock ); - logadd( LOG_WARNING, "%s", buffer ); - mutex_lock( &uplink->queueLock ); } } mutex_unlock( &uplink->queueLock ); - if ( resend ) - uplink_sendRequests( uplink, true ); + if ( resend ) { + sendQueuedRequests( uplink, true ); + } } #endif } - cleanup: ; - uplink_saveCacheMap( uplink ); +cleanup: ; dnbd3_image_t *image = uplink->image; + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + cache->dirty = true; // Force writeout of cache map + ref_put( &cache->reference ); + } mutex_lock( &image->lock ); bool exp = false; if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { @@ -653,37 +765,60 @@ static void* uplink_mainloop(void *data) return NULL ; } -static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) +/** + * Only called from uplink thread. + */ +static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly) { - // Scan for new requests - int j; + assert_uplink_thread(); + // Scan for new requests, or optionally, (re)send all + // Build a buffer, so if there aren't too many requests, we can send them after + // unlocking the queue again. Otherwise we need flushes during iteration, which + // is no ideal, but in that case the uplink is probably overwhelmed anyways. + // Try 125 as that's exactly 300bytes, usually 2*MTU. +#define MAX_RESEND_BATCH 125 + dnbd3_request_t reqs[MAX_RESEND_BATCH]; + int count = 0; mutex_lock( &uplink->queueLock ); - for (j = 0; j < uplink->queueLen; ++j) { - if ( uplink->queue[j].status != ULR_NEW && (newOnly || uplink->queue[j].status != ULR_PENDING) ) continue; - uplink->queue[j].status = ULR_PENDING; - uint8_t hops = uplink->queue[j].hopCount; - const uint64_t reqStart = uplink->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); - /* - logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", - (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize ); - */ - mutex_unlock( &uplink->queueLock ); - if ( hops < 200 ) ++hops; - mutex_lock( &uplink->sendMutex ); - const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); - mutex_unlock( &uplink->sendMutex ); - if ( !ret ) { - // Non-critical - if the connection dropped or the server was changed - // the thread will re-send this request as soon as the connection - // is reestablished. - logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - altservers_serverFailed( uplink->current.index ); - return; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( newOnly && it->sent ) + continue; + it->sent = true; + dnbd3_request_t *hdr = &reqs[count++]; + hdr->magic = dnbd3_packet_magic; + hdr->cmd = CMD_GET_BLOCK; + hdr->size = (uint32_t)( it->to - it->from ); + hdr->offset = it->from; // Offset first, then hops! (union) + hdr->hops = COND_HOPCOUNT( uplink->current.version, it->hopCount ); + hdr->handle = it->handle; + fixup_request( *hdr ); + if ( count == MAX_RESEND_BATCH ) { + bool ok = false; + logadd( LOG_DEBUG2, "BLOCKING resend of %d", count ); + count = 0; + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + ok = ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH, 3 ) + == DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH ); + } + mutex_unlock( &uplink->sendMutex ); + if ( !ok ) { + uplink->image->problem.uplink = true; + break; + } } - mutex_lock( &uplink->queueLock ); } mutex_unlock( &uplink->queueLock ); + if ( count != 0 ) { + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + uplink->image->problem.uplink = + ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * count, 3 ) + != DNBD3_REQUEST_SIZE * count ); + } + mutex_unlock( &uplink->sendMutex ); + } +#undef MAX_RESEND_BATCH } /** @@ -695,73 +830,97 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) * server. This means we might request data we already have, but it makes * the code simpler. Worst case would be only one bit is zero, which means * 4kb are missing, but we will request 32kb. + * + * Only called form uplink thread, so current.fd is assumed to be valid. + * + * @return false if sending request failed, true otherwise (i.e. not necessary/disabled) */ -static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) +static bool sendReplicationRequest(dnbd3_uplink_t *uplink) { - if ( uplink == NULL || uplink->current.fd == -1 ) return; - if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication - if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE ) - return; // Already a replication request on the wire, or no more blocks to replicate + assert_uplink_thread(); + if ( uplink->current.fd == -1 ) + return false; // Should never be called in this state, consider send error + if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) + return true; // Don't do background replication + if ( uplink->nextReplicationIndex == -1 ) + return true; // No more blocks to replicate dnbd3_image_t * const image = uplink->image; - if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return; - if ( image->users < _bgrMinClients ) return; // Not enough active users + if ( image->users < _bgrMinClients ) + return true; // Not enough active users + const int numNewRequests = numWantedReplicationRequests( uplink ); + if ( numNewRequests <= 0 ) + return true; // Already sufficient amount of requests on the wire dnbd3_cache_map_t *cache = ref_get_cachemap( image ); - if ( cache == NULL || image->users < _bgrMinClients ) { + if ( cache == NULL ) { // No cache map (=image complete) - ref_put( &cache->reference ); - return; + return true; } const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); const int lastBlockIndex = mapBytes - 1; - int endByte; - if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks - endByte = uplink->nextReplicationIndex + mapBytes; - } else { // Hashblock based: Only look for match in current hash block - endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; - if ( endByte > mapBytes ) { - endByte = mapBytes; + for ( int bc = 0; bc < numNewRequests; ++bc ) { + int endByte; + if ( UPLINK_MAX_QUEUE - uplink->queueLen < 10 ) + break; // Don't overload queue + if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks + endByte = uplink->nextReplicationIndex + mapBytes; + } else { // Hashblock based: Only look for match in current hash block + endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; + if ( endByte > mapBytes ) { + endByte = mapBytes; + } } - } - atomic_thread_fence( memory_order_acquire ); - int replicationIndex = -1; - for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) { - const int i = j % ( mapBytes ); // Wrap around for BGR_FULL - if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff - && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) { - // Found incomplete one - replicationIndex = i; + atomic_thread_fence( memory_order_acquire ); + int replicationIndex = -1; + for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) { + const int i = j % ( mapBytes ); // Wrap around for BGR_FULL + if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff + && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) { + // Found incomplete one + replicationIndex = i; + break; + } + } + if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { + // Nothing left in current block, find next one + replicationIndex = findNextIncompleteHashBlock( uplink, endByte ); + } + if ( replicationIndex == -1 ) { + // Replication might be complete, uplink_mainloop should take care.... + uplink->nextReplicationIndex = -1; break; } + const uint64_t handle = ++uplink->queueId; + const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; + uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); + // Extend the default 32k request size if _minRequestSize is > 32k + for ( size_t extra = 1; extra < ( _minRequestSize / FILE_BYTES_PER_MAP_BYTE ) + && offset + size < image->virtualFilesize + && _backgroundReplication == BGR_FULL; ++extra ) { + if ( atomic_load_explicit( &cache->map[replicationIndex+1], memory_order_relaxed ) == 0xff ) + break; // Hit complete 32k block, stop here + replicationIndex++; + size += (uint32_t)MIN( image->virtualFilesize - offset - size, FILE_BYTES_PER_MAP_BYTE ); + } + if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) { + logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)", + PIMG(uplink->image) ); + ref_put( &cache->reference ); + return false; + } + if ( replicationIndex == lastBlockIndex ) { + uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks + } + uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter + if ( _backgroundReplication == BGR_HASHBLOCK + && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { + // Just crossed a hash block boundary, look for new candidate starting at this very index + uplink->nextReplicationIndex = findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex ); + if ( uplink->nextReplicationIndex == -1 ) + break; + } } ref_put( &cache->reference ); - if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { - // Nothing left in current block, find next one - replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte ); - } - if ( replicationIndex == -1 ) { - // Replication might be complete, uplink_mainloop should take care.... - uplink->nextReplicationIndex = -1; - return; - } - const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; - uplink->replicationHandle = offset; - const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); - mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) ); - mutex_unlock( &uplink->sendMutex ); - if ( !sendOk ) { - logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); - return; - } - if ( replicationIndex == lastBlockIndex ) { - uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks - } - uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter - if ( _backgroundReplication == BGR_HASHBLOCK - && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { - // Just crossed a hash block boundary, look for new candidate starting at this very index - uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex ); - } + return true; } /** @@ -769,7 +928,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) * of a hash block which is neither completely empty nor completely * replicated yet. Returns -1 if no match. */ -static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex) +static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex) { int retval = -1; dnbd3_cache_map_t *cache = ref_get_cachemap( uplink->image ); @@ -816,29 +975,32 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int /** * Receive data from uplink server and process/dispatch * Locks on: uplink.lock, images[].lock + * Only called from uplink thread, so current.fd is assumed to be valid. */ -static void uplink_handleReceive(dnbd3_uplink_t *uplink) +static void handleReceive(dnbd3_uplink_t *uplink) { - dnbd3_reply_t inReply, outReply; - int ret, i; + dnbd3_reply_t inReply; + int ret; + assert_uplink_thread(); + assert( uplink->queueLen >= 0 ); for (;;) { ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; if ( ret == REPLY_AGAIN ) break; if ( unlikely( ret == REPLY_CLOSED ) ) { - logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", uplink->image->path ); + logadd( LOG_INFO, "Uplink: Remote host hung up (%s:%d)", PIMG(uplink->image) ); goto error_cleanup; } if ( unlikely( ret == REPLY_WRONGMAGIC ) ) { - logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", uplink->image->path ); + logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s:%d)", PIMG(uplink->image) ); goto error_cleanup; } if ( unlikely( ret != REPLY_OK ) ) { - logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, uplink->image->path ); + logadd( LOG_INFO, "Uplink: Connection error %d (%s:%d)", ret, PIMG(uplink->image) ); goto error_cleanup; } if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { - logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, uplink->image->path ); + logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s:%d", inReply.size, PIMG(uplink->image) ); goto error_cleanup; } @@ -851,21 +1013,41 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) } } if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) { - logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path ); + logadd( LOG_INFO, "Lost connection to uplink server of %s:%d (payload)", PIMG(uplink->image) ); goto error_cleanup; } // Payload read completely // Bail out if we're not interested - if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue; + if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) + continue; // Is a legit block reply - struct iovec iov[2]; - const uint64_t start = inReply.handle; - const uint64_t end = inReply.handle + inReply.size; totalBytesReceived += inReply.size; uplink->bytesReceived += inReply.size; + // Get entry from queue + dnbd3_queue_entry_t *entry; + mutex_lock( &uplink->queueLock ); + for ( entry = uplink->queue; entry != NULL; entry = entry->next ) { + if ( entry->handle == inReply.handle ) + break; + } + if ( entry == NULL ) { + mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)", + inReply.handle, PIMG(uplink->image) ); + continue; + } + const uint64_t start = entry->from; + const uint64_t end = entry->to; + mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + // We don't remove the entry from the list here yet, to slightly increase the chance of other + // clients attaching to this request while we write the data to disk + if ( end - start != inReply.size ) { + logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)", + inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) ); + } // 1) Write to cache file if ( unlikely( uplink->cacheFd == -1 ) ) { - uplink_reopenCacheFd( uplink, false ); + reopenCacheFd( uplink, false ); } if ( likely( uplink->cacheFd != -1 ) ) { int err = 0; @@ -884,16 +1066,19 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) continue; // Success, retry write } if ( err == EBADF || err == EINVAL || err == EIO ) { - if ( !tryAgain || !uplink_reopenCacheFd( uplink, true ) ) + uplink->image->problem.write = true; + if ( !tryAgain || !reopenCacheFd( uplink, true ) ) break; tryAgain = false; continue; // Write handle to image successfully re-opened, try again } - logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", uplink->image->name, (int)uplink->image->rid, err ); + logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", + PIMG(uplink->image), err ); break; } if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) { - logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, uplink->image->name, (int)uplink->image->rid ); + logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", + ret, PIMG(uplink->image) ); break; } done += (uint32_t)ret; @@ -903,114 +1088,79 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) } if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", - uplink->image->name, (int)uplink->image->rid, err ); + PIMG(uplink->image), err ); } } - // 2) Figure out which clients are interested in it - // Mark as ULR_PROCESSING, since we unlock repeatedly in the second loop - // below; this prevents uplink_request() from attaching to this request - // by populating a slot with index greater than the highest matching - // request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW - // where it's fine if the index is greater) + bool found = false; + dnbd3_queue_entry_t **it; mutex_lock( &uplink->queueLock ); - for (i = 0; i < uplink->queueLen; ++i) { - dnbd3_queued_request_t * const req = &uplink->queue[i]; - assert( req->status != ULR_PROCESSING ); - if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue; - assert( req->client != NULL ); - if ( req->from >= start && req->to <= end ) { // Match :-) - req->status = ULR_PROCESSING; + for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) { + if ( *it == entry && entry->handle == inReply.handle ) { // ABA check + assert( found == false ); + *it = (**it).next; + found = true; + uplink->queueLen--; + break; } } - // 3) Send to interested clients - iterate backwards so request collaboration works, and - // so we can decrease queueLen on the fly while iterating. Should you ever change this to start - // from 0, you also need to change the "attach to existing request"-logic in uplink_request() - outReply.magic = dnbd3_packet_magic; - bool served = false; - for ( i = uplink->queueLen - 1; i >= 0; --i ) { - dnbd3_queued_request_t * const req = &uplink->queue[i]; - if ( req->status == ULR_PROCESSING ) { - size_t bytesSent = 0; - assert( req->from >= start && req->to <= end ); - dnbd3_client_t * const client = req->client; - outReply.cmd = CMD_GET_BLOCK; - outReply.handle = req->handle; - outReply.size = (uint32_t)( req->to - req->from ); - iov[0].iov_base = &outReply; - iov[0].iov_len = sizeof outReply; - iov[1].iov_base = uplink->recvBuffer + (req->from - start); - iov[1].iov_len = outReply.size; - fixup_reply( outReply ); - req->status = ULR_FREE; - req->client = NULL; - served = true; - mutex_lock( &client->sendMutex ); - mutex_unlock( &uplink->queueLock ); - if ( client->sock != -1 ) { - ssize_t sent = writev( client->sock, iov, 2 ); - if ( sent > (ssize_t)sizeof outReply ) { - bytesSent = (size_t)sent - sizeof outReply; - } - } - if ( bytesSent != 0 ) { - client->bytesSent += bytesSent; - } - mutex_unlock( &client->sendMutex ); - mutex_lock( &uplink->queueLock ); - if ( i > uplink->queueLen ) { - i = uplink->queueLen; // Might have been set to 0 by cancelAllRequests - } - } - if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--; + if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = false; } mutex_unlock( &uplink->queueLock ); -#ifdef _DEBUG - if ( !served && start != uplink->replicationHandle ) { - logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end ); + if ( !found ) { + logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)", + PIMG(uplink->image) ); + continue; } -#endif - if ( start == uplink->replicationHandle ) { - // Was our background replication - uplink->replicationHandle = REP_NONE; - // Try to remove from fs cache if no client was interested in this data - if ( !served && uplink->cacheFd != -1 ) { - posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); - } + dnbd3_queue_client_t *next; + for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { + assert( c->from >= start && c->to <= end ); + (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ), + (const char*)( uplink->recvBuffer + (c->from - start) ) ); + next = c->next; + free( c ); } - if ( served ) { + if ( entry->clients != NULL ) { // Was some client -- reset idle counter uplink->idleTime = 0; // Re-enable replication if disabled if ( uplink->nextReplicationIndex == -1 ) { uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; } + } else { + if ( uplink->cacheFd != -1 ) { + // Try to remove from fs cache if no client was interested in this data + posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); + } } + free( entry ); + } // main receive loop + // Trigger background replication if applicable + if ( !sendReplicationRequest( uplink ) ) { + goto error_cleanup; } - if ( uplink->replicationHandle == REP_NONE ) { - mutex_lock( &uplink->queueLock ); - const bool rep = ( uplink->queueLen == 0 ); - mutex_unlock( &uplink->queueLock ); - if ( rep ) uplink_sendReplicationRequest( uplink ); - } + // Normal end return; // Error handling from failed receive or message parsing - error_cleanup: ; - uplink_connectionFailed( uplink, true ); +error_cleanup: ; + connectionFailed( uplink, true ); } /** * Only call from uplink thread */ -static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) +static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { + assert_uplink_thread(); if ( uplink->current.fd == -1 ) return; + setThreadName( "panic-uplink" ); altservers_serverFailed( uplink->current.index ); mutex_lock( &uplink->sendMutex ); + uplink->image->problem.uplink = true; close( uplink->current.fd ); uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); - uplink->replicationHandle = REP_NONE; if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { uplink->nextReplicationIndex = 0; } @@ -1025,15 +1175,26 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) } /** - * Send keep alive request to server + * Send keep alive request to server. + * Called from uplink thread, current.fd must be valid. */ -static int uplink_sendKeepalive(const int fd) +static bool sendKeepalive(dnbd3_uplink_t *uplink) { static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) }; - return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); + assert_uplink_thread(); + mutex_lock( &uplink->sendMutex ); + bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); + mutex_unlock( &uplink->sendMutex ); + return sendOk; } -static void uplink_addCrc32(dnbd3_uplink_t *uplink) +/** + * Request crclist from uplink. + * Called from uplink thread, current.fd must be valid. + * FIXME This is broken as it could happen that another message arrives after sending + * the request. Refactor, split and move receive into general receive handler. + */ +static void requestCrc32List(dnbd3_uplink_t *uplink) { dnbd3_image_t *image = uplink->image; if ( image == NULL || image->virtualFilesize == 0 ) return; @@ -1042,6 +1203,9 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink) uint32_t *buffer = malloc( bytes ); mutex_lock( &uplink->sendMutex ); bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes ); + if ( !sendOk ) { + uplink->image->problem.uplink = true; + } mutex_unlock( &uplink->sendMutex ); if ( !sendOk || bytes == 0 ) { free( buffer ); @@ -1051,7 +1215,7 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink) lists_crc = crc32( lists_crc, (uint8_t*)buffer, bytes ); lists_crc = net_order_32( lists_crc ); if ( lists_crc != masterCrc ) { - logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s)!", uplink->image->name ); + logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s:%d)!", PIMG(uplink->image) ); free( buffer ); return; } @@ -1061,10 +1225,14 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink) char path[len]; snprintf( path, len, "%s.crc", uplink->image->path ); const int fd = open( path, O_WRONLY | O_CREAT, 0644 ); - if ( fd >= 0 ) { - write( fd, &masterCrc, sizeof(uint32_t) ); - write( fd, buffer, bytes ); + if ( fd != -1 ) { + ssize_t ret = write( fd, &masterCrc, sizeof(masterCrc) ); + ret += write( fd, buffer, bytes ); close( fd ); + if ( (size_t)ret != sizeof(masterCrc) + bytes ) { + unlink( path ); + logadd( LOG_WARNING, "Could not write crc32 file for %s:%d", PIMG(uplink->image) ); + } } } @@ -1076,80 +1244,24 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink) * it will be closed first. Otherwise, nothing will happen and true will be returned * immediately. */ -static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force) +static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force) { if ( uplink->cacheFd != -1 ) { if ( !force ) return true; close( uplink->cacheFd ); } uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 ); + uplink->image->problem.write = uplink->cacheFd == -1; return uplink->cacheFd != -1; } /** - * Saves the cache map of the given image. - * Return true on success. - * Locks on: imageListLock, image.lock + * Returns true if the uplink has been idle for some time (apart from + * background replication, if it is set to hashblock, or if it has + * a minimum number of active clients configured that is not currently + * reached) */ -static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink) -{ - dnbd3_image_t *image = uplink->image; - assert( image != NULL ); - - if ( uplink->cacheFd != -1 ) { - if ( fsync( uplink->cacheFd ) == -1 ) { - // A failing fsync means we have no guarantee that any data - // since the last fsync (or open if none) has been saved. Apart - // from keeping the cache map from the last successful fsync - // around and restoring it there isn't much we can do to recover - // a consistent state. Bail out. - logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); - logadd( LOG_ERROR, "Bailing out immediately" ); - exit( 1 ); - } - } - - dnbd3_cache_map_t *cache = ref_get_cachemap( image ); - if ( cache == NULL ) - return true; - logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); - const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); - assert( image->path != NULL ); - char mapfile[strlen( image->path ) + 4 + 1]; - strcpy( mapfile, image->path ); - strcat( mapfile, ".map" ); - - int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); - if ( fd == -1 ) { - const int err = errno; - ref_put( &cache->reference ); - logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); - return false; - } - - size_t done = 0; - while ( done < size ) { - const ssize_t ret = write( fd, cache->map + done, size - done ); - if ( ret == -1 ) { - if ( errno == EINTR ) continue; - logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); - break; - } - if ( ret <= 0 ) { - logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); - break; - } - done += (size_t)ret; - } - ref_put( &cache->reference ); - if ( fsync( fd ) == -1 ) { - logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); - } - close( fd ); - return true; -} - -static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink) +static bool connectionShouldShutdown(dnbd3_uplink_t *uplink) { return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT && ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) ); @@ -1165,3 +1277,44 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) return false; return altservers_toString( current, buffer, len ); } + +/** + * Get number of replication requests that should be sent right now to + * meet the configured bgrWindowSize. Returns 0 if any client requests + * are pending. + * This applies a sort of "slow start" in case the uplink was recently + * dealing with actual client requests, in that the uplink's idle time + * (in seconds) is an upper bound for the number returned, so we don't + * saturate the uplink with loads of requests right away, in case that + * client triggers more requests to the uplink server. + */ +static int numWantedReplicationRequests(dnbd3_uplink_t *uplink) +{ + int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 ); + if ( uplink->queueLen == 0 ) + return ret; + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->clients == NULL ) { + ret--; + } else { + ret = 0; // Do not allow BGR if client requests are being handled + break; + } + } + mutex_unlock( &uplink->queueLock ); + return ret; +} + +static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle) +{ + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->handle == handle ) { + it->sent = false; + break; + } + } + mutex_unlock( &uplink->queueLock ); +} + diff --git a/src/server/uplink.h b/src/server/uplink.h index 49ff0b4..b6037d6 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -2,7 +2,7 @@ #define _UPLINK_H_ #include "globals.h" -#include "../types.h" +#include <dnbd3/types.h> void uplink_globalsInit(); @@ -10,9 +10,11 @@ uint64_t uplink_getTotalBytesReceived(); bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version); -void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client); +void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback); -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount); +bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); + +bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length); bool uplink_shutdown(dnbd3_image_t *image); |