diff options
-rw-r--r-- | src/server/altservers.c | 26 | ||||
-rw-r--r-- | src/server/globals.h | 5 | ||||
-rw-r--r-- | src/server/image.c | 289 | ||||
-rw-r--r-- | src/server/image.h | 6 | ||||
-rw-r--r-- | src/server/integrity.c | 88 | ||||
-rw-r--r-- | src/server/net.c | 6 | ||||
-rw-r--r-- | src/server/server.c | 2 | ||||
-rw-r--r-- | src/server/uplink.c | 217 | ||||
-rw-r--r-- | src/shared/fdsignal.inc/eventfd.c | 2 |
9 files changed, 351 insertions, 290 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c index 4a2b8d5..575b849 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -18,7 +18,6 @@ static dnbd3_signal_t* runSignal = NULL; static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; static int numAltServers = 0; static pthread_spinlock_t altServersLock; -static bool initDone = false; static pthread_t altThread; @@ -31,6 +30,12 @@ void altservers_init() // Init spinlock spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE ); spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE ); + // Init signal + runSignal = signal_new(); + if ( runSignal == NULL ) { + logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." ); + exit( EXIT_FAILURE ); + } memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { logadd( LOG_ERROR, "Could not start altservers connector thread" ); @@ -44,12 +49,11 @@ void altservers_init() pending[i] = NULL; } spin_unlock( &pendingLockWrite ); - initDone = true; } void altservers_shutdown() { - if ( !initDone ) return; + if ( runSignal == NULL ) return; signal_call( runSignal ); // Wake altservers thread up thread_join( altThread, NULL ); } @@ -407,18 +411,11 @@ static void *altservers_main(void *data UNUSED) dnbd3_host_t servers[ALTS + 1]; serialized_buffer_t serialized; struct timespec start, end; - ticks nextCacheMapSave, nextCloseUnusedFd; + ticks nextCloseUnusedFd; setThreadName( "altserver-check" ); blockNoncriticalSignals(); - timing_gets( &nextCacheMapSave, 90 ); timing_gets( &nextCloseUnusedFd, 900 ); - // Init signal - runSignal = signal_new(); - if ( runSignal == NULL ) { - logadd( LOG_WARNING, "error creating signal object. Uplink feature unavailable." ); - goto cleanup; - } // LOOP while ( !_shutdown ) { // Wait 5 seconds max. @@ -598,13 +595,8 @@ static void *altservers_main(void *data UNUSED) pthread_mutex_unlock( &pendingLockConsume ); } // Save cache maps of all images if applicable - // TODO: Has nothing to do with alt servers really, maybe move somewhere else? declare_now; - if ( timing_reached( &nextCacheMapSave, &now ) ) { - timing_gets( &nextCacheMapSave, SERVER_CACHE_MAP_SAVE_INTERVAL ); - image_saveAllCacheMaps(); - } - // TODO: More random crap + // TODO: Has nothing to do with alt servers really, maybe move somewhere else? if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) { timing_gets( &nextCloseUnusedFd, 900 ); image_closeUnusedFd(); diff --git a/src/server/globals.h b/src/server/globals.h index 2e39cb8..2fd1af2 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -57,9 +57,10 @@ struct _dnbd3_connection dnbd3_host_t currentServer; // Current server we're connected to pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer int rttTestResult; // RTT_* - dnbd3_host_t betterServer; // The better server + int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! int betterVersion; // protocol version of better server int betterFd; // Active connection to better server, ready to use + dnbd3_host_t betterServer; // The better server uint8_t *recvBuffer; // Buffer for receiving payload uint32_t recvBufferLen; // Len of ^^ volatile bool shutdown; // signal this thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop() @@ -70,6 +71,7 @@ struct _dnbd3_connection uint64_t bytesReceived; // Number of bytes received by the connection. uint64_t lastBytesReceived; // Number of bytes received last time we updated the global counter. int queueLen; // length of queue + int idleCount; // How many iterations of keepalive check connection was idle dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; }; @@ -112,7 +114,6 @@ struct _dnbd3_image uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image uint32_t masterCrc32; // CRC-32 of the crc-32 list int readFd; // used to read the image. Used from multiple threads, so use atomic operations (pread et al) - int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! int completenessEstimate; // Completeness estimate in percent int users; // clients currently using this image int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server diff --git a/src/server/image.c b/src/server/image.c index 02a383f..673a269 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -20,12 +20,6 @@ #define PATHLEN (2000) #define NONWORKING_RECHECK_INTERVAL_SECONDS (60) -#ifdef HAVE_FDATASYNC -#define dnbd3_fdatasync fdatasync -#else -#define dnbd3_fdatasync fsync -#endif - // ########################################## static dnbd3_image_t *_images[SERVER_MAX_IMAGES]; @@ -49,7 +43,6 @@ static imagecache remoteCloneCache[CACHELEN]; static bool isForbiddenExtension(const char* name); static dnbd3_image_t* image_remove(dnbd3_image_t *image); static dnbd3_image_t* image_free(dnbd3_image_t *image); -static bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); static bool image_load_all_internal(char *base, char *path); static bool image_addToList(dnbd3_image_t *image); static bool image_load(char *base, char *path, int withUplink); @@ -70,43 +63,6 @@ void image_serverStartup() } /** - * Returns true if the given image is complete. - * DOES NOT LOCK - */ -bool image_isComplete(dnbd3_image_t *image) -{ - assert( image != NULL ); - if ( image->working && image->cache_map == NULL ) { - return true; - } - if ( image->virtualFilesize == 0 ) { - return false; - } - bool complete = true; - int j; - const int map_len_bytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); - for (j = 0; j < map_len_bytes - 1; ++j) { - if ( image->cache_map[j] != 0xFF ) { - complete = false; - break; - } - } - if ( complete ) // Every block except the last one is complete - { // Last one might need extra treatment if it's not a full byte - const int blocks_in_last_byte = (image->virtualFilesize >> 12) & 7; - uint8_t last_byte = 0; - if ( blocks_in_last_byte == 0 ) { - last_byte = 0xFF; - } else { - for (j = 0; j < blocks_in_last_byte; ++j) - last_byte |= (uint8_t)(1 << j); - } - complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte); - } - return complete; -} - -/** * Update cache-map of given image for the given byte range * start (inclusive) - end (exclusive) * Locks on: images[].lock @@ -125,29 +81,36 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co start &= ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); end = (uint64_t)(end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); } - bool dirty = false; + bool setNewBlocks = false; uint64_t pos = start; spin_lock( &image->lock ); if ( image->cache_map == NULL ) { // Image seems already complete - spin_unlock( &image->lock ); - logadd( LOG_DEBUG1, "image_updateCachemap with no cache_map: %s", image->path ); - return; + if ( set ) { + // This makes no sense + spin_unlock( &image->lock ); + logadd( LOG_DEBUG1, "image_updateCachemap(true) with no cache_map: %s", image->path ); + return; + } + // Recreate a cache map, set it to all 1 initially as we assume the image was complete + const int byteSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + image->cache_map = malloc( byteSize ); + memset( image->cache_map, 0xff, byteSize ); } while ( pos < end ) { const size_t map_y = (int)( pos >> 15 ); const int map_x = (int)( (pos >> 12) & 7 ); // mod 8 const int bit_mask = 1 << map_x; if ( set ) { - if ( (image->cache_map[map_y] & bit_mask) == 0 ) dirty = true; + if ( (image->cache_map[map_y] & bit_mask) == 0 ) setNewBlocks = true; image->cache_map[map_y] |= (uint8_t)bit_mask; } else { image->cache_map[map_y] &= (uint8_t)~bit_mask; } pos += DNBD3_BLOCK_SIZE; } - if ( dirty && image->crc32 != NULL ) { - // If dirty is set, at least one of the blocks was not cached before, so queue all hash blocks + if ( setNewBlocks && image->crc32 != NULL ) { + // If setNewBlocks is set, at least one of the blocks was not cached before, so queue all hash blocks // for checking, even though this might lead to checking some hash block again, if it was // already complete and the block range spanned at least two hash blocks. // First set start and end to borders of hash blocks @@ -157,7 +120,7 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co while ( pos < end ) { if ( image->cache_map == NULL ) break; const int block = (int)( pos / HASH_BLOCK_SIZE ); - if ( image_isHashBlockComplete( image->cache_map, block, image->virtualFilesize ) ) { + if ( image_isHashBlockComplete( image->cache_map, block, image->realFilesize ) ) { spin_unlock( &image->lock ); integrity_check( image, block ); spin_lock( &image->lock ); @@ -169,112 +132,54 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co } /** - * Mark image as complete by freeing the cache_map and deleting the map file on disk + * Returns true if the given image is complete. + * Also frees cache_map and deletes it on disk + * if it hasn't been complete before * Locks on: image.lock */ -void image_markComplete(dnbd3_image_t *image) +bool image_isComplete(dnbd3_image_t *image) { - char mapfile[PATHLEN] = ""; assert( image != NULL ); spin_lock( &image->lock ); - if ( image->cache_map != NULL ) { - free( image->cache_map ); - image->cache_map = NULL; - snprintf( mapfile, PATHLEN, "%s.map", image->path ); - } - spin_unlock( &image->lock ); - if ( mapfile[0] != '\0' ) { - remove( mapfile ); - } -} - -/** - * Save cache map of every image - */ -void image_saveAllCacheMaps() -{ - spin_lock( &imageListLock ); - for (int i = 0; i < _num_images; ++i) { - if ( _images[i] == NULL ) continue; - dnbd3_image_t * const image = _images[i]; - spin_lock( &image->lock ); - image->users++; - spin_unlock( &imageListLock ); - spin_unlock( &image->lock ); - image_saveCacheMap( image ); - spin_lock( &imageListLock ); - spin_lock( &image->lock ); - image->users--; + if ( image->virtualFilesize == 0 ) { spin_unlock( &image->lock ); + return false; } - spin_unlock( &imageListLock ); -} - -/** - * Saves the cache map of the given image. - * Return true on success. - * Locks on: imageListLock, image.lock - */ -bool image_saveCacheMap(dnbd3_image_t *image) -{ - if ( image == NULL || image->cache_map == NULL ) return true; - image = image_lock( image ); // Again after locking: - if ( image == NULL ) return true; - spin_lock( &image->lock ); - // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to - // figure out that this image's cache copy is complete - if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) { + if ( image->cache_map == NULL ) { spin_unlock( &image->lock ); - image_release( image ); return true; } - const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); - uint8_t *map = malloc( size ); - memcpy( map, image->cache_map, size ); - // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image, - // cacheFd is written to and we don't want to hold a spinlock during I/O - spin_unlock( &image->lock ); - assert( image->path != NULL ); - char mapfile[strlen( image->path ) + 4 + 1]; - strcpy( mapfile, image->path ); - strcat( mapfile, ".map" ); - - int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); - if ( fd < 0 ) { - const int err = errno; - image_release( image ); - free( map ); - logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); - return false; - } - - size_t done = 0; - while ( done < size ) { - const ssize_t ret = write( fd, map, size - done ); - if ( ret == -1 ) { - if ( errno == EINTR ) continue; - logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); - break; - } - if ( ret <= 0 ) { - logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); + bool complete = true; + int j; + const int map_len_bytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + for (j = 0; j < map_len_bytes - 1; ++j) { + if ( image->cache_map[j] != 0xFF ) { + complete = false; break; } - done += (size_t)ret; } - if ( image->cacheFd != -1 ) { - if ( dnbd3_fdatasync( image->cacheFd ) == -1 ) { - logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); - logadd( LOG_ERROR, "Bailing out immediately" ); - exit( 1 ); + if ( complete ) { // Every block except the last one is complete + // Last one might need extra treatment if it's not a full byte + const int blocks_in_last_byte = (image->virtualFilesize >> 12) & 7; + uint8_t last_byte = 0; + if ( blocks_in_last_byte == 0 ) { + last_byte = 0xFF; + } else { + for (j = 0; j < blocks_in_last_byte; ++j) + last_byte |= (uint8_t)(1 << j); } + complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte); } - if ( dnbd3_fdatasync( fd ) == -1 ) { - logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); + if ( !complete ) { + spin_unlock( &image->lock ); + return false; } - image_release( image ); // Release only after both hit the disk - close( fd ); - free( map ); + char mapfile[PATHLEN] = ""; + free( image->cache_map ); + image->cache_map = NULL; + snprintf( mapfile, PATHLEN, "%s.map", image->path ); + spin_unlock( &image->lock ); + unlink( mapfile ); return true; } @@ -435,7 +340,6 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) img->atime = now; img->masterCrc32 = candidate->masterCrc32; img->readFd = -1; - img->cacheFd = -1; img->rid = candidate->rid; img->users = 1; img->working = false; @@ -467,15 +371,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) // Check if image is incomplete, handle if ( candidate->cache_map != NULL ) { - // -- Incomplete - rw check - if ( candidate->cacheFd == -1 ) { // Make sure file is open for writing - image_reopenCacheFd( candidate, false ); - // It might have failed - still offer proxy mode, we just can't cache - if ( candidate->cacheFd == -1 ) { - logadd( LOG_WARNING, "Cannot re-open %s for writing - replication disabled", candidate->path ); - } - } - if ( candidate->uplink == NULL && candidate->cacheFd != -1 ) { + if ( candidate->uplink == NULL ) { uplink_init( candidate, -1, NULL, -1 ); } } @@ -484,49 +380,6 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) } /** - * Open the given image's main image file in - * rw mode, assigning it to the cacheFd struct member. - * - * @param force If cacheFd was previously assigned a file descriptor (not == -1), - * it will be closed first. Otherwise, nothing will happen and true will be returned - * immediately. - */ -bool image_reopenCacheFd(dnbd3_image_t *image, const bool force) -{ - if ( image->cacheFd != -1 && !force ) - return true; - const int nfd = open( image->path, O_RDWR ); - if ( nfd == -1 ) { - if ( force ) { - // Opening new one failed, force is enabled -- close old one - spin_lock( &image->lock ); - int ofd = image->cacheFd; - image->cacheFd = -1; - spin_unlock( &image->lock ); - if ( ofd != -1 ) { - close( ofd ); - } - } - return false; - } - // Open succeeded, now switch out while holding lock to make it look somewhat atomic - spin_lock( &image->lock ); - int closeFd = -1; - if ( force || image->cacheFd == -1 ) { - closeFd = image->cacheFd; - image->cacheFd = nfd; - } else { - closeFd = nfd; - } - spin_unlock( &image->lock ); - if ( closeFd != -1 ) { // Failed - close( closeFd ); - } - return true; // We either replaced the fd in force mode or the old one was -1, or - // force was false and cacheFd != -1. Either way we consider that success -} - -/** * Lock the image by increasing its users count * Returns the image on success, NULL if it is not found in the image list * Every call to image_lock() needs to be followed by a call to image_release() at some point. @@ -636,7 +489,12 @@ void image_killUplinks() if ( _images[i] == NULL ) continue; spin_lock( &_images[i]->lock ); if ( _images[i]->uplink != NULL ) { - _images[i]->uplink->shutdown = true; + spin_lock( &_images[i]->uplink->queueLock ); + if ( !_images[i]->uplink->shutdown ) { + thread_detach( _images[i]->uplink->thread ); + _images[i]->uplink->shutdown = true; + } + spin_unlock( &_images[i]->uplink->queueLock ); signal_call( _images[i]->uplink->signal ); } spin_unlock( &_images[i]->lock ); @@ -741,15 +599,17 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid ); } // - image_saveCacheMap( image ); uplink_shutdown( image ); spin_lock( &image->lock ); free( image->cache_map ); free( image->crc32 ); free( image->path ); free( image->name ); + image->cache_map = NULL; + image->crc32 = NULL; + image->path = NULL; + image->name = NULL; spin_unlock( &image->lock ); - if ( image->cacheFd != -1 ) close( image->cacheFd ); if ( image->readFd != -1 ) close( image->readFd ); spin_destroy( &image->lock ); // @@ -758,7 +618,7 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) return NULL ; } -static bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize) +bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize) { if ( cacheMap == NULL ) return true; const uint64_t end = (block + 1) * HASH_BLOCK_SIZE; @@ -953,6 +813,7 @@ static bool image_load(char *base, char *path, int withUplink) // XXX: Maybe try sha-256 or 512 first if you're paranoid (to be implemented) // 2. Load CRC-32 list of image + bool doFullCheck = false; uint32_t masterCrc = 0; const int hashBlockCount = IMGSIZE_TO_HASHBLOCKS( virtualFilesize ); crc32list = image_loadCrcList( path, virtualFilesize, &masterCrc ); @@ -961,7 +822,7 @@ static bool image_load(char *base, char *path, int withUplink) if ( crc32list != NULL ) { if ( !image_checkRandomBlocks( 4, fdImage, realFilesize, crc32list, cache_map ) ) { logadd( LOG_ERROR, "quick crc32 check of %s failed. Data corruption?", path ); - goto load_error; + doFullCheck = true; } } @@ -1012,7 +873,6 @@ static bool image_load(char *base, char *path, int withUplink) image->rid = (uint16_t)revision; image->users = 0; image->readFd = -1; - image->cacheFd = -1; image->working = (image->cache_map == NULL ); timing_get( &image->nextCompletenessEstimate ); image->completenessEstimate = -1; @@ -1032,22 +892,13 @@ static bool image_load(char *base, char *path, int withUplink) crc32list = NULL; // Get rid of cache map if image is complete - if ( image->cache_map != NULL && image_isComplete( image ) ) { - image_markComplete( image ); - image->working = true; + if ( image->cache_map != NULL ) { + image_isComplete( image ); } - // Image is definitely incomplete, open image file for writing, so we can update the cache + // Image is definitely incomplete, initialize uplink worker if ( image->cache_map != NULL ) { image->working = false; - image->cacheFd = open( path, O_WRONLY ); - if ( image->cacheFd < 0 ) { - // Proxy mode without disk caching is pointless, bail out - image->cacheFd = -1; - logadd( LOG_ERROR, "Could not open incomplete image %s for writing!", path ); - image = image_free( image ); - goto load_error; - } if ( withUplink ) { uplink_init( image, -1, NULL, -1 ); } @@ -1060,11 +911,16 @@ static bool image_load(char *base, char *path, int withUplink) fdImage = -1; } else { logadd( LOG_ERROR, "Image list full: Could not add image %s", path ); - image->readFd = -1; + image->readFd = -1; // Keep fdImage instead, will be closed below image = image_free( image ); goto load_error; } logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->name, (int)image->rid ); + // CRC errors found... + if ( doFullCheck ) { + logadd( LOG_INFO, "Queueing full CRC32 check for '%s:%d'\n", image->name, (int)image->rid ); + integrity_check( image, -1 ); + } function_return = true; @@ -1407,7 +1263,7 @@ server_fail: ; while ( !image->working && ++i < 100 ) usleep( 2000 ); } - } else if ( uplinkSock >= 0 ) { + } else if ( uplinkSock != -1 ) { close( uplinkSock ); } return image; @@ -1719,8 +1575,9 @@ int image_getCompletenessEstimate(dnbd3_image_t * const image) } /** - * Check the CRC-32 of the given blocks. The array blocks is of variable length. + * Check the CRC-32 of the given blocks. The array "blocks" is of variable length. * !! pass -1 as the last block so the function knows when to stop !! + * Does NOT check whether block index is within image. * Returns true or false */ bool image_checkBlocksCrc32(const int fd, uint32_t *crc32list, const int *blocks, const uint64_t realFilesize) diff --git a/src/server/image.h b/src/server/image.h index dd8b818..4668eff 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -9,14 +9,12 @@ void image_serverStartup(); bool image_isComplete(dnbd3_image_t *image); +bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); + void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set); void image_markComplete(dnbd3_image_t *image); -void image_saveAllCacheMaps(); - -bool image_saveCacheMap(dnbd3_image_t *image); - bool image_ensureOpen(dnbd3_image_t *image); dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking); diff --git a/src/server/integrity.c b/src/server/integrity.c index e8124f4..c9ac798 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -3,17 +3,19 @@ #include "helper.h" #include "locks.h" #include "image.h" +#include "uplink.h" #include <assert.h> #include <sys/syscall.h> #include <sys/resource.h> -#define CHECK_QUEUE_SIZE 100 +#define CHECK_QUEUE_SIZE 500 typedef struct { - dnbd3_image_t *image; - int block; + dnbd3_image_t *image; // Image to check + int block; // Block to check + bool full; // Check all blocks in image; .block will be increased } queue_entry; static pthread_t thread; @@ -72,7 +74,8 @@ void integrity_check(dnbd3_image_t *image, int block) for (i = 0; i < queueLen; ++i) { if ( freeSlot == -1 && checkQueue[i].image == NULL ) { freeSlot = i; - } else if ( checkQueue[i].image == image && checkQueue[i].block == block ) { + } else if ( checkQueue[i].image == image + && ( checkQueue[i].block == block || checkQueue[i].full ) ) { pthread_mutex_unlock( &integrityQueueLock ); return; } @@ -86,7 +89,13 @@ void integrity_check(dnbd3_image_t *image, int block) freeSlot = queueLen++; } checkQueue[freeSlot].image = image; - checkQueue[freeSlot].block = block; + if ( block == -1 ) { + checkQueue[freeSlot].block = 0; + checkQueue[freeSlot].full = true; + } else { + checkQueue[freeSlot].block = block; + checkQueue[freeSlot].full = false; + } pthread_cond_signal( &queueSignal ); pthread_mutex_unlock( &integrityQueueLock ); } @@ -113,22 +122,30 @@ static void* integrity_main(void * data UNUSED) for (i = queueLen - 1; i >= 0; --i) { if ( _shutdown ) break; dnbd3_image_t * const image = image_lock( checkQueue[i].image ); - checkQueue[i].image = NULL; - if ( i + 1 == queueLen ) queueLen--; + if ( !checkQueue[i].full || image == NULL ) { + checkQueue[i].image = NULL; + if ( i + 1 == queueLen ) queueLen--; + } if ( image == NULL ) continue; // We have the image. Call image_release() some time // Make sure the image is open for reading (closeUnusedFd) if ( !image_ensureOpen( image ) ) { + // TODO: Open new fd for file with O_DIRECT in case we do a full scan, + // so we don't thrash the whole fs cache logadd( LOG_MINOR, "Cannot hash check block %d of %s -- no readFd", checkQueue[i].block, image->path ); image_release( image ); continue; } + bool full = checkQueue[i].full; + bool foundCorrupted = false; spin_lock( &image->lock ); if ( image->crc32 != NULL && image->realFilesize != 0 ) { - int const blocks[2] = { checkQueue[i].block, -1 }; + int blocks[2] = { checkQueue[i].block, -1 }; pthread_mutex_unlock( &integrityQueueLock ); + // Make copy of crc32 list as it might go away const uint64_t fileSize = image->realFilesize; - const size_t required = IMGSIZE_TO_HASHBLOCKS(fileSize) * sizeof(uint32_t); + const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize); + const size_t required = numHashBlocks * sizeof(uint32_t); if ( buffer == NULL || required > bufferSize ) { bufferSize = required; if ( buffer != NULL ) free( buffer ); @@ -136,14 +153,60 @@ static void* integrity_main(void * data UNUSED) } memcpy( buffer, image->crc32, required ); spin_unlock( &image->lock ); - if ( !image_checkBlocksCrc32( image->readFd, (uint32_t*)buffer, blocks, fileSize ) ) { - logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name ); - image_updateCachemap( image, blocks[0] * HASH_BLOCK_SIZE, (blocks[0] + 1) * HASH_BLOCK_SIZE, false ); + int checkCount = full ? 5 : 1; + while ( blocks[0] < numHashBlocks ) { + bool complete = true; + if ( full ) { + // When checking full image, skip incomplete blocks, otherwise assume block is complete + spin_lock( &image->lock ); + complete = image_isHashBlockComplete( image->cache_map, blocks[0], fileSize ); + spin_unlock( &image->lock ); + } + if ( complete && !image_checkBlocksCrc32( image->readFd, (uint32_t*)buffer, blocks, fileSize ) ) { + logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name ); + image_updateCachemap( image, blocks[0] * HASH_BLOCK_SIZE, (blocks[0] + 1) * HASH_BLOCK_SIZE, false ); + // If this is not a full check, queue one + if ( !full ) { + logadd( LOG_INFO, "Queueing full check for %s", image->name ); + integrity_check( image, -1 ); + } + foundCorrupted = true; + } + if ( complete && --checkCount == 0 ) break; + blocks[0]++; } pthread_mutex_lock( &integrityQueueLock ); + if ( full ) { + assert( checkQueue[i].image == image ); + assert( checkQueue[i].full ); + if ( checkCount == 0 ) { + // Not done yet, keep going + checkQueue[i].block = blocks[0] + 1; + } else { + // Didn't check as many blocks as requested, so we must be done + checkQueue[i].image = NULL; + if ( i + 1 == queueLen ) queueLen--; + spin_lock( &image->lock ); + if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper? + image->working = image->uplink->fd != -1 && image->readFd != -1; + } + spin_unlock( &image->lock ); + } + } } else { spin_unlock( &image->lock ); } + if ( foundCorrupted ) { + // Something was fishy, make sure uplink exists + spin_lock( &image->lock ); + image->working = false; + bool restart = image->uplink == NULL || image->uplink->shutdown; + spin_unlock( &image->lock ); + if ( restart ) { + uplink_shutdown( image ); + uplink_init( image, -1, NULL, -1 ); + } + } // Release :-) image_release( image ); } @@ -153,3 +216,4 @@ static void* integrity_main(void * data UNUSED) bRunning = false; return NULL ; } + diff --git a/src/server/net.c b/src/server/net.c index b160778..6fa536f 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -289,15 +289,17 @@ void* net_handleNewConnection(void *clientPtr) logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", client->hostName, image_name, (int)rid ); } else { + bool penalty; // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; if ( image->cache_map != NULL ) { spin_lock( &image->lock ); - if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { bOk = ( rand() % 4 ) == 1; } + penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1; spin_unlock( &image->lock ); - if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this + if ( penalty ) { // Wait 100ms if local caching is not working so this usleep( 100000 ); // server gets a penalty and is less likely to be selected } } diff --git a/src/server/server.c b/src/server/server.c index b209451..0944c51 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -334,7 +334,7 @@ int main(int argc, char *argv[]) } // Give other threads some time to start up before accepting connections - sleep( 1 ); + usleep( 100000 ); // setup network listeners = setupNetwork( bindAddress ); diff --git a/src/server/uplink.c b/src/server/uplink.c index c0babaa..538f388 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -12,7 +12,13 @@ #include <inttypes.h> #include <fcntl.h> #include <poll.h> +#include <unistd.h> +#ifdef HAVE_FDATASYNC +#define dnbd3_fdatasync fdatasync +#else +#define dnbd3_fdatasync fsync +#endif static uint64_t totalBytesReceived = 0; static pthread_spinlock_t statisticsReceivedLock; @@ -24,6 +30,8 @@ static int uplink_sendKeepalive(const int fd); static void uplink_addCrc32(dnbd3_connection_t *uplink); static void uplink_sendReplicationRequest(dnbd3_connection_t *link); static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link); +static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); +static bool uplink_saveCacheMap(dnbd3_connection_t *link); // ############ uplink connection handling @@ -47,7 +55,7 @@ uint64_t uplink_getTotalBytesReceived() */ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { - if ( !_isProxy ) return false; + if ( !_isProxy || _shutdown ) return false; dnbd3_connection_t *link = NULL; assert( image != NULL ); spin_lock( &image->lock ); @@ -66,8 +74,10 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version link->image = image; link->bytesReceived = 0; link->lastBytesReceived = 0; + link->idleCount = 0; link->queueLen = 0; link->fd = -1; + link->cacheFd = -1; link->signal = NULL; link->replicationHandle = 0; spin_lock( &link->rttLock ); @@ -123,10 +133,15 @@ void uplink_shutdown(dnbd3_image_t *image) join = true; } spin_unlock( &uplink->queueLock ); + bool wait = image->uplink != NULL; spin_unlock( &image->lock ); if ( join ) thread_join( thread, NULL ); - while ( image->uplink != NULL ) - usleep( 10000 ); + while ( wait ) { + usleep( 5000 ); + spin_lock( &image->lock ); + wait = image->uplink != NULL && image->uplink->shutdown; + spin_unlock( &image->lock ); + } } /** @@ -273,6 +288,7 @@ static void* uplink_mainloop(void *data) int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; int discoverFailCount = 0; + int unsavedCount = 0; ticks nextAltCheck, nextKeepalive; char buffer[200]; memset( events, 0, sizeof(events) ); @@ -282,6 +298,11 @@ static void* uplink_mainloop(void *data) assert( link != NULL ); setThreadName( "idle-uplink" ); blockNoncriticalSignals(); + // Make sure file is open for writing + if ( !uplink_reopenCacheFd( link, false ) ) { + // It might have failed - still offer proxy mode, we just can't cache + logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno ); + } // link->signal = signal_new(); if ( link->signal == NULL ) { @@ -335,7 +356,7 @@ static void* uplink_mainloop(void *data) declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); } while(0); - if ( waitTime < 1500 ) waitTime = 1500; + if ( waitTime < 100 ) waitTime = 100; if ( waitTime > 5000 ) waitTime = 5000; numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || link->shutdown ) goto cleanup; @@ -371,17 +392,29 @@ static void* uplink_mainloop(void *data) if ( link->fd == -1 ) timing_get( &nextAltCheck ); if ( _shutdown || link->shutdown ) goto cleanup; } - // Send keep alive if nothing is happening declare_now; - if ( link->fd != -1 && link->replicationHandle == 0 && timing_reached( &nextKeepalive, &now ) ) { - timing_set( &nextKeepalive, &now, 20 ); - if ( uplink_sendKeepalive( link->fd ) ) { - // Re-trigger periodically, in case it requires a minimum user count - uplink_sendReplicationRequest( link ); - } else { - const int fd = link->fd; - link->fd = -1; - close( fd ); + if ( timing_reached( &nextKeepalive, &now ) ) { + timing_set( &nextKeepalive, &now, 10 ); + link->idleCount++; + unsavedCount++; + if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) { + // fsync/save every 4 minutes, or every 60 seconds if link is idle + unsavedCount = 0; + uplink_saveCacheMap( link ); + } + if ( link->idleCount % 2 == 0 ) { + // Save cache map only if we don't seem busy handling actual client requests + if ( link->fd != -1 && link->replicationHandle == 0 ) { + // Send keep alive if nothing is happening + if ( uplink_sendKeepalive( link->fd ) ) { + // Re-trigger periodically, in case it requires a minimum user count + uplink_sendReplicationRequest( link ); + } else { + const int fd = link->fd; + link->fd = -1; + close( fd ); + } + } } } // See if we should trigger an RTT measurement @@ -394,7 +427,6 @@ static void* uplink_mainloop(void *data) if ( image_isComplete( link->image ) ) { // Quit work if image is complete logadd( LOG_INFO, "Replication of %s complete.", link->image->name ); - image_markComplete( link->image ); goto cleanup; } else { // Not complete - do measurement @@ -441,9 +473,10 @@ static void* uplink_mainloop(void *data) } cleanup: ; altservers_removeUplink( link ); + uplink_saveCacheMap( link ); spin_lock( &link->image->lock ); - spin_lock( &link->queueLock ); link->image->uplink = NULL; + spin_lock( &link->queueLock ); const int fd = link->fd; const dnbd3_signal_t* signal = link->signal; link->fd = -1; @@ -452,6 +485,9 @@ static void* uplink_mainloop(void *data) link->shutdown = true; thread_detach( link->thread ); } + // Do not access link->image after unlocking, since we set + // image->uplink to NULL. Acquire with image_lock first, + // like done below when checking whether to re-init uplink spin_unlock( &link->image->lock ); spin_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); @@ -459,13 +495,26 @@ static void* uplink_mainloop(void *data) // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); - if ( link->betterFd != -1 ) close( link->betterFd ); + if ( link->betterFd != -1 ) { + close( link->betterFd ); + } spin_destroy( &link->queueLock ); spin_destroy( &link->rttLock ); free( link->recvBuffer ); link->recvBuffer = NULL; uplink_updateGlobalReceivedCounter( link ); - free( link ); + if ( link->cacheFd != -1 ) { + close( link->cacheFd ); + } + dnbd3_image_t *image = image_lock( link->image ); + free( link ); // !!! + if ( image != NULL ) { + if ( !_shutdown && image->cache_map != NULL ) { + // Ingegrity checker must have found something in the meantime + uplink_init( image, -1, NULL, 0 ); + } + image_release( image ); + } return NULL ; } @@ -556,36 +605,36 @@ static void uplink_handleReceive(dnbd3_connection_t *link) int ret, i; for (;;) { ret = dnbd3_read_reply( link->fd, &inReply, false ); - if ( ret == REPLY_INTR && !_shutdown && !link->shutdown ) continue; + if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue; if ( ret == REPLY_AGAIN ) break; - if ( ret == REPLY_CLOSED ) { + if ( unlikely( ret == REPLY_CLOSED ) ) { logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path ); goto error_cleanup; } - if ( ret == REPLY_WRONGMAGIC ) { + if ( unlikely( ret == REPLY_WRONGMAGIC ) ) { logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); goto error_cleanup; } - if ( ret != REPLY_OK ) { + if ( unlikely( ret != REPLY_OK ) ) { logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path ); goto error_cleanup; } - if ( inReply.size > (uint32_t)_maxPayload ) { + if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } - if ( link->recvBufferLen < inReply.size ) { + if ( unlikely( link->recvBufferLen < inReply.size ) ) { link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); } - if ( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) { + if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) { logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path ); goto error_cleanup; } // Payload read completely // Bail out if we're not interested - if ( inReply.cmd != CMD_GET_BLOCK ) continue; + if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue; // Is a legit block reply struct iovec iov[2]; const uint64_t start = inReply.handle; @@ -594,16 +643,16 @@ static void uplink_handleReceive(dnbd3_connection_t *link) link->bytesReceived += inReply.size; spin_unlock( &link->image->lock ); // 1) Write to cache file - if ( unlikely( link->image->cacheFd == -1 ) ) { - image_reopenCacheFd( link->image, false ); + if ( unlikely( link->cacheFd == -1 ) ) { + uplink_reopenCacheFd( link, false ); } - if ( likely( link->image->cacheFd != -1 ) ) { + if ( likely( link->cacheFd != -1 ) ) { int err = 0; bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid uint32_t done = 0; ret = 0; while ( done < inReply.size ) { - ret = (int)pwrite( link->image->cacheFd, link->recvBuffer + done, inReply.size - done, start + done ); + ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done ); if ( unlikely( ret == -1 ) ) { err = errno; if ( err == EINTR ) continue; @@ -614,7 +663,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link) continue; // Success, retry write } if ( err == EBADF || err == EINVAL || err == EIO ) { - if ( !tryAgain || !image_reopenCacheFd( link->image, true ) ) + if ( !tryAgain || !uplink_reopenCacheFd( link, true ) ) break; tryAgain = false; continue; // Write handle to image successfully re-opened, try again @@ -628,9 +677,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } done += (uint32_t)ret; } - if ( done > 0 ) image_updateCachemap( link->image, start, start + done, true ); - if ( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) { - // Only disable caching if something seems severely wrong, not on ENOSPC since that might get better + if ( likely( done > 0 ) ) { + image_updateCachemap( link->image, start, start + done, true ); + } + if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.", link->image->name, (int)link->image->rid ); } @@ -691,7 +741,13 @@ static void uplink_handleReceive(dnbd3_connection_t *link) if ( !served && start != link->replicationHandle ) logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); #endif - if ( start == link->replicationHandle ) link->replicationHandle = 0; + if ( start == link->replicationHandle ) { + // Was our background replication + link->replicationHandle = 0; + } else { + // Was some client -- reset idle counter + link->idleCount = 0; + } } spin_lock( &link->queueLock ); const bool rep = ( link->queueLen == 0 ); @@ -766,3 +822,94 @@ static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link) link->lastBytesReceived = link->bytesReceived; } +/** + * Open the given image's main image file in + * rw mode, assigning it to the cacheFd struct member. + * + * @param force If cacheFd was previously assigned a file descriptor (not == -1), + * it will be closed first. Otherwise, nothing will happen and true will be returned + * immediately. + */ +static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force) +{ + if ( link->cacheFd != -1 ) { + if ( !force ) return true; + close( link->cacheFd ); + } + link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT ); + return link->cacheFd != -1; +} + +/** + * Saves the cache map of the given image. + * Return true on success. + * Locks on: imageListLock, image.lock + */ +static bool uplink_saveCacheMap(dnbd3_connection_t *link) +{ + dnbd3_image_t *image = link->image; + assert( image != NULL ); + + if ( link->cacheFd != -1 ) { + if ( dnbd3_fdatasync( link->cacheFd ) == -1 ) { + // A failing fsync means we have no guarantee that any data + // since the last fsync (or open if none) has been saved. Apart + // from keeping the cache_map from the last successful fsync + // around and restoring it there isn't much we can do to recover + // a consistent state. Bail out. + logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); + logadd( LOG_ERROR, "Bailing out immediately" ); + exit( 1 ); + } + } + + if ( image->cache_map == NULL ) return true; + logadd( LOG_DEBUG1, "Saving cache map of %s:%d", image->name, (int)image->rid ); + spin_lock( &image->lock ); + // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to + // figure out that this image's cache copy is complete + if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) { + spin_unlock( &image->lock ); + return true; + } + const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); + uint8_t *map = malloc( size ); + memcpy( map, image->cache_map, size ); + // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image, + // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O + spin_unlock( &image->lock ); + assert( image->path != NULL ); + char mapfile[strlen( image->path ) + 4 + 1]; + strcpy( mapfile, image->path ); + strcat( mapfile, ".map" ); + + int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); + if ( fd == -1 ) { + const int err = errno; + free( map ); + logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); + return false; + } + + size_t done = 0; + while ( done < size ) { + const ssize_t ret = write( fd, map, size - done ); + if ( ret == -1 ) { + if ( errno == EINTR ) continue; + logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); + break; + } + if ( ret <= 0 ) { + logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); + break; + } + done += (size_t)ret; + } + if ( dnbd3_fdatasync( fd ) == -1 ) { + logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); + } + close( fd ); + free( map ); + return true; +} + diff --git a/src/shared/fdsignal.inc/eventfd.c b/src/shared/fdsignal.inc/eventfd.c index b4d4d6e..358d41c 100644 --- a/src/shared/fdsignal.inc/eventfd.c +++ b/src/shared/fdsignal.inc/eventfd.c @@ -27,7 +27,7 @@ dnbd3_signal_t* signal_newBlocking() int signal_call(const dnbd3_signal_t* const signal) { if ( signal == NULL ) return SIGNAL_ERROR; - static uint64_t one = 1; + static const uint64_t one = 1; const int signalFd = ( (int)(intptr_t)signal ) - 1; return write( signalFd, &one, sizeof one ) == sizeof one ? SIGNAL_OK : SIGNAL_ERROR; } |