diff options
39 files changed, 2365 insertions, 1722 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e3241c..cc8bfb7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,19 +33,39 @@ ELSE() OPTION(BUILD_KERNEL_MODULE "Build the dnbd3 Linux kernel module" ON) ENDIF() +INCLUDE(CheckCCompilerFlag) +macro (TRY_ADD_FLAG _FLAG) + UNSET(TMP_TEST CACHE) + CHECK_C_COMPILER_FLAG("${_FLAG}" TMP_TEST) + if (TMP_TEST) + message(":-) Compiler supports ${_FLAG}") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${_FLAG}") + else() + message(":-( Compiler does not support ${_FLAG}") + endif() + +endmacro() + +# Common for gcc and clang +SET(CMAKE_EXE_LINKER_FLAGS "-Wl,-z,relro,-z,now,-z,defs -pie") +SET(CMAKE_C_FLAGS "-fPIE -std=c11 -Wno-multichar -fno-strict-aliasing -D_GNU_SOURCE -D_FORTIFY_SOURCE=2 ${EXTRA_C_FLAGS}") +SET(CMAKE_C_FLAGS_RELEASE " -O3 -Wno-unused-result -DNDEBUG") +# Hardening. Try as much as is possible. +TRY_ADD_FLAG("-mmitigate-rop") +TRY_ADD_FLAG("-fstack-protector-strong") +TRY_ADD_FLAG("-fstack-clash-protection") +TRY_ADD_FLAG("-Wformat") +TRY_ADD_FLAG("-Wformat-security") +TRY_ADD_FLAG("-Werror=format-security") if(CMAKE_C_COMPILER MATCHES "clang") message( "Using clang flags." ) - SET(CMAKE_C_FLAGS_DEBUG "-std=c11 -O1 -fno-omit-frame-pointer -g -Wall -Wextra -Wpedantic -Wno-unused-result -D_GNU_SOURCE -D_DEBUG -Wno-multichar -fno-strict-aliasing ${EXTRA_C_FLAGS}") - SET(CMAKE_C_FLAGS_RELEASE "-std=c11 -O2 -Wno-unused-result -D_GNU_SOURCE -DNDEBUG -Wno-multichar -fno-strict-aliasing ${EXTRA_C_FLAGS}") + SET(CMAKE_C_FLAGS_DEBUG " -O1 -fno-omit-frame-pointer -g -Wall -Wextra -Wpedantic -Wno-unused-result -D_DEBUG") elseif (CMAKE_C_COMPILER MATCHES "(cc-)|(cc$)") message( "Using (g)cc flags." ) - SET(CMAKE_C_FLAGS_DEBUG "-std=c11 -O0 -g -Wall -Wextra -Wpedantic -Wconversion -Wno-sign-conversion -D_GNU_SOURCE -D_DEBUG -Wno-multichar -fno-strict-aliasing ${EXTRA_C_FLAGS}") - SET(CMAKE_C_FLAGS_RELEASE "-std=c11 -O2 -Wno-unused-result -D_GNU_SOURCE -DNDEBUG -Wno-multichar -fno-strict-aliasing ${EXTRA_C_FLAGS}") + SET(CMAKE_C_FLAGS_DEBUG " -O0 -g -Wall -Wextra -Wpedantic -Wconversion -Wno-sign-conversion -D_DEBUG") else() message( FATAL_ERROR "Could not determine compiler type." ) endif() -#SET(CMAKE_CXX_FLAGS_DEBUG "-std=c99 -O0 -g -Wall -Wno-unused-result -D_GNU_SOURCE -D_DEBUG") -#SET(CMAKE_CXX_FLAGS_RELEASE "-std=c99 -O2 -Wno-unused-result -D_GNU_SOURCE -DNDEBUG" ) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/") @@ -16,23 +16,22 @@ requests.lock ===== SERVER ===== This is a list of used locks, in the order they -have to be aquired if you must hold multiple locks: -remoteCloneLock | reloadLock +have to be aquired if you must hold multiple locks. +Note this list might be out of date, take a look at the +defines in lock.h for the effective order. +reloadLock +remoteCloneLock _clients_lock _clients[].lock integrityQueueLock _images_lock _images[].lock -pendingLockConsume -pendingLockProduce uplink.queueLock altServersLock client.sendMutex -client.statsLock -statisticsSentLock -statisticsReceivedLock uplink.rttLock uplink.sendMutex +aclLock If you need to lock multiple clients/images/... at once, lock the client with the lowest array index first. diff --git a/conf/README.server b/conf/README.server index 285758b..08be09f 100644 --- a/conf/README.server +++ b/conf/README.server @@ -7,13 +7,18 @@ There are two files in that dir == alt-servers == List of known alt-servers for this server. -Format: -[PREFIX]<IP:PORT> [Comment] - -Prefix can be: -+ - Only report server to clients as alt-server, but don't use for replication -- - Only use server for replication, but don't advertise to clients -No prefix means server will be advertised to clients and is used for replication +INI Format: +[Address] +comment=Whatever +for=client | replication +namespace=some/path/ + +All fields in a section are optional. If the "for" key is missing, the server +will be used for replication and will be sent to clients that request a list +of alt servers. +The namespace key can be specified multiple times per section. If it is missing, +the server will be used for all image names; otherwise, it will only be used +for images which's name starts with one of the given strings. If you're not running in proxy mode, this file won't do much for you diff --git a/conf/alt-servers b/conf/alt-servers index fd2f2ec..1d5d39e 100644 --- a/conf/alt-servers +++ b/conf/alt-servers @@ -1,4 +1,15 @@ -192.168.100.10 Some alt server -+192.168.100.100 My first alt server that will not be used for replication --192.168.100.50 Super sectret alt server that will be used for replication, but clients don't know about it +[192.168.100.10] +comment=Some alt server +[192.168.100.100] +comment=My first alt server that will not be used for replication +for=client + +[192.168.100.50] +comment=Super sectret alt server that will be used for replication, but clients don't know about it +for=replication + +[192.168.100.123] +comment=Also just for replication, and only for images starting with foobar/baz/ +namespace=foobar/baz/ +for=replication diff --git a/conf/server.conf b/conf/server.conf index 2f43247..a15092f 100644 --- a/conf/server.conf +++ b/conf/server.conf @@ -13,9 +13,10 @@ isProxy=true backgroundReplication=true ; minimum amount of connected clients for background replication to kick in bgrMinClients=0 -; if isProxy==true and another proxy requests and image that we don't have, should we ask our alt-servers for it? +; if another proxy requests and image that we don't have, should we ask our alt-servers for it? lookupMissingForProxy=true -; create sparse files instead of preallocating; ignored if backgroundReplication=true -- only recommended if cache space is small +; create sparse files instead of preallocating; ignored if backgroundReplication=true +; -- only recommended if cache space is small sparseFiles=false ; if true (which is the default), images will automatically be removed from the list if they can't be accessed removeMissingImages=true @@ -27,6 +28,20 @@ clientTimeout=15000 closeUnusedFd=false ; set this to true to load files without the .r[0-9]+ extension too, assuming RID=1 vmdkLegacyMode=false +; Don't set the server flag when connecting to alt-servers +; Intended for if the proxy is used for on-client caching +pretendClient=false +; When running in proxy mode and running out of space, automatically delete oldest image(s) to make +; the newly replicated image fit. In sparse mode, this will make sure at least 2GB of free space are +; available when replicating a new image. During normal operation, it will free at least 256MB whenever +; an attempt to write more data to cache fails. In non-sparse mode, whenever a new image is replicated, +; as much space as is required to store the entire image will be made available. +; However, after startup the proxy will refuse to delete any images for the time span given below, to be +; able to gather up to date usage information for the images available. If unitless, the value is +; interpreted in seconds. Valid suffixes are m, h, d. +; Setting this to -1 disables deletion of images. If the cache partition is full, no more images will +; be replicated unless you manually free up more disk space. +autoFreeDiskSpaceDelay=10h [limits] maxClients=2000 diff --git a/src/bench/connection.c b/src/bench/connection.c index 129ae3c..26be440 100644 --- a/src/bench/connection.c +++ b/src/bench/connection.c @@ -18,23 +18,10 @@ static const size_t SHORTBUF = 100; #define SOCKET_KEEPALIVE_TIMEOUT (3) #define MAX_ALTS (8) #define MAX_HOSTS_PER_ADDRESS (2) -// If a server wasn't reachable this many times, we slowly start skipping it on measurements -static const int FAIL_BACKOFF_START_COUNT = 8; #define RTT_COUNT (4) /* Module variables */ - -// Init guard -static bool connectionInitDone = false; -static bool keepRunning = true; - -static struct { - int sockFd; - pthread_mutex_t sendMutex; - dnbd3_signal_t* panicSignal; - dnbd3_host_t currentServer; - uint64_t startupTime; -} connection; +static char trash[4096]; // Known alt servers typedef struct _alt_server { @@ -54,13 +41,14 @@ bool connection_init_n_times( const char *lowerImage, const uint16_t rid, int ntimes, - BenchCounters* counters, - bool closeSockets + uint64_t blockSize, + BenchCounters* counters ) { for (int run_i = 0; run_i < ntimes; ++run_i) { counters->attempts++; - printf("."); + putchar('.'); + fflush(stdout); int sock = -1; char host[SHORTBUF]; serialized_buffer_t buffer; @@ -68,66 +56,85 @@ bool connection_init_n_times( char *remoteName; uint64_t remoteSize; - if ( !connectionInitDone && keepRunning ) { - dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; - const char *current, *end; - int altIndex = 0; - memset( altservers, 0, sizeof altservers ); - connection.sockFd = -1; - current = hosts; - do { - // Get next host from string - while ( *current == ' ' ) current++; - end = strchr( current, ' ' ); - size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); - if ( len > SHORTBUF ) len = SHORTBUF; - snprintf( host, len, "%s", current ); - int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); - for ( int i = 0; i < newHosts; ++i ) { - if ( altIndex >= MAX_ALTS ) + dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; + const char *current, *end; + int altIndex = 0; + memset( altservers, 0, sizeof altservers ); + current = hosts; + do { + // Get next host from string + while ( *current == ' ' ) current++; + end = strchr( current, ' ' ); + size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); + if ( len > SHORTBUF ) len = SHORTBUF; + snprintf( host, len, "%s", current ); + int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); + for ( int i = 0; i < newHosts; ++i ) { + if ( altIndex >= MAX_ALTS ) + break; + altservers[altIndex].host = tempHosts[i]; + altIndex += 1; + } + current = end + 1; + } while ( end != NULL && altIndex < MAX_ALTS ); + // Connect + for ( int i = 0; i < altIndex; ++i ) { + if ( altservers[i].host.type == 0 ) + continue; + // Try to connect + dnbd3_reply_t reply; + sock = sock_connect( &altservers[i].host, 3500, 10000 ); + if ( sock == -1 ) { + counters->fails++; + logadd( LOG_ERROR, "Could not connect to host (errno=%d)", errno ); + } else if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) { + counters->fails++; + logadd( LOG_ERROR, "Could not send select image" ); + } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { + counters->fails++; + logadd( LOG_ERROR, "Could not read select image reply (%d)", errno ); + } else if ( rid != 0 && rid != remoteRid ) { + counters->fails++; + logadd( LOG_ERROR, "rid mismatch" ); + //} else if ( !dnbd3_get_block( sock, run_i * blockSize, blockSize, 0, 0 ) ) { + } else if ( !dnbd3_get_block( sock, (((uint64_t)rand()) << 16 + rand()) % (remoteSize - blockSize), blockSize, 0, 0 ) ) { + counters->fails++; + logadd( LOG_ERROR, "send: get block failed" ); + } else if ( !dnbd3_get_reply( sock, &reply ) ) { + counters->fails++; + logadd( LOG_ERROR, "recv: get block header failed" ); + } else if ( reply.cmd != CMD_GET_BLOCK ) { + counters->fails++; + logadd( LOG_ERROR, "recv: get block reply is not CMD_GET_BLOCK" ); + } else { + int rv, togo = blockSize; + do { + rv = recv( sock, trash, MIN( sizeof(trash), togo ), MSG_WAITALL|MSG_NOSIGNAL ); + if ( rv == -1 && errno == EINTR ) + continue; + if ( rv <= 0 ) break; - altservers[altIndex].host = tempHosts[i]; - altIndex += 1; - } - current = end + 1; - } while ( end != NULL && altIndex < MAX_ALTS ); - logadd( LOG_INFO, "Got %d servers from init call", altIndex ); - // Connect - for ( int i = 0; i < altIndex; ++i ) { - if ( altservers[i].host.type == 0 ) - continue; - // Try to connect - sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 1000 ); - if ( sock == -1 ) { - counters->fails++; - logadd( LOG_ERROR, "Could not connect to host" ); - } else if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) { - counters->fails++; - logadd( LOG_ERROR, "Could not send select image" ); - } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { + togo -= rv; + } while ( togo > 0 ); + if ( togo != 0 ) { counters->fails++; - logadd( LOG_ERROR, "Could not read select image reply (%d)", errno ); - } else if ( rid != 0 && rid != remoteRid ) { - counters->fails++; - logadd( LOG_ERROR, "rid mismatch" ); + logadd( LOG_ERROR, "recv: get block payload failed (remaining %d)", togo ); } else { counters->success++; - break; - } - // Failed - logadd( LOG_DEBUG1, "Server does not offer requested image... " ); - if ( sock != -1 ) { close( sock ); sock = -1; + continue; } } + // Failed if ( sock != -1 ) { - // connectionInitDone = true; - if (closeSockets) { - close( sock ); - } + close( sock ); + sock = -1; } } + if ( sock != -1 ) { + close( sock ); + } } return true; } diff --git a/src/bench/connection.h b/src/bench/connection.h index 9cb59ef..770bf0d 100644 --- a/src/bench/connection.h +++ b/src/bench/connection.h @@ -19,7 +19,7 @@ typedef struct _dnbd3_async { } dnbd3_async_t; -bool connection_init_n_times(const char *hosts, const char *image, const uint16_t rid, int ntimes, BenchCounters* counters, bool closeSockets); +bool connection_init_n_times(const char *hosts, const char *image, const uint16_t rid, int ntimes, uint64_t blockSize, BenchCounters* counters); bool connection_init(const char *hosts, const char *image, const uint16_t rid); diff --git a/src/bench/helper.h b/src/bench/helper.h index 8342a79..e0c0262 100644 --- a/src/bench/helper.h +++ b/src/bench/helper.h @@ -29,6 +29,7 @@ typedef struct BenchThreadData { char* server_address; char * image_name; int runs; + int bs; int threadNumber; bool closeSockets; } BenchThreadData; diff --git a/src/bench/main.c b/src/bench/main.c index 2f32dbf..f8c55c3 100644 --- a/src/bench/main.c +++ b/src/bench/main.c @@ -17,10 +17,6 @@ #define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0) -/* Debug/Benchmark variables */ -static bool useDebug = false; - - static void printUsage(char *argv0, int exitCode) { printf( "Usage: %s [--debug] --host <serverAddress(es)> --image <imageName> [--rid revision]\n", argv0 ); @@ -30,19 +26,18 @@ static void printUsage(char *argv0, int exitCode) printf( " -r --rid Revision to use (omit or pass 0 for latest)\n" ); printf( " -n --runs Number of connection attempts per thread\n" ); printf( " -t --threads number of threads\n" ); - printf( " -l --log Write log to given location\n" ); - printf( " -d --debug Don't fork and print debug output (fuse > stderr, dnbd3 > stdout)\n" ); - // // fuse_main( 2, arg, &dnbd3_fuse_no_operations, NULL ); + printf( " -b --blocksize Size of blocks to request (def. 4096)\n" ); exit( exitCode ); } -static const char *optString = "h:i:n:t:HvVd"; +static const char *optString = "b:h:i:n:t:Hv"; static const struct option longOpts[] = { { "host", required_argument, NULL, 'h' }, { "image", required_argument, NULL, 'i' }, { "nruns", optional_argument, NULL, 'n' }, - { "threads", optional_argument, NULL, 't' }, - { "help", optional_argument, NULL, 'H' }, + { "threads", required_argument, NULL, 't' }, + { "blocksize", required_argument, NULL, 'b' }, + { "help", no_argument, NULL, 'H' }, { "version", no_argument, NULL, 'v' }, { 0, 0, 0, 0 } }; @@ -59,11 +54,11 @@ void* runBenchThread(void* t) { BenchThreadData* data = t; connection_init_n_times( data->server_address, - data->server_address, + data->image_name, 0, data->runs, - data->counter, - data->closeSockets); + data->bs, + data->counter); printf("Thread #%d finished\n", data->threadNumber); return NULL; } @@ -77,6 +72,7 @@ int main(int argc, char *argv[]) bool closeSockets = false; int n_runs = 100; int n_threads = 1; + int bs = 4096; if ( argc <= 1 || strcmp( argv[1], "--help" ) == 0 || strcmp( argv[1], "--usage" ) == 0 ) { printUsage( argv[0], 0 ); @@ -85,10 +81,10 @@ int main(int argc, char *argv[]) while ( ( opt = getopt_long( argc, argv, optString, longOpts, &lidx ) ) != -1 ) { switch ( opt ) { case 'h': - server_address = optarg; + server_address = strdup(optarg); break; case 'i': - image_Name = optarg; + image_Name = strdup(optarg); break; case 'n': n_runs = atoi(optarg); @@ -96,15 +92,15 @@ int main(int argc, char *argv[]) case 't': n_threads = atoi(optarg); break; + case 'b': + bs = atoi(optarg); + break; case 'c': closeSockets = true; break; case 'H': printUsage( argv[0], 0 ); break; - case 'd': - useDebug = true; - break; default: printUsage( argv[0], EXIT_FAILURE ); } @@ -126,6 +122,7 @@ int main(int argc, char *argv[]) server_address, image_Name, n_runs, + bs, i, closeSockets}; threadData[i] = tmp2; diff --git a/src/server/altservers.c b/src/server/altservers.c index bbbc584..943345c 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -1,11 +1,15 @@ +#include "ini.h" #include "altservers.h" #include "locks.h" +#include "threadpool.h" #include "helper.h" #include "image.h" #include "fileutil.h" #include "../shared/protocol.h" #include "../shared/timing.h" #include "../serverconfig.h" +#include "reference.h" + #include <assert.h> #include <inttypes.h> #include <jansson.h> @@ -14,56 +18,24 @@ #define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0); #define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) -static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; -static pthread_mutex_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) -static pthread_mutex_t pendingLockConsume; // Lock for removing something (nonNULL -> NULL) -static dnbd3_signal_t* runSignal = NULL; - static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; -static int numAltServers = 0; +static atomic_int numAltServers = 0; static pthread_mutex_t altServersLock; -static pthread_t altThread; - -static void *altservers_main(void *data); -static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt); +static void *altservers_runCheck(void *data); +static int altservers_getListForUplink(dnbd3_uplink_t *uplink, const char *image, int *servers, int size, int current); +static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink); +static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt); +static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server); void altservers_init() { srand( (unsigned int)time( NULL ) ); - // Init spinlock - mutex_init( &pendingLockWrite ); - mutex_init( &pendingLockConsume ); - mutex_init( &altServersLock ); - // Init signal - runSignal = signal_new(); - if ( runSignal == NULL ) { - logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." ); - exit( EXIT_FAILURE ); - } - memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); - if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { - logadd( LOG_ERROR, "Could not start altservers connector thread" ); - exit( EXIT_FAILURE ); - } - // Init waiting links queue -- this is currently a global static array so - // it will already be zero, but in case we refactor later do it explicitly - // while also holding the write lock so thread sanitizer is happy - mutex_lock( &pendingLockWrite ); - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - pending[i] = NULL; - } - mutex_unlock( &pendingLockWrite ); -} - -void altservers_shutdown() -{ - if ( runSignal == NULL ) return; - signal_call( runSignal ); // Wake altservers thread up - thread_join( altThread, NULL ); + // Init lock + mutex_init( &altServersLock, LOCK_ALT_SERVER_LIST ); } -static void addalt(int argc, char **argv, void *data) +static void addAltFromLegacy(int argc, char **argv, void *data) { char *shost; dnbd3_host_t host; @@ -81,29 +53,82 @@ static void addalt(int argc, char **argv, void *data) return; } if ( argc == 1 ) argv[1] = ""; - if ( altservers_add( &host, argv[1], isPrivate, isClientOnly ) ) { + if ( altservers_add( &host, argv[1], isPrivate, isClientOnly, NULL ) ) { (*(int*)data)++; } } +static int addAltFromIni(void *countptr, const char* section, const char* key, const char* value) +{ + dnbd3_host_t host; + char *strhost = strdup( section ); + if ( !parse_address( strhost, &host ) ) { + free( strhost ); + logadd( LOG_WARNING, "Invalid host section in alt-servers file ignored: '%s'", section ); + return 1; + } + free( strhost ); + int index; + if ( altservers_add( &host, "", false, false, &index ) ) { + (*(int*)countptr)++; + } + if ( index == -1 ) + return 1; + if ( strcmp( key, "for" ) == 0 ) { + if ( strncmp( value, "client", 6 ) == 0 ) { + altServers[index].isClientOnly = true; + altServers[index].isPrivate = false; + } else if ( strcmp( value, "replication" ) == 0 ) { + altServers[index].isClientOnly = false; + altServers[index].isPrivate = true; + } else { + logadd( LOG_WARNING, "Invalid value in alt-servers section %s for key %s: '%s'", section, key, value ); + } + } else if ( strcmp( key, "comment" ) == 0 ) { + snprintf( altServers[index].comment, COMMENT_LENGTH, "%s", value ); + } else if ( strcmp( key, "namespace" ) == 0 ) { + dnbd3_ns_t *elem = malloc( sizeof(*elem) ); + elem->name = strdup( value ); + elem->len = strlen( value ); + do { + elem->next = altServers[index].nameSpaces; + } while ( !atomic_compare_exchange_weak( &altServers[index].nameSpaces, &elem->next, elem ) ); + } else { + logadd( LOG_DEBUG1, "Unknown key in alt-servers section: '%s'", key ); + } + return 1; +} + int altservers_load() { int count = 0; char *name; if ( asprintf( &name, "%s/%s", _configDir, "alt-servers" ) == -1 ) return -1; - file_loadLineBased( name, 1, 2, &addalt, (void*)&count ); + if ( !file_isReadable( name ) ) { + free( name ); + return 0; + } + ini_parse( name, &addAltFromIni, &count ); + if ( numAltServers == 0 ) { + logadd( LOG_INFO, "Could not parse %s as .ini file, trying to load as legacy format.", name ); + file_loadLineBased( name, 1, 2, &addAltFromLegacy, (void*)&count ); + } free( name ); logadd( LOG_DEBUG1, "Added %d alt servers\n", count ); return count; } -bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly) +bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly, int *index) { int i, freeSlot = -1; + if ( index == NULL ) { + index = &freeSlot; + } mutex_lock( &altServersLock ); for (i = 0; i < numAltServers; ++i) { if ( isSameAddressPort( &altServers[i].host, host ) ) { mutex_unlock( &altServersLock ); + *index = i; return false; } else if ( freeSlot == -1 && altServers[i].host.type == 0 ) { freeSlot = i; @@ -113,6 +138,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate if ( numAltServers >= SERVER_MAX_ALTS ) { logadd( LOG_WARNING, "Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS ); mutex_unlock( &altServersLock ); + *index = -1; return false; } freeSlot = numAltServers++; @@ -120,62 +146,48 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate altServers[freeSlot].host = *host; altServers[freeSlot].isPrivate = isPrivate; altServers[freeSlot].isClientOnly = isClientOnly; + altServers[freeSlot].nameSpaces = NULL; if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment ); mutex_unlock( &altServersLock ); + *index = freeSlot; return true; } /** * ONLY called from the passed uplink's main thread */ -void altservers_findUplink(dnbd3_connection_t *uplink) +void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) { - int i; + if ( uplink->shutdown ) + return; + if ( uplink->current.fd != -1 && numAltServers <= 1 ) + return; // if betterFd != -1 it means the uplink is supposed to switch to another // server. As this function here is called by the uplink thread, it can // never be that the uplink is supposed to switch, but instead calls // this function. - assert( uplink->betterFd == -1 ); - mutex_lock( &pendingLockWrite ); + assert( uplink->better.fd == -1 ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress - if ( uplink->rttTestResult == RTT_INPROGRESS ) { - for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] != uplink ) continue; - // Yep, measuring right now - mutex_unlock( &pendingLockWrite ); - return; + if ( uplink->rttTestResult != RTT_INPROGRESS ) { + dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref ); + if ( current == uplink ) { + threadpool_run( &altservers_runCheck, uplink ); + } else if ( current != NULL ) { + ref_put( ¤t->reference ); } } - // Find free slot for measurement - for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] != NULL ) continue; - pending[i] = uplink; - uplink->rttTestResult = RTT_INPROGRESS; - mutex_unlock( &pendingLockWrite ); - signal_call( runSignal ); // Wake altservers thread up - return; - } - // End of loop - no free slot - mutex_unlock( &pendingLockWrite ); - logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." ); } -/** - * The given uplink is about to disappear, so remove it from any queues - */ -void altservers_removeUplink(dnbd3_connection_t *uplink) +static bool isImageAllowed(dnbd3_alt_server_t *alt, const char *image) { - mutex_lock( &pendingLockConsume ); - mutex_lock( &pendingLockWrite ); - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] == uplink ) { - uplink->rttTestResult = RTT_NOT_REACHABLE; - pending[i] = NULL; - } + if ( alt->nameSpaces == NULL ) + return true; + for ( dnbd3_ns_t *it = alt->nameSpaces; it != NULL; it = it->next ) { + if ( strncmp( it->name, image, it->len ) == 0 ) + return true; } - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); + return false; } /** @@ -184,95 +196,134 @@ void altservers_removeUplink(dnbd3_connection_t *uplink) * Private servers are excluded, so this is what you want to call to * get a list of servers you can tell a client about */ -int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) +int altservers_getListForClient(dnbd3_client_t *client, dnbd3_server_entry_t *output, int size) { - if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0; + dnbd3_host_t *host = &client->host; + if ( host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) + return 0; int i, j; int count = 0; - int scores[size]; - int score; - mutex_lock( &altServersLock ); + uint16_t scores[SERVER_MAX_ALTS] = { 0 }; if ( size > numAltServers ) size = numAltServers; - for (i = 0; i < numAltServers; ++i) { - if ( altServers[i].host.type == 0 ) continue; // Slot is empty - if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers - if ( host->type == altServers[i].host.type ) { - score = altservers_netCloseness( host, &altServers[i].host ) - altServers[i].numFails; - } else { - score = -( altServers[i].numFails + 128 ); // Wrong address family - } - if ( count == 0 ) { - // Trivial - this is the first entry - output[0].host = altServers[i].host; - output[0].failures = 0; - scores[0] = score; - count++; - } else { - // Other entries already exist, insert in proper position - for (j = 0; j < size; ++j) { - if ( j < count && score <= scores[j] ) continue; - if ( j > count ) break; // Should never happen but just in case... - if ( j < count && j + 1 < size ) { - // Check if we're in the middle and need to move other entries... - memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) ); - memmove( &scores[j + 1], &scores[j], sizeof(int) * (size - j - 1) ); - } - if ( count < size ) { - count++; - } - output[j].host = altServers[i].host; - output[j].failures = 0; - scores[j] = score; - break; + mutex_lock( &altServersLock ); + for ( i = 0; i < numAltServers; ++i ) { + if ( altServers[i].host.type == 0 || altServers[i].isPrivate ) + continue; // Slot is empty or uplink is for replication only + if ( !isImageAllowed( &altServers[i], client->image->name ) ) + continue; + scores[i] = (uint16_t)( 10 + altservers_netCloseness( host, &altServers[i].host ) ); + } + while ( count < size ) { + i = -1; + for ( j = 0; j < numAltServers; ++j ) { + if ( scores[j] == 0 ) + continue; + if ( i == -1 || scores[j] > scores[i] ) { + i = j; } } + if ( i == -1 ) + break; + scores[i] = 0; + output[count].host = altServers[i].host; + output[count].failures = 0; + count++; } mutex_unlock( &altServersLock ); return count; } +bool altservers_toString(int server, char *buffer, size_t len) +{ + return host_to_string( &altServers[server].host, buffer, len ); +} + +static bool isUsableForUplink( dnbd3_uplink_t *uplink, int server, ticks *now ) +{ + dnbd3_alt_local_t *local = ( uplink == NULL ? NULL : &uplink->altData[server] ); + dnbd3_alt_server_t *global = &altServers[server]; + if ( global->isClientOnly || ( !global->isPrivate && _proxyPrivateOnly ) ) + return false; + // Blocked locally (image not found on server...) + if ( local != NULL && local->blocked ) { + if ( --local->fails > 0 ) + return false; + local->blocked = false; + } + if ( global->blocked ) { + if ( timing_diff( &global->lastFail, now ) < SERVER_GLOBAL_DUP_TIME ) + return false; + global->lastFail = *now; + if ( --global->fails > 0 ) + return false; + global->blocked = false; + } + // Not blocked, depend on both fail counters + int fails = ( local == NULL ? 0 : local->fails ) + global->fails; + return fails < SERVER_BAD_UPLINK_MIN || ( rand() % fails ) < SERVER_BAD_UPLINK_MIN; +} + +int altservers_getHostListForReplication(const char *image, dnbd3_host_t *servers, int size) +{ + int idx[size]; + int num = altservers_getListForUplink( NULL, image, idx, size, -1 ); + for ( int i = 0; i < num; ++i ) { + servers[i] = altServers[i].host; + } + return num; +} + /** * Get <size> alt servers. If there are more alt servers than * requested, random servers will be picked. * This function is suited for finding uplink servers as * it includes private servers and ignores any "client only" servers + * @param current index of server for current connection, or -1 in panic mode */ -int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency) +static int altservers_getListForUplink(dnbd3_uplink_t *uplink, const char *image, int *servers, int size, int current) { - if ( size <= 0 ) return 0; - int count = 0, i; - ticks now; - timing_get( &now ); + if ( size <= 0 ) + return 0; + int count = 0; + declare_now; mutex_lock( &altServersLock ); - // Flip first server in list with a random one every time this is called - if ( numAltServers > 1 ) { - const dnbd3_alt_server_t tmp = altServers[0]; - do { - i = rand() % numAltServers; - } while ( i == 0 ); - altServers[0] = altServers[i]; - altServers[i] = tmp; - } - // We iterate over the list twice. First run adds servers with 0 failures only, - // second one also considers those that failed (not too many times) - if ( size > numAltServers ) size = numAltServers; - for (i = 0; i < numAltServers * 2; ++i) { - dnbd3_alt_server_t *srv = &altServers[i % numAltServers]; - if ( srv->host.type == 0 ) continue; // Slot is empty - if ( _proxyPrivateOnly && !srv->isPrivate ) continue; // Config says to consider private alt-servers only? ignore! - if ( srv->isClientOnly ) continue; - bool first = ( i < numAltServers ); - if ( first ) { - if ( srv->numFails > 0 ) continue; - } else { - if ( srv->numFails == 0 ) continue; // Already added in first iteration - if ( !emergency && srv->numFails > SERVER_BAD_UPLINK_THRES // server failed X times in a row - && timing_diff( &srv->lastFail, &now ) < SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore! - if ( !emergency ) srv->numFails--; + // If we don't have enough servers to randomize, take a shortcut + if ( numAltServers <= size ) { + for ( int i = 0; i < numAltServers; ++i ) { + if ( current == -1 || i == current || isUsableForUplink( uplink, i, &now ) ) { + if ( isImageAllowed( &altServers[i], image ) ) { + servers[count++] = i; + } + } + } + } else { + // Plenty of alt servers; randomize + uint8_t state[SERVER_MAX_ALTS] = { 0 }; + if ( current != -1 ) { // Make sure we also test the current server + servers[count++] = current; + state[current] = 2; + } + for ( int tr = size * 10; tr > 0 && count < size; --tr ) { + int idx = rand() % numAltServers; + if ( state[idx] != 0 ) + continue; + if ( !isImageAllowed( &altServers[idx], image ) ) { + state[idx] = 2; // Mark as used without adding, so it will be ignored in panic loop + } else if ( isUsableForUplink( uplink, idx, &now ) ) { + servers[count++] = idx; + state[idx] = 2; // Used + } else { + state[idx] = 1; // Potential + } + } + // If panic mode, consider others too + for ( int tr = size * 10; current == -1 && tr > 0 && count < size; --tr ) { + int idx = rand() % numAltServers; + if ( state[idx] == 2 ) + continue; + servers[count++] = idx; + state[idx] = 2; // Used } - // server seems ok, include in output and decrease its fail counter - output[count++] = srv->host; - if ( count >= size ) break; } mutex_unlock( &altServersLock ); return count; @@ -300,7 +351,7 @@ json_t* altservers_toJson() "rtt", rtts, "isPrivate", (int)src[i].isPrivate, "isClientOnly", (int)src[i].isClientOnly, - "numFails", src[i].numFails + "numFails", src[i].fails ); json_array_append_new( list, server ); } @@ -308,33 +359,27 @@ json_t* altservers_toJson() } /** - * Update rtt history of given server - returns the new average for that server + * Update rtt history of given server - returns the new average for that server. */ -static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt) +static uint32_t altservers_updateRtt(dnbd3_uplink_t *uplink, int index, uint32_t rtt) { - unsigned int avg = rtt; - int i; + uint32_t avg = 0, j; + dnbd3_alt_local_t *local = &uplink->altData[index]; mutex_lock( &altServersLock ); - for (i = 0; i < numAltServers; ++i) { - if ( !isSameAddressPort( host, &altServers[i].host ) ) continue; - altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt; -#if SERVER_RTT_PROBES == 5 - avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2] - + altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES; -#else -#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES - avg = 0; - for (int j = 0; j < SERVER_RTT_PROBES; ++j) { - avg += altServers[i].rtt[j]; + if ( likely( local->initDone ) ) { + local->rtt[++local->rttIndex % SERVER_RTT_PROBES] = rtt; + for ( j = 0; j < SERVER_RTT_PROBES; ++j ) { + avg += local->rtt[j]; } avg /= SERVER_RTT_PROBES; -#endif - // If we got a new rtt value, server must be working - if ( altServers[i].numFails > 0 ) { - altServers[i].numFails--; + } else { // First rtt measurement -- copy to every slot + for ( j = 0; j < SERVER_RTT_PROBES; ++j ) { + local->rtt[j] = rtt; } - break; + avg = rtt; + local->initDone = true; } + altServers[index].rtt[++altServers[index].rttIndex % SERVER_RTT_PROBES] = avg; mutex_unlock( &altServersLock ); return avg; } @@ -364,250 +409,234 @@ int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2) * track of how often servers fail, and consider them disabled for some time if they * fail too many times. */ -void altservers_serverFailed(const dnbd3_host_t * const host) +void altservers_serverFailed(int server) { - int i; - int foundIndex = -1, lastOk = -1; - ticks now; - timing_get( &now ); + declare_now; mutex_lock( &altServersLock ); - for (i = 0; i < numAltServers; ++i) { - if ( foundIndex == -1 ) { - // Looking for the failed server in list - if ( isSameAddressPort( host, &altServers[i].host ) ) { - foundIndex = i; - } - } else if ( altServers[i].host.type != 0 && altServers[i].numFails == 0 ) { - lastOk = i; - } - } - // Do only increase counter if last fail was not too recent. This is - // to prevent the counter from increasing rapidly if many images use the - // same uplink. If there's a network hickup, all uplinks will call this - // function and would increase the counter too quickly, disabling the server. - if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) { - altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE; - altServers[foundIndex].lastFail = now; - if ( lastOk != -1 ) { - // Make sure non-working servers are put at the end of the list, so they're less likely - // to get picked when testing servers for uplink connections. - const dnbd3_alt_server_t tmp = altServers[foundIndex]; - altServers[foundIndex] = altServers[lastOk]; - altServers[lastOk] = tmp; + if ( timing_diff( &altServers[server].lastFail, &now ) > SERVER_GLOBAL_DUP_TIME ) { + altServers[server].lastFail = now; + if ( altServers[server].fails++ >= SERVER_BAD_UPLINK_MAX ) { + altServers[server].blocked = true; } } mutex_unlock( &altServersLock ); } + /** - * Mainloop of this module. It will wait for requests by uplinks to find a - * suitable uplink server for them. If found, it will tell the uplink about - * the best server found. Currently the RTT history is kept per server and - * not per uplink, so if many images use the same uplink server, the history - * will update quite quickly. Needs to be improved some time, ie. by only - * updating the rtt if the last update was at least X seconds ago. + * Called from RTT checker if connecting to a server succeeded but + * subsequently selecting the given image failed. Handle this within + * the uplink and don't increase the global fail counter. */ -static void *altservers_main(void *data UNUSED) +static void altservers_imageFailed(dnbd3_uplink_t *uplink, int server) +{ + mutex_lock( &altServersLock ); + if ( uplink->altData[server].fails++ >= SERVER_BAD_UPLINK_MAX ) { + uplink->altData[server].blocked = true; + } + mutex_unlock( &altServersLock ); +} + +static void *altservers_runCheck(void *data) +{ + dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data; + + assert( uplink != NULL ); + setThreadName( "altserver-check" ); + altservers_findUplinkInternal( uplink ); + ref_put( &uplink->reference ); // Acquired in findUplinkAsync + return NULL; +} + +void altservers_findUplink(dnbd3_uplink_t *uplink) +{ + altservers_findUplinkInternal( uplink ); + while ( uplink->rttTestResult == RTT_INPROGRESS ) { + usleep( 5000 ); + } +} + +int altservers_hostToIndex(dnbd3_host_t *host) +{ + for ( int i = 0; i < numAltServers; ++i ) { + if ( isSameAddressPort( host, &altServers[i].host ) ) + return i; + } + return -1; +} + +const dnbd3_host_t* altservers_indexToHost(int server) +{ + return &altServers[server].host; +} + +// XXX Sync call above must block until async worker has finished XXX +static void altservers_findUplinkInternal(dnbd3_uplink_t *uplink) { const int ALTS = 4; - int ret, itLink, itAlt, numAlts; - bool found; - char buffer[DNBD3_BLOCK_SIZE ]; - dnbd3_reply_t reply; - dnbd3_host_t servers[ALTS + 1]; - serialized_buffer_t serialized; + int itAlt, numAlts, current; + bool panic; + int servers[ALTS + 1]; struct timespec start, end; - ticks nextCloseUnusedFd; - setThreadName( "altserver-check" ); - blockNoncriticalSignals(); - timing_gets( &nextCloseUnusedFd, 900 ); - // LOOP - while ( !_shutdown ) { - // Wait 5 seconds max. - ret = signal_wait( runSignal, 5000 ); - if ( _shutdown ) goto cleanup; - if ( ret == SIGNAL_ERROR ) { - if ( errno == EAGAIN || errno == EINTR ) continue; - logadd( LOG_WARNING, "Error %d on signal_clear on alservers_main! Things will break!", errno ); - usleep( 100000 ); + if ( _shutdown ) + return; + mutex_lock( &uplink->rttLock ); + // Maybe we already have a result, or check is currently running + if ( uplink->better.fd != -1 || uplink->rttTestResult == RTT_INPROGRESS ) { + mutex_unlock( &uplink->rttLock ); + return; + } + assert( uplink->rttTestResult != RTT_DOCHANGE ); + uplink->rttTestResult = RTT_INPROGRESS; + panic = ( uplink->current.fd == -1 ); + current = uplink->current.index; // Current server index (or last one in panic mode) + mutex_unlock( &uplink->rttLock ); + // First, get 4 alt servers + numAlts = altservers_getListForUplink( uplink, uplink->image->name, servers, ALTS, panic ? -1 : current ); + // If we're already connected and only got one server anyways, there isn't much to do + if ( numAlts == 0 || ( numAlts == 1 && !panic ) ) { + uplink->rttTestResult = RTT_DONTCHANGE; + return; + } + dnbd3_image_t * const image = image_lock( uplink->image ); + if ( image == NULL ) { // Check again after locking + uplink->rttTestResult = RTT_NOT_REACHABLE; + logadd( LOG_WARNING, "Image has gone away that was queued for RTT measurement" ); + return; + } + LOG( LOG_DEBUG2, "Running alt check for %s:%d", image->name, (int)image->rid ); + assert( uplink->rttTestResult == RTT_INPROGRESS ); + // Test them all + dnbd3_server_connection_t best = { .fd = -1 }; + unsigned long bestRtt = RTT_UNREACHABLE; + unsigned long currentRtt = RTT_UNREACHABLE; + for (itAlt = 0; itAlt < numAlts; ++itAlt) { + int server = servers[itAlt]; + // Connect + clock_gettime( BEST_CLOCK_SOURCE, &start ); + int sock = sock_connect( &altServers[server].host, 750, 1000 ); + if ( sock == -1 ) { // Connection failed means global error + altservers_serverFailed( server ); + continue; } - // Work your way through the queue - for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { - mutex_lock( &pendingLockWrite ); - if ( pending[itLink] == NULL ) { - mutex_unlock( &pendingLockWrite ); - continue; // Check once before locking, as a mutex is expensive - } - mutex_unlock( &pendingLockWrite ); - mutex_lock( &pendingLockConsume ); - mutex_lock( &pendingLockWrite ); - dnbd3_connection_t * const uplink = pending[itLink]; - mutex_unlock( &pendingLockWrite ); - if ( uplink == NULL ) { // Check again after locking - mutex_unlock( &pendingLockConsume ); - continue; - } - dnbd3_image_t * const image = image_lock( uplink->image ); - if ( image == NULL ) { // Check again after locking - uplink->rttTestResult = RTT_NOT_REACHABLE; - mutex_lock( &pendingLockWrite ); - pending[itLink] = NULL; - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); - logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" ); - continue; - } - LOG( LOG_DEBUG2, "[%d] Running alt check", itLink ); - assert( uplink->rttTestResult == RTT_INPROGRESS ); - // Now get 4 alt servers - numAlts = altservers_getListForUplink( servers, ALTS, uplink->fd == -1 ); - if ( uplink->fd != -1 ) { - // Add current server if not already in list - found = false; - for (itAlt = 0; itAlt < numAlts; ++itAlt) { - if ( !isSameAddressPort( &uplink->currentServer, &servers[itAlt] ) ) continue; - found = true; - break; - } - if ( !found ) servers[numAlts++] = uplink->currentServer; - } - // Test them all - int bestSock = -1; - int bestIndex = -1; - int bestProtocolVersion = -1; - unsigned long bestRtt = RTT_UNREACHABLE; - unsigned long currentRtt = RTT_UNREACHABLE; - for (itAlt = 0; itAlt < numAlts; ++itAlt) { - usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...) - // Connect - clock_gettime( BEST_CLOCK_SOURCE, &start ); - int sock = sock_connect( &servers[itAlt], 750, 1000 ); - if ( sock < 0 ) continue; - // Select image ++++++++++++++++++++++++++++++ - if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) { - goto server_failed; - } - // See if selecting the image succeeded ++++++++++++++++++++++++++++++ - uint16_t protocolVersion, rid; - uint64_t imageSize; - char *name; - if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { - goto server_image_not_available; - } - if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed; - if ( name == NULL || strcmp( name, image->name ) != 0 ) { - ERROR_GOTO( server_failed, "[RTT] Server offers image '%s'", name ); - } - if ( rid != image->rid ) { - ERROR_GOTO( server_failed, "[RTT] Server provides rid %d", (int)rid ); - } - if ( imageSize != image->virtualFilesize ) { - ERROR_GOTO( server_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); - } - // Request first block (NOT random!) ++++++++++++++++++++++++++++++ - if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { - LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", itLink ); - } - // See if requesting the block succeeded ++++++++++++++++++++++ - if ( !dnbd3_get_reply( sock, &reply ) ) { - LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", itLink ); - } - // check reply header - if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { - ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); - } - if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { - ERROR_GOTO( server_failed, "[RTT%d] Could not read first block payload", itLink ); - } - clock_gettime( BEST_CLOCK_SOURCE, &end ); - // Measurement done - everything fine so far - mutex_lock( &uplink->rttLock ); - const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer ); - // Penaltize rtt if this was a cycle; this will treat this server with lower priority - // in the near future too, so we prevent alternating between two servers that are both - // part of a cycle and have the lowest latency. - const unsigned int rtt = (unsigned int)((end.tv_sec - start.tv_sec) * 1000000 - + (end.tv_nsec - start.tv_nsec) / 1000 - + ( (isCurrent && uplink->cycleDetected) ? 1000000 : 0 )); // µs - unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); - // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time - if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000; - mutex_unlock( &uplink->rttLock ); - if ( uplink->fd != -1 && isCurrent ) { - // Was measuring current server - currentRtt = avg; - close( sock ); - } else if ( avg < bestRtt ) { - // Was another server, update "best" - if ( bestSock != -1 ) close( bestSock ); - bestSock = sock; - bestRtt = avg; - bestIndex = itAlt; - bestProtocolVersion = protocolVersion; - } else { - // Was too slow, ignore - close( sock ); - } - // We're done, call continue - continue; - // Jump here if anything went wrong - // This will cleanup and continue - server_failed: ; - altservers_serverFailed( &servers[itAlt] ); - server_image_not_available: ; - close( sock ); - } - // Done testing all servers. See if we should switch - if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { - // yep - if ( currentRtt > 10000000 || uplink->fd == -1 ) { - LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); - } else { - LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); - } - sock_setTimeout( bestSock, _uplinkTimeout ); - mutex_lock( &uplink->rttLock ); - uplink->betterFd = bestSock; - uplink->betterServer = servers[bestIndex]; - uplink->betterVersion = bestProtocolVersion; - uplink->rttTestResult = RTT_DOCHANGE; - mutex_unlock( &uplink->rttLock ); - signal_call( uplink->signal ); - } else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) { - // No server was reachable - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_NOT_REACHABLE; - mutex_unlock( &uplink->rttLock ); - } else { - // nope - if ( bestSock != -1 ) close( bestSock ); - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_DONTCHANGE; - uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away - mutex_unlock( &uplink->rttLock ); - if ( !image->working ) { - image->working = true; - LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink ); - } + // Select image ++++++++++++++++++++++++++++++ + if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) { + goto image_failed; + } + // See if selecting the image succeeded ++++++++++++++++++++++++++++++ + uint16_t protocolVersion, rid; + uint64_t imageSize; + char *name; + serialized_buffer_t serialized; + if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { + goto image_failed; + } + if ( protocolVersion < MIN_SUPPORTED_SERVER ) { // Server version unsupported; global fail + goto server_failed; + } + if ( name == NULL || strcmp( name, image->name ) != 0 ) { + ERROR_GOTO( image_failed, "[RTT] Server offers image '%s' instead of '%s'", name, image->name ); + } + if ( rid != image->rid ) { + ERROR_GOTO( image_failed, "[RTT] Server provides rid %d instead of %d", (int)rid, (int)image->rid ); + } + if ( imageSize != image->virtualFilesize ) { + ERROR_GOTO( image_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); + } + // Request first block (NOT random!) ++++++++++++++++++++++++++++++ + if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", server ); + } + // See if requesting the block succeeded ++++++++++++++++++++++ + dnbd3_reply_t reply; + if ( !dnbd3_get_reply( sock, &reply ) ) { + LOG_GOTO( image_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", server ); + } + // check reply header + if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { + // Sanity check failed; count this as global error (malicious/broken server) + ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); + } + // flush payload to include this into measurement + char buffer[DNBD3_BLOCK_SIZE]; + if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { + ERROR_GOTO( image_failed, "[RTT%d] Could not read first block payload", server ); + } + clock_gettime( BEST_CLOCK_SOURCE, &end ); + // Measurement done - everything fine so far + mutex_lock( &uplink->rttLock ); + const bool isCurrent = ( uplink->current.index == server ); + mutex_unlock( &uplink->rttLock ); + // Penaltize rtt if this was a cycle; this will treat this server with lower priority + // in the near future too, so we prevent alternating between two servers that are both + // part of a cycle and have the lowest latency. + uint32_t rtt = (uint32_t)((end.tv_sec - start.tv_sec) * 1000000 + + (end.tv_nsec - start.tv_nsec) / 1000); // µs + uint32_t avg = altservers_updateRtt( uplink, server, rtt ); + // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time + if ( ( uplink->cycleDetected || panic ) && isCurrent ) { + avg = (avg * 2) + 50000; + } + if ( !panic && isCurrent ) { + // Was measuring current server + currentRtt = avg; + close( sock ); + } else if ( avg < bestRtt ) { + // Was another server, update "best" + if ( best.fd != -1 ) { + close( best.fd ); } - image_release( image ); - // end of loop over all pending uplinks - mutex_lock( &pendingLockWrite ); - pending[itLink] = NULL; - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); + best.fd = sock; + bestRtt = avg; + best.index = server; + best.version = protocolVersion; + } else { + // Was too slow, ignore + close( sock ); + } + // We're done, call continue + continue; + // Jump here if anything went wrong + // This will cleanup and continue +image_failed: + altservers_imageFailed( uplink, server ); + goto failed; +server_failed: + altservers_serverFailed( server ); +failed: + close( sock ); + } + // Done testing all servers. See if we should switch + if ( best.fd != -1 && (panic || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { + // yep + if ( currentRtt > 10000000 || panic ) { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); + } else { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); + } + sock_setTimeout( best.fd, _uplinkTimeout ); + mutex_lock( &uplink->rttLock ); + uplink->better = best; + uplink->rttTestResult = RTT_DOCHANGE; + mutex_unlock( &uplink->rttLock ); + signal_call( uplink->signal ); + } else if ( best.fd == -1 && currentRtt == RTT_UNREACHABLE ) { + // No server was reachable, including current + uplink->rttTestResult = RTT_NOT_REACHABLE; + } else { + // nope + if ( best.fd != -1 ) { + close( best.fd ); } - // Save cache maps of all images if applicable - declare_now; - // TODO: Has nothing to do with alt servers really, maybe move somewhere else? - if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) { - timing_gets( &nextCloseUnusedFd, 900 ); - image_closeUnusedFd(); + if ( !image->working || uplink->cycleDetected ) { + image->working = true; + LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid ); } + uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away + mutex_lock( &uplink->rttLock ); + uplink->rttTestResult = RTT_DONTCHANGE; + mutex_unlock( &uplink->rttLock ); } - cleanup: ; - if ( runSignal != NULL ) signal_close( runSignal ); - runSignal = NULL; - return NULL ; + image_release( image ); } diff --git a/src/server/altservers.h b/src/server/altservers.h index 7b7b46d..8e29aaa 100644 --- a/src/server/altservers.h +++ b/src/server/altservers.h @@ -7,23 +7,27 @@ struct json_t; void altservers_init(); -void altservers_shutdown(); - int altservers_load(); -bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly); +bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly, int *index); + +void altservers_findUplinkAsync(dnbd3_uplink_t *uplink); -void altservers_findUplink(dnbd3_connection_t *uplink); +void altservers_findUplink(dnbd3_uplink_t *uplink); -void altservers_removeUplink(dnbd3_connection_t *uplink); +int altservers_getListForClient(dnbd3_client_t *client, dnbd3_server_entry_t *output, int size); -int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); +int altservers_getHostListForReplication(const char *image, dnbd3_host_t *servers, int size); -int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency); +bool altservers_toString(int server, char *buffer, size_t len); int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2); -void altservers_serverFailed(const dnbd3_host_t * const host); +void altservers_serverFailed(int server); + +int altservers_hostToIndex(dnbd3_host_t *host); + +const dnbd3_host_t* altservers_indexToHost(int server); struct json_t* altservers_toJson(); diff --git a/src/server/globals.c b/src/server/globals.c index 69e8a6e..f8c3f66 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -28,6 +28,7 @@ atomic_bool _closeUnusedFd = false; atomic_bool _vmdkLegacyMode = false; // Not really needed anymore since we have '+' and '-' in alt-servers atomic_bool _proxyPrivateOnly = false; +atomic_int _autoFreeDiskSpaceDelay = 3600 * 10; // [limits] atomic_int _maxClients = SERVER_MAX_CLIENTS; atomic_int _maxImages = SERVER_MAX_IMAGES; @@ -83,6 +84,7 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key SAVE_TO_VAR_UINT( limits, maxPayload ); SAVE_TO_VAR_UINT64( limits, maxReplicationSize ); SAVE_TO_VAR_BOOL( dnbd3, pretendClient ); + SAVE_TO_VAR_INT( dnbd3, autoFreeDiskSpaceDelay ); if ( strcmp( section, "dnbd3" ) == 0 && strcmp( key, "backgroundReplication" ) == 0 ) { if ( strcmp( value, "hashblock" ) == 0 ) { _backgroundReplication = BGR_HASHBLOCK; @@ -112,7 +114,7 @@ void globals_loadConfig() asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME ); if ( name == NULL ) return; if ( initialLoad ) { - mutex_init( &loadLock ); + mutex_init( &loadLock, LOCK_LOAD_CONFIG ); } if ( mutex_trylock( &loadLock ) != 0 ) { logadd( LOG_INFO, "Ignoring config reload request due to already running reload" ); @@ -229,6 +231,15 @@ static bool parse64(const char *in, atomic_int_fast64_t *out, const char *optnam while ( *end == ' ' ) end++; if ( *end == '\0' ) { exp = 0; + } else if ( *end == 'm' ) { + exp = 1; + base = 60; + } else if ( *end == 'h' ) { + exp = 1; + base = 3600; + } else if ( *end == 'd' ) { + exp = 1; + base = 24 * 3600; } else { char *pos = strchr( units, *end > 'Z' ? (*end - 32) : *end ); if ( pos == NULL ) { @@ -318,6 +329,7 @@ size_t globals_dumpConfig(char *buffer, size_t size) PBOOL(vmdkLegacyMode); PBOOL(proxyPrivateOnly); PBOOL(pretendClient); + PINT(autoFreeDiskSpaceDelay); P_ARG("[limits]\n"); PINT(maxClients); PINT(maxImages); diff --git a/src/server/globals.h b/src/server/globals.h index b248800..df8c595 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -8,27 +8,16 @@ #include <stdatomic.h> #include <time.h> #include <pthread.h> +#include "reftypes.h" typedef struct timespec ticks; // ######### All structs/types used by the server ######## -typedef struct _dnbd3_connection dnbd3_connection_t; +typedef struct _dnbd3_uplink dnbd3_uplink_t; typedef struct _dnbd3_image dnbd3_image_t; typedef struct _dnbd3_client dnbd3_client_t; -// Slot is free, can be used. -// Must only be set in uplink_handle_receive() or uplink_remove_client() -#define ULR_FREE 0 -// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse. -// Must only be set in uplink_request() -#define ULR_NEW 1 -// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. -// Must only be set in uplink_mainloop() or uplink_request() -#define ULR_PENDING 2 -// Slot is being processed, do not consider for hop on. -// Must only be set in uplink_handle_receive() -#define ULR_PROCESSING 3 typedef struct { uint64_t handle; // Client defined handle to pass back in reply @@ -42,60 +31,88 @@ typedef struct uint8_t hopCount; // How many hops this request has already taken across proxies } dnbd3_queued_request_t; +typedef struct _ns +{ + struct _ns *next; + char *name; + size_t len; +} dnbd3_ns_t; + +typedef struct +{ + int fails; // Hard fail: Connection failed + int rttIndex; + uint32_t rtt[SERVER_RTT_PROBES]; + bool isPrivate, isClientOnly; + bool blocked; // If true count down fails until 0 to enable again + ticks lastFail; // Last hard fail + dnbd3_host_t host; + char comment[COMMENT_LENGTH]; + _Atomic(dnbd3_ns_t *) nameSpaces; // Linked list of name spaces +} dnbd3_alt_server_t; + +typedef struct +{ + int fails; // Soft fail: Image not found + int rttIndex; + uint32_t rtt[SERVER_RTT_PROBES]; + bool blocked; // True if server is to be ignored and fails should be counted down + bool initDone; +} dnbd3_alt_local_t; + +typedef struct { + int fd; // Socket fd for this connection + int version; // Protocol version of remote server + int index; // Entry in uplinks list +} dnbd3_server_connection_t; + #define RTT_IDLE 0 // Not in progress #define RTT_INPROGRESS 1 // In progess, not finished #define RTT_DONTCHANGE 2 // Finished, but no better alternative found #define RTT_DOCHANGE 3 // Finished, better alternative written to .betterServer + .betterFd #define RTT_NOT_REACHABLE 4 // No uplink was reachable -struct _dnbd3_connection +struct _dnbd3_uplink { - int fd; // socket fd to remote server - int version; // remote server protocol version + ref reference; + dnbd3_server_connection_t current; // Currently active connection; fd == -1 means disconnected + dnbd3_server_connection_t better; // Better connection as found by altserver worker; fd == -1 means none dnbd3_signal_t* signal; // used to wake up the process pthread_t thread; // thread holding the connection pthread_mutex_t sendMutex; // For locking socket while sending pthread_mutex_t queueLock; // lock for synchronization on request queue etc. dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer - dnbd3_host_t currentServer; // Current server we're connected to pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer - int rttTestResult; // RTT_* + atomic_int rttTestResult; // RTT_* int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! - int betterVersion; // protocol version of better server - int betterFd; // Active connection to better server, ready to use - dnbd3_host_t betterServer; // The better server uint8_t *recvBuffer; // Buffer for receiving payload uint32_t recvBufferLen; // Len of ^^ - volatile bool shutdown; // signal this thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop() + atomic_bool shutdown; // signal this thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop() bool replicatedLastBlock; // bool telling if the last block has been replicated yet bool cycleDetected; // connection cycle between proxies detected for current remote server int nextReplicationIndex; // Which index in the cache map we should start looking for incomplete blocks at // If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block" uint64_t replicationHandle; // Handle of pending replication request atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup. - int queueLen; // length of queue + atomic_int queueLen; // length of queue uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives) dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; + dnbd3_alt_local_t altData[SERVER_MAX_ALTS]; }; typedef struct { - char comment[COMMENT_LENGTH]; - dnbd3_host_t host; - unsigned int rtt[SERVER_RTT_PROBES]; - unsigned int rttIndex; - bool isPrivate, isClientOnly; - ticks lastFail; - int numFails; -} dnbd3_alt_server_t; - -typedef struct -{ uint8_t host[16]; int bytes; int bitMask; int permissions; } dnbd3_access_rule_t; +typedef struct +{ + ref reference; + _Atomic uint8_t map[]; +} dnbd3_cache_map_t; + /** * Image struct. An image path could be something like * /mnt/images/rz/zfs/Windows7 ZfS.vmdk.r1 @@ -106,8 +123,8 @@ struct _dnbd3_image { char *path; // absolute path of the image char *name; // public name of the image (usually relative path minus revision ID) - dnbd3_connection_t *uplink; // pointer to a server connection - uint8_t *cache_map; // cache map telling which parts are locally cached, NULL if complete + weakref uplinkref; // pointer to a server connection + weakref ref_cacheMap; // cache map telling which parts are locally cached, NULL if complete uint64_t virtualFilesize; // virtual size of image (real size rounded up to multiple of 4k) uint64_t realFilesize; // actual file size on disk ticks atime; // last access time @@ -116,10 +133,10 @@ struct _dnbd3_image uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image uint32_t masterCrc32; // CRC-32 of the crc-32 list int readFd; // used to read the image. Used from multiple threads, so use atomic operations (pread et al) - int completenessEstimate; // Completeness estimate in percent - int users; // clients currently using this image + atomic_int completenessEstimate; // Completeness estimate in percent + atomic_int users; // clients currently using this image. XXX Lock on imageListLock when modifying and checking whether the image should be freed. Reading it elsewhere is fine without the lock. int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server - bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected + atomic_bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected uint16_t rid; // revision of image pthread_mutex_t lock; }; @@ -128,13 +145,14 @@ struct _dnbd3_client { #define HOSTNAMELEN (48) atomic_uint_fast64_t bytesSent; // Byte counter for this client. - dnbd3_image_t *image; // Image in use by this client, or NULL during handshake + dnbd3_image_t * _Atomic image; // Image in use by this client, or NULL during handshake int sock; bool isServer; // true if a server in proxy mode, false if real client dnbd3_host_t host; char hostName[HOSTNAMELEN]; // inet_ntop version of host pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too) pthread_mutex_t lock; + pthread_t thread; }; // ####################################################### @@ -273,6 +291,13 @@ extern atomic_uint_fast64_t _maxReplicationSize; extern atomic_bool _pretendClient; /** + * Minimum uptime in seconds before proxy starts deleting old + * images if running out of space. -1 disables automatic deletion. + * Only relevant in proxy mode. + */ +extern atomic_int _autoFreeDiskSpaceDelay; + +/** * Load the server configuration. */ void globals_loadConfig(); diff --git a/src/server/image.c b/src/server/image.c index bfba6cb..6259e38 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -8,6 +8,7 @@ #include "../shared/protocol.h" #include "../shared/timing.h" #include "../shared/crc32.h" +#include "reference.h" #include <assert.h> #include <fcntl.h> @@ -50,24 +51,33 @@ static bool image_clone(int sock, char *name, uint16_t revision, uint64_t imageS static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_t realFilesize, uint32_t *crc); static bool image_ensureDiskSpace(uint64_t size, bool force); -static uint8_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize); +static dnbd3_cache_map_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize); static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc); -static bool image_checkRandomBlocks(const int count, int fdImage, const int64_t fileSize, uint32_t * const crc32list, uint8_t * const cache_map); +static void image_checkRandomBlocks(dnbd3_image_t *image, const int count); +static void* closeUnusedFds(void*); +static void allocCacheMap(dnbd3_image_t *image, bool complete); + +static void cmfree(ref *ref) +{ + dnbd3_cache_map_t *cache = container_of(ref, dnbd3_cache_map_t, reference); + logadd( LOG_DEBUG2, "Freeing a cache map" ); + free( cache ); +} // ########################################## void image_serverStartup() { srand( (unsigned int)time( NULL ) ); - mutex_init( &imageListLock ); - mutex_init( &remoteCloneLock ); - mutex_init( &reloadLock ); + mutex_init( &imageListLock, LOCK_IMAGE_LIST ); + mutex_init( &remoteCloneLock, LOCK_REMOTE_CLONE ); + mutex_init( &reloadLock, LOCK_RELOAD ); + server_addJob( &closeUnusedFds, NULL, 10, 900 ); } /** * Update cache-map of given image for the given byte range * start (inclusive) - end (exclusive) - * Locks on: images[].lock */ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set) { @@ -88,33 +98,59 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co if ( start >= end ) return; bool setNewBlocks = false; - uint64_t pos = start; - mutex_lock( &image->lock ); - if ( image->cache_map == NULL ) { + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL ) { // Image seems already complete if ( set ) { // This makes no sense - mutex_unlock( &image->lock ); - logadd( LOG_DEBUG1, "image_updateCachemap(true) with no cache_map: %s", image->path ); + logadd( LOG_DEBUG1, "image_updateCachemap(true) with no cache map: %s", image->path ); return; } // Recreate a cache map, set it to all 1 initially as we assume the image was complete - const int byteSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); - image->cache_map = malloc( byteSize ); - memset( image->cache_map, 0xff, byteSize ); - } - while ( pos < end ) { - const size_t map_y = (int)( pos >> 15 ); - const int map_x = (int)( (pos >> 12) & 7 ); // mod 8 - const int bit_mask = 1 << map_x; - if ( set ) { - if ( (image->cache_map[map_y] & bit_mask) == 0 ) setNewBlocks = true; - image->cache_map[map_y] |= (uint8_t)bit_mask; - } else { - image->cache_map[map_y] &= (uint8_t)~bit_mask; + allocCacheMap( image, true ); + cache = ref_get_cachemap( image ); + if ( cache == NULL ) { + logadd( LOG_WARNING, "WHAT!!!?!?!= No cache map right after alloc?! %s", image->path ); + return; } - pos += DNBD3_BLOCK_SIZE; } + // Set/unset + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + uint64_t pos; + // First byte + uint8_t fb = 0, lb = 0; + for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) { + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + fb |= bit_mask; + } + // Last byte + if ( lastByteInMap != firstByteInMap ) { + for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) { + assert( lastByteInMap == (pos >> 15) ); + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + lb |= bit_mask; + } + } + atomic_thread_fence( memory_order_acquire ); + if ( set ) { + uint8_t fo = atomic_fetch_or_explicit( &cache->map[firstByteInMap], fb, memory_order_relaxed ); + uint8_t lo = atomic_fetch_or_explicit( &cache->map[lastByteInMap], lb, memory_order_relaxed ); + setNewBlocks = ( fo != cache->map[firstByteInMap] || lo != cache->map[lastByteInMap] ); + } else { + atomic_fetch_and_explicit( &cache->map[firstByteInMap], (uint8_t)~fb, memory_order_relaxed ); + atomic_fetch_and_explicit( &cache->map[lastByteInMap], (uint8_t)~lb, memory_order_relaxed ); + } + const uint8_t nval = set ? 0xff : 0; + // Everything in between + for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { + if ( atomic_exchange_explicit( &cache->map[pos], nval, memory_order_relaxed ) != nval && set ) { + setNewBlocks = true; + } + } + atomic_thread_fence( memory_order_release ); if ( setNewBlocks && image->crc32 != NULL ) { // If setNewBlocks is set, at least one of the blocks was not cached before, so queue all hash blocks // for checking, even though this might lead to checking some hash block again, if it was @@ -122,19 +158,14 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co // First set start and end to borders of hash blocks start &= ~(uint64_t)(HASH_BLOCK_SIZE - 1); end = (end + HASH_BLOCK_SIZE - 1) & ~(uint64_t)(HASH_BLOCK_SIZE - 1); - pos = start; - while ( pos < end ) { - if ( image->cache_map == NULL ) break; + for ( pos = start; pos < end; pos += HASH_BLOCK_SIZE ) { const int block = (int)( pos / HASH_BLOCK_SIZE ); - if ( image_isHashBlockComplete( image->cache_map, block, image->realFilesize ) ) { - mutex_unlock( &image->lock ); - integrity_check( image, block ); - mutex_lock( &image->lock ); + if ( image_isHashBlockComplete( cache->map, block, image->realFilesize ) ) { + integrity_check( image, block, false ); } - pos += HASH_BLOCK_SIZE; } } - mutex_unlock( &image->lock ); + ref_put( &cache->reference ); } /** @@ -146,20 +177,18 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co bool image_isComplete(dnbd3_image_t *image) { assert( image != NULL ); - mutex_lock( &image->lock ); if ( image->virtualFilesize == 0 ) { - mutex_unlock( &image->lock ); return false; } - if ( image->cache_map == NULL ) { - mutex_unlock( &image->lock ); + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL ) { return true; } bool complete = true; int j; const int map_len_bytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); for (j = 0; j < map_len_bytes - 1; ++j) { - if ( image->cache_map[j] != 0xFF ) { + if ( cache->map[j] != 0xFF ) { complete = false; break; } @@ -174,18 +203,27 @@ bool image_isComplete(dnbd3_image_t *image) for (j = 0; j < blocks_in_last_byte; ++j) last_byte |= (uint8_t)(1 << j); } - complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte); + complete = ((cache->map[map_len_bytes - 1] & last_byte) == last_byte); } - if ( !complete ) { - mutex_unlock( &image->lock ); + ref_put( &cache->reference ); + if ( !complete ) return false; + mutex_lock( &image->lock ); + // Lock and make sure current cache map is still the one we saw complete + dnbd3_cache_map_t *current = ref_get_cachemap( image ); + if ( current == cache ) { + // Set cache map NULL as it's complete + ref_setref( &image->ref_cacheMap, NULL ); + } + if ( current != NULL ) { + ref_put( ¤t->reference ); } - char mapfile[PATHLEN] = ""; - free( image->cache_map ); - image->cache_map = NULL; - snprintf( mapfile, PATHLEN, "%s.map", image->path ); mutex_unlock( &image->lock ); - unlink( mapfile ); + if ( current == cache ) { // Successfully set cache map to NULL above + char mapfile[PATHLEN] = ""; + snprintf( mapfile, PATHLEN, "%s.map", image->path ); + unlink( mapfile ); + } return true; } @@ -203,7 +241,9 @@ bool image_ensureOpen(dnbd3_image_t *image) { if ( image->readFd != -1 ) return image; int newFd = open( image->path, O_RDONLY ); - if ( newFd != -1 ) { + if ( newFd == -1 ) { + logadd( LOG_WARNING, "Cannot open %s for reading", image->path ); + } else { // Check size const off_t flen = lseek( newFd, 0, SEEK_END ); if ( flen == -1 ) { @@ -234,6 +274,22 @@ bool image_ensureOpen(dnbd3_image_t *image) return image->readFd != -1; } +dnbd3_image_t* image_byId(int imgId) +{ + int i; + mutex_lock( &imageListLock ); + for (i = 0; i < _num_images; ++i) { + dnbd3_image_t * const image = _images[i]; + if ( image != NULL && image->id == imgId ) { + image->users++; + mutex_unlock( &imageListLock ); + return image; + } + } + mutex_unlock( &imageListLock ); + return NULL; +} + /** * Get an image by name+rid. This function increases a reference counter, * so you HAVE TO CALL image_release for every image_get() call at some @@ -267,14 +323,12 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) return NULL ; } - mutex_lock( &candidate->lock ); - mutex_unlock( &imageListLock ); candidate->users++; - mutex_unlock( &candidate->lock ); + mutex_unlock( &imageListLock ); // Found, see if it works -// TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list -// TODO: But remember size-changed images forever + // TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list + // TODO: But remember size-changed images forever if ( candidate->working || checkIfWorking ) { // Is marked working, but might not have an fd open if ( !image_ensureOpen( candidate ) ) { @@ -317,14 +371,14 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) logadd( LOG_WARNING, "lseek() on %s failed (errno=%d)%s.", candidate->path, errno, removingText ); reload = true; } else if ( (uint64_t)len != candidate->realFilesize ) { - logadd( LOG_DEBUG1, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64 + logadd( LOG_WARNING, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64 ". Try sending SIGHUP to server if you know what you're doing.", candidate->path, candidate->realFilesize, (uint64_t)len ); } else { // Seek worked, file size is same, now see if we can read from file char buffer[100]; if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) { - logadd( LOG_DEBUG2, "Reading first %d bytes from %s failed (errno=%d)%s.", + logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)%s.", (int)sizeof(buffer), candidate->path, errno, removingText ); reload = true; } else if ( !candidate->working ) { @@ -338,6 +392,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) // Could not access the image with exising fd - mark for reload which will re-open the file. // make a copy of the image struct but keep the old one around. If/When it's not being used // anymore, it will be freed automatically. + logadd( LOG_DEBUG1, "Reloading image file %s", candidate->path ); dnbd3_image_t *img = calloc( sizeof(dnbd3_image_t), 1 ); img->path = strdup( candidate->path ); img->name = strdup( candidate->name ); @@ -349,19 +404,18 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) img->rid = candidate->rid; img->users = 1; img->working = false; - mutex_init( &img->lock ); + img->ref_cacheMap = NULL; + mutex_init( &img->lock, LOCK_IMAGE ); if ( candidate->crc32 != NULL ) { const size_t mb = IMGSIZE_TO_HASHBLOCKS( candidate->virtualFilesize ) * sizeof(uint32_t); img->crc32 = malloc( mb ); memcpy( img->crc32, candidate->crc32, mb ); } - mutex_lock( &candidate->lock ); - if ( candidate->cache_map != NULL ) { - const size_t mb = IMGSIZE_TO_MAPBYTES( candidate->virtualFilesize ); - img->cache_map = malloc( mb ); - memcpy( img->cache_map, candidate->cache_map, mb ); + dnbd3_cache_map_t *cache = ref_get_cachemap( candidate ); + if ( cache != NULL ) { + ref_setref( &img->ref_cacheMap, &cache->reference ); + ref_put( &cache->reference ); } - mutex_unlock( &candidate->lock ); if ( image_addToList( img ) ) { image_release( candidate ); candidate = img; @@ -369,19 +423,16 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) img->users = 0; image_free( img ); } + // Check if image is incomplete, initialize uplink + if ( candidate->ref_cacheMap != NULL ) { + uplink_init( candidate, -1, NULL, -1 ); + } // readFd == -1 and working == FALSE at this point, // this function needs some splitting up for handling as we need to run most // of the above code again. for now we know that the next call for this // name:rid will get ne newly inserted "img" and try to re-open the file. } - // Check if image is incomplete, handle - if ( candidate->cache_map != NULL ) { - if ( candidate->uplink == NULL ) { - uplink_init( candidate, -1, NULL, -1 ); - } - } - return candidate; // We did all we can, hopefully it's working } @@ -391,17 +442,15 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) * Every call to image_lock() needs to be followed by a call to image_release() at some point. * Locks on: imageListLock, _images[].lock */ -dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places that do image->users-- +dnbd3_image_t* image_lock(dnbd3_image_t *image) { if ( image == NULL ) return NULL ; int i; mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { if ( _images[i] == image ) { - mutex_lock( &image->lock ); - mutex_unlock( &imageListLock ); image->users++; - mutex_unlock( &image->lock ); + mutex_unlock( &imageListLock ); return image; } } @@ -419,12 +468,9 @@ dnbd3_image_t* image_release(dnbd3_image_t *image) { if ( image == NULL ) return NULL; mutex_lock( &imageListLock ); - mutex_lock( &image->lock ); assert( image->users > 0 ); - image->users--; - bool inUse = image->users != 0; - mutex_unlock( &image->lock ); - if ( inUse ) { // Still in use, do nothing + // Decrement and check for 0 + if ( --image->users != 0 ) { // Still in use, do nothing mutex_unlock( &imageListLock ); return NULL; } @@ -439,7 +485,7 @@ dnbd3_image_t* image_release(dnbd3_image_t *image) } mutex_unlock( &imageListLock ); // So it wasn't in the images list anymore either, get rid of it - if ( !inUse ) image = image_free( image ); + image = image_free( image ); return NULL; } @@ -470,7 +516,6 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image) { bool mustFree = false; mutex_lock( &imageListLock ); - mutex_lock( &image->lock ); for ( int i = _num_images - 1; i >= 0; --i ) { if ( _images[i] == image ) { _images[i] = NULL; @@ -478,7 +523,6 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image) } if ( _images[i] == NULL && i + 1 == _num_images ) _num_images--; } - mutex_unlock( &image->lock ); mutex_unlock( &imageListLock ); if ( mustFree ) image = image_free( image ); return image; @@ -493,17 +537,7 @@ void image_killUplinks() mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { if ( _images[i] == NULL ) continue; - mutex_lock( &_images[i]->lock ); - if ( _images[i]->uplink != NULL ) { - mutex_lock( &_images[i]->uplink->queueLock ); - if ( !_images[i]->uplink->shutdown ) { - thread_detach( _images[i]->uplink->thread ); - _images[i]->uplink->shutdown = true; - } - mutex_unlock( &_images[i]->uplink->queueLock ); - signal_call( _images[i]->uplink->signal ); - } - mutex_unlock( &_images[i]->lock ); + uplink_shutdown( _images[i] ); } mutex_unlock( &imageListLock ); } @@ -542,18 +576,14 @@ bool image_loadAll(char *path) // Lock again, see if image is still there, free if required mutex_lock( &imageListLock ); if ( ret || i >= _num_images || _images[i] == NULL || _images[i]->id != imgId ) continue; - // Image needs to be removed + // File not readable but still in list -- needs to be removed imgHandle = _images[i]; _images[i] = NULL; if ( i + 1 == _num_images ) _num_images--; - mutex_lock( &imgHandle->lock ); - const bool freeImg = ( imgHandle->users == 0 ); - mutex_unlock( &imgHandle->lock ); - // We unlocked, but the image has been removed from the list already, so - // there's no way the users-counter can increase at this point. - if ( freeImg ) { + if ( imgHandle->users == 0 ) { // Image is not in use anymore, free the dangling entry immediately - mutex_unlock( &imageListLock ); // image_free might do several fs operations; unlock + mutex_unlock( &imageListLock ); // image_free locks on this, and + // might do several fs operations; unlock image_free( imgHandle ); mutex_lock( &imageListLock ); } @@ -581,12 +611,10 @@ bool image_tryFreeAll() { mutex_lock( &imageListLock ); for (int i = _num_images - 1; i >= 0; --i) { - if ( _images[i] != NULL && _images[i]->users == 0 ) { // XXX Data race... + if ( _images[i] != NULL && _images[i]->users == 0 ) { dnbd3_image_t *image = _images[i]; _images[i] = NULL; - mutex_unlock( &imageListLock ); image = image_free( image ); - mutex_lock( &imageListLock ); } if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--; } @@ -596,35 +624,34 @@ bool image_tryFreeAll() /** * Free image. DOES NOT check if it's in use. - * Indirectly locks on imageListLock, image.lock, uplink.queueLock + * (Indirectly) locks on image.lock, uplink.queueLock */ static dnbd3_image_t* image_free(dnbd3_image_t *image) { assert( image != NULL ); - if ( !_shutdown ) { - logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid ); - } - // - uplink_shutdown( image ); + assert( image->users == 0 ); + logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", image->name, (int)image->rid ); + // uplink_shutdown might return false to tell us + // that the shutdown is in progress. Bail out since + // this will get called again when the uplink is done. + if ( !uplink_shutdown( image ) ) + return NULL; mutex_lock( &image->lock ); - free( image->cache_map ); + ref_setref( &image->ref_cacheMap, NULL ); free( image->crc32 ); free( image->path ); free( image->name ); - image->cache_map = NULL; image->crc32 = NULL; image->path = NULL; image->name = NULL; mutex_unlock( &image->lock ); if ( image->readFd != -1 ) close( image->readFd ); mutex_destroy( &image->lock ); - // - memset( image, 0, sizeof(*image) ); free( image ); return NULL ; } -bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize) +bool image_isHashBlockComplete(atomic_uint_least8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize) { if ( cacheMap == NULL ) return true; const uint64_t end = (block + 1) * HASH_BLOCK_SIZE; @@ -731,7 +758,7 @@ static bool image_load(char *base, char *path, int withUplink) { int revision = -1; struct stat st; - uint8_t *cache_map = NULL; + dnbd3_cache_map_t *cache = NULL; uint32_t *crc32list = NULL; dnbd3_image_t *existing = NULL; int fdImage = -1; @@ -814,24 +841,15 @@ static bool image_load(char *base, char *path, int withUplink) } // 1. Allocate memory for the cache map if the image is incomplete - cache_map = image_loadCacheMap( path, virtualFilesize ); + cache = image_loadCacheMap( path, virtualFilesize ); // XXX: Maybe try sha-256 or 512 first if you're paranoid (to be implemented) // 2. Load CRC-32 list of image - bool doFullCheck = false; uint32_t masterCrc = 0; const int hashBlockCount = IMGSIZE_TO_HASHBLOCKS( virtualFilesize ); crc32list = image_loadCrcList( path, virtualFilesize, &masterCrc ); - // Check CRC32 - if ( crc32list != NULL ) { - if ( !image_checkRandomBlocks( 4, fdImage, realFilesize, crc32list, cache_map ) ) { - logadd( LOG_ERROR, "quick crc32 check of %s failed. Data corruption?", path ); - doFullCheck = true; - } - } - // Compare data just loaded to identical image we apparently already loaded if ( existing != NULL ) { if ( existing->realFilesize != realFilesize ) { @@ -850,7 +868,7 @@ static bool image_load(char *base, char *path, int withUplink) crc32list = NULL; function_return = true; goto load_error; // Keep existing - } else if ( existing->cache_map != NULL && cache_map == NULL ) { + } else if ( existing->ref_cacheMap != NULL && cache == NULL ) { // Just ignore that fact, if replication is really complete the cache map will be removed anyways logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", existing->name, (int)existing->rid ); function_return = true; @@ -870,19 +888,20 @@ static bool image_load(char *base, char *path, int withUplink) dnbd3_image_t *image = calloc( 1, sizeof(dnbd3_image_t) ); image->path = strdup( path ); image->name = strdup( imgName ); - image->cache_map = cache_map; + image->ref_cacheMap = NULL; + ref_setref( &image->ref_cacheMap, &cache->reference ); image->crc32 = crc32list; image->masterCrc32 = masterCrc; - image->uplink = NULL; + image->uplinkref = NULL; image->realFilesize = realFilesize; image->virtualFilesize = virtualFilesize; image->rid = (uint16_t)revision; image->users = 0; image->readFd = -1; - image->working = (image->cache_map == NULL ); + image->working = ( cache == NULL ); timing_get( &image->nextCompletenessEstimate ); image->completenessEstimate = -1; - mutex_init( &image->lock ); + mutex_init( &image->lock, LOCK_IMAGE ); int32_t offset; if ( stat( path, &st ) == 0 ) { // Negatively offset atime by file modification time @@ -894,16 +913,16 @@ static bool image_load(char *base, char *path, int withUplink) timing_gets( &image->atime, offset ); // Prevent freeing in cleanup - cache_map = NULL; + cache = NULL; crc32list = NULL; // Get rid of cache map if image is complete - if ( image->cache_map != NULL ) { + if ( image->ref_cacheMap != NULL ) { image_isComplete( image ); } // Image is definitely incomplete, initialize uplink worker - if ( image->cache_map != NULL ) { + if ( image->ref_cacheMap != NULL ) { image->working = false; if ( withUplink ) { uplink_init( image, -1, NULL, -1 ); @@ -915,6 +934,8 @@ static bool image_load(char *base, char *path, int withUplink) if ( image_addToList( image ) ) { // Keep fd for reading fdImage = -1; + // Check CRC32 + image_checkRandomBlocks( image, 4 ); } else { logadd( LOG_ERROR, "Image list full: Could not add image %s", path ); image->readFd = -1; // Keep fdImage instead, will be closed below @@ -922,33 +943,28 @@ static bool image_load(char *base, char *path, int withUplink) goto load_error; } logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->name, (int)image->rid ); - // CRC errors found... - if ( doFullCheck ) { - logadd( LOG_INFO, "Queueing full CRC32 check for '%s:%d'\n", image->name, (int)image->rid ); - integrity_check( image, -1 ); - } - function_return = true; // Clean exit: load_error: ; if ( existing != NULL ) existing = image_release( existing ); if ( crc32list != NULL ) free( crc32list ); - if ( cache_map != NULL ) free( cache_map ); + if ( cache != NULL ) free( cache ); if ( fdImage != -1 ) close( fdImage ); return function_return; } -static uint8_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize) +static dnbd3_cache_map_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize) { - uint8_t *retval = NULL; + dnbd3_cache_map_t *retval = NULL; char mapFile[strlen( imagePath ) + 10 + 1]; sprintf( mapFile, "%s.map", imagePath ); int fdMap = open( mapFile, O_RDONLY ); - if ( fdMap >= 0 ) { + if ( fdMap != -1 ) { const int map_size = IMGSIZE_TO_MAPBYTES( fileSize ); - retval = calloc( 1, map_size ); - const ssize_t rd = read( fdMap, retval, map_size ); + retval = calloc( 1, sizeof(*retval) + map_size ); + ref_init( &retval->reference, cmfree, 0 ); + const ssize_t rd = read( fdMap, retval->map, map_size ); if ( map_size != rd ) { logadd( LOG_WARNING, "Could only read %d of expected %d bytes of cache map of '%s'", (int)rd, (int)map_size, imagePath ); // Could not read complete map, that means the rest of the image file will be considered incomplete @@ -1009,18 +1025,26 @@ static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t f return retval; } -static bool image_checkRandomBlocks(const int count, int fdImage, const int64_t realFilesize, uint32_t * const crc32list, uint8_t * const cache_map) +static void image_checkRandomBlocks(dnbd3_image_t *image, const int count) { + if ( image->crc32 == NULL ) + return; // This checks the first block and (up to) count - 1 random blocks for corruption // via the known crc32 list. This is very sloppy and is merely supposed to detect // accidental corruption due to broken dnbd3-proxy functionality or file system - // corruption. + // corruption, or people replacing/updating images which is a very stupid thing. assert( count > 0 ); - const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( realFilesize ); - int blocks[count + 1]; + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ); + int blocks[count]; int index = 0, j; int block; - if ( image_isHashBlockComplete( cache_map, 0, realFilesize ) ) blocks[index++] = 0; + if ( image_isHashBlockComplete( cache->map, 0, image->virtualFilesize ) ) { + blocks[index++] = 0; + } + if ( hashBlocks > 1 && image_isHashBlockComplete( cache->map, hashBlocks - 1, image->virtualFilesize ) ) { + blocks[index++] = hashBlocks - 1; + } int tries = count * 5; // Try only so many times to find a non-duplicate complete block while ( index + 1 < count && --tries > 0 ) { block = rand() % hashBlocks; // Random block @@ -1028,11 +1052,15 @@ static bool image_checkRandomBlocks(const int count, int fdImage, const int64_t if ( blocks[j] == block ) goto while_end; } // Block complete? If yes, add to list - if ( image_isHashBlockComplete( cache_map, block, realFilesize ) ) blocks[index++] = block; + if ( image_isHashBlockComplete( cache->map, block, image->virtualFilesize ) ) { + blocks[index++] = block; + } while_end: ; } - blocks[MIN(index, count)] = -1; // End of array has to be marked by a -1 - return image_checkBlocksCrc32( fdImage, crc32list, blocks, realFilesize ); // Return result of check + ref_put( &cache->reference ); + for ( int i = 0; i < index; ++i ) { + integrity_check( image, blocks[i], true ); + } } /** @@ -1191,7 +1219,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, dnbd3_host_t servers[REP_NUM_SRV]; int uplinkSock = -1; dnbd3_host_t uplinkServer; - const int count = altservers_getListForUplink( servers, REP_NUM_SRV, false ); + const int count = altservers_getHostListForReplication( name, servers, REP_NUM_SRV ); uint16_t remoteProtocolVersion; uint16_t remoteRid = revision; uint64_t remoteImageSize; @@ -1504,9 +1532,9 @@ json_t* image_getListAsJson() json_t *imagesJson = json_array(); json_t *jsonImage; int i; - char uplinkName[100] = { 0 }; + char uplinkName[100]; uint64_t bytesReceived; - int users, completeness, idleTime; + int completeness, idleTime; declare_now; mutex_lock( &imageListLock ); @@ -1514,27 +1542,26 @@ json_t* image_getListAsJson() if ( _images[i] == NULL ) continue; dnbd3_image_t *image = _images[i]; mutex_lock( &image->lock ); - mutex_unlock( &imageListLock ); - users = image->users; idleTime = (int)timing_diff( &image->atime, &now ); completeness = image_getCompletenessEstimate( image ); - if ( image->uplink == NULL ) { + mutex_unlock( &image->lock ); + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { bytesReceived = 0; uplinkName[0] = '\0'; } else { - bytesReceived = image->uplink->bytesReceived; - if ( image->uplink->fd == -1 || !host_to_string( &image->uplink->currentServer, uplinkName, sizeof(uplinkName) ) ) { + bytesReceived = uplink->bytesReceived; + if ( !uplink_getHostString( uplink, uplinkName, sizeof(uplinkName) ) ) { uplinkName[0] = '\0'; } + ref_put( &uplink->reference ); } - image->users++; // Prevent freeing after we unlock - mutex_unlock( &image->lock ); jsonImage = json_pack( "{sisssisisisisI}", "id", image->id, // id, name, rid never change, so access them without locking "name", image->name, "rid", (int) image->rid, - "users", users, + "users", image->users, "complete", completeness, "idle", idleTime, "size", (json_int_t)image->virtualFilesize ); @@ -1546,8 +1573,6 @@ json_t* image_getListAsJson() } json_array_append_new( imagesJson, jsonImage ); - image = image_release( image ); // Since we did image->users++; - mutex_lock( &imageListLock ); } mutex_unlock( &imageListLock ); return imagesJson; @@ -1556,30 +1581,37 @@ json_t* image_getListAsJson() /** * Get completeness of an image in percent. Only estimated, not exact. * Returns: 0-100 - * DOES NOT LOCK, so make sure to do so before calling */ int image_getCompletenessEstimate(dnbd3_image_t * const image) { assert( image != NULL ); - if ( image->cache_map == NULL ) return image->working ? 100 : 0; + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL ) + return image->working ? 100 : 0; + const int len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + if ( unlikely( len == 0 ) ) { + ref_put( &cache->reference ); + return 0; + } declare_now; if ( !timing_reached( &image->nextCompletenessEstimate, &now ) ) { // Since this operation is relatively expensive, we cache the result for a while + ref_put( &cache->reference ); return image->completenessEstimate; } int i; int percent = 0; - const int len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); - if ( len == 0 ) return 0; for ( i = 0; i < len; ++i ) { - if ( image->cache_map[i] == 0xff ) { + const uint8_t v = atomic_load_explicit( &cache->map[i], memory_order_relaxed ); + if ( v == 0xff ) { percent += 100; - } else if ( image->cache_map[i] != 0 ) { + } else if ( v != 0 ) { percent += 50; } } + ref_put( &cache->reference ); image->completenessEstimate = percent / len; - timing_set( &image->nextCompletenessEstimate, &now, 8 + rand() % 32 ); + timing_set( &image->nextCompletenessEstimate, &now, 4 + rand() % 16 ); return image->completenessEstimate; } @@ -1611,7 +1643,7 @@ bool image_checkBlocksCrc32(const int fd, uint32_t *crc32list, const int *blocks static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_t realFilesize, uint32_t *crc) { // Make buffer 4k aligned in case fd has O_DIRECT set -#define BSIZE 262144 +#define BSIZE (512*1024) char rawBuffer[BSIZE + DNBD3_BLOCK_SIZE]; char * const buffer = (char*)( ( (uintptr_t)rawBuffer + ( DNBD3_BLOCK_SIZE - 1 ) ) & ~( DNBD3_BLOCK_SIZE - 1 ) ); // How many bytes to read from the input file @@ -1669,7 +1701,7 @@ bool image_ensureDiskSpaceLocked(uint64_t size, bool force) * TODO: Store last access time of images. Currently the * last access time is reset to the file modification time * on server restart. Thus it will - * currently only delete images if server uptime is > 10 hours. + * currently only delete images if server uptime is > 24 hours. * This can be overridden by setting force to true, in case * free space is desperately needed. * Return true iff enough space is available. false in random other cases @@ -1679,48 +1711,55 @@ static bool image_ensureDiskSpace(uint64_t size, bool force) for ( int maxtries = 0; maxtries < 20; ++maxtries ) { uint64_t available; if ( !file_freeDiskSpace( _basePath, NULL, &available ) ) { - const int e = errno; - logadd( LOG_WARNING, "Could not get free disk space (errno %d), will assume there is enough space left... ;-)\n", e ); + logadd( LOG_WARNING, "Could not get free disk space (errno %d), will assume there is enough space left... ;-)\n", errno ); return true; } - if ( available > size ) return true; - if ( !force && dnbd3_serverUptime() < 10 * 3600 ) { - logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but server uptime < 10 hours...", (int)(available / (1024ll * 1024ll)), - (int)(size / (1024 * 1024)) ); + if ( available > size ) + return true; // Yay + if ( !_isProxy || _autoFreeDiskSpaceDelay == -1 ) + return false; // If not in proxy mode at all, or explicitly disabled, never delete anything + if ( !force && dnbd3_serverUptime() < (uint32_t)_autoFreeDiskSpaceDelay ) { + logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but server uptime < %d minutes...", (int)(available / (1024ll * 1024ll)), + (int)(size / (1024 * 1024)), _autoFreeDiskSpaceDelay / 60 ); return false; } logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, freeing an image...", (int)(available / (1024ll * 1024ll)), (int)(size / (1024 * 1024)) ); // Find least recently used image dnbd3_image_t *oldest = NULL; - int i; // XXX improve locking + int i; + mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { - if ( _images[i] == NULL ) continue; - dnbd3_image_t *current = image_lock( _images[i] ); + dnbd3_image_t *current = _images[i]; if ( current == NULL ) continue; - if ( current->users == 1 ) { // Just from the lock above + if ( current->users == 0 ) { // Not in use :-) if ( oldest == NULL || timing_1le2( ¤t->atime, &oldest->atime ) ) { // Oldest access time so far oldest = current; } } - current = image_release( current ); + } + if ( oldest != NULL ) { + oldest->users++; + } + mutex_unlock( &imageListLock ); + if ( oldest == NULL ) { + logadd( LOG_INFO, "All images are currently in use :-(" ); + return false; } declare_now; - if ( oldest == NULL || ( !_sparseFiles && timing_diff( &oldest->atime, &now ) < 86400 ) ) { - if ( oldest == NULL ) { - logadd( LOG_INFO, "All images are currently in use :-(" ); - } else { - logadd( LOG_INFO, "Won't free any image, all have been in use in the past 24 hours :-(" ); - } + if ( !_sparseFiles && timing_diff( &oldest->atime, &now ) < 86400 ) { + logadd( LOG_INFO, "Won't free any image, all have been in use in the past 24 hours :-(" ); + image_release( oldest ); // We did users++ above; image might have to be freed entirely return false; } - oldest = image_lock( oldest ); - if ( oldest == NULL ) continue; // Image freed in the meantime? Try again logadd( LOG_INFO, "'%s:%d' has to go!", oldest->name, (int)oldest->rid ); - char *filename = strdup( oldest->path ); - oldest = image_remove( oldest ); - oldest = image_release( oldest ); + char *filename = strdup( oldest->path ); // Copy name as we remove the image first + oldest = image_remove( oldest ); // Remove from list first... + oldest = image_release( oldest ); // Decrease users counter; if it falls to 0, image will be freed + // Technically the image might have been grabbed again, but chances for + // this should be close to zero anyways since the image went unused for more than 24 hours.. + // Proper fix would be a "delete" flag in the image struct that will be checked in image_free unlink( filename ); size_t len = strlen( filename ) + 10; char buffer[len]; @@ -1735,62 +1774,52 @@ static bool image_ensureDiskSpace(uint64_t size, bool force) return false; } -void image_closeUnusedFd() +#define FDCOUNT (400) +static void* closeUnusedFds(void* nix UNUSED) { - int fd, i; + if ( !_closeUnusedFd ) + return NULL; ticks deadline; timing_gets( &deadline, -UNUSED_FD_TIMEOUT ); - char imgstr[300]; + int fds[FDCOUNT]; + int fdindex = 0; mutex_lock( &imageListLock ); - for (i = 0; i < _num_images; ++i) { + for ( int i = 0; i < _num_images; ++i ) { dnbd3_image_t * const image = _images[i]; - if ( image == NULL ) + if ( image == NULL || image->readFd == -1 ) continue; - mutex_lock( &image->lock ); - mutex_unlock( &imageListLock ); - if ( image->users == 0 && image->uplink == NULL && timing_reached( &image->atime, &deadline ) ) { - snprintf( imgstr, sizeof(imgstr), "%s:%d", image->name, (int)image->rid ); - fd = image->readFd; - image->readFd = -1; - } else { - fd = -1; - } - mutex_unlock( &image->lock ); - if ( fd != -1 ) { - close( fd ); - logadd( LOG_DEBUG1, "Inactive fd closed for %s", imgstr ); + // TODO: Also close for idle uplinks (uplink_connectionShouldShutdown) + // TODO: And close writeFd for idle uplinks.... + if ( image->users == 0 && image->uplinkref == NULL && timing_reached( &image->atime, &deadline ) ) { + logadd( LOG_DEBUG1, "Inactive fd closed for %s:%d", image->name, (int)image->rid ); + fds[fdindex++] = image->readFd; + image->readFd = -1; // Not a race; image->users is 0 and to increase it you need imageListLock + if ( fdindex == FDCOUNT ) + break; } - mutex_lock( &imageListLock ); } mutex_unlock( &imageListLock ); + // Do this after unlock since close might block + for ( int i = 0; i < fdindex; ++i ) { + close( fds[i] ); + } + return NULL; +} + +static void allocCacheMap(dnbd3_image_t *image, bool complete) +{ + const uint8_t val = complete ? 0xff : 0; + const int byteSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + dnbd3_cache_map_t *cache = malloc( sizeof(*cache) + byteSize ); + ref_init( &cache->reference, cmfree, 0 ); + memset( cache->map, val, byteSize ); + mutex_lock( &image->lock ); + if ( image->ref_cacheMap != NULL ) { + logadd( LOG_WARNING, "BUG: allocCacheMap called but there already is a cache map for %s:%d", image->name, (int)image->rid ); + free( cache ); + } else { + ref_setref( &image->ref_cacheMap, &cache->reference ); + } + mutex_unlock( &image->lock ); } -/* - void image_find_latest() - { - // Not in array or most recent rid is requested, try file system - if (revision != 0) { - // Easy case - specific RID - char - } else { - // Determine base directory where the image in question has to reside. - // Eg, the _basePath is "/srv/", requested image is "rz/ubuntu/default-13.04" - // Then searchPath has to be set to "/srv/rz/ubuntu" - char searchPath[strlen(_basePath) + len + 1]; - char *lastSlash = strrchr(name, '/'); - char *baseName; // Name of the image. In the example above, it will be "default-13.04" - if ( lastSlash == NULL ) { - *searchPath = '\0'; - baseName = name; - } else { - char *from = name, *to = searchPath; - while (from < lastSlash) *to++ = *from++; - *to = '\0'; - baseName = lastSlash + 1; - } - // Now we have the search path in our real file system and the expected image name. - // The revision naming sceme is <IMAGENAME>.r<RID>, so if we're looking for revision 13, - // our example image has to be named default-13.04.r13 - } - } - */ diff --git a/src/server/image.h b/src/server/image.h index 4668eff..449e31f 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -9,7 +9,7 @@ void image_serverStartup(); bool image_isComplete(dnbd3_image_t *image); -bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); +bool image_isHashBlockComplete(atomic_uint_least8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set); @@ -17,6 +17,8 @@ void image_markComplete(dnbd3_image_t *image); bool image_ensureOpen(dnbd3_image_t *image); +dnbd3_image_t* image_byId(int imgId); + dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking); bool image_reopenCacheFd(dnbd3_image_t *image, const bool force); diff --git a/src/server/ini.c b/src/server/ini.c index 216543b..c796d5c 100644 --- a/src/server/ini.c +++ b/src/server/ini.c @@ -110,7 +110,17 @@ int ini_parse_file(FILE* file, int (*handler)(void*, const char*, const char*, c #endif else if ( *start == '[' ) { /* A "[section]" line */ - end = find_char_or_comment( start + 1, ']' ); + int cnt = 0; + char *f = start, *sstart = start; + while ( *++f ) { + if ( *f == '[' ) cnt++; + if ( *f == ']' ) cnt--; + if ( cnt < 0 ) { + sstart = f - 1; + break; + } + } + end = find_char_or_comment( sstart + 1, ']' ); if ( *end == ']' ) { *end = '\0'; strncpy0( section, start + 1, sizeof(section) ); diff --git a/src/server/integrity.c b/src/server/integrity.c index 8f17855..1fbd9dc 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -4,6 +4,7 @@ #include "locks.h" #include "image.h" #include "uplink.h" +#include "reference.h" #include <assert.h> #include <sys/syscall.h> @@ -12,6 +13,8 @@ #include <sys/stat.h> #include <unistd.h> #include <fcntl.h> +#include <pthread.h> +#include <signal.h> #define CHECK_QUEUE_SIZE 200 @@ -29,9 +32,10 @@ static queue_entry checkQueue[CHECK_QUEUE_SIZE]; static pthread_mutex_t integrityQueueLock; static pthread_cond_t queueSignal; static int queueLen = -1; -static volatile bool bRunning = false; +static atomic_bool bRunning = false; static void* integrity_main(void *data); +static void flushFileRange(dnbd3_image_t *image, uint64_t start, uint64_t end); /** * Initialize the integrity check thread @@ -39,7 +43,7 @@ static void* integrity_main(void *data); void integrity_init() { assert( queueLen == -1 ); - mutex_init( &integrityQueueLock ); + mutex_init( &integrityQueueLock, LOCK_INTEGRITY_QUEUE ); pthread_cond_init( &queueSignal, NULL ); mutex_lock( &integrityQueueLock ); queueLen = 0; @@ -55,13 +59,14 @@ void integrity_init() void integrity_shutdown() { assert( queueLen != -1 ); + if ( !bRunning ) + return; logadd( LOG_DEBUG1, "Shutting down integrity checker...\n" ); + pthread_kill( thread, SIGINT ); mutex_lock( &integrityQueueLock ); pthread_cond_signal( &queueSignal ); mutex_unlock( &integrityQueueLock ); thread_join( thread, NULL ); - while ( bRunning ) - usleep( 10000 ); mutex_destroy( &integrityQueueLock ); pthread_cond_destroy( &queueSignal ); logadd( LOG_DEBUG1, "Integrity checker exited normally.\n" ); @@ -73,32 +78,42 @@ void integrity_shutdown() * make sure it is before calling, otherwise it will result in falsely * detected corruption. */ -void integrity_check(dnbd3_image_t *image, int block) +void integrity_check(dnbd3_image_t *image, int block, bool blocking) { + int freeSlot; if ( !bRunning ) { logadd( LOG_MINOR, "Ignoring check request; thread not running..." ); return; } - int i, freeSlot = -1; +start_over: + freeSlot = -1; mutex_lock( &integrityQueueLock ); - for (i = 0; i < queueLen; ++i) { + for (int i = 0; i < queueLen; ++i) { if ( freeSlot == -1 && checkQueue[i].image == NULL ) { freeSlot = i; - } else if ( checkQueue[i].image == image - && checkQueue[i].block <= block && checkQueue[i].block + checkQueue[i].count >= block ) { - // Already queued check dominates this one, or at least lies directly before this block - if ( checkQueue[i].block + checkQueue[i].count == block ) { - // It's directly before this one; expand range + } else if ( checkQueue[i].image == image && checkQueue[i].block <= block ) { + if ( checkQueue[i].count == CHECK_ALL ) { + logadd( LOG_DEBUG2, "Dominated by full image scan request (%d/%d) (at %d)", i, queueLen, checkQueue[i].block ); + } else if ( checkQueue[i].block + checkQueue[i].count == block ) { checkQueue[i].count += 1; + logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (at %d, %d to go)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); + } else if ( checkQueue[i].block + checkQueue[i].count > block ) { + logadd( LOG_DEBUG2, "Dominated by existing check request (%d/%d) (at %d, %d to go)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); + } else { + continue; } - logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (%d +%d)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); mutex_unlock( &integrityQueueLock ); return; } } if ( freeSlot == -1 ) { - if ( queueLen >= CHECK_QUEUE_SIZE ) { + if ( unlikely( queueLen >= CHECK_QUEUE_SIZE ) ) { mutex_unlock( &integrityQueueLock ); + if ( blocking ) { + logadd( LOG_INFO, "Check queue full, waiting a couple seconds...\n" ); + sleep( 3 ); + goto start_over; + } logadd( LOG_INFO, "Check queue full, discarding check request...\n" ); return; } @@ -119,11 +134,9 @@ void integrity_check(dnbd3_image_t *image, int block) static void* integrity_main(void * data UNUSED) { int i; - uint8_t *buffer = NULL; - size_t bufferSize = 0; setThreadName( "image-check" ); blockNoncriticalSignals(); -#if defined(linux) || defined(__linux) +#if defined(__linux__) // Setting nice of this thread - this is not POSIX conforming, so check if other platforms support this. // POSIX says that setpriority() should set the nice value of all threads belonging to the current process, // but on linux you can do this per thread. @@ -146,79 +159,70 @@ static void* integrity_main(void * data UNUSED) // We have the image. Call image_release() some time const int qCount = checkQueue[i].count; bool foundCorrupted = false; - mutex_lock( &image->lock ); if ( image->crc32 != NULL && image->realFilesize != 0 ) { int blocks[2] = { checkQueue[i].block, -1 }; mutex_unlock( &integrityQueueLock ); - // Make copy of crc32 list as it might go away const uint64_t fileSize = image->realFilesize; const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize); - const size_t required = numHashBlocks * sizeof(uint32_t); - if ( buffer == NULL || required > bufferSize ) { - bufferSize = required; - if ( buffer != NULL ) free( buffer ); - buffer = malloc( bufferSize ); - } - memcpy( buffer, image->crc32, required ); - mutex_unlock( &image->lock ); - // Open for direct I/O if possible; this prevents polluting the fs cache - int fd = open( image->path, O_RDONLY | O_DIRECT ); - bool direct = fd != -1; - if ( unlikely( !direct ) ) { - // Try unbuffered; flush to disk for that - logadd( LOG_DEBUG1, "O_DIRECT failed for %s", image->path ); - image_ensureOpen( image ); - fd = image->readFd; - } int checkCount = MIN( qCount, 5 ); - if ( fd != -1 ) { - while ( blocks[0] < numHashBlocks && !_shutdown ) { - const uint64_t start = blocks[0] * HASH_BLOCK_SIZE; - const uint64_t end = MIN( (uint64_t)(blocks[0] + 1) * HASH_BLOCK_SIZE, image->virtualFilesize ); - bool complete = true; - if ( qCount == CHECK_ALL ) { + int readFd = -1, directFd = -1; + while ( blocks[0] < numHashBlocks && !_shutdown ) { + const uint64_t start = blocks[0] * HASH_BLOCK_SIZE; + const uint64_t end = MIN( (uint64_t)(blocks[0] + 1) * HASH_BLOCK_SIZE, image->virtualFilesize ); + bool complete = true; + if ( qCount == CHECK_ALL ) { + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { // When checking full image, skip incomplete blocks, otherwise assume block is complete - mutex_lock( &image->lock ); - complete = image_isHashBlockComplete( image->cache_map, blocks[0], fileSize ); - mutex_unlock( &image->lock ); - } -#if defined(linux) || defined(__linux) - if ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) { -#else - if ( fsync( fd ) == -1 ) { -#endif - logadd( LOG_ERROR, "Cannot flush %s for integrity check", image->path ); - exit( 1 ); + complete = image_isHashBlockComplete( cache->map, blocks[0], fileSize ); + ref_put( &cache->reference ); } + } + // Flush to disk if there's an uplink, as that means the block might have been written recently + if ( image->uplinkref != NULL ) { + flushFileRange( image, start, end ); + } + if ( _shutdown ) + break; + // Open for direct I/O if possible; this prevents polluting the fs cache + if ( directFd == -1 && ( end % DNBD3_BLOCK_SIZE ) == 0 ) { // Use direct I/O only if read length is multiple of 4096 to be on the safe side - int tfd; - if ( direct && ( end % DNBD3_BLOCK_SIZE ) == 0 ) { - // Suitable for direct io - tfd = fd; - } else if ( !image_ensureOpen( image ) ) { - logadd( LOG_WARNING, "Cannot open %s for reading", image->path ); - break; + directFd = open( image->path, O_RDONLY | O_DIRECT ); + if ( directFd == -1 ) { + logadd( LOG_DEBUG2, "O_DIRECT failed for %s (errno=%d)", image->path, errno ); + directFd = -2; } else { - tfd = image->readFd; - // Evict from cache so we have to re-read, making sure data was properly stored - posix_fadvise( fd, start, end - start, POSIX_FADV_DONTNEED ); - } - if ( complete && !image_checkBlocksCrc32( tfd, (uint32_t*)buffer, blocks, fileSize ) ) { - logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name ); - image_updateCachemap( image, start, end, false ); - // If this is not a full check, queue one - if ( qCount != CHECK_ALL ) { - logadd( LOG_INFO, "Queueing full check for %s", image->name ); - integrity_check( image, -1 ); - } - foundCorrupted = true; + readFd = directFd; } - blocks[0]++; // Increase before break, so it always points to the next block to check after loop - if ( complete && --checkCount == 0 ) break; } - if ( direct ) { - close( fd ); + if ( readFd == -1 ) { // Try buffered; flush to disk for that + image_ensureOpen( image ); + readFd = image->readFd; } + if ( readFd == -1 ) { + logadd( LOG_MINOR, "Couldn't get any valid fd for integrity check of %s... ignoring...", image->path ); + } else if ( complete && !image_checkBlocksCrc32( readFd, image->crc32, blocks, fileSize ) ) { + bool iscomplete = true; + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + iscomplete = image_isHashBlockComplete( cache->map, blocks[0], fileSize ); + ref_put( &cache->reference ); + } + logadd( LOG_WARNING, "Hash check for block %d of %s failed (complete: was: %d, is: %d)", blocks[0], image->name, (int)complete, (int)iscomplete ); + image_updateCachemap( image, start, end, false ); + // If this is not a full check, queue one + if ( qCount != CHECK_ALL ) { + logadd( LOG_INFO, "Queueing full check for %s", image->name ); + integrity_check( image, -1, false ); + } + foundCorrupted = true; + } + blocks[0]++; // Increase before break, so it always points to the next block to check after loop + if ( complete && --checkCount == 0 ) + break; + } + if ( directFd != -1 && directFd != -2 ) { + close( directFd ); } mutex_lock( &integrityQueueLock ); assert( checkQueue[i].image == image ); @@ -229,46 +233,83 @@ static void* integrity_main(void * data UNUSED) logadd( LOG_WARNING, "BUG! checkQueue counter ran negative" ); } } - if ( checkCount > 0 || checkQueue[i].count <= 0 || fd == -1 ) { - // Done with this task as nothing left, OR we don't have an fd to read from - if ( fd == -1 ) { - logadd( LOG_WARNING, "Cannot hash check %s: bad fd", image->path ); - } + if ( checkCount > 0 || checkQueue[i].count <= 0 ) { + // Done with this task as nothing left checkQueue[i].image = NULL; if ( i + 1 == queueLen ) queueLen--; // Mark as working again if applicable if ( !foundCorrupted ) { - mutex_lock( &image->lock ); - if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper? - image->working = image->uplink->fd != -1 && image->readFd != -1; + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { // TODO: image_determineWorkingState() helper? + mutex_lock( &image->lock ); + image->working = uplink->current.fd != -1 && image->readFd != -1; + mutex_unlock( &image->lock ); + ref_put( &uplink->reference ); } - mutex_unlock( &image->lock ); } } else { // Still more blocks to go... checkQueue[i].block = blocks[0]; } - } else { - mutex_unlock( &image->lock ); } - if ( foundCorrupted ) { + if ( foundCorrupted && !_shutdown ) { // Something was fishy, make sure uplink exists mutex_lock( &image->lock ); image->working = false; - bool restart = image->uplink == NULL || image->uplink->shutdown; mutex_unlock( &image->lock ); - if ( restart ) { - uplink_shutdown( image ); - uplink_init( image, -1, NULL, -1 ); - } + uplink_init( image, -1, NULL, -1 ); } // Release :-) image_release( image ); } } mutex_unlock( &integrityQueueLock ); - if ( buffer != NULL ) free( buffer ); bRunning = false; return NULL; } +static void flushFileRange(dnbd3_image_t *image, uint64_t start, uint64_t end) +{ + int flushFd; + int writableFd = -1; + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { // Try to steal uplink's writable fd + if ( uplink->cacheFd != -1 ) { + writableFd = dup( uplink->cacheFd ); + } + ref_put( &uplink->reference ); + } + if ( writableFd == -1 ) { // Open file as writable + writableFd = open( image->path, O_WRONLY ); + } + if ( writableFd == -1 ) { // Fallback to readFd (should work on Linux and BSD...) + logadd( LOG_WARNING, "flushFileRange: Cannot open %s for writing. Trying readFd.", image->path ); + image_ensureOpen( image ); + flushFd = image->readFd; + } else { + flushFd = writableFd; + } + if ( flushFd == -1 ) + return; +#if defined(__linux__) + while ( sync_file_range( flushFd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) +#else + while ( fsync( flushFd ) == -1 ) // TODO: fdatasync() should be available since FreeBSD 12.0 ... Might be a tad bit faster +#endif + { + if ( _shutdown ) + break; + int e = errno; + if ( e == EINTR ) + continue; + logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, e ); + if ( e == EIO ) { + exit( 1 ); + } + } + // Evict from cache too so we have to re-read, making sure data was properly stored + posix_fadvise( flushFd, start, end - start, POSIX_FADV_DONTNEED ); + if ( writableFd != -1 ) { + close( writableFd ); + } +} diff --git a/src/server/integrity.h b/src/server/integrity.h index c3c2b44..09d3785 100644 --- a/src/server/integrity.h +++ b/src/server/integrity.h @@ -7,6 +7,6 @@ void integrity_init(); void integrity_shutdown(); -void integrity_check(dnbd3_image_t *image, int block); +void integrity_check(dnbd3_image_t *image, int block, bool blocking); #endif /* INTEGRITY_H_ */ diff --git a/src/server/locks.c b/src/server/locks.c index a5b7c76..b39576b 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -12,47 +12,45 @@ #ifdef _DEBUG #define MAXLOCKS (SERVER_MAX_CLIENTS * 2 + SERVER_MAX_ALTS + 200 + SERVER_MAX_IMAGES) #define MAXTHREADS (SERVER_MAX_CLIENTS + 100) +#define MAXLPT 20 #define LOCKLEN 60 typedef struct { - void *lock; + void * _Atomic lock; ticks locktime; - char locked; - pthread_t thread; + bool _Atomic locked; + pthread_t _Atomic thread; int lockId; + int prio; char name[LOCKLEN]; char where[LOCKLEN]; } debug_lock_t; typedef struct { - pthread_t tid; + pthread_t _Atomic tid; ticks time; char name[LOCKLEN]; char where[LOCKLEN]; - + debug_lock_t *locks[MAXLPT]; } debug_thread_t; int debugThreadCount = 0; static debug_lock_t locks[MAXLOCKS]; static debug_thread_t threads[MAXTHREADS]; -static int init_done = 0; -static pthread_mutex_t initdestory; +static pthread_mutex_t initdestory = PTHREAD_MUTEX_INITIALIZER; static int lockId = 0; -static pthread_t watchdog = 0; -static dnbd3_signal_t* watchdogSignal = NULL; -static void *debug_thread_watchdog(void *something); +#define ULDE(...) do { \ + pthread_mutex_unlock( &initdestory ); \ + logadd( LOG_ERROR, __VA_ARGS__ ); \ + debug_dump_lock_stats(); \ + exit( 4 ); \ +} while(0) -int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock) +int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock, int priority) { - if ( !init_done ) { - memset( locks, 0, MAXLOCKS * sizeof(debug_lock_t) ); - memset( threads, 0, MAXTHREADS * sizeof(debug_thread_t) ); - pthread_mutex_init( &initdestory, NULL ); - init_done = 1; - } int first = -1; pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXLOCKS; ++i) { @@ -63,20 +61,18 @@ int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex if ( first == -1 && locks[i].lock == NULL ) first = i; } if ( first == -1 ) { - logadd( LOG_ERROR, "No more free debug locks (%s:%d)\n", file, line ); - pthread_mutex_unlock( &initdestory ); - debug_dump_lock_stats(); - exit( 4 ); + ULDE( "No more free debug locks (%s:%d)\n", file, line ); } locks[first].lock = (void*)lock; - locks[first].locked = 0; + locks[first].locked = false; + locks[first].prio = priority; snprintf( locks[first].name, LOCKLEN, "%s", name ); snprintf( locks[first].where, LOCKLEN, "I %s:%d", file, line ); pthread_mutex_unlock( &initdestory ); return pthread_mutex_init( lock, NULL ); } -int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock) +int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock, bool try) { debug_lock_t *l = NULL; pthread_mutex_lock( &initdestory ); @@ -86,163 +82,180 @@ int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex break; } } - pthread_mutex_unlock( &initdestory ); if ( l == NULL ) { - logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); - debug_dump_lock_stats(); - exit( 4 ); + ULDE( "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); } debug_thread_t *t = NULL; - pthread_mutex_lock( &initdestory ); + int first = -1; + const pthread_t self = pthread_self(); for (int i = 0; i < MAXTHREADS; ++i) { - if ( threads[i].tid != 0 ) continue; - threads[i].tid = pthread_self(); - timing_get( &threads[i].time ); - snprintf( threads[i].name, LOCKLEN, "%s", name ); - snprintf( threads[i].where, LOCKLEN, "%s:%d", file, line ); - t = &threads[i]; - break; - } - pthread_mutex_unlock( &initdestory ); - if ( t == NULL ) { - logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); - } - const int retval = pthread_mutex_lock( lock ); - pthread_mutex_lock( &initdestory ); - t->tid = 0; - pthread_mutex_unlock( &initdestory ); - if ( l->locked ) { - logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); - } - l->locked = 1; - timing_get( &l->locktime ); - l->thread = pthread_self(); - snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); - pthread_mutex_lock( &initdestory ); - l->lockId = ++lockId; - pthread_mutex_unlock( &initdestory ); - return retval; -} - -int debug_mutex_trylock(const char *name, const char *file, int line, pthread_mutex_t *lock) -{ - debug_lock_t *l = NULL; - pthread_mutex_lock( &initdestory ); - for (int i = 0; i < MAXLOCKS; ++i) { - if ( locks[i].lock == lock ) { - l = &locks[i]; + if ( threads[i].tid == self ) { + t = &threads[i]; break; } + if ( first == -1 && threads[i].tid == 0 ) { + first = i; + } } - pthread_mutex_unlock( &initdestory ); - if ( l == NULL ) { - logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); - debug_dump_lock_stats(); - exit( 4 ); - } - debug_thread_t *t = NULL; - pthread_mutex_lock( &initdestory ); - for (int i = 0; i < MAXTHREADS; ++i) { - if ( threads[i].tid != 0 ) continue; - threads[i].tid = pthread_self(); - timing_get( &threads[i].time ); - snprintf( threads[i].name, LOCKLEN, "%s", name ); - snprintf( threads[i].where, LOCKLEN, "%s:%d", file, line ); - t = &threads[i]; - break; - } - pthread_mutex_unlock( &initdestory ); + int idx; if ( t == NULL ) { - logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for %p (%s) at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); + if ( first == -1 ) { + ULDE( "Lock sanity check: Too many waiting threads for lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + } + t = &threads[first]; + timing_get( &t->time ); + t->tid = self; + snprintf( t->name, LOCKLEN, "%s", name ); + snprintf( t->where, LOCKLEN, "%s:%d", file, line ); + memset( t->locks, 0, sizeof(t->locks) ); + idx = 0; + } else { + // Thread already has locks, check for order violation + idx = -1; + for (int i = 0; i < MAXLPT; ++i) { + if ( t->locks[i] == NULL ) { + if ( idx == -1 ) { + idx = i; + } + continue; + } + if ( t->locks[i]->prio >= l->prio ) { + ULDE( "Lock priority violation: %s at %s:%d (%d) when already holding %s at %s (%d)", + name, file, line, l->prio, + t->locks[i]->name, t->locks[i]->where, t->locks[i]->prio ); + } + if ( t->locks[i] == l ) { + ULDE( "Tried to recusively lock %s in the same thread. Tried at %s:%d, when already locked at %s", + name, file, line, t->locks[i]->name ); + } + } + if ( idx == -1 ) { + ULDE( "Thread %d tried to lock more than %d locks.", (int)self, (int)MAXLPT ); + } } - const int retval = pthread_mutex_trylock( lock ); - pthread_mutex_lock( &initdestory ); - t->tid = 0; pthread_mutex_unlock( &initdestory ); + const int retval = try ? pthread_mutex_trylock( lock ) : pthread_mutex_lock( lock ); if ( retval == 0 ) { + timing_get( &l->locktime ); + l->thread = self; + snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); + pthread_mutex_lock( &initdestory ); if ( l->locked ) { logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line ); exit( 4 ); } - l->locked = 1; - timing_get( &l->locktime ); - l->thread = pthread_self(); - snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); - pthread_mutex_lock( &initdestory ); + l->locked = true; + t->locks[idx] = l; l->lockId = ++lockId; pthread_mutex_unlock( &initdestory ); + } else if ( !try || retval != EBUSY ) { + logadd( LOG_ERROR, "Acquiring lock %s at %s:%d failed with error code %d", name, file, line, retval ); + debug_dump_lock_stats(); + exit( 4 ); } return retval; } int debug_mutex_unlock(const char *name, const char *file, int line, pthread_mutex_t *lock) { - debug_lock_t *l = NULL; + debug_thread_t *t = NULL; + pthread_t self = pthread_self(); pthread_mutex_lock( &initdestory ); - for (int i = 0; i < MAXLOCKS; ++i) { - if ( locks[i].lock == lock ) { - l = &locks[i]; + for (int i = 0; i < MAXTHREADS; ++i) { + if ( threads[i].tid == self ) { + t = &threads[i]; break; } } - pthread_mutex_unlock( &initdestory ); - if ( l == NULL ) { - logadd( LOG_ERROR, "Tried to unlock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); + if ( t == NULL ) { + ULDE( "Unlock called from unknown thread for %s at %s:%d", name, file, line ); } - if ( !l->locked ) { - logadd( LOG_ERROR, "Unlock sanity check: lock %p (%s) not locked at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); + int idx = -1; + int cnt = 0; + for (int i = 0; i < MAXLPT; ++i) { + if ( t->locks[i] == NULL ) + continue; + cnt++; + if ( t->locks[i]->lock == lock ) { + idx = i; + } + } + if ( idx == -1 ) { + ULDE( "Unlock: Calling thread doesn't hold lock %s at %s:%d", name, file, line ); } - l->locked = 0; + debug_lock_t *l = t->locks[idx]; + if ( l->thread != self || !l->locked ) { + ULDE( "Unlock sanity check for lock debugger failed! Lock %s is assigned to calling thread, but lock's meta data doesn't match up at %s:%d", name, file, line ); + } + l->locked = false; l->thread = 0; + t->locks[idx] = NULL; + if ( cnt == 1 ) { + t->tid = 0; // No more locks held, free up slot + } snprintf( l->where, LOCKLEN, "U %s:%d", file, line ); - int retval = pthread_mutex_unlock( lock ); + pthread_mutex_unlock( &initdestory ); + const int retval = pthread_mutex_unlock( lock ); + if ( retval != 0 ) { + logadd( LOG_ERROR, "pthread_mutex_unlock returned %d for %s at %s:%d", retval, name, file, line ); + exit( 4 ); + } return retval; } int debug_mutex_cond_wait(const char *name, const char *file, int line, pthread_cond_t *restrict cond, pthread_mutex_t *restrict lock) { debug_lock_t *l = NULL; + debug_thread_t *t = NULL; + pthread_t self = pthread_self(); pthread_mutex_lock( &initdestory ); - for (int i = 0; i < MAXLOCKS; ++i) { - if ( locks[i].lock == lock ) { - l = &locks[i]; + for (int i = 0; i < MAXTHREADS; ++i) { + if ( threads[i].tid == self ) { + t = &threads[i]; break; } } - pthread_mutex_unlock( &initdestory ); + if ( t == NULL ) { + ULDE( "Unlock called from unknown thread for %s at %s:%d", name, file, line ); + } + int mp = 0, mpi = -1; + for (int i = 0; i < MAXLPT; ++i) { + if ( t->locks[i] == NULL ) + continue; + if ( t->locks[i]->lock == lock ) { + l = t->locks[i]; + } else if ( t->locks[i]->prio > mp ) { + mp = t->locks[i]->prio; + mpi = i; + } + } if ( l == NULL ) { - logadd( LOG_ERROR, "Tried to cond_wait on uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); + ULDE( "cond_wait: Calling thread doesn't hold lock %s at %s:%d", name, file, line ); } - if ( !l->locked ) { - logadd( LOG_ERROR, "Cond_wait sanity check: lock %p (%s) not locked at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); + if ( l->thread != self || !l->locked ) { + ULDE( "cond_wait: Sanity check for lock debugger failed! Lock %s is assigned to calling thread, but lock's meta data doesn't match up at %s:%d", name, file, line ); } - pthread_t self = pthread_self(); - if ( l->thread != self ) { - logadd( LOG_ERROR, "Cond_wait called from non-owning thread for %p (%s) at %s:%d\n", (void*)lock, name, file, line ); - exit( 4 ); + if ( mp >= l->prio ) { + ULDE( "cond_wait: Yielding a mutex while holding another one with higher prio: %s at %s:%d (%d) while also holding %s at %s (%d)", + name, file, line, l->prio, + t->locks[mpi]->name, t->locks[mpi]->where, mp ); } - l->locked = 0; + l->locked = false; l->thread = 0; - snprintf( l->where, LOCKLEN, "CW %s:%d", file, line ); + snprintf( l->where, LOCKLEN, "CWU %s:%d", file, line ); + pthread_mutex_unlock( &initdestory ); int retval = pthread_cond_wait( cond, lock ); if ( retval != 0 ) { logadd( LOG_ERROR, "pthread_cond_wait returned %d for lock %p (%s) at %s:%d\n", retval, (void*)lock, name, file, line ); exit( 4 ); } - if ( l->locked != 0 || l->thread != 0 ) { + if ( l->locked || l->thread != 0 ) { logadd( LOG_ERROR, "Lock is not free after returning from pthread_cond_wait for %p (%s) at %s:%d\n", (void*)lock, name, file, line ); exit( 4 ); } - l->locked = 1; l->thread = self; timing_get( &l->locktime ); + l->locked = true; pthread_mutex_lock( &initdestory ); l->lockId = ++lockId; pthread_mutex_unlock( &initdestory ); @@ -256,6 +269,7 @@ int debug_mutex_destroy(const char *name, const char *file, int line, pthread_mu if ( locks[i].lock == lock ) { if ( locks[i].locked ) { logadd( LOG_ERROR, "Tried to destroy lock %p (%s) at %s:%d when it is still locked\n", (void*)lock, name, file, line ); + logadd( LOG_ERROR, "Currently locked by: %s", locks[i].where ); exit( 4 ); } locks[i].lock = NULL; @@ -289,63 +303,21 @@ void debug_dump_lock_stats() "* Locked: %d\n", locks[i].name, locks[i].where, (int)locks[i].locked ); } } - printf( "\n **** WAITING THREADS ****\n\n" ); + printf( "\n **** ACTIVE THREADS ****\n\n" ); for (int i = 0; i < MAXTHREADS; ++i) { - if ( threads[i].tid == 0 ) continue; + if ( threads[i].tid == 0 ) + continue; printf( "* *** Thread %d ***\n" "* Lock: %s\n" "* Where: %s\n" "* How long: %d secs\n", (int)threads[i].tid, threads[i].name, threads[i].where, (int)timing_diff( &threads[i].time, &now ) ); - } - pthread_mutex_unlock( &initdestory ); -} - -static void *debug_thread_watchdog(void *something UNUSED) -{ - setThreadName( "debug-watchdog" ); - while ( !_shutdown ) { - if ( init_done ) { - declare_now; - pthread_mutex_lock( &initdestory ); - for (int i = 0; i < MAXTHREADS; ++i) { - if ( threads[i].tid == 0 ) continue; - const uint32_t diff = timing_diff( &threads[i].time, &now ); - if ( diff > 6 && diff < 100000 ) { - printf( "\n\n +++++++++ DEADLOCK ++++++++++++\n\n" ); - pthread_mutex_unlock( &initdestory ); - debug_dump_lock_stats(); - exit( 99 ); - } - } - pthread_mutex_unlock( &initdestory ); + for (int j = 0; j < MAXLPT; ++j) { + if ( threads[i].locks[j] == NULL ) + continue; + printf( " * Lock %s @ %s\n", threads[i].locks[j]->name, threads[i].locks[j]->where ); } - if ( watchdogSignal == NULL || signal_wait( watchdogSignal, 5000 ) == SIGNAL_ERROR ) sleep( 5 ); } - return NULL ; -} - -#endif - -void debug_locks_start_watchdog() -{ -#ifdef _DEBUG - watchdogSignal = signal_new(); - if ( 0 != thread_create( &watchdog, NULL, &debug_thread_watchdog, (void *)NULL ) ) { - logadd( LOG_ERROR, "Could not start debug-lock watchdog." ); - return; - } -#endif + pthread_mutex_unlock( &initdestory ); } -void debug_locks_stop_watchdog() -{ -#ifdef _DEBUG - _shutdown = true; - printf( "Killing debug watchdog...\n" ); - pthread_mutex_lock( &initdestory ); - signal_call( watchdogSignal ); - pthread_mutex_unlock( &initdestory ); - thread_join( watchdog, NULL ); - signal_close( watchdogSignal ); #endif -} diff --git a/src/server/locks.h b/src/server/locks.h index 7f72722..e5c9801 100644 --- a/src/server/locks.h +++ b/src/server/locks.h @@ -5,19 +5,38 @@ #include <errno.h> #include <stdio.h> #include <stdlib.h> +#include <stdbool.h> + +// Lock priority + +#define LOCK_RELOAD 90 +#define LOCK_LOAD_CONFIG 100 +#define LOCK_REMOTE_CLONE 110 +#define LOCK_CLIENT_LIST 120 +#define LOCK_CLIENT 130 +#define LOCK_INTEGRITY_QUEUE 140 +#define LOCK_IMAGE_LIST 150 +#define LOCK_IMAGE 160 +#define LOCK_UPLINK_QUEUE 170 +#define LOCK_ALT_SERVER_LIST 180 +#define LOCK_CLIENT_SEND 190 +#define LOCK_UPLINK_RTT 200 +#define LOCK_UPLINK_SEND 210 +#define LOCK_RPC_ACL 220 + +// #ifdef _DEBUG -#define mutex_init( lock ) debug_mutex_init( #lock, __FILE__, __LINE__, lock) -#define mutex_lock( lock ) debug_mutex_lock( #lock, __FILE__, __LINE__, lock) -#define mutex_trylock( lock ) debug_mutex_trylock( #lock, __FILE__, __LINE__, lock) +#define mutex_init( lock, prio ) debug_mutex_init( #lock, __FILE__, __LINE__, lock, prio) +#define mutex_lock( lock ) debug_mutex_lock( #lock, __FILE__, __LINE__, lock, false) +#define mutex_trylock( lock ) debug_mutex_lock( #lock, __FILE__, __LINE__, lock, true) #define mutex_unlock( lock ) debug_mutex_unlock( #lock, __FILE__, __LINE__, lock) #define mutex_cond_wait( cond, lock ) debug_mutex_cond_wait( #lock, __FILE__, __LINE__, cond, lock) #define mutex_destroy( lock ) debug_mutex_destroy( #lock, __FILE__, __LINE__, lock) -int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock); -int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock); -int debug_mutex_trylock(const char *name, const char *file, int line, pthread_mutex_t *lock); +int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock, int priority); +int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock, bool try); int debug_mutex_unlock(const char *name, const char *file, int line, pthread_mutex_t *lock); int debug_mutex_cond_wait(const char *name, const char *file, int line, pthread_cond_t *restrict cond, pthread_mutex_t *restrict lock); int debug_mutex_destroy(const char *name, const char *file, int line, pthread_mutex_t *lock); @@ -27,7 +46,7 @@ void debug_dump_lock_stats(); #else -#define mutex_init( lock ) pthread_mutex_init(lock, NULL) +#define mutex_init( lock, prio ) pthread_mutex_init(lock, NULL) #define mutex_lock( lock ) pthread_mutex_lock(lock) #define mutex_trylock( lock ) pthread_mutex_trylock(lock) #define mutex_unlock( lock ) pthread_mutex_unlock(lock) @@ -82,7 +101,4 @@ static inline int debug_thread_join(pthread_t thread, void **value_ptr) #endif -void debug_locks_start_watchdog(); -void debug_locks_stop_watchdog(); - #endif /* LOCKS_H_ */ diff --git a/src/server/net.c b/src/server/net.c index 9abe221..aba4e7d 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -24,6 +24,7 @@ #include "locks.h" #include "rpc.h" #include "altservers.h" +#include "reference.h" #include "../shared/sockhelper.h" #include "../shared/timing.h" @@ -43,6 +44,7 @@ #include <jansson.h> #include <inttypes.h> #include <stdatomic.h> +#include <signal.h> static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS]; static int _num_clients = 0; @@ -145,13 +147,14 @@ static inline bool sendPadding( const int fd, uint32_t bytes ) void net_init() { - mutex_init( &_clients_lock ); + mutex_init( &_clients_lock, LOCK_CLIENT_LIST ); } void* net_handleNewConnection(void *clientPtr) { dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr; dnbd3_request_t request; + client->thread = pthread_self(); // Await data from client. Since this is a fresh connection, we expect data right away sock_setTimeout( client->sock, _clientTimeout ); @@ -186,8 +189,8 @@ void* net_handleNewConnection(void *clientPtr) } } while (0); // Fully init client struct - mutex_init( &client->lock ); - mutex_init( &client->sendMutex ); + mutex_init( &client->lock, LOCK_CLIENT ); + mutex_init( &client->sendMutex, LOCK_CLIENT_SEND ); mutex_lock( &client->lock ); host_to_string( &client->host, client->hostName, HOSTNAMELEN ); @@ -229,7 +232,7 @@ void* net_handleNewConnection(void *clientPtr) rid = serializer_get_uint16( &payload ); const uint8_t flags = serializer_get_uint8( &payload ); client->isServer = ( flags & FLAGS8_SERVER ); - if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( unlikely( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) ) { if ( client_version < MIN_SUPPORTED_CLIENT ) { logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); } else { @@ -243,7 +246,7 @@ void* net_handleNewConnection(void *clientPtr) // We're a proxy, client is another proxy, we don't do BGR, but connecting proxy does... // Reject, as this would basically force this proxy to do BGR too. image = image_get( image_name, rid, true ); - if ( image != NULL && image->cache_map != NULL ) { + if ( image != NULL && image->ref_cacheMap != NULL ) { // Only exception is if the image is complete locally image = image_release( image ); } @@ -255,28 +258,27 @@ void* net_handleNewConnection(void *clientPtr) // No BGR mismatch, but don't lookup if image is unknown locally image = image_get( image_name, rid, true ); } - mutex_lock( &client->lock ); client->image = image; - mutex_unlock( &client->lock ); - if ( image == NULL ) { + atomic_thread_fence( memory_order_release ); + if ( unlikely( image == NULL ) ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( !image->working ) { + } else if ( unlikely( !image->working ) ) { logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", client->hostName, image_name, (int)rid ); } else { - bool penalty; // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; - if ( image->cache_map != NULL ) { - mutex_lock( &image->lock ); - if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + if ( image->ref_cacheMap != NULL ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) { bOk = ( rand() % 4 ) == 1; } - penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1; - mutex_unlock( &image->lock ); - if ( penalty ) { // Wait 100ms if local caching is not working so this + if ( bOk && uplink != NULL && uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this usleep( 100000 ); // server gets a penalty and is less likely to be selected } + if ( uplink != NULL ) { + ref_put( &uplink->reference ); + } } if ( bOk ) { mutex_lock( &image->lock ); @@ -301,7 +303,7 @@ void* net_handleNewConnection(void *clientPtr) } } - if ( bOk ) { + if ( likely( bOk ) ) { // add artificial delay if applicable if ( client->isServer && _serverPenalty != 0 ) { usleep( _serverPenalty ); @@ -315,7 +317,8 @@ void* net_handleNewConnection(void *clientPtr) case CMD_GET_BLOCK:; const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking - if ( offset >= image->virtualFilesize ) { + reply.handle = request.handle; + if ( unlikely( offset >= image->virtualFilesize ) ) { // Sanity check logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName ); reply.size = 0; @@ -323,7 +326,7 @@ void* net_handleNewConnection(void *clientPtr) send_reply( client->sock, &reply, NULL ); break; } - if ( offset + request.size > image->virtualFilesize ) { + if ( unlikely( offset + request.size > image->virtualFilesize ) ) { // Sanity check logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName ); reply.size = 0; @@ -332,57 +335,52 @@ void* net_handleNewConnection(void *clientPtr) break; } - if ( request.size != 0 && image->cache_map != NULL ) { + dnbd3_cache_map_t *cache; + if ( request.size != 0 && ( cache = ref_get_cachemap( image ) ) != NULL ) { // This is a proxyed image, check if we need to relay the request... start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); bool isCached = true; - mutex_lock( &image->lock ); - // Check again as we only aquired the lock just now - if ( image->cache_map != NULL ) { - const uint64_t firstByteInMap = start >> 15; - const uint64_t lastByteInMap = (end - 1) >> 15; - uint64_t pos; - // Middle - quick checking - if ( isCached ) { - pos = firstByteInMap + 1; - while ( pos < lastByteInMap ) { - if ( image->cache_map[pos] != 0xff ) { - isCached = false; - break; - } - ++pos; + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + uint64_t pos; + uint8_t b; + atomic_thread_fence( memory_order_acquire ); + // Middle - quick checking + if ( isCached ) { + for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { + if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) { + isCached = false; + break; } } - // First byte - if ( isCached ) { - pos = start; - do { - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) { - isCached = false; - break; - } - pos += DNBD3_BLOCK_SIZE; - } while ( firstByteInMap == (pos >> 15) && pos < end ); + } + // First byte + if ( isCached ) { + b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed ); + for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) { + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + if ( (b & bit_mask) == 0 ) { + isCached = false; + break; + } } - // Last byte - only check if request spans multiple bytes in cache map - if ( isCached && firstByteInMap != lastByteInMap ) { - pos = lastByteInMap << 15; - while ( pos < end ) { - assert( lastByteInMap == (pos >> 15) ); - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) { - isCached = false; - break; - } - pos += DNBD3_BLOCK_SIZE; + } + // Last byte - only check if request spans multiple bytes in cache map + if ( isCached && firstByteInMap != lastByteInMap ) { + b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed ); + for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) { + assert( lastByteInMap == (pos >> 15) ); + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + if ( (b & bit_mask) == 0 ) { + isCached = false; + break; } } } - mutex_unlock( &image->lock ); + ref_put( &cache->reference ); if ( !isCached ) { if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", @@ -396,10 +394,9 @@ void* net_handleNewConnection(void *clientPtr) reply.cmd = CMD_GET_BLOCK; reply.size = request.size; - reply.handle = request.handle; fixup_reply( reply ); - const bool lock = image->uplink != NULL; + const bool lock = image->uplinkref != NULL; if ( lock ) mutex_lock( &client->sendMutex ); // Send reply header if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { @@ -480,7 +477,7 @@ void* net_handleNewConnection(void *clientPtr) case CMD_GET_SERVERS: // Build list of known working alt servers - num = altservers_getListForClient( &client->host, server_list, NUMBER_SERVERS ); + num = altservers_getListForClient( client, server_list, NUMBER_SERVERS ); reply.cmd = CMD_GET_SERVERS; reply.size = (uint32_t)( num * sizeof(dnbd3_server_entry_t) ); mutex_lock( &client->sendMutex ); @@ -533,16 +530,15 @@ exit_client_cleanup: ; removeFromList( client ); totalBytesSent += client->bytesSent; // Access time, but only if client didn't just probe - if ( image != NULL ) { + if ( image != NULL && client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { mutex_lock( &image->lock ); - if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { - timing_get( &image->atime ); - } + timing_get( &image->atime ); mutex_unlock( &image->lock ); } freeClientStruct( client ); // This will also call image_release on client->image return NULL ; fail_preadd: ; + // This is before we even initialized any mutex close( client->sock ); free( client ); return NULL; @@ -609,6 +605,12 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) } bs += client->bytesSent; } + // Do this before unlocking the list, otherwise we might + // account for a client twice if it would disconnect after + // unlocking but before we add the count here. + if ( bytesSent != NULL ) { + *bytesSent = totalBytesSent + bs; + } mutex_unlock( &_clients_lock ); if ( clientCount != NULL ) { *clientCount = cc; @@ -616,9 +618,6 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) if ( serverCount != NULL ) { *serverCount = sc; } - if ( bytesSent != NULL ) { - *bytesSent = totalBytesSent + bs; - } } void net_disconnectAll() @@ -626,11 +625,10 @@ void net_disconnectAll() int i; mutex_lock( &_clients_lock ); for (i = 0; i < _num_clients; ++i) { - if ( _clients[i] == NULL ) continue; - dnbd3_client_t * const client = _clients[i]; - mutex_lock( &client->lock ); - if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR ); - mutex_unlock( &client->lock ); + if ( _clients[i] == NULL ) + continue; + shutdown( _clients[i]->sock, SHUT_RDWR ); + pthread_kill( _clients[i]->thread, SIGINT ); } mutex_unlock( &_clients_lock ); } @@ -668,11 +666,19 @@ static void removeFromList(dnbd3_client_t *client) { int i; mutex_lock( &_clients_lock ); - for ( i = _num_clients - 1; i >= 0; --i ) { - if ( _clients[i] == client ) { - _clients[i] = NULL; + if ( _num_clients != 0 ) { + for ( i = _num_clients - 1; i >= 0; --i ) { + if ( _clients[i] == client ) { + _clients[i] = NULL; + break; + } + } + if ( i != 0 && i + 1 == _num_clients ) { + do { + i--; + } while ( _clients[i] == NULL && i > 0 ); + _num_clients = i + 1; } - if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; } mutex_unlock( &_clients_lock ); } @@ -686,17 +692,21 @@ static void removeFromList(dnbd3_client_t *client) static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) { mutex_lock( &client->lock ); + if ( client->image != NULL ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink != NULL ) { + uplink_removeClient( uplink, client ); + ref_put( &uplink->reference ); + } + } mutex_lock( &client->sendMutex ); - if ( client->sock != -1 ) close( client->sock ); + if ( client->sock != -1 ) { + close( client->sock ); + } client->sock = -1; mutex_unlock( &client->sendMutex ); - if ( client->image != NULL ) { - mutex_lock( &client->image->lock ); - if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); - mutex_unlock( &client->image->lock ); - client->image = image_release( client->image ); - } mutex_unlock( &client->lock ); + client->image = image_release( client->image ); mutex_destroy( &client->lock ); mutex_destroy( &client->sendMutex ); free( client ); @@ -729,3 +739,15 @@ static bool addToList(dnbd3_client_t *client) return true; } +void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle) +{ + dnbd3_reply_t reply; + reply.magic = dnbd3_packet_magic; + reply.cmd = cmd; + reply.handle = handle; + reply.size = 0; + mutex_lock( &client->sendMutex ); + send_reply( client->sock, &reply, NULL ); + mutex_unlock( &client->sendMutex ); +} + diff --git a/src/server/net.h b/src/server/net.h index 6813b49..7719aef 100644 --- a/src/server/net.h +++ b/src/server/net.h @@ -37,4 +37,6 @@ void net_disconnectAll(); void net_waitForAllDisconnected(); +void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle); + #endif /* NET_H_ */ diff --git a/src/server/picohttpparser/picohttpparser.c b/src/server/picohttpparser/picohttpparser.c index cfa05ef..f077016 100644 --- a/src/server/picohttpparser/picohttpparser.c +++ b/src/server/picohttpparser/picohttpparser.c @@ -36,8 +36,6 @@ #endif #include "picohttpparser.h" -/* $Id$ */ - #if __GNUC__ >= 3 #define likely(x) __builtin_expect(!!(x), 1) #define unlikely(x) __builtin_expect(!!(x), 0) @@ -73,9 +71,9 @@ #define ADVANCE_TOKEN(tok, toklen) \ do { \ const char *tok_start = buf; \ - static const char ALIGNED(16) ranges2[] = "\000\040\177\177"; \ + static const char ALIGNED(16) ranges2[16] = "\000\040\177\177"; \ int found2; \ - buf = findchar_fast(buf, buf_end, ranges2, sizeof(ranges2) - 1, &found2); \ + buf = findchar_fast(buf, buf_end, ranges2, 4, &found2); \ if (!found2) { \ CHECK_EOF(); \ } \ @@ -138,15 +136,11 @@ static const char *get_token_to_eol(const char *buf, const char *buf_end, struct const char *token_start = buf; #ifdef __SSE4_2__ - static const char ranges1[] = "\0\010" - /* allow HT */ - "\012\037" - /* allow SP and up to but not including DEL */ - "\177\177" - /* allow chars w. MSB set */ - ; + static const char ALIGNED(16) ranges1[16] = "\0\010" /* allow HT */ + "\012\037" /* allow SP and up to but not including DEL */ + "\177\177"; /* allow chars w. MSB set */ int found; - buf = findchar_fast(buf, buf_end, ranges1, sizeof(ranges1) - 1, &found); + buf = findchar_fast(buf, buf_end, ranges1, 6, &found); if (found) goto FOUND_CTL; #else @@ -325,9 +319,21 @@ static const char *parse_headers(const char *buf, const char *buf_end, struct ph headers[*num_headers].name.s = NULL; headers[*num_headers].name.l = 0; } - if ((buf = get_token_to_eol(buf, buf_end, &headers[*num_headers].value, ret)) == NULL) { + struct string value; + // DELETE + if ((buf = get_token_to_eol(buf, buf_end, &value, ret)) == NULL) { return NULL; } + /* remove trailing SPs and HTABs */ + const char *value_end = value.s + value.l; + for (; value_end != value.s; --value_end) { + const char c = *(value_end - 1); + if (!(c == ' ' || c == '\t')) { + break; + } + } + headers[*num_headers].value.s = value.s; + headers[*num_headers].value.l = value_end - value.s; } return buf; } @@ -347,9 +353,17 @@ static const char *parse_request(const char *buf, const char *buf_end, struct st /* parse request line */ ADVANCE_TOKEN(method->s, method->l); - ++buf; + do { + ++buf; + } while (*buf == ' '); ADVANCE_TOKEN(path->s, path->l); - ++buf; + do { + ++buf; + } while (*buf == ' '); + if (method->l == 0 || path->l == 0) { + *ret = -1; + return NULL; + } if ((buf = parse_http_version(buf, buf_end, minor_version, ret)) == NULL) { return NULL; } @@ -402,10 +416,13 @@ static const char *parse_response(const char *buf, const char *buf_end, int *min return NULL; } /* skip space */ - if (*buf++ != ' ') { + if (*buf != ' ') { *ret = -1; return NULL; } + do { + ++buf; + } while (*buf == ' '); /* parse status code, we want at least [:digit:][:digit:][:digit:]<other char> to try to parse */ if (buf_end - buf < 4) { *ret = -2; @@ -413,13 +430,21 @@ static const char *parse_response(const char *buf, const char *buf_end, int *min } PARSE_INT_3(status); - /* skip space */ - if (*buf++ != ' ') { - *ret = -1; + /* get message includig preceding space */ + if ((buf = get_token_to_eol(buf, buf_end, msg, ret)) == NULL) { return NULL; } - /* get message */ - if ((buf = get_token_to_eol(buf, buf_end, msg, ret)) == NULL) { + if (msg->l == 0) { + /* ok */ + } else if (*msg->s == ' ') { + /* remove preceding space */ + do { + ++msg->s; + --msg->l; + } while (*msg->s == ' '); + } else { + /* garbage found after status code */ + *ret = -1; return NULL; } diff --git a/src/server/reference.c b/src/server/reference.c new file mode 100644 index 0000000..64109ca --- /dev/null +++ b/src/server/reference.c @@ -0,0 +1,33 @@ +#ifndef unlikely +#define unlikely(x) (x) +#endif +#include "reference.h" +#include <stdio.h> +#include <stdlib.h> + +void ref_init( ref *reference, void ( *freefun )( ref * ), long count ) +{ + reference->count = count; + reference->free = freefun; +} + +_Noreturn void _ref_error( const char *message ) +{ + fprintf( stderr, "%s\n", message ); + abort(); +} + +void ref_setref( weakref *weakref, ref *ref ) +{ + union _aligned_ref_ *new_weakref = 0; + if ( ref ) { + ( new_weakref = aligned_ref( ref->_aligned_ref ) )->ref = ref; + ref->count += sizeof( union _aligned_ref_ ) + 1; + } + char *old_weakref = (char *)atomic_exchange( weakref, new_weakref ); + if ( !old_weakref ) + return; + struct _ref_ *old_ref = aligned_ref( old_weakref )->ref; + old_ref->count += old_weakref - (char *)aligned_ref( old_weakref ) - sizeof( union _aligned_ref_ ); + ref_put( old_ref ); +} diff --git a/src/server/reference.h b/src/server/reference.h new file mode 100644 index 0000000..4eda546 --- /dev/null +++ b/src/server/reference.h @@ -0,0 +1,59 @@ +#ifndef _REFERENCE_H_ +#define _REFERENCE_H_ + +#include "reftypes.h" +#include <stddef.h> +#include <stdint.h> + +#define container_of(ptr, type, member) \ + ((type *)((char *)(ptr) - (char *)&(((type *)NULL)->member))) + +void ref_init( ref *reference, void ( *freefun )( ref * ), long count ); + +void ref_setref( weakref *weakref, ref *ref ); + +_Noreturn void _ref_error( const char *message ); + +static inline ref *ref_get( weakref *weakref ) +{ + char *old_weakref = (char *)*weakref; + do { + if ( old_weakref == NULL ) + return NULL; + if ( aligned_ref( old_weakref ) != aligned_ref( old_weakref + 1 ) ) { + old_weakref = (char *)*weakref; + continue; + } + } while ( !atomic_compare_exchange_weak( weakref, (void **)&old_weakref, old_weakref + 1 ) ); + struct _ref_ *ref = aligned_ref( old_weakref )->ref; + if ( unlikely( ++ref->count == -1 ) ) { + _ref_error( "Reference counter overflow. Aborting." ); + } + char *cur_weakref = ( char * )*weakref; + do { + if ( aligned_ref( cur_weakref ) != aligned_ref( old_weakref ) ) { + ref->count--; + break; + } + } while ( !atomic_compare_exchange_weak( weakref, (void **)&cur_weakref, cur_weakref - 1 ) ); + return ref; +} + +static inline void ref_put( ref *ref ) +{ + if ( --ref->count == 0 ) { + ref->free( ref ); + } +} + +#define ref_get_uplink(wr) __extension__({ \ + ref* ref = ref_get( wr ); \ + ref == NULL ? NULL : container_of(ref, dnbd3_uplink_t, reference); \ +}) + +#define ref_get_cachemap(image) __extension__({ \ + ref* ref = ref_get( &(image)->ref_cacheMap ); \ + ref == NULL ? NULL : container_of(ref, dnbd3_cache_map_t, reference); \ +}) + +#endif diff --git a/src/server/reftypes.h b/src/server/reftypes.h new file mode 100644 index 0000000..45c0c20 --- /dev/null +++ b/src/server/reftypes.h @@ -0,0 +1,25 @@ +#ifndef _REFTYPES_H_ +#define _REFTYPES_H_ + +#include <stdatomic.h> + +_Static_assert( sizeof( void * ) == sizeof( _Atomic( void * ) ), "Atomic pointer bad" ); + +typedef _Atomic( void * ) weakref; + +#define aligned_ref(ptr) \ + ((union _aligned_ref_ *)((ptr) - (uintptr_t)(ptr) % sizeof(union _aligned_ref_))) + +union _aligned_ref_ { + struct _ref_ *ref; + void *_padding[( 32 - 1 ) / sizeof( void * ) + 1]; +}; + +typedef struct _ref_ { + _Atomic long count; + void ( *free )( struct _ref_ * ); + char _padding[sizeof( union _aligned_ref_ )]; + char _aligned_ref[sizeof( union _aligned_ref_ )]; +} ref; + +#endif diff --git a/src/server/rpc.c b/src/server/rpc.c index 5dbcafe..5daf20c 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -9,6 +9,7 @@ #include "fileutil.h" #include "picohttpparser/picohttpparser.h" #include "urldecode.h" +#include "reference.h" #include <jansson.h> #include <sys/types.h> @@ -43,7 +44,9 @@ _Static_assert( sizeof("test") == 5 && sizeof("test2") == 6, "Stringsize messup DEFSTR(STR_CONNECTION, "connection") DEFSTR(STR_CLOSE, "close") DEFSTR(STR_QUERY, "/query") +DEFSTR(STR_CACHEMAP, "/cachemap") DEFSTR(STR_Q, "q") +DEFSTR(STR_ID, "id") static inline bool equals(struct string *s1,struct string *s2) { @@ -75,13 +78,13 @@ static json_int_t randomRunId; static pthread_mutex_t aclLock; #define MAX_CLIENTS 50 #define CUTOFF_START 40 -static pthread_mutex_t statusLock; static struct { - int count; - bool overloaded; + atomic_int count; + atomic_bool overloaded; } status; static bool handleStatus(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive); +static bool handleCacheMap(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive); static bool sendReply(int sock, const char *status, const char *ctype, const char *payload, ssize_t plen, int keepAlive); static void parsePath(struct string *path, struct string *file, struct field *getv, size_t *getc); static bool hasHeaderValue(struct phr_header *headers, size_t numHeaders, struct string *name, struct string *value); @@ -91,8 +94,7 @@ static void loadAcl(); void rpc_init() { - mutex_init( &aclLock ); - mutex_init( &statusLock ); + mutex_init( &aclLock, LOCK_RPC_ACL ); randomRunId = (((json_int_t)getpid()) << 16) | (json_int_t)time(NULL); // </guard> if ( sizeof(randomRunId) > 4 ) { @@ -123,10 +125,8 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int return; } do { - mutex_lock( &statusLock ); const int curCount = ++status.count; UPDATE_LOADSTATE( curCount ); - mutex_unlock( &statusLock ); if ( curCount > MAX_CLIENTS ) { sendReply( sock, "503 Service Temporarily Unavailable", "text/plain", "Too many HTTP clients", -1, HTTP_CLOSE ); goto func_return; @@ -141,13 +141,13 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int bool hasName = false; bool ok; int keepAlive = HTTP_KEEPALIVE; - do { + while ( !_shutdown ) { // Read request from client struct phr_header headers[100]; size_t numHeaders, prevLen = 0, consumed; struct string method, path; int minorVersion; - do { + while ( !_shutdown ) { // Parse before calling recv, there might be a complete pipelined request in the buffer already // If the request is incomplete, we allow exactly one additional recv() to complete it. // This should suffice for real world scenarios as I don't know of any @@ -192,15 +192,15 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int sendReply( sock, "400 Bad Request", "text/plain", "Server cannot understand what you're trying to say", -1, HTTP_CLOSE ); goto func_return; } - } while ( true ); + } // Loop while request header incomplete + if ( _shutdown ) + break; if ( keepAlive == HTTP_KEEPALIVE ) { // Only keep the connection alive (and indicate so) if the client seems to support this if ( minorVersion == 0 || hasHeaderValue( headers, numHeaders, &STR_CONNECTION, &STR_CLOSE ) ) { keepAlive = HTTP_CLOSE; } else { // And if there aren't too many active HTTP sessions - mutex_lock( &statusLock ); if ( status.overloaded ) keepAlive = HTTP_CLOSE; - mutex_unlock( &statusLock ); } } if ( method.s != NULL && path.s != NULL ) { @@ -216,10 +216,13 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int // Don't care if GET or POST if ( equals( &file, &STR_QUERY ) ) { ok = handleStatus( sock, permissions, getv, getc, keepAlive ); + } else if ( equals( &file, &STR_CACHEMAP ) ) { + ok = handleCacheMap( sock, permissions, getv, getc, keepAlive ); } else { ok = sendReply( sock, "404 Not found", "text/plain", "Nothing", -1, keepAlive ); } - if ( !ok ) break; + if ( !ok ) + break; } // hoff might be beyond end if the client sent another request (burst) const ssize_t extra = hoff - consumed; @@ -231,13 +234,11 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int hasName = true; setThreadName( "HTTP" ); } - } while (true); + } // Loop while more requests func_return:; do { - mutex_lock( &statusLock ); const int curCount = --status.count; UPDATE_LOADSTATE( curCount ); - mutex_unlock( &statusLock ); } while (0); } @@ -347,6 +348,44 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t return ok; } +static bool handleCacheMap(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive) +{ + if ( !(permissions & ACL_IMAGE_LIST) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access image list", -1, keepAlive ); + } + int imgId = -1; + static const char one = (char)0xff; + for (size_t i = 0; i < fields_num; ++i) { + if ( equals( &fields[i].name, &STR_ID ) ) { + char *broken; + imgId = (int)strtol( fields[i].value.s, &broken, 10 ); + if ( broken != fields[i].value.s ) + break; + imgId = -1; + } + } + if ( imgId == -1 ) + return sendReply( sock, "400 Bad Request", "text/plain", "Missing parameter 'id'", -1, keepAlive ); + dnbd3_image_t *image = image_byId( imgId ); + if ( image == NULL ) + return sendReply( sock, "404 Not found", "text/plain", "Image not found", -1, keepAlive ); + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + image_release( image ); + int len; + const char *map; + if ( cache == NULL ) { + map = &one; + len = 1; + } else { + _Static_assert( sizeof(const char) == sizeof(_Atomic uint8_t), "Atomic assumption exploded" ); + map = (const char*)cache->map; + len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + } + bool ok = sendReply( sock, "200 OK", "application/octet-stream", map, len, keepAlive ); + ref_put( &cache->reference ); + return ok; +} + static bool sendReply(int sock, const char *status, const char *ctype, const char *payload, ssize_t plen, int keepAlive) { if ( plen == -1 ) plen = strlen( payload ); diff --git a/src/server/server.c b/src/server/server.c index 10ab208..0dddea7 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -37,6 +37,8 @@ #include <signal.h> #include <getopt.h> #include <assert.h> +#include <sys/types.h> +#include <unistd.h> #define LONGOPT_CRC4 1000 #define LONGOPT_ASSERT 1001 @@ -45,6 +47,26 @@ #define LONGOPT_SIZE 1004 #define LONGOPT_ERRORMSG 1005 +typedef struct _job job_t; + +struct _job { + job_t *next; + void *(*startRoutine)(void *); + void *arg; + ticks dueDate; + int intervalSecs; +}; + +static job_t *jobHead; +static _Atomic(job_t *) newJob; +static bool hasTimerThread = false; +static pthread_t timerThread; + +static pid_t mainPid; +static pthread_t mainThread; + +#define DEFAULT_TIMER_TIMEOUT (60) + static poll_list_t *listeners = NULL; /** @@ -71,6 +93,12 @@ static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data); static void* server_asyncImageListLoad(void *data); +static void* timerMainloop(void*); + +static int handlePendingJobs(void); + +static void queueJobInternal(job_t *job); + /** * Print help text for usage instructions */ @@ -105,14 +133,21 @@ void dnbd3_printVersion() /** * Clean up structs, connections, write out data, then exit */ -void dnbd3_cleanup() +_Noreturn static void dnbd3_cleanup() { int retries; _shutdown = true; logadd( LOG_INFO, "Cleanup..." ); - if ( listeners != NULL ) sock_destroyPollList( listeners ); + if ( hasTimerThread ) { + pthread_kill( timerThread, SIGINT ); + thread_join( timerThread, NULL ); + } + + if ( listeners != NULL ) { + sock_destroyPollList( listeners ); + } listeners = NULL; // Kill connection to all clients @@ -121,9 +156,6 @@ void dnbd3_cleanup() // Disable threadpool threadpool_close(); - // Terminate the altserver checking thread - altservers_shutdown(); - // Terminate all uplinks image_killUplinks(); @@ -133,8 +165,7 @@ void dnbd3_cleanup() // Wait for clients to disconnect net_waitForAllDisconnected(); - // Watchdog not needed anymore - debug_locks_stop_watchdog(); + threadpool_waitEmpty(); // Clean up images retries = 5; @@ -178,6 +209,8 @@ int main(int argc, char *argv[]) { 0, 0, 0, 0 } }; + mainPid = getpid(); + mainThread = pthread_self(); opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); while ( opt != -1 ) { @@ -201,6 +234,15 @@ int main(int argc, char *argv[]) case LONGOPT_CRC4: return image_generateCrcFile( optarg ) ? 0 : EXIT_FAILURE; case LONGOPT_ASSERT: + printf( "Now leaking memory:\n" ); + char *bla = malloc( 10 ); + bla[2] = 3; + bla = NULL; + printf( "Testing use after free:\n" ); + char *test = malloc( 10 ); + test[0] = 1; + free( (void*)test ); + test[1] = 2; printf( "Testing a failing assertion:\n" ); assert( 4 == 5 ); printf( "Assertion 4 == 5 seems to hold. ;-)\n" ); @@ -303,16 +345,11 @@ int main(int argc, char *argv[]) logadd( LOG_WARNING, "Could not load alt-servers. Does the file exist in %s?", _configDir ); } -#ifdef _DEBUG - debug_locks_start_watchdog(); -#endif - // setup signal handler - struct sigaction sa; - memset( &sa, 0, sizeof(sa) ); - sa.sa_sigaction = dnbd3_handleSignal2; - sa.sa_flags = SA_SIGINFO; - //sa.sa_mask = ; + struct sigaction sa = { + .sa_sigaction = dnbd3_handleSignal2, + .sa_flags = SA_SIGINFO, + }; sigaction( SIGTERM, &sa, NULL ); sigaction( SIGINT, &sa, NULL ); sigaction( SIGUSR1, &sa, NULL ); @@ -347,6 +384,10 @@ int main(int argc, char *argv[]) logadd( LOG_INFO, "Server is ready. (%s)", VERSION_STRING ); + if ( thread_create( &timerThread, NULL, &timerMainloop, NULL ) == 0 ) { + hasTimerThread = true; + } + // +++++++++++++++++++++++++++++++++++++++++++++++++++ main loop struct sockaddr_storage client; socklen_t len; @@ -370,7 +411,7 @@ int main(int argc, char *argv[]) // len = sizeof(client); fd = sock_accept( listeners, &client, &len ); - if ( fd < 0 ) { + if ( fd == -1 ) { const int err = errno; if ( err == EINTR || err == EAGAIN ) continue; logadd( LOG_ERROR, "Client accept failure (err=%d)", err ); @@ -474,8 +515,16 @@ static void dnbd3_handleSignal(int signum) static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED) { - memcpy( &lastSignal, info, sizeof(siginfo_t) ); - dnbd3_handleSignal( signum ); + if ( info->si_pid != mainPid ) { // Source is not this process + memcpy( &lastSignal, info, sizeof(siginfo_t) ); // Copy signal info + if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) { + pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread + } + } + if ( pthread_equal( pthread_self(), mainThread ) ) { + // Signal received by main thread -- handle + dnbd3_handleSignal( signum ); + } } uint32_t dnbd3_serverUptime() @@ -493,3 +542,85 @@ static void* server_asyncImageListLoad(void *data UNUSED) return NULL; } +static void* timerMainloop(void* stuff UNUSED) +{ + setThreadName( "timer" ); + while ( !_shutdown ) { + // Handle jobs/timer events; returns timeout until next event + int to = handlePendingJobs(); + sleep( MIN( MAX( 1, to ), DEFAULT_TIMER_TIMEOUT ) ); + } + logadd( LOG_DEBUG1, "Timer thread done" ); + return NULL; +} + +static int handlePendingJobs(void) +{ + declare_now; + job_t *todo, **temp, *old; + int diff; + todo = jobHead; + for ( temp = &todo; *temp != NULL; temp = &(*temp)->next ) { + diff = (int)timing_diff( &now, &(*temp)->dueDate ); + if ( diff > 0 ) // Found one that's in the future + break; + } + jobHead = *temp; // Make it list head + *temp = NULL; // Split off part before that + while ( todo != NULL ) { + threadpool_run( todo->startRoutine, todo->arg ); + old = todo; + todo = todo->next; + if ( old->intervalSecs == 0 ) { + free( old ); // oneshot + } else { + timing_set( &old->dueDate, &now, old->intervalSecs ); + queueJobInternal( old ); // repeated + } + } + // See if any new jobs have been queued + while ( newJob != NULL ) { + todo = newJob; + // NULL should never happen since we're the only consumer + assert( todo != NULL ); + if ( !atomic_compare_exchange_weak( &newJob, &todo, NULL ) ) + continue; + do { + old = todo; + todo = todo->next; + queueJobInternal( old ); + } while ( todo != NULL ); + } + // Return new timeout + if ( jobHead == NULL ) + return DEFAULT_TIMER_TIMEOUT; + return (int)timing_diff( &now, &jobHead->dueDate ); +} + +static void queueJobInternal(job_t *job) +{ + assert( job != NULL ); + job_t **it; + for ( it = &jobHead; *it != NULL; it = &(*it)->next ) { + if ( timing_1le2( &job->dueDate, &(*it)->dueDate ) ) + break; + } + job->next = *it; + *it = job; +} + +void server_addJob(void *(*startRoutine)(void *), void *arg, int delaySecs, int intervalSecs) +{ + declare_now; + job_t *new = malloc( sizeof(*new) ); + new->startRoutine = startRoutine; + new->arg = arg; + new->intervalSecs = intervalSecs; + timing_set( &new->dueDate, &now, delaySecs ); + for ( ;; ) { + new->next = newJob; + if ( atomic_compare_exchange_weak( &newJob, &new->next, new ) ) + break; + } +} + diff --git a/src/server/server.h b/src/server/server.h index bab8421..a026eb6 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -24,8 +24,8 @@ #include "globals.h" #include "../types.h" -void dnbd3_cleanup(); uint32_t dnbd3_serverUptime(); +void server_addJob(void *(*startRoutine)(void *), void *arg, int delaySecs, int intervalSecs); #if !defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64 #error Please set _FILE_OFFSET_BITS to 64 in your makefile/configuration diff --git a/src/server/threadpool.c b/src/server/threadpool.c index dac0980..0b46fd6 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -4,7 +4,6 @@ #include "locks.h" typedef struct _entry_t { - struct _entry_t *next; pthread_t thread; dnbd3_signal_t* signal; void *(*startRoutine)(void *); @@ -14,16 +13,21 @@ typedef struct _entry_t { static void *threadpool_worker(void *entryPtr); static pthread_attr_t threadAttrs; - -static int maxIdleThreads = -1; -static entry_t *pool = NULL; -static pthread_mutex_t poolLock; +static atomic_int maxIdleThreads = -1; +static _Atomic(entry_t *) *pool = NULL; +static atomic_int activeThreads = 0; bool threadpool_init(int maxIdle) { - if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; - mutex_init( &poolLock ); - maxIdleThreads = maxIdle; + if ( maxIdle < 0 ) + return false; + int exp = -1; + if ( !atomic_compare_exchange_strong( &maxIdleThreads, &exp, maxIdle ) ) + return false; + pool = malloc( maxIdle * sizeof(*pool) ); + for ( int i = 0; i < maxIdle; ++i ) { + atomic_init( &pool[i], NULL ); + } pthread_attr_init( &threadAttrs ); pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); return true; @@ -31,28 +35,47 @@ bool threadpool_init(int maxIdle) void threadpool_close() { - _shutdown = true; - if ( maxIdleThreads < 0 ) return; - mutex_lock( &poolLock ); - maxIdleThreads = -1; - entry_t *ptr = pool; - while ( ptr != NULL ) { - entry_t *current = ptr; - ptr = ptr->next; - signal_call( current->signal ); + int max = atomic_exchange( &maxIdleThreads, -1 ); + if ( max <= 0 ) + return; + for ( int i = 0; i < max; ++i ) { + entry_t *cur = pool[i]; + if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) { + signal_call( cur->signal ); + } } - mutex_unlock( &poolLock ); - mutex_destroy( &poolLock ); +} + +void threadpool_waitEmpty() +{ + if ( activeThreads == 0 ) + return; + do { + sleep( 1 ); + logadd( LOG_INFO, "Threadpool: %d threads still active", (int)activeThreads ); + } while ( activeThreads != 0 ); } bool threadpool_run(void *(*startRoutine)(void *), void *arg) { - mutex_lock( &poolLock ); - entry_t *entry = pool; - if ( entry != NULL ) pool = entry->next; - mutex_unlock( &poolLock ); - if ( entry == NULL ) { - entry = (entry_t*)malloc( sizeof(entry_t) ); + if ( unlikely( _shutdown ) ) { + logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" ); + return false; + } + if ( unlikely( startRoutine == NULL ) ) { + logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); + return false; // Or bail out!? + } + entry_t *entry = NULL; + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry_t *cur = pool[i]; + if ( cur != NULL && atomic_compare_exchange_weak( &pool[i], &cur, NULL ) ) { + entry = cur; + break; + } + } + if ( unlikely( entry == NULL ) ) { + entry = malloc( sizeof(entry_t) ); if ( entry == NULL ) { logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); return false; @@ -69,10 +92,11 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) free( entry ); return false; } + activeThreads++; } - entry->next = NULL; entry->startRoutine = startRoutine; entry->arg = arg; + atomic_thread_fence( memory_order_release ); signal_call( entry->signal ); return true; } @@ -84,43 +108,44 @@ static void *threadpool_worker(void *entryPtr) { blockNoncriticalSignals(); entry_t *entry = (entry_t*)entryPtr; + int ret; for ( ;; ) { +keep_going:; // Wait for signal from outside that we have work to do - int ret = signal_clear( entry->signal ); - if ( _shutdown ) break; - if ( ret > 0 ) { - if ( entry->startRoutine == NULL ) { - logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" ); - continue; - } - // Start assigned work - (*entry->startRoutine)( entry->arg ); - // Reset vars for safety - entry->startRoutine = NULL; - entry->arg = NULL; - if ( _shutdown ) break; - // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise - int threadCount = 0; - mutex_lock( &poolLock ); - entry_t *ptr = pool; - while ( ptr != NULL ) { - threadCount++; - ptr = ptr->next; - } - if ( threadCount >= maxIdleThreads ) { - mutex_unlock( &poolLock ); - break; - } - entry->next = pool; - pool = entry; - mutex_unlock( &poolLock ); - setThreadName( "[pool]" ); - } else { + ret = signal_clear( entry->signal ); + atomic_thread_fence( memory_order_acquire ); + if ( _shutdown ) + break; + if ( ret <= 0 ) { logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); + continue; + } + if ( entry->startRoutine == NULL ) { + logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); + exit( 1 ); + } + // Start assigned work + (*entry->startRoutine)( entry->arg ); + // Reset vars for safety + entry->startRoutine = NULL; + entry->arg = NULL; + atomic_thread_fence( memory_order_release ); + if ( _shutdown ) + break; + // Put thread back into pool + setThreadName( "[pool]" ); + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry_t *exp = NULL; + if ( atomic_compare_exchange_weak( &pool[i], &exp, entry ) ) { + goto keep_going; + } } + // Reaching here means pool is full; just let the thread exit + break; } signal_close( entry->signal ); free( entry ); + activeThreads--; return NULL; } diff --git a/src/server/threadpool.h b/src/server/threadpool.h index 15dd151..ee0b3aa 100644 --- a/src/server/threadpool.h +++ b/src/server/threadpool.h @@ -18,6 +18,11 @@ bool threadpool_init(int maxIdleThreadCount); void threadpool_close(); /** + * Block until all threads spawned have exited + */ +void threadpool_waitEmpty(); + +/** * Run a thread using the thread pool. * @param startRoutine function to run in new thread * @param arg argument to pass to thead diff --git a/src/server/uplink.c b/src/server/uplink.c index 682b986..f39e633 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -3,10 +3,12 @@ #include "locks.h" #include "image.h" #include "altservers.h" +#include "net.h" #include "../shared/sockhelper.h" #include "../shared/protocol.h" #include "../shared/timing.h" #include "../shared/crc32.h" +#include "reference.h" #include <assert.h> #include <inttypes.h> @@ -21,19 +23,43 @@ #define REP_NONE ( (uint64_t)0xffffffffffffffff ) +// Status of request in queue + +// Slot is free, can be used. +// Must only be set in uplink_handle_receive() or uplink_remove_client() +#define ULR_FREE 0 +// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse. +// Must only be set in uplink_request() +#define ULR_NEW 1 +// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. +// Must only be set in uplink_mainloop() or uplink_request() +#define ULR_PENDING 2 +// Slot is being processed, do not consider for hop on. +// Must only be set in uplink_handle_receive() +#define ULR_PROCESSING 3 + +static const char *const NAMES_ULR[4] = { + [ULR_FREE] = "ULR_FREE", + [ULR_NEW] = "ULR_NEW", + [ULR_PENDING] = "ULR_PENDING", + [ULR_PROCESSING] = "ULR_PROCESSING", +}; + static atomic_uint_fast64_t totalBytesReceived = 0; +static void cancelAllRequests(dnbd3_uplink_t *uplink); +static void uplink_free(ref *ref); static void* uplink_mainloop(void *data); -static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly); -static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int lastBlockIndex); -static void uplink_handleReceive(dnbd3_connection_t *link); +static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly); +static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex); +static void uplink_handleReceive(dnbd3_uplink_t *uplink); static int uplink_sendKeepalive(const int fd); -static void uplink_addCrc32(dnbd3_connection_t *uplink); -static void uplink_sendReplicationRequest(dnbd3_connection_t *link); -static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); -static bool uplink_saveCacheMap(dnbd3_connection_t *link); -static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link); -static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew); +static void uplink_addCrc32(dnbd3_uplink_t *uplink); +static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink); +static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force); +static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink); +static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink); +static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew); // ############ uplink connection handling @@ -54,56 +80,67 @@ uint64_t uplink_getTotalBytesReceived() bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { if ( !_isProxy || _shutdown ) return false; - dnbd3_connection_t *link = NULL; assert( image != NULL ); mutex_lock( &image->lock ); - if ( image->uplink != NULL && !image->uplink->shutdown ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { mutex_unlock( &image->lock ); - if ( sock >= 0 ) close( sock ); - return true; // There's already an uplink, so should we consider this success or failure? + if ( sock != -1 ) { + close( sock ); + } + ref_put( &uplink->reference ); + return true; // There's already an uplink } - if ( image->cache_map == NULL ) { + if ( image->ref_cacheMap == NULL ) { logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name ); goto failure; } - link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); - mutex_init( &link->queueLock ); - mutex_init( &link->rttLock ); - mutex_init( &link->sendMutex ); - link->image = image; - link->bytesReceived = 0; - link->idleTime = 0; - link->queueLen = 0; - mutex_lock( &link->sendMutex ); - link->fd = -1; - mutex_unlock( &link->sendMutex ); - link->cacheFd = -1; - link->signal = NULL; - link->replicationHandle = REP_NONE; - mutex_lock( &link->rttLock ); - link->cycleDetected = false; - if ( sock >= 0 ) { - link->betterFd = sock; - link->betterServer = *host; - link->rttTestResult = RTT_DOCHANGE; - link->betterVersion = version; + uplink = calloc( 1, sizeof(dnbd3_uplink_t) ); + // Start with one reference for the uplink thread. We'll return it when the thread finishes + ref_init( &uplink->reference, uplink_free, 1 ); + mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE ); + mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT ); + mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND ); + uplink->image = image; + uplink->bytesReceived = 0; + uplink->idleTime = 0; + uplink->queueLen = 0; + uplink->cacheFd = -1; + uplink->signal = signal_new(); + if ( uplink->signal == NULL ) { + logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." ); + goto failure; + } + uplink->replicationHandle = REP_NONE; + mutex_lock( &uplink->rttLock ); + mutex_lock( &uplink->sendMutex ); + uplink->current.fd = -1; + mutex_unlock( &uplink->sendMutex ); + uplink->cycleDetected = false; + if ( sock != -1 ) { + uplink->better.fd = sock; + int index = altservers_hostToIndex( host ); + uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access + uplink->rttTestResult = RTT_DOCHANGE; + uplink->better.version = version; } else { - link->betterFd = -1; - link->rttTestResult = RTT_IDLE; + uplink->better.fd = -1; + uplink->rttTestResult = RTT_IDLE; } - mutex_unlock( &link->rttLock ); - link->recvBufferLen = 0; - link->shutdown = false; - if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) { + mutex_unlock( &uplink->rttLock ); + uplink->recvBufferLen = 0; + uplink->shutdown = false; + if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) { logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; } + ref_setref( &image->uplinkref, &uplink->reference ); mutex_unlock( &image->lock ); return true; failure: ; - if ( link != NULL ) { - free( link ); - link = image->uplink = NULL; + if ( uplink != NULL ) { + image->users++; // Expected by uplink_free() + ref_put( &uplink->reference ); // The ref for the uplink thread that never was } mutex_unlock( &image->lock ); return false; @@ -114,45 +151,89 @@ failure: ; * Calling it multiple times, even concurrently, will * not break anything. */ -void uplink_shutdown(dnbd3_image_t *image) +bool uplink_shutdown(dnbd3_image_t *image) { - bool join = false; - pthread_t thread; assert( image != NULL ); mutex_lock( &image->lock ); - if ( image->uplink == NULL ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { mutex_unlock( &image->lock ); - return; + return true; } - dnbd3_connection_t * const uplink = image->uplink; mutex_lock( &uplink->queueLock ); - if ( !uplink->shutdown ) { - uplink->shutdown = true; + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // Prevent free while uplink shuts down signal_call( uplink->signal ); - thread = uplink->thread; - join = true; + } else { + logadd( LOG_ERROR, "This will never happen. '%s:%d'", image->name, (int)image->rid ); } + cancelAllRequests( uplink ); + ref_setref( &image->uplinkref, NULL ); + ref_put( &uplink->reference ); mutex_unlock( &uplink->queueLock ); - bool wait = image->uplink != NULL; + bool retval = ( exp && image->users == 0 ); mutex_unlock( &image->lock ); - if ( join ) thread_join( thread, NULL ); - while ( wait ) { - usleep( 5000 ); - mutex_lock( &image->lock ); - wait = image->uplink != NULL && image->uplink->shutdown; - mutex_unlock( &image->lock ); + return retval; +} + +/** + * Cancel all requests of this uplink. + * HOLD QUEUE LOCK WHILE CALLING + */ +static void cancelAllRequests(dnbd3_uplink_t *uplink) +{ + for ( int i = 0; i < uplink->queueLen; ++i ) { + if ( uplink->queue[i].status != ULR_FREE ) { + net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle ); + uplink->queue[i].status = ULR_FREE; + } + } + uplink->queueLen = 0; +} + +static void uplink_free(ref *ref) +{ + dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference); + logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", uplink->image->name, (int)uplink->image->rid ); + assert( uplink->queueLen == 0 ); + if ( uplink->signal != NULL ) { + signal_close( uplink->signal ); + } + if ( uplink->current.fd != -1 ) { + close( uplink->current.fd ); + uplink->current.fd = -1; } + if ( uplink->better.fd != -1 ) { + close( uplink->better.fd ); + uplink->better.fd = -1; + } + mutex_destroy( &uplink->queueLock ); + mutex_destroy( &uplink->rttLock ); + mutex_destroy( &uplink->sendMutex ); + free( uplink->recvBuffer ); + uplink->recvBuffer = NULL; + if ( uplink->cacheFd != -1 ) { + close( uplink->cacheFd ); + } + // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code + // of the uplink thread, depending on who set the uplink->shutdown flag. (Or uplink_init if that failed) + image_release( uplink->image ); + free( uplink ); // !!! } /** * Remove given client from uplink request queue * Locks on: uplink.queueLock */ -void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) +void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) { mutex_lock( &uplink->queueLock ); for (int i = uplink->queueLen - 1; i >= 0; --i) { if ( uplink->queue[i].client == client ) { + // Make sure client doesn't get destroyed while we're sending it data + mutex_lock( &client->sendMutex ); + mutex_unlock( &client->sendMutex ); uplink->queue[i].client = NULL; uplink->queue[i].status = ULR_FREE; } @@ -167,89 +248,98 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) */ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - if ( client == NULL || client->image == NULL ) return false; + if ( client == NULL || client->image == NULL ) + return false; if ( length > (uint32_t)_maxPayload ) { logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); return false; } - mutex_lock( &client->image->lock ); - if ( client->image->uplink == NULL ) { - mutex_unlock( &client->image->lock ); - logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); - return false; + dnbd3_uplink_t * uplink = ref_get_uplink( &client->image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( client->image, -1, NULL, -1 ); + uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } } - dnbd3_connection_t * const uplink = client->image->uplink; if ( uplink->shutdown ) { - mutex_unlock( &client->image->lock ); logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); - return false; + goto fail_ref; } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { - mutex_unlock( &client->image->lock ); - logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - mutex_lock( &uplink->rttLock ); + if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); - return false; + logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); + goto fail_ref; } int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise int existingType = -1; // ULR_* type of existing request int i; int freeSlot = -1; + int firstUsedSlot = -1; bool requestLoop = false; const uint64_t end = start + length; mutex_lock( &uplink->queueLock ); - mutex_unlock( &client->image->lock ); + if ( uplink->shutdown ) { // Check again after locking to prevent lost requests + goto fail_lock; + } for (i = 0; i < uplink->queueLen; ++i) { - if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) { - freeSlot = i; + // find free slot to place this request into + if ( uplink->queue[i].status == ULR_FREE ) { + if ( freeSlot == -1 || existingType != ULR_PROCESSING ) { + freeSlot = i; + } continue; } - if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { - if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end ) { - requestLoop = true; - break; - } - if ( foundExisting == -1 || existingType == ULR_PENDING ) { - foundExisting = i; - existingType = uplink->queue[i].status; - if ( freeSlot != -1 ) break; - } + if ( firstUsedSlot == -1 ) { + firstUsedSlot = i; + } + // find existing request to attach to + if ( uplink->queue[i].from > start || uplink->queue[i].to < end ) + continue; // Range not suitable + // Detect potential proxy cycle. New request hopcount is greater, range is same, old request has already been sent -> suspicious + if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end && uplink->queue[i].status == ULR_PENDING ) { + requestLoop = true; + break; + } + if ( foundExisting == -1 || existingType == ULR_PROCESSING ) { + foundExisting = i; + existingType = uplink->queue[i].status; } } - if ( requestLoop ) { - mutex_unlock( &uplink->queueLock ); - logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - mutex_lock( &uplink->rttLock ); + if ( unlikely( requestLoop ) ) { uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); - return false; + logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); + goto fail_lock; + } + if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { + freeSlot = -1; // Not attaching to existing request, make it use a higher slot } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { - mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); - return false; + goto fail_lock; } freeSlot = uplink->queueLen++; } // Do not send request to uplink server if we have a matching pending request AND the request either has the - // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise + // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise // explicitly send this request to the uplink server. The second condition mentioned here is to prevent // a race condition where the reply for the outstanding request already arrived and the uplink thread // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might // already have passed the index of the free slot we determined, but not reached the existing request we just found above. - if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request" + if ( foundExisting != -1 && existingType == ULR_PROCESSING && freeSlot > foundExisting ) { + foundExisting = -1; // -1 means "send request" + } #ifdef _DEBUG if ( foundExisting != -1 ) { - logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot ); + logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, NAMES_ULR[existingType], foundExisting, freeSlot ); logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n" "New %" PRIu64 "-%" PRIu64 " (%p)\n", uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client, @@ -262,7 +352,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin uplink->queue[freeSlot].handle = handle; uplink->queue[freeSlot].client = client; //int old = uplink->queue[freeSlot].status; - uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING); + uplink->queue[freeSlot].status = ( foundExisting == -1 ? ULR_NEW : + ( existingType == ULR_NEW ? ULR_PENDING : existingType ) ); uplink->queue[freeSlot].hopCount = hops; #ifdef _DEBUG timing_get( &uplink->queue[freeSlot].entered ); @@ -270,45 +361,63 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin #endif mutex_unlock( &uplink->queueLock ); - if ( foundExisting != -1 ) + if ( foundExisting != -1 ) { + ref_put( &uplink->reference ); return true; // Attached to pending request, do nothing + } // See if we can fire away the request - if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { + if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); } else { - if ( uplink->fd == -1 ) { + if ( unlikely( uplink->current.fd == -1 ) ) { mutex_unlock( &uplink->sendMutex ); logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); } else { const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); if ( hops < 200 ) ++hops; - const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); + const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); mutex_unlock( &uplink->sendMutex ); - if ( !ret ) { + if ( unlikely( !ret ) ) { logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); } else { + // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again + int state; mutex_lock( &uplink->queueLock ); - if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) { - uplink->queue[freeSlot].status = ULR_PENDING; - logadd( LOG_DEBUG2, "Succesful direct uplink request" ); + if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { + state = uplink->queue[freeSlot].status; + if ( uplink->queue[freeSlot].status == ULR_NEW ) { + uplink->queue[freeSlot].status = ULR_PENDING; + } } else { - logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" ); + state = -1; } mutex_unlock( &uplink->queueLock ); + if ( state == -1 ) { + logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" ); + } else if ( state == ULR_NEW ) { + //logadd( LOG_DEBUG2, "Direct uplink request" ); + } else { + logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); + } + ref_put( &uplink->reference ); return true; } // Fall through to waking up sender thread } } - if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); - } + if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } + ref_put( &uplink->reference ); return true; +fail_lock: + mutex_unlock( &uplink->queueLock ); +fail_ref: + ref_put( &uplink->reference ); + return false; } /** @@ -321,9 +430,10 @@ static void* uplink_mainloop(void *data) #define EV_SOCKET (1) #define EV_COUNT (2) struct pollfd events[EV_COUNT]; - dnbd3_connection_t * const link = (dnbd3_connection_t*)data; + dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_INTERVAL_INIT; + int rttTestResult; uint32_t discoverFailCount = 0; uint32_t unsavedSeconds = 0; ticks nextAltCheck, lastKeepalive; @@ -332,41 +442,36 @@ static void* uplink_mainloop(void *data) timing_get( &nextAltCheck ); lastKeepalive = nextAltCheck; // - assert( link != NULL ); + assert( uplink != NULL ); setThreadName( "idle-uplink" ); + thread_detach( uplink->thread ); blockNoncriticalSignals(); // Make sure file is open for writing - if ( !uplink_reopenCacheFd( link, false ) ) { + if ( !uplink_reopenCacheFd( uplink, false ) ) { // It might have failed - still offer proxy mode, we just can't cache - logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno ); + logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno ); } // - link->signal = signal_new(); - if ( link->signal == NULL ) { - logadd( LOG_WARNING, "error creating signal. Uplink unavailable." ); - goto cleanup; - } events[EV_SIGNAL].events = POLLIN; - events[EV_SIGNAL].fd = signal_getWaitFd( link->signal ); + events[EV_SIGNAL].fd = signal_getWaitFd( uplink->signal ); events[EV_SOCKET].fd = -1; - while ( !_shutdown && !link->shutdown ) { + if ( uplink->rttTestResult != RTT_DOCHANGE ) { + altservers_findUplink( uplink ); // In case we didn't kickstart + } + while ( !_shutdown && !uplink->shutdown ) { // poll() - mutex_lock( &link->rttLock ); - waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1; - mutex_unlock( &link->rttLock ); + waitTime = uplink->rttTestResult == RTT_DOCHANGE ? 0 : -1; if ( waitTime == 0 ) { - // Nothing - } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { - waitTime = 1000; + // 0 means poll, since we're about to change the server } else { declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); if ( waitTime < 100 ) waitTime = 100; - if ( waitTime > 5000 ) waitTime = 5000; + if ( waitTime > 10000 ) waitTime = 10000; } - events[EV_SOCKET].fd = link->fd; + events[EV_SOCKET].fd = uplink->current.fd; numSocks = poll( events, EV_COUNT, waitTime ); - if ( _shutdown || link->shutdown ) goto cleanup; + if ( _shutdown || uplink->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? if ( errno == EINTR ) continue; logadd( LOG_DEBUG1, "poll() error %d", (int)errno ); @@ -374,39 +479,36 @@ static void* uplink_mainloop(void *data) continue; } // Check if server switch is in order - mutex_lock( &link->rttLock ); - if ( link->rttTestResult != RTT_DOCHANGE ) { - mutex_unlock( &link->rttLock ); - } else { - link->rttTestResult = RTT_IDLE; + if ( unlikely( uplink->rttTestResult == RTT_DOCHANGE ) ) { + mutex_lock( &uplink->rttLock ); + assert( uplink->rttTestResult == RTT_DOCHANGE ); + uplink->rttTestResult = RTT_IDLE; // The rttTest worker thread has finished our request. // And says it's better to switch to another server - const int fd = link->fd; - mutex_lock( &link->sendMutex ); - link->fd = link->betterFd; - mutex_unlock( &link->sendMutex ); - link->betterFd = -1; - link->currentServer = link->betterServer; - link->version = link->betterVersion; - link->cycleDetected = false; - mutex_unlock( &link->rttLock ); + const int fd = uplink->current.fd; + mutex_lock( &uplink->sendMutex ); + uplink->current = uplink->better; + mutex_unlock( &uplink->sendMutex ); + uplink->better.fd = -1; + uplink->cycleDetected = false; + mutex_unlock( &uplink->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); - link->replicationHandle = REP_NONE; - link->image->working = true; - link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received + uplink->replicationHandle = REP_NONE; + uplink->image->working = true; + uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; - if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { - logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", link->image->name, buffer + 1 ); + if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) { + logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 ); setThreadName( buffer ); } // If we don't have a crc32 list yet, see if the new server has one - if ( link->image->crc32 == NULL ) { - uplink_addCrc32( link ); + if ( uplink->image->crc32 == NULL ) { + uplink_addCrc32( uplink ); } // Re-send all pending requests - uplink_sendRequests( link, false ); - uplink_sendReplicationRequest( link ); + uplink_sendRequests( uplink, false ); + uplink_sendReplicationRequest( uplink ); events[EV_SOCKET].events = POLLIN | POLLRDHUP; timing_gets( &nextAltCheck, altCheckInterval ); // The rtt worker already did the handshake for our image, so there's nothing @@ -419,202 +521,169 @@ static void* uplink_mainloop(void *data) goto cleanup; } else if ( (events[EV_SIGNAL].revents & POLLIN) ) { // signal triggered -> pending requests - if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name ); + if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name ); } - if ( link->fd != -1 ) { + if ( uplink->current.fd != -1 ) { // Uplink seems fine, relay requests to it... - uplink_sendRequests( link, true ); - } else { // No uplink; maybe it was shutdown since it was idle for too long - link->idleTime = 0; + uplink_sendRequests( uplink, true ); + } else if ( uplink->queueLen != 0 ) { // No uplink; maybe it was shutdown since it was idle for too long + uplink->idleTime = 0; } } // Uplink socket if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { - uplink_connectionFailed( link, true ); - logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); + uplink_connectionFailed( uplink, true ); + logadd( LOG_DEBUG1, "Uplink gone away, panic! (revents=%d)\n", (int)events[EV_SOCKET].revents ); setThreadName( "panic-uplink" ); } else if ( (events[EV_SOCKET].revents & POLLIN) ) { - uplink_handleReceive( link ); - if ( _shutdown || link->shutdown ) goto cleanup; + uplink_handleReceive( uplink ); + if ( _shutdown || uplink->shutdown ) goto cleanup; } declare_now; uint32_t timepassed = timing_diff( &lastKeepalive, &now ); if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { lastKeepalive = now; - link->idleTime += timepassed; + uplink->idleTime += timepassed; unsavedSeconds += timepassed; - if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) { - // fsync/save every 4 minutes, or every 60 seconds if link is idle + if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && uplink->idleTime >= 20 && uplink->idleTime <= 70 ) ) { + // fsync/save every 4 minutes, or every 60 seconds if uplink is idle unsavedSeconds = 0; - uplink_saveCacheMap( link ); + uplink_saveCacheMap( uplink ); } // Keep-alive - if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { + if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) { // Send keep-alive if nothing is happening - if ( uplink_sendKeepalive( link->fd ) ) { + if ( uplink_sendKeepalive( uplink->current.fd ) ) { // Re-trigger periodically, in case it requires a minimum user count - uplink_sendReplicationRequest( link ); + uplink_sendReplicationRequest( uplink ); } else { - uplink_connectionFailed( link, true ); + uplink_connectionFailed( uplink, true ); logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" ); setThreadName( "panic-uplink" ); } } - // Don't keep link established if we're idle for too much - if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { - mutex_lock( &link->sendMutex ); - close( link->fd ); - link->fd = events[EV_SOCKET].fd = -1; - mutex_unlock( &link->sendMutex ); - link->cycleDetected = false; - if ( link->recvBufferLen != 0 ) { - link->recvBufferLen = 0; - free( link->recvBuffer ); - link->recvBuffer = NULL; - } - logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid ); - setThreadName( "idle-uplink" ); + // Don't keep uplink established if we're idle for too much + if ( uplink_connectionShouldShutdown( uplink ) ) { + logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", uplink->image->name, (int)uplink->image->rid ); + goto cleanup; } } // See if we should trigger an RTT measurement - mutex_lock( &link->rttLock ); - const int rttTestResult = link->rttTestResult; - mutex_unlock( &link->rttLock ); + rttTestResult = uplink->rttTestResult; if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { - if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) { + if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && discoverFailCount == 0 ) || uplink->cycleDetected ) { // It seems it's time for a check - if ( image_isComplete( link->image ) ) { + if ( image_isComplete( uplink->image ) ) { // Quit work if image is complete - logadd( LOG_INFO, "Replication of %s complete.", link->image->name ); + logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name ); setThreadName( "finished-uplink" ); goto cleanup; - } else if ( !uplink_connectionShouldShutdown( link ) ) { + } else { // Not complete - do measurement - altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) - if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { - link->nextReplicationIndex = 0; + altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous) + if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { + uplink->nextReplicationIndex = 0; } } altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX); timing_set( &nextAltCheck, &now, altCheckInterval ); } } else if ( rttTestResult == RTT_NOT_REACHABLE ) { - mutex_lock( &link->rttLock ); - link->rttTestResult = RTT_IDLE; - mutex_unlock( &link->rttLock ); - discoverFailCount++; - timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); + if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) { + discoverFailCount++; + if ( uplink->image->working && uplink->current.fd == -1 && discoverFailCount > (SERVER_RTT_MAX_UNREACH / 2) ) { + logadd( LOG_DEBUG1, "Disabling %s:%d since no uplink is available", uplink->image->name, (int)uplink->image->rid ); + uplink->image->working = false; + } + if ( uplink->current.fd == -1 ) { + uplink->cycleDetected = false; + } + } + timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH) ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED ); } #ifdef _DEBUG - if ( link->fd != -1 && !link->shutdown ) { + if ( uplink->current.fd != -1 && !uplink->shutdown ) { bool resend = false; ticks deadline; timing_set( &deadline, &now, -10 ); - mutex_lock( &link->queueLock ); - for (i = 0; i < link->queueLen; ++i) { - if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) { + mutex_lock( &uplink->queueLock ); + for (i = 0; i < uplink->queueLen; ++i) { + if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) { snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" - "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, link->queue[i].client->image->name, - link->queue[i].from, link->queue[i].to, link->queue[i].status ); - link->queue[i].entered = now; + "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name, + uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status ); + uplink->queue[i].entered = now; #ifdef _DEBUG_RESEND_STARVING - link->queue[i].status = ULR_NEW; + uplink->queue[i].status = ULR_NEW; resend = true; #endif - mutex_unlock( &link->queueLock ); + mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "%s", buffer ); - mutex_lock( &link->queueLock ); + mutex_lock( &uplink->queueLock ); } } - mutex_unlock( &link->queueLock ); + mutex_unlock( &uplink->queueLock ); if ( resend ) - uplink_sendRequests( link, true ); + uplink_sendRequests( uplink, true ); } #endif } cleanup: ; - altservers_removeUplink( link ); - uplink_saveCacheMap( link ); - mutex_lock( &link->image->lock ); - if ( link->image->uplink == link ) { - link->image->uplink = NULL; - } - mutex_lock( &link->queueLock ); - const int fd = link->fd; - const dnbd3_signal_t* signal = link->signal; - mutex_lock( &link->sendMutex ); - link->fd = -1; - mutex_unlock( &link->sendMutex ); - link->signal = NULL; - if ( !link->shutdown ) { - link->shutdown = true; - thread_detach( link->thread ); - } - // Do not access link->image after unlocking, since we set - // image->uplink to NULL. Acquire with image_lock first, - // like done below when checking whether to re-init uplink - mutex_unlock( &link->image->lock ); - mutex_unlock( &link->queueLock ); - if ( fd != -1 ) close( fd ); - if ( signal != NULL ) signal_close( signal ); - // Wait for the RTT check to finish/fail if it's in progress - while ( link->rttTestResult == RTT_INPROGRESS ) - usleep( 10000 ); - if ( link->betterFd != -1 ) { - close( link->betterFd ); + uplink_saveCacheMap( uplink ); + dnbd3_image_t *image = uplink->image; + mutex_lock( &image->lock ); + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // We set the flag - hold onto image } - mutex_destroy( &link->queueLock ); - mutex_destroy( &link->rttLock ); - mutex_destroy( &link->sendMutex ); - free( link->recvBuffer ); - link->recvBuffer = NULL; - if ( link->cacheFd != -1 ) { - close( link->cacheFd ); + dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref ); + if ( current == uplink ) { // Set NULL if it's still us... + mutex_lock( &uplink->queueLock ); + cancelAllRequests( uplink ); + mutex_unlock( &uplink->queueLock ); + ref_setref( &image->uplinkref, NULL ); } - dnbd3_image_t *image = image_lock( link->image ); - free( link ); // !!! - if ( image != NULL ) { - if ( !_shutdown && image->cache_map != NULL ) { - // Ingegrity checker must have found something in the meantime - uplink_init( image, -1, NULL, 0 ); - } - image_release( image ); + if ( current != NULL ) { // Decrease ref in any case + ref_put( ¤t->reference ); } + mutex_unlock( &image->lock ); + // Finally as the thread is done, decrease our own ref that we initialized with + ref_put( &uplink->reference ); return NULL ; } -static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) +static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) { // Scan for new requests int j; - mutex_lock( &link->queueLock ); - for (j = 0; j < link->queueLen; ++j) { - if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue; - link->queue[j].status = ULR_PENDING; - uint8_t hops = link->queue[j].hopCount; - const uint64_t reqStart = link->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint32_t reqSize = (uint32_t)(((link->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); + mutex_lock( &uplink->queueLock ); + for (j = 0; j < uplink->queueLen; ++j) { + if ( uplink->queue[j].status != ULR_NEW && (newOnly || uplink->queue[j].status != ULR_PENDING) ) continue; + uplink->queue[j].status = ULR_PENDING; + uint8_t hops = uplink->queue[j].hopCount; + const uint64_t reqStart = uplink->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); /* logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", - (void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize ); + (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize ); */ - mutex_unlock( &link->queueLock ); + mutex_unlock( &uplink->queueLock ); if ( hops < 200 ) ++hops; - mutex_lock( &link->sendMutex ); - const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); - mutex_unlock( &link->sendMutex ); + mutex_lock( &uplink->sendMutex ); + const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); + mutex_unlock( &uplink->sendMutex ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection // is reestablished. logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - altservers_serverFailed( &link->currentServer ); + altservers_serverFailed( uplink->current.index ); return; } - mutex_lock( &link->queueLock ); + mutex_lock( &uplink->queueLock ); } - mutex_unlock( &link->queueLock ); + mutex_unlock( &uplink->queueLock ); } /** @@ -627,89 +696,93 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) * the code simpler. Worst case would be only one bit is zero, which means * 4kb are missing, but we will request 32kb. */ -static void uplink_sendReplicationRequest(dnbd3_connection_t *link) +static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) { - if ( link == NULL || link->fd == -1 ) return; - if ( _backgroundReplication == BGR_DISABLED || link->cacheFd == -1 ) return; // Don't do background replication - if ( link->nextReplicationIndex == -1 || link->replicationHandle != REP_NONE ) - return; - dnbd3_image_t * const image = link->image; + if ( uplink == NULL || uplink->current.fd == -1 ) return; + if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication + if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE ) + return; // Already a replication request on the wire, or no more blocks to replicate + dnbd3_image_t * const image = uplink->image; if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return; - mutex_lock( &image->lock ); - if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) { - // No cache map (=image complete), or replication pending, or not enough users, do nothing - mutex_unlock( &image->lock ); + if ( image->users < _bgrMinClients ) return; // Not enough active users + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL || image->users < _bgrMinClients ) { + // No cache map (=image complete) + ref_put( &cache->reference ); return; } const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); const int lastBlockIndex = mapBytes - 1; int endByte; if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks - endByte = link->nextReplicationIndex + mapBytes; + endByte = uplink->nextReplicationIndex + mapBytes; } else { // Hashblock based: Only look for match in current hash block - endByte = ( link->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; + endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; if ( endByte > mapBytes ) { endByte = mapBytes; } } + atomic_thread_fence( memory_order_acquire ); int replicationIndex = -1; - for ( int j = link->nextReplicationIndex; j < endByte; ++j ) { + for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) { const int i = j % ( mapBytes ); // Wrap around for BGR_FULL - if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !link->replicatedLastBlock ) ) { + if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff + && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) { // Found incomplete one replicationIndex = i; break; } } - mutex_unlock( &image->lock ); + ref_put( &cache->reference ); if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { // Nothing left in current block, find next one - replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte ); + replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte ); } if ( replicationIndex == -1 ) { // Replication might be complete, uplink_mainloop should take care.... - link->nextReplicationIndex = -1; + uplink->nextReplicationIndex = -1; return; } const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; - link->replicationHandle = offset; + uplink->replicationHandle = offset; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); - mutex_lock( &link->sendMutex ); - bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ); - mutex_unlock( &link->sendMutex ); + mutex_lock( &uplink->sendMutex ); + bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) ); + mutex_unlock( &uplink->sendMutex ); if ( !sendOk ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); return; } if ( replicationIndex == lastBlockIndex ) { - link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks + uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks } - link->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter + uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter if ( _backgroundReplication == BGR_HASHBLOCK - && link->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { + && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { // Just crossed a hash block boundary, look for new candidate starting at this very index - link->nextReplicationIndex = uplink_findNextIncompleteHashBlock( link, link->nextReplicationIndex ); + uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex ); } } /** - * find next index into cache_map that corresponds to the beginning + * find next index into cache map that corresponds to the beginning * of a hash block which is neither completely empty nor completely * replicated yet. Returns -1 if no match. */ -static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex) +static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex) { int retval = -1; - mutex_lock( &link->image->lock ); - const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize ); - const uint8_t *cache_map = link->image->cache_map; - if ( cache_map != NULL ) { - int j; + dnbd3_cache_map_t *cache = ref_get_cachemap( uplink->image ); + if ( cache != NULL ) { + const int mapBytes = IMGSIZE_TO_MAPBYTES( uplink->image->virtualFilesize ); const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK ); + atomic_thread_fence( memory_order_acquire ); + int j; for (j = 0; j < mapBytes; ++j) { const int i = ( start + j ) % mapBytes; - const bool isFull = cache_map[i] == 0xff || ( i + 1 == mapBytes && link->replicatedLastBlock ); - const bool isEmpty = cache_map[i] == 0; + const uint8_t b = atomic_load_explicit( &cache->map[i], memory_order_relaxed ); + const bool isFull = b == 0xff || ( i + 1 == mapBytes && uplink->replicatedLastBlock ); + const bool isEmpty = b == 0; if ( !isEmpty && !isFull ) { // Neither full nor empty, replicate if ( retval == -1 ) { @@ -736,49 +809,49 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const in retval = -1; } } - mutex_unlock( &link->image->lock ); + ref_put( &cache->reference ); return retval; } /** * Receive data from uplink server and process/dispatch - * Locks on: link.lock, images[].lock + * Locks on: uplink.lock, images[].lock */ -static void uplink_handleReceive(dnbd3_connection_t *link) +static void uplink_handleReceive(dnbd3_uplink_t *uplink) { dnbd3_reply_t inReply, outReply; int ret, i; for (;;) { - ret = dnbd3_read_reply( link->fd, &inReply, false ); - if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue; + ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); + if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; if ( ret == REPLY_AGAIN ) break; if ( unlikely( ret == REPLY_CLOSED ) ) { - logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path ); + logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", uplink->image->path ); goto error_cleanup; } if ( unlikely( ret == REPLY_WRONGMAGIC ) ) { - logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); + logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", uplink->image->path ); goto error_cleanup; } if ( unlikely( ret != REPLY_OK ) ) { - logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path ); + logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, uplink->image->path ); goto error_cleanup; } if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { - logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path ); + logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, uplink->image->path ); goto error_cleanup; } - if ( unlikely( link->recvBufferLen < inReply.size ) ) { - link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); - link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); - if ( link->recvBuffer == NULL ) { + if ( unlikely( uplink->recvBufferLen < inReply.size ) ) { + uplink->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); + uplink->recvBuffer = realloc( uplink->recvBuffer, uplink->recvBufferLen ); + if ( uplink->recvBuffer == NULL ) { logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" ); exit( 1 ); } } - if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) { - logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path ); + if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) { + logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path ); goto error_cleanup; } // Payload read completely @@ -789,21 +862,21 @@ static void uplink_handleReceive(dnbd3_connection_t *link) const uint64_t start = inReply.handle; const uint64_t end = inReply.handle + inReply.size; totalBytesReceived += inReply.size; - link->bytesReceived += inReply.size; + uplink->bytesReceived += inReply.size; // 1) Write to cache file - if ( unlikely( link->cacheFd == -1 ) ) { - uplink_reopenCacheFd( link, false ); + if ( unlikely( uplink->cacheFd == -1 ) ) { + uplink_reopenCacheFd( uplink, false ); } - if ( likely( link->cacheFd != -1 ) ) { + if ( likely( uplink->cacheFd != -1 ) ) { int err = 0; bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid uint32_t done = 0; ret = 0; while ( done < inReply.size ) { - ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done ); + ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done ); if ( unlikely( ret == -1 ) ) { err = errno; - if ( err == EINTR ) continue; + if ( err == EINTR && !_shutdown ) continue; if ( err == ENOSPC || err == EDQUOT ) { // try to free 256MiB if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break; @@ -811,32 +884,37 @@ static void uplink_handleReceive(dnbd3_connection_t *link) continue; // Success, retry write } if ( err == EBADF || err == EINVAL || err == EIO ) { - if ( !tryAgain || !uplink_reopenCacheFd( link, true ) ) + if ( !tryAgain || !uplink_reopenCacheFd( uplink, true ) ) break; tryAgain = false; continue; // Write handle to image successfully re-opened, try again } - logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", link->image->name, (int)link->image->rid, err ); + logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", uplink->image->name, (int)uplink->image->rid, err ); break; } if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) { - logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, link->image->name, (int)link->image->rid ); + logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, uplink->image->name, (int)uplink->image->rid ); break; } done += (uint32_t)ret; } if ( likely( done > 0 ) ) { - image_updateCachemap( link->image, start, start + done, true ); + image_updateCachemap( uplink->image, start, start + done, true ); } if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", - link->image->name, (int)link->image->rid, err ); + uplink->image->name, (int)uplink->image->rid, err ); } } // 2) Figure out which clients are interested in it - mutex_lock( &link->queueLock ); - for (i = 0; i < link->queueLen; ++i) { - dnbd3_queued_request_t * const req = &link->queue[i]; + // Mark as ULR_PROCESSING, since we unlock repeatedly in the second loop + // below; this prevents uplink_request() from attaching to this request + // by populating a slot with index greater than the highest matching + // request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW + // where it's fine if the index is greater) + mutex_lock( &uplink->queueLock ); + for (i = 0; i < uplink->queueLen; ++i) { + dnbd3_queued_request_t * const req = &uplink->queue[i]; assert( req->status != ULR_PROCESSING ); if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue; assert( req->client != NULL ); @@ -849,8 +927,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link) // from 0, you also need to change the "attach to existing request"-logic in uplink_request() outReply.magic = dnbd3_packet_magic; bool served = false; - for ( i = link->queueLen - 1; i >= 0; --i ) { - dnbd3_queued_request_t * const req = &link->queue[i]; + for ( i = uplink->queueLen - 1; i >= 0; --i ) { + dnbd3_queued_request_t * const req = &uplink->queue[i]; if ( req->status == ULR_PROCESSING ) { size_t bytesSent = 0; assert( req->from >= start && req->to <= end ); @@ -860,84 +938,90 @@ static void uplink_handleReceive(dnbd3_connection_t *link) outReply.size = (uint32_t)( req->to - req->from ); iov[0].iov_base = &outReply; iov[0].iov_len = sizeof outReply; - iov[1].iov_base = link->recvBuffer + (req->from - start); + iov[1].iov_base = uplink->recvBuffer + (req->from - start); iov[1].iov_len = outReply.size; fixup_reply( outReply ); req->status = ULR_FREE; req->client = NULL; served = true; mutex_lock( &client->sendMutex ); - mutex_unlock( &link->queueLock ); + mutex_unlock( &uplink->queueLock ); if ( client->sock != -1 ) { ssize_t sent = writev( client->sock, iov, 2 ); if ( sent > (ssize_t)sizeof outReply ) { bytesSent = (size_t)sent - sizeof outReply; } } - mutex_unlock( &client->sendMutex ); if ( bytesSent != 0 ) { client->bytesSent += bytesSent; } - mutex_lock( &link->queueLock ); + mutex_unlock( &client->sendMutex ); + mutex_lock( &uplink->queueLock ); + if ( i > uplink->queueLen ) { + i = uplink->queueLen; // Might have been set to 0 by cancelAllRequests + } } - if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; + if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--; } - mutex_unlock( &link->queueLock ); + mutex_unlock( &uplink->queueLock ); #ifdef _DEBUG - if ( !served && start != link->replicationHandle ) { - logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); + if ( !served && start != uplink->replicationHandle ) { + logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end ); } #endif - if ( start == link->replicationHandle ) { + if ( start == uplink->replicationHandle ) { // Was our background replication - link->replicationHandle = REP_NONE; + uplink->replicationHandle = REP_NONE; // Try to remove from fs cache if no client was interested in this data - if ( !served && link->cacheFd != -1 ) { - posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); + if ( !served && uplink->cacheFd != -1 ) { + posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); } } if ( served ) { // Was some client -- reset idle counter - link->idleTime = 0; + uplink->idleTime = 0; // Re-enable replication if disabled - if ( link->nextReplicationIndex == -1 ) { - link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; + if ( uplink->nextReplicationIndex == -1 ) { + uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; } } } - if ( link->replicationHandle == REP_NONE ) { - mutex_lock( &link->queueLock ); - const bool rep = ( link->queueLen == 0 ); - mutex_unlock( &link->queueLock ); - if ( rep ) uplink_sendReplicationRequest( link ); + if ( uplink->replicationHandle == REP_NONE ) { + mutex_lock( &uplink->queueLock ); + const bool rep = ( uplink->queueLen == 0 ); + mutex_unlock( &uplink->queueLock ); + if ( rep ) uplink_sendReplicationRequest( uplink ); } return; // Error handling from failed receive or message parsing error_cleanup: ; - uplink_connectionFailed( link, true ); + uplink_connectionFailed( uplink, true ); } -static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) +/** + * Only call from uplink thread + */ +static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { - if ( link->fd == -1 ) + if ( uplink->current.fd == -1 ) return; - altservers_serverFailed( &link->currentServer ); - mutex_lock( &link->sendMutex ); - close( link->fd ); - link->fd = -1; - mutex_unlock( &link->sendMutex ); - link->replicationHandle = REP_NONE; - if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { - link->nextReplicationIndex = 0; + altservers_serverFailed( uplink->current.index ); + mutex_lock( &uplink->sendMutex ); + close( uplink->current.fd ); + uplink->current.fd = -1; + mutex_unlock( &uplink->sendMutex ); + uplink->replicationHandle = REP_NONE; + if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { + uplink->nextReplicationIndex = 0; } if ( !findNew ) return; - mutex_lock( &link->rttLock ); - bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1; - mutex_unlock( &link->rttLock ); + mutex_lock( &uplink->rttLock ); + bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1; + mutex_unlock( &uplink->rttLock ); if ( bail ) return; - altservers_findUplink( link ); + altservers_findUplinkAsync( uplink ); } /** @@ -945,16 +1029,11 @@ static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) */ static int uplink_sendKeepalive(const int fd) { - static dnbd3_request_t request = { 0 }; - if ( request.magic == 0 ) { - request.magic = dnbd3_packet_magic; - request.cmd = CMD_KEEPALIVE; - fixup_request( request ); - } + static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) }; return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); } -static void uplink_addCrc32(dnbd3_connection_t *uplink) +static void uplink_addCrc32(dnbd3_uplink_t *uplink) { dnbd3_image_t *image = uplink->image; if ( image == NULL || image->virtualFilesize == 0 ) return; @@ -962,7 +1041,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ); + bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes ); mutex_unlock( &uplink->sendMutex ); if ( !sendOk || bytes == 0 ) { free( buffer ); @@ -997,14 +1076,14 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) * it will be closed first. Otherwise, nothing will happen and true will be returned * immediately. */ -static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force) +static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force) { - if ( link->cacheFd != -1 ) { + if ( uplink->cacheFd != -1 ) { if ( !force ) return true; - close( link->cacheFd ); + close( uplink->cacheFd ); } - link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT, 0644 ); - return link->cacheFd != -1; + uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 ); + return uplink->cacheFd != -1; } /** @@ -1012,16 +1091,16 @@ static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force) * Return true on success. * Locks on: imageListLock, image.lock */ -static bool uplink_saveCacheMap(dnbd3_connection_t *link) +static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink) { - dnbd3_image_t *image = link->image; + dnbd3_image_t *image = uplink->image; assert( image != NULL ); - if ( link->cacheFd != -1 ) { - if ( fsync( link->cacheFd ) == -1 ) { + if ( uplink->cacheFd != -1 ) { + if ( fsync( uplink->cacheFd ) == -1 ) { // A failing fsync means we have no guarantee that any data // since the last fsync (or open if none) has been saved. Apart - // from keeping the cache_map from the last successful fsync + // from keeping the cache map from the last successful fsync // around and restoring it there isn't much we can do to recover // a consistent state. Bail out. logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); @@ -1030,21 +1109,11 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link) } } - if ( image->cache_map == NULL ) return true; - logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); - mutex_lock( &image->lock ); - // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to - // figure out that this image's cache copy is complete - if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) { - mutex_unlock( &image->lock ); + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL ) return true; - } + logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); - uint8_t *map = malloc( size ); - memcpy( map, image->cache_map, size ); - // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image, - // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O - mutex_unlock( &image->lock ); assert( image->path != NULL ); char mapfile[strlen( image->path ) + 4 + 1]; strcpy( mapfile, image->path ); @@ -1053,14 +1122,14 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link) int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); if ( fd == -1 ) { const int err = errno; - free( map ); + ref_put( &cache->reference ); logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); return false; } size_t done = 0; while ( done < size ) { - const ssize_t ret = write( fd, map, size - done ); + const ssize_t ret = write( fd, cache->map + done, size - done ); if ( ret == -1 ) { if ( errno == EINTR ) continue; logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); @@ -1072,16 +1141,27 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link) } done += (size_t)ret; } + ref_put( &cache->reference ); if ( fsync( fd ) == -1 ) { logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); } close( fd ); - free( map ); return true; } -static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link) +static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink) { - return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT && _backgroundReplication != BGR_FULL ); + return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT + && ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) ); } +bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) +{ + int current; + mutex_lock( &uplink->rttLock ); + current = uplink->current.fd == -1 ? -1 : uplink->current.index; + mutex_unlock( &uplink->rttLock ); + if ( current == -1 ) + return false; + return altservers_toString( current, buffer, len ); +} diff --git a/src/server/uplink.h b/src/server/uplink.h index 2b41dfc..49ff0b4 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -10,10 +10,12 @@ uint64_t uplink_getTotalBytesReceived(); bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version); -void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client); +void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client); bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount); -void uplink_shutdown(dnbd3_image_t *image); +bool uplink_shutdown(dnbd3_image_t *image); + +bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len); #endif /* UPLINK_H_ */ diff --git a/src/serverconfig.h b/src/serverconfig.h index 0cbb320..239f0a2 100644 --- a/src/serverconfig.h +++ b/src/serverconfig.h @@ -6,10 +6,12 @@ // +++++ Performance/memory related #define SERVER_MAX_CLIENTS 4000 #define SERVER_MAX_IMAGES 5000 -#define SERVER_MAX_ALTS 100 +#define SERVER_MAX_ALTS 50 // +++++ Uplink handling (proxy mode) -#define SERVER_UPLINK_FAIL_INCREASE 5 // On server failure, increase numFails by this value -#define SERVER_BAD_UPLINK_THRES 40 // Thresold for numFails at which we ignore a server for the time span below +#define SERVER_GLOBAL_DUP_TIME 6 // How many seconds to wait before changing global fail counter again +#define SERVER_BAD_UPLINK_MIN 10 // Thresold for fails at which we start ignoring the server occasionally +#define SERVER_BAD_UPLINK_MAX 20 // Hard block server if it failed this many times +#define SERVER_BAD_UPLINK_LOCAL_BLOCK 10 // If a server didn't supply the requested image this many times, block it for some time #define SERVER_BAD_UPLINK_IGNORE 180 // How many seconds is a server ignored #define SERVER_MAX_UPLINK_QUEUE 1500 // Maximum number of queued requests per uplink #define SERVER_UPLINK_QUEUELEN_THRES 900 // Threshold where we start dropping incoming clients @@ -33,7 +35,7 @@ #define SERVER_RTT_PROBES 5 // How many probes to average over #define SERVER_RTT_INTERVAL_INIT 5 // Initial interval between probes #define SERVER_RTT_INTERVAL_MAX 45 // Maximum interval between probes -#define SERVER_RTT_BACKOFF_COUNT 5 // If we can't reach any uplink server this many times, consider the uplink bad +#define SERVER_RTT_MAX_UNREACH 10 // If no server was reachable this many times, stop RTT measurements for a while #define SERVER_RTT_INTERVAL_FAILED 180 // Interval to use if no uplink server is reachable for above many times #define SERVER_REMOTE_IMAGE_CHECK_CACHETIME 120 // 2 minutes diff --git a/src/shared/fdsignal.c b/src/shared/fdsignal.c index 5e5cf7f..087b6f1 100644 --- a/src/shared/fdsignal.c +++ b/src/shared/fdsignal.c @@ -1,6 +1,6 @@ #include "fdsignal.h" -#if defined(linux) || defined(__linux) || defined(__linux__) +#if defined(__linux__) //#warning "Using eventfd based signalling" #include "fdsignal.inc/eventfd.c" #elif __SIZEOF_INT__ == 4 && __SIZEOF_POINTER__ == 8 diff --git a/src/shared/protocol.h b/src/shared/protocol.h index 92dbe11..2b21c21 100644 --- a/src/shared/protocol.h +++ b/src/shared/protocol.h @@ -20,7 +20,7 @@ #define COND_HOPCOUNT(vers,hopcount) ( (vers) >= 3 ? (hopcount) : 0 ) // 2017-11-02: Macro to set flags in select image message properly if we're a server, as BG_REP depends on global var -#define SI_SERVER_FLAGS ( (_pretendClient ? 0 : FLAGS8_SERVER) | (_backgroundReplication == BGR_FULL ? FLAGS8_BG_REP : 0) ) +#define SI_SERVER_FLAGS ( (uint8_t)( (_pretendClient ? 0 : FLAGS8_SERVER) | (_backgroundReplication == BGR_FULL ? FLAGS8_BG_REP : 0) ) ) #define REPLY_OK (0) #define REPLY_ERRNO (-1) diff --git a/src/shared/sockhelper.c b/src/shared/sockhelper.c index ab34aa1..ec80659 100644 --- a/src/shared/sockhelper.c +++ b/src/shared/sockhelper.c @@ -46,6 +46,7 @@ int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const in #endif else { logadd( LOG_DEBUG1, "Unsupported address type: %d\n", (int)addr->type ); + errno = EAFNOSUPPORT; return -1; } int client_sock = socket( proto, SOCK_STREAM, IPPROTO_TCP ); @@ -56,8 +57,10 @@ int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const in } else { sock_setTimeout( client_sock, connect_ms ); } + int e2; for ( int i = 0; i < 5; ++i ) { int ret = connect( client_sock, (struct sockaddr *)&ss, addrlen ); + e2 = errno; if ( ret != -1 || errno == EINPROGRESS || errno == EISCONN ) break; if ( errno == EINTR ) { // http://www.madore.org/~david/computers/connect-intr.html @@ -67,21 +70,26 @@ int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const in struct pollfd unix_really_sucks = { .fd = client_sock, .events = POLLOUT | POLLIN }; while ( i-- > 0 ) { int pr = poll( &unix_really_sucks, 1, connect_ms == 0 ? -1 : connect_ms ); + e2 = errno; if ( pr == 1 && ( unix_really_sucks.revents & POLLOUT ) ) break; if ( pr == -1 && errno == EINTR ) continue; close( client_sock ); + errno = e2; return -1; } sockaddr_storage junk; socklen_t more_junk = sizeof(junk); if ( getpeername( client_sock, (struct sockaddr*)&junk, &more_junk ) == -1 ) { + e2 = errno; close( client_sock ); + errno = e2; return -1; } break; #endif } // EINTR close( client_sock ); + errno = e2; return -1; } if ( connect_ms != -1 && connect_ms != rw_ms ) { diff --git a/src/shared/timing.h b/src/shared/timing.h index f3d8802..f23bfeb 100644 --- a/src/shared/timing.h +++ b/src/shared/timing.h @@ -22,7 +22,7 @@ extern struct timespec basetime; /** * Assign src to dst while adding secs seconds. */ -#define timing_set(dst,src,secs) do { (dst)->tv_sec = (src)->tv_sec + secs; (dst)->tv_nsec = (src)->tv_nsec; } while (0) +#define timing_set(dst,src,secs) do { (dst)->tv_sec = (src)->tv_sec + (secs); (dst)->tv_nsec = (src)->tv_nsec; } while (0) /** * Define variable now, initialize to timing_get. diff --git a/src/types.h b/src/types.h index ec37d9b..cb0ccfd 100644 --- a/src/types.h +++ b/src/types.h @@ -77,7 +77,7 @@ #define IOCTL_REM_SRV _IO(0xab, 5) #if defined(__BIG_ENDIAN__) || (defined(__BYTE_ORDER) && defined(__BIG_ENDIAN) && __BYTE_ORDER == __BIG_ENDIAN) || (defined(__BYTE_ORDER__) && defined(__ORDER_BIG_ENDIAN__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__) -static const uint16_t dnbd3_packet_magic = (0x73 << 8) | (0x72); +#define dnbd3_packet_magic ((uint16_t)( (0x73 << 8) | (0x72) )) // Flip bytes around on big endian when putting stuff on the net #define net_order_64(a) ((uint64_t)((((a) & 0xFFull) << 56) | (((a) & 0xFF00ull) << 40) | (((a) & 0xFF0000ull) << 24) | (((a) & 0xFF000000ull) << 8) | (((a) & 0xFF00000000ull) >> 8) | (((a) & 0xFF0000000000ull) >> 24) | (((a) & 0xFF000000000000ull) >> 40) | (((a) & 0xFF00000000000000ull) >> 56))) #define net_order_32(a) ((uint32_t)((((a) & (uint32_t)0xFF) << 24) | (((a) & (uint32_t)0xFF00) << 8) | (((a) & (uint32_t)0xFF0000) >> 8) | (((a) & (uint32_t)0xFF000000) >> 24))) @@ -96,7 +96,7 @@ static const uint16_t dnbd3_packet_magic = (0x73 << 8) | (0x72); #define BIG_ENDIAN #endif #elif defined(__LITTLE_ENDIAN__) || (defined(__BYTE_ORDER) && defined(__LITTLE_ENDIAN) && __BYTE_ORDER == __LITTLE_ENDIAN) || (defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__) || defined(__i386__) || defined(__i386) || defined(__x86_64) -static const uint16_t dnbd3_packet_magic = (0x73) | (0x72 << 8); +#define dnbd3_packet_magic ((uint16_t)( (0x73) | (0x72 << 8) )) // Make little endian our network byte order as probably 99.999% of machines this will be used on are LE #define net_order_64(a) (a) #define net_order_32(a) (a) @@ -117,17 +117,14 @@ static const dnbd3_af HOST_NONE = (dnbd3_af)0; static const dnbd3_af HOST_IP4 = (dnbd3_af)2; static const dnbd3_af HOST_IP6 = (dnbd3_af)10; -#pragma pack(1) -typedef struct dnbd3_host_t +typedef struct __attribute__((packed)) dnbd3_host_t { uint8_t addr[16]; // 16byte (network representation, so it can be directly passed to socket functions) uint16_t port; // 2byte (network representation, so it can be directly passed to socket functions) dnbd3_af type; // 1byte (ip version. HOST_IP4 or HOST_IP6. 0 means this struct is empty and should be ignored) } dnbd3_host_t; -#pragma pack(0) -#pragma pack(1) -typedef struct +typedef struct __attribute__((packed)) { uint16_t len; dnbd3_host_t host; @@ -137,7 +134,6 @@ typedef struct int read_ahead_kb; uint8_t use_server_provided_alts; } dnbd3_ioctl_t; -#pragma pack(0) // network #define CMD_GET_BLOCK 1 @@ -150,8 +146,7 @@ typedef struct #define CMD_GET_CRC32 8 #define DNBD3_REQUEST_SIZE 24 -#pragma pack(1) -typedef struct +typedef struct __attribute__((packed)) { uint16_t magic; // 2byte uint16_t cmd; // 2byte @@ -170,27 +165,22 @@ typedef struct }; uint64_t handle; // 8byte } dnbd3_request_t; -#pragma pack(0) _Static_assert( sizeof(dnbd3_request_t) == DNBD3_REQUEST_SIZE, "dnbd3_request_t is messed up" ); #define DNBD3_REPLY_SIZE 16 -#pragma pack(1) -typedef struct +typedef struct __attribute__((packed)) { uint16_t magic; // 2byte uint16_t cmd; // 2byte uint32_t size; // 4byte uint64_t handle; // 8byte } dnbd3_reply_t; -#pragma pack(0) _Static_assert( sizeof(dnbd3_reply_t) == DNBD3_REPLY_SIZE, "dnbd3_reply_t is messed up" ); -#pragma pack(1) -typedef struct +typedef struct __attribute__((packed)) { dnbd3_host_t host; uint8_t failures; // 1byte (number of times server has been consecutively unreachable) } dnbd3_server_entry_t; -#pragma pack(0) #endif /* TYPES_H_ */ |