summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-09-05 16:52:31 +0200
committerSimon Rettberg2019-09-05 16:52:31 +0200
commit5765ce49f5e1e26505fd6b162db73a732603d1a8 (patch)
treeb95ec005d24b4fead79f86b54a7dc8735c589626
parent[SERVER] Update sample config (diff)
downloaddnbd3-5765ce49f5e1e26505fd6b162db73a732603d1a8.tar.gz
dnbd3-5765ce49f5e1e26505fd6b162db73a732603d1a8.tar.xz
dnbd3-5765ce49f5e1e26505fd6b162db73a732603d1a8.zip
[SERVER] integrity checker: Improve flushing logic
-rw-r--r--src/server/integrity.c199
-rw-r--r--src/server/uplink.c2
2 files changed, 111 insertions, 90 deletions
diff --git a/src/server/integrity.c b/src/server/integrity.c
index a9fbae6..fddb755 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -35,6 +35,7 @@ static int queueLen = -1;
static atomic_bool bRunning = false;
static void* integrity_main(void *data);
+static void flushFileRange(dnbd3_image_t *image, uint64_t start, uint64_t end);
/**
* Initialize the integrity check thread
@@ -88,14 +89,17 @@ void integrity_check(dnbd3_image_t *image, int block)
for (i = 0; i < queueLen; ++i) {
if ( freeSlot == -1 && checkQueue[i].image == NULL ) {
freeSlot = i;
- } else if ( checkQueue[i].image == image
- && checkQueue[i].block <= block && checkQueue[i].block + checkQueue[i].count >= block ) {
- // Already queued check dominates this one, or at least lies directly before this block
- if ( checkQueue[i].block + checkQueue[i].count == block ) {
- // It's directly before this one; expand range
+ } else if ( checkQueue[i].image == image && checkQueue[i].block <= block ) {
+ if ( checkQueue[i].count == CHECK_ALL ) {
+ logadd( LOG_DEBUG2, "Dominated by full image scan request (%d/%d) (at %d)", i, queueLen, checkQueue[i].block );
+ } else if ( checkQueue[i].block + checkQueue[i].count == block ) {
checkQueue[i].count += 1;
+ logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (at %d, %d to go)", i, queueLen, checkQueue[i].block, checkQueue[i].count );
+ } else if ( checkQueue[i].block + checkQueue[i].count > block ) {
+ logadd( LOG_DEBUG2, "Dominated by existing check request (%d/%d) (at %d, %d to go)", i, queueLen, checkQueue[i].block, checkQueue[i].count );
+ } else {
+ continue;
}
- logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (%d +%d)", i, queueLen, checkQueue[i].block, checkQueue[i].count );
mutex_unlock( &integrityQueueLock );
return;
}
@@ -123,8 +127,6 @@ void integrity_check(dnbd3_image_t *image, int block)
static void* integrity_main(void * data UNUSED)
{
int i;
- uint8_t *buffer = NULL;
- size_t bufferSize = 0;
setThreadName( "image-check" );
blockNoncriticalSignals();
#if defined(linux) || defined(__linux)
@@ -150,88 +152,70 @@ static void* integrity_main(void * data UNUSED)
// We have the image. Call image_release() some time
const int qCount = checkQueue[i].count;
bool foundCorrupted = false;
- mutex_lock( &image->lock );
if ( image->crc32 != NULL && image->realFilesize != 0 ) {
int blocks[2] = { checkQueue[i].block, -1 };
mutex_unlock( &integrityQueueLock );
- // Make copy of crc32 list as it might go away
const uint64_t fileSize = image->realFilesize;
const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize);
- const size_t required = numHashBlocks * sizeof(uint32_t);
- if ( buffer == NULL || required > bufferSize ) {
- bufferSize = required;
- if ( buffer != NULL ) free( buffer );
- buffer = malloc( bufferSize );
- }
- memcpy( buffer, image->crc32, required );
- mutex_unlock( &image->lock );
- // Open for direct I/O if possible; this prevents polluting the fs cache
- int fd = open( image->path, O_RDONLY | O_DIRECT );
- bool direct = fd != -1;
- if ( unlikely( !direct ) ) {
- // Try unbuffered; flush to disk for that
- logadd( LOG_DEBUG1, "O_DIRECT failed for %s", image->path );
- image_ensureOpen( image );
- fd = image->readFd;
- }
int checkCount = MIN( qCount, 5 );
- if ( fd != -1 ) {
- while ( blocks[0] < numHashBlocks && !_shutdown ) {
- const uint64_t start = blocks[0] * HASH_BLOCK_SIZE;
- const uint64_t end = MIN( (uint64_t)(blocks[0] + 1) * HASH_BLOCK_SIZE, image->virtualFilesize );
- bool complete = true;
- if ( qCount == CHECK_ALL ) {
- dnbd3_cache_map_t *cache = ref_get_cachemap( image );
- if ( cache != NULL ) {
- // When checking full image, skip incomplete blocks, otherwise assume block is complete
- complete = image_isHashBlockComplete( cache->map, blocks[0], fileSize );
- ref_put( &cache->reference );
- }
- }
-#if defined(linux) || defined(__linux)
- while ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 )
-#else
- while ( fsync( fd ) == -1 )
-#endif
- {
- if ( _shutdown )
- break;
- if ( errno == EINTR )
- continue;
- logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, errno );
- exit( 1 );
+ int readFd = -1, directFd = -1;
+ while ( blocks[0] < numHashBlocks && !_shutdown ) {
+ const uint64_t start = blocks[0] * HASH_BLOCK_SIZE;
+ const uint64_t end = MIN( (uint64_t)(blocks[0] + 1) * HASH_BLOCK_SIZE, image->virtualFilesize );
+ bool complete = true;
+ if ( qCount == CHECK_ALL ) {
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache != NULL ) {
+ // When checking full image, skip incomplete blocks, otherwise assume block is complete
+ complete = image_isHashBlockComplete( cache->map, blocks[0], fileSize );
+ ref_put( &cache->reference );
}
- if ( _shutdown )
- break;
+ }
+ // Flush to disk if there's an uplink, as that means the block might have been written recently
+ if ( image->uplinkref != NULL ) {
+ flushFileRange( image, start, end );
+ }
+ if ( _shutdown )
+ break;
+ // Open for direct I/O if possible; this prevents polluting the fs cache
+ if ( directFd == -1 && ( end % DNBD3_BLOCK_SIZE ) == 0 ) {
// Use direct I/O only if read length is multiple of 4096 to be on the safe side
- int tfd;
- if ( direct && ( end % DNBD3_BLOCK_SIZE ) == 0 ) {
- // Suitable for direct io
- tfd = fd;
- } else if ( !image_ensureOpen( image ) ) {
- logadd( LOG_WARNING, "Cannot open %s for reading", image->path );
- break;
+ directFd = open( image->path, O_RDONLY | O_DIRECT );
+ if ( directFd == -1 ) {
+ logadd( LOG_DEBUG2, "O_DIRECT failed for %s (errno=%d)", image->path, errno );
+ directFd = -2;
} else {
- tfd = image->readFd;
- // Evict from cache so we have to re-read, making sure data was properly stored
- posix_fadvise( fd, start, end - start, POSIX_FADV_DONTNEED );
+ readFd = directFd;
}
- if ( complete && !image_checkBlocksCrc32( tfd, (uint32_t*)buffer, blocks, fileSize ) ) {
- logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name );
- image_updateCachemap( image, start, end, false );
- // If this is not a full check, queue one
- if ( qCount != CHECK_ALL ) {
- logadd( LOG_INFO, "Queueing full check for %s", image->name );
- integrity_check( image, -1 );
- }
- foundCorrupted = true;
- }
- blocks[0]++; // Increase before break, so it always points to the next block to check after loop
- if ( complete && --checkCount == 0 ) break;
}
- if ( direct ) {
- close( fd );
+ if ( readFd == -1 ) { // Try buffered; flush to disk for that
+ image_ensureOpen( image );
+ readFd = image->readFd;
+ }
+ if ( readFd == -1 ) {
+ logadd( LOG_MINOR, "Couldn't get any valid fd for integrity check of %s... ignoring...", image->path );
+ } else if ( complete && !image_checkBlocksCrc32( readFd, image->crc32, blocks, fileSize ) ) {
+ bool iscomplete = true;
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache != NULL ) {
+ iscomplete = image_isHashBlockComplete( cache->map, blocks[0], fileSize );
+ ref_put( &cache->reference );
+ }
+ logadd( LOG_WARNING, "Hash check for block %d of %s failed (complete: was: %d, is: %d)", blocks[0], image->name, (int)complete, (int)iscomplete );
+ image_updateCachemap( image, start, end, false );
+ // If this is not a full check, queue one
+ if ( qCount != CHECK_ALL ) {
+ logadd( LOG_INFO, "Queueing full check for %s", image->name );
+ integrity_check( image, -1 );
+ }
+ foundCorrupted = true;
}
+ blocks[0]++; // Increase before break, so it always points to the next block to check after loop
+ if ( complete && --checkCount == 0 )
+ break;
+ }
+ if ( directFd != -1 && directFd != -2 ) {
+ close( directFd );
}
mutex_lock( &integrityQueueLock );
assert( checkQueue[i].image == image );
@@ -242,11 +226,8 @@ static void* integrity_main(void * data UNUSED)
logadd( LOG_WARNING, "BUG! checkQueue counter ran negative" );
}
}
- if ( checkCount > 0 || checkQueue[i].count <= 0 || fd == -1 ) {
- // Done with this task as nothing left, OR we don't have an fd to read from
- if ( fd == -1 ) {
- logadd( LOG_WARNING, "Cannot hash check %s: bad fd", image->path );
- }
+ if ( checkCount > 0 || checkQueue[i].count <= 0 ) {
+ // Done with this task as nothing left
checkQueue[i].image = NULL;
if ( i + 1 == queueLen ) queueLen--;
// Mark as working again if applicable
@@ -263,10 +244,8 @@ static void* integrity_main(void * data UNUSED)
// Still more blocks to go...
checkQueue[i].block = blocks[0];
}
- } else {
- mutex_unlock( &image->lock );
}
- if ( foundCorrupted ) {
+ if ( foundCorrupted && !_shutdown ) {
// Something was fishy, make sure uplink exists
mutex_lock( &image->lock );
image->working = false;
@@ -278,10 +257,52 @@ static void* integrity_main(void * data UNUSED)
}
}
mutex_unlock( &integrityQueueLock );
- if ( buffer != NULL ) {
- free( buffer );
- }
bRunning = false;
return NULL;
}
+static void flushFileRange(dnbd3_image_t *image, uint64_t start, uint64_t end)
+{
+ int flushFd;
+ int writableFd = -1;
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink != NULL ) { // Try to steal uplink's writable fd
+ if ( uplink->cacheFd != -1 ) {
+ writableFd = dup( uplink->cacheFd );
+ }
+ ref_put( &uplink->reference );
+ }
+ if ( writableFd == -1 ) { // Open file as writable
+ writableFd = open( image->path, O_WRONLY );
+ }
+ if ( writableFd == -1 ) { // Fallback to readFd (should work on Linux and BSD...)
+ logadd( LOG_WARNING, "flushFileRange: Cannot open %s for writing. Trying readFd.", image->path );
+ image_ensureOpen( image );
+ flushFd = image->readFd;
+ } else {
+ flushFd = writableFd;
+ }
+ if ( flushFd == -1 )
+ return;
+#if defined(linux) || defined(__linux)
+ while ( sync_file_range( flushFd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 )
+#else
+ while ( fsync( flushFd ) == -1 ) // TODO: fdatasync() should be available since FreeBSD 12.0 ... Might be a tad bit faster
+#endif
+ {
+ if ( _shutdown )
+ break;
+ int e = errno;
+ if ( e == EINTR )
+ continue;
+ logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, e );
+ if ( e == EIO ) {
+ exit( 1 );
+ }
+ }
+ // Evict from cache too so we have to re-read, making sure data was properly stored
+ posix_fadvise( flushFd, start, end - start, POSIX_FADV_DONTNEED );
+ if ( writableFd != -1 ) {
+ close( writableFd );
+ }
+}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 8a0b06b..dab5c27 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -876,7 +876,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done );
if ( unlikely( ret == -1 ) ) {
err = errno;
- if ( err == EINTR ) continue;
+ if ( err == EINTR && !_shutdown ) continue;
if ( err == ENOSPC || err == EDQUOT ) {
// try to free 256MiB
if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break;