From 5765ce49f5e1e26505fd6b162db73a732603d1a8 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 5 Sep 2019 16:52:31 +0200 Subject: [SERVER] integrity checker: Improve flushing logic --- src/server/integrity.c | 199 +++++++++++++++++++++++++++---------------------- src/server/uplink.c | 2 +- 2 files changed, 111 insertions(+), 90 deletions(-) diff --git a/src/server/integrity.c b/src/server/integrity.c index a9fbae6..fddb755 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -35,6 +35,7 @@ static int queueLen = -1; static atomic_bool bRunning = false; static void* integrity_main(void *data); +static void flushFileRange(dnbd3_image_t *image, uint64_t start, uint64_t end); /** * Initialize the integrity check thread @@ -88,14 +89,17 @@ void integrity_check(dnbd3_image_t *image, int block) for (i = 0; i < queueLen; ++i) { if ( freeSlot == -1 && checkQueue[i].image == NULL ) { freeSlot = i; - } else if ( checkQueue[i].image == image - && checkQueue[i].block <= block && checkQueue[i].block + checkQueue[i].count >= block ) { - // Already queued check dominates this one, or at least lies directly before this block - if ( checkQueue[i].block + checkQueue[i].count == block ) { - // It's directly before this one; expand range + } else if ( checkQueue[i].image == image && checkQueue[i].block <= block ) { + if ( checkQueue[i].count == CHECK_ALL ) { + logadd( LOG_DEBUG2, "Dominated by full image scan request (%d/%d) (at %d)", i, queueLen, checkQueue[i].block ); + } else if ( checkQueue[i].block + checkQueue[i].count == block ) { checkQueue[i].count += 1; + logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (at %d, %d to go)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); + } else if ( checkQueue[i].block + checkQueue[i].count > block ) { + logadd( LOG_DEBUG2, "Dominated by existing check request (%d/%d) (at %d, %d to go)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); + } else { + continue; } - logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (%d +%d)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); mutex_unlock( &integrityQueueLock ); return; } @@ -123,8 +127,6 @@ void integrity_check(dnbd3_image_t *image, int block) static void* integrity_main(void * data UNUSED) { int i; - uint8_t *buffer = NULL; - size_t bufferSize = 0; setThreadName( "image-check" ); blockNoncriticalSignals(); #if defined(linux) || defined(__linux) @@ -150,88 +152,70 @@ static void* integrity_main(void * data UNUSED) // We have the image. Call image_release() some time const int qCount = checkQueue[i].count; bool foundCorrupted = false; - mutex_lock( &image->lock ); if ( image->crc32 != NULL && image->realFilesize != 0 ) { int blocks[2] = { checkQueue[i].block, -1 }; mutex_unlock( &integrityQueueLock ); - // Make copy of crc32 list as it might go away const uint64_t fileSize = image->realFilesize; const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize); - const size_t required = numHashBlocks * sizeof(uint32_t); - if ( buffer == NULL || required > bufferSize ) { - bufferSize = required; - if ( buffer != NULL ) free( buffer ); - buffer = malloc( bufferSize ); - } - memcpy( buffer, image->crc32, required ); - mutex_unlock( &image->lock ); - // Open for direct I/O if possible; this prevents polluting the fs cache - int fd = open( image->path, O_RDONLY | O_DIRECT ); - bool direct = fd != -1; - if ( unlikely( !direct ) ) { - // Try unbuffered; flush to disk for that - logadd( LOG_DEBUG1, "O_DIRECT failed for %s", image->path ); - image_ensureOpen( image ); - fd = image->readFd; - } int checkCount = MIN( qCount, 5 ); - if ( fd != -1 ) { - while ( blocks[0] < numHashBlocks && !_shutdown ) { - const uint64_t start = blocks[0] * HASH_BLOCK_SIZE; - const uint64_t end = MIN( (uint64_t)(blocks[0] + 1) * HASH_BLOCK_SIZE, image->virtualFilesize ); - bool complete = true; - if ( qCount == CHECK_ALL ) { - dnbd3_cache_map_t *cache = ref_get_cachemap( image ); - if ( cache != NULL ) { - // When checking full image, skip incomplete blocks, otherwise assume block is complete - complete = image_isHashBlockComplete( cache->map, blocks[0], fileSize ); - ref_put( &cache->reference ); - } - } -#if defined(linux) || defined(__linux) - while ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) -#else - while ( fsync( fd ) == -1 ) -#endif - { - if ( _shutdown ) - break; - if ( errno == EINTR ) - continue; - logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, errno ); - exit( 1 ); + int readFd = -1, directFd = -1; + while ( blocks[0] < numHashBlocks && !_shutdown ) { + const uint64_t start = blocks[0] * HASH_BLOCK_SIZE; + const uint64_t end = MIN( (uint64_t)(blocks[0] + 1) * HASH_BLOCK_SIZE, image->virtualFilesize ); + bool complete = true; + if ( qCount == CHECK_ALL ) { + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + // When checking full image, skip incomplete blocks, otherwise assume block is complete + complete = image_isHashBlockComplete( cache->map, blocks[0], fileSize ); + ref_put( &cache->reference ); } - if ( _shutdown ) - break; + } + // Flush to disk if there's an uplink, as that means the block might have been written recently + if ( image->uplinkref != NULL ) { + flushFileRange( image, start, end ); + } + if ( _shutdown ) + break; + // Open for direct I/O if possible; this prevents polluting the fs cache + if ( directFd == -1 && ( end % DNBD3_BLOCK_SIZE ) == 0 ) { // Use direct I/O only if read length is multiple of 4096 to be on the safe side - int tfd; - if ( direct && ( end % DNBD3_BLOCK_SIZE ) == 0 ) { - // Suitable for direct io - tfd = fd; - } else if ( !image_ensureOpen( image ) ) { - logadd( LOG_WARNING, "Cannot open %s for reading", image->path ); - break; + directFd = open( image->path, O_RDONLY | O_DIRECT ); + if ( directFd == -1 ) { + logadd( LOG_DEBUG2, "O_DIRECT failed for %s (errno=%d)", image->path, errno ); + directFd = -2; } else { - tfd = image->readFd; - // Evict from cache so we have to re-read, making sure data was properly stored - posix_fadvise( fd, start, end - start, POSIX_FADV_DONTNEED ); + readFd = directFd; } - if ( complete && !image_checkBlocksCrc32( tfd, (uint32_t*)buffer, blocks, fileSize ) ) { - logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name ); - image_updateCachemap( image, start, end, false ); - // If this is not a full check, queue one - if ( qCount != CHECK_ALL ) { - logadd( LOG_INFO, "Queueing full check for %s", image->name ); - integrity_check( image, -1 ); - } - foundCorrupted = true; - } - blocks[0]++; // Increase before break, so it always points to the next block to check after loop - if ( complete && --checkCount == 0 ) break; } - if ( direct ) { - close( fd ); + if ( readFd == -1 ) { // Try buffered; flush to disk for that + image_ensureOpen( image ); + readFd = image->readFd; + } + if ( readFd == -1 ) { + logadd( LOG_MINOR, "Couldn't get any valid fd for integrity check of %s... ignoring...", image->path ); + } else if ( complete && !image_checkBlocksCrc32( readFd, image->crc32, blocks, fileSize ) ) { + bool iscomplete = true; + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + iscomplete = image_isHashBlockComplete( cache->map, blocks[0], fileSize ); + ref_put( &cache->reference ); + } + logadd( LOG_WARNING, "Hash check for block %d of %s failed (complete: was: %d, is: %d)", blocks[0], image->name, (int)complete, (int)iscomplete ); + image_updateCachemap( image, start, end, false ); + // If this is not a full check, queue one + if ( qCount != CHECK_ALL ) { + logadd( LOG_INFO, "Queueing full check for %s", image->name ); + integrity_check( image, -1 ); + } + foundCorrupted = true; } + blocks[0]++; // Increase before break, so it always points to the next block to check after loop + if ( complete && --checkCount == 0 ) + break; + } + if ( directFd != -1 && directFd != -2 ) { + close( directFd ); } mutex_lock( &integrityQueueLock ); assert( checkQueue[i].image == image ); @@ -242,11 +226,8 @@ static void* integrity_main(void * data UNUSED) logadd( LOG_WARNING, "BUG! checkQueue counter ran negative" ); } } - if ( checkCount > 0 || checkQueue[i].count <= 0 || fd == -1 ) { - // Done with this task as nothing left, OR we don't have an fd to read from - if ( fd == -1 ) { - logadd( LOG_WARNING, "Cannot hash check %s: bad fd", image->path ); - } + if ( checkCount > 0 || checkQueue[i].count <= 0 ) { + // Done with this task as nothing left checkQueue[i].image = NULL; if ( i + 1 == queueLen ) queueLen--; // Mark as working again if applicable @@ -263,10 +244,8 @@ static void* integrity_main(void * data UNUSED) // Still more blocks to go... checkQueue[i].block = blocks[0]; } - } else { - mutex_unlock( &image->lock ); } - if ( foundCorrupted ) { + if ( foundCorrupted && !_shutdown ) { // Something was fishy, make sure uplink exists mutex_lock( &image->lock ); image->working = false; @@ -278,10 +257,52 @@ static void* integrity_main(void * data UNUSED) } } mutex_unlock( &integrityQueueLock ); - if ( buffer != NULL ) { - free( buffer ); - } bRunning = false; return NULL; } +static void flushFileRange(dnbd3_image_t *image, uint64_t start, uint64_t end) +{ + int flushFd; + int writableFd = -1; + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { // Try to steal uplink's writable fd + if ( uplink->cacheFd != -1 ) { + writableFd = dup( uplink->cacheFd ); + } + ref_put( &uplink->reference ); + } + if ( writableFd == -1 ) { // Open file as writable + writableFd = open( image->path, O_WRONLY ); + } + if ( writableFd == -1 ) { // Fallback to readFd (should work on Linux and BSD...) + logadd( LOG_WARNING, "flushFileRange: Cannot open %s for writing. Trying readFd.", image->path ); + image_ensureOpen( image ); + flushFd = image->readFd; + } else { + flushFd = writableFd; + } + if ( flushFd == -1 ) + return; +#if defined(linux) || defined(__linux) + while ( sync_file_range( flushFd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) +#else + while ( fsync( flushFd ) == -1 ) // TODO: fdatasync() should be available since FreeBSD 12.0 ... Might be a tad bit faster +#endif + { + if ( _shutdown ) + break; + int e = errno; + if ( e == EINTR ) + continue; + logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, e ); + if ( e == EIO ) { + exit( 1 ); + } + } + // Evict from cache too so we have to re-read, making sure data was properly stored + posix_fadvise( flushFd, start, end - start, POSIX_FADV_DONTNEED ); + if ( writableFd != -1 ) { + close( writableFd ); + } +} diff --git a/src/server/uplink.c b/src/server/uplink.c index 8a0b06b..dab5c27 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -876,7 +876,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done ); if ( unlikely( ret == -1 ) ) { err = errno; - if ( err == EINTR ) continue; + if ( err == EINTR && !_shutdown ) continue; if ( err == ENOSPC || err == EDQUOT ) { // try to free 256MiB if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break; -- cgit v1.2.3-55-g7522