summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/altservers.c26
-rw-r--r--src/server/globals.h5
-rw-r--r--src/server/image.c289
-rw-r--r--src/server/image.h6
-rw-r--r--src/server/integrity.c88
-rw-r--r--src/server/net.c6
-rw-r--r--src/server/server.c2
-rw-r--r--src/server/uplink.c217
-rw-r--r--src/shared/fdsignal.inc/eventfd.c2
9 files changed, 351 insertions, 290 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 4a2b8d5..575b849 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -18,7 +18,6 @@ static dnbd3_signal_t* runSignal = NULL;
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
static int numAltServers = 0;
static pthread_spinlock_t altServersLock;
-static bool initDone = false;
static pthread_t altThread;
@@ -31,6 +30,12 @@ void altservers_init()
// Init spinlock
spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE );
spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE );
+ // Init signal
+ runSignal = signal_new();
+ if ( runSignal == NULL ) {
+ logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." );
+ exit( EXIT_FAILURE );
+ }
memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
logadd( LOG_ERROR, "Could not start altservers connector thread" );
@@ -44,12 +49,11 @@ void altservers_init()
pending[i] = NULL;
}
spin_unlock( &pendingLockWrite );
- initDone = true;
}
void altservers_shutdown()
{
- if ( !initDone ) return;
+ if ( runSignal == NULL ) return;
signal_call( runSignal ); // Wake altservers thread up
thread_join( altThread, NULL );
}
@@ -407,18 +411,11 @@ static void *altservers_main(void *data UNUSED)
dnbd3_host_t servers[ALTS + 1];
serialized_buffer_t serialized;
struct timespec start, end;
- ticks nextCacheMapSave, nextCloseUnusedFd;
+ ticks nextCloseUnusedFd;
setThreadName( "altserver-check" );
blockNoncriticalSignals();
- timing_gets( &nextCacheMapSave, 90 );
timing_gets( &nextCloseUnusedFd, 900 );
- // Init signal
- runSignal = signal_new();
- if ( runSignal == NULL ) {
- logadd( LOG_WARNING, "error creating signal object. Uplink feature unavailable." );
- goto cleanup;
- }
// LOOP
while ( !_shutdown ) {
// Wait 5 seconds max.
@@ -598,13 +595,8 @@ static void *altservers_main(void *data UNUSED)
pthread_mutex_unlock( &pendingLockConsume );
}
// Save cache maps of all images if applicable
- // TODO: Has nothing to do with alt servers really, maybe move somewhere else?
declare_now;
- if ( timing_reached( &nextCacheMapSave, &now ) ) {
- timing_gets( &nextCacheMapSave, SERVER_CACHE_MAP_SAVE_INTERVAL );
- image_saveAllCacheMaps();
- }
- // TODO: More random crap
+ // TODO: Has nothing to do with alt servers really, maybe move somewhere else?
if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) {
timing_gets( &nextCloseUnusedFd, 900 );
image_closeUnusedFd();
diff --git a/src/server/globals.h b/src/server/globals.h
index 2e39cb8..2fd1af2 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -57,9 +57,10 @@ struct _dnbd3_connection
dnbd3_host_t currentServer; // Current server we're connected to
pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer
int rttTestResult; // RTT_*
- dnbd3_host_t betterServer; // The better server
+ int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD!
int betterVersion; // protocol version of better server
int betterFd; // Active connection to better server, ready to use
+ dnbd3_host_t betterServer; // The better server
uint8_t *recvBuffer; // Buffer for receiving payload
uint32_t recvBufferLen; // Len of ^^
volatile bool shutdown; // signal this thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop()
@@ -70,6 +71,7 @@ struct _dnbd3_connection
uint64_t bytesReceived; // Number of bytes received by the connection.
uint64_t lastBytesReceived; // Number of bytes received last time we updated the global counter.
int queueLen; // length of queue
+ int idleCount; // How many iterations of keepalive check connection was idle
dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
};
@@ -112,7 +114,6 @@ struct _dnbd3_image
uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image
uint32_t masterCrc32; // CRC-32 of the crc-32 list
int readFd; // used to read the image. Used from multiple threads, so use atomic operations (pread et al)
- int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD!
int completenessEstimate; // Completeness estimate in percent
int users; // clients currently using this image
int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server
diff --git a/src/server/image.c b/src/server/image.c
index 02a383f..673a269 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -20,12 +20,6 @@
#define PATHLEN (2000)
#define NONWORKING_RECHECK_INTERVAL_SECONDS (60)
-#ifdef HAVE_FDATASYNC
-#define dnbd3_fdatasync fdatasync
-#else
-#define dnbd3_fdatasync fsync
-#endif
-
// ##########################################
static dnbd3_image_t *_images[SERVER_MAX_IMAGES];
@@ -49,7 +43,6 @@ static imagecache remoteCloneCache[CACHELEN];
static bool isForbiddenExtension(const char* name);
static dnbd3_image_t* image_remove(dnbd3_image_t *image);
static dnbd3_image_t* image_free(dnbd3_image_t *image);
-static bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize);
static bool image_load_all_internal(char *base, char *path);
static bool image_addToList(dnbd3_image_t *image);
static bool image_load(char *base, char *path, int withUplink);
@@ -70,43 +63,6 @@ void image_serverStartup()
}
/**
- * Returns true if the given image is complete.
- * DOES NOT LOCK
- */
-bool image_isComplete(dnbd3_image_t *image)
-{
- assert( image != NULL );
- if ( image->working && image->cache_map == NULL ) {
- return true;
- }
- if ( image->virtualFilesize == 0 ) {
- return false;
- }
- bool complete = true;
- int j;
- const int map_len_bytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
- for (j = 0; j < map_len_bytes - 1; ++j) {
- if ( image->cache_map[j] != 0xFF ) {
- complete = false;
- break;
- }
- }
- if ( complete ) // Every block except the last one is complete
- { // Last one might need extra treatment if it's not a full byte
- const int blocks_in_last_byte = (image->virtualFilesize >> 12) & 7;
- uint8_t last_byte = 0;
- if ( blocks_in_last_byte == 0 ) {
- last_byte = 0xFF;
- } else {
- for (j = 0; j < blocks_in_last_byte; ++j)
- last_byte |= (uint8_t)(1 << j);
- }
- complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte);
- }
- return complete;
-}
-
-/**
* Update cache-map of given image for the given byte range
* start (inclusive) - end (exclusive)
* Locks on: images[].lock
@@ -125,29 +81,36 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
start &= ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
end = (uint64_t)(end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
}
- bool dirty = false;
+ bool setNewBlocks = false;
uint64_t pos = start;
spin_lock( &image->lock );
if ( image->cache_map == NULL ) {
// Image seems already complete
- spin_unlock( &image->lock );
- logadd( LOG_DEBUG1, "image_updateCachemap with no cache_map: %s", image->path );
- return;
+ if ( set ) {
+ // This makes no sense
+ spin_unlock( &image->lock );
+ logadd( LOG_DEBUG1, "image_updateCachemap(true) with no cache_map: %s", image->path );
+ return;
+ }
+ // Recreate a cache map, set it to all 1 initially as we assume the image was complete
+ const int byteSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
+ image->cache_map = malloc( byteSize );
+ memset( image->cache_map, 0xff, byteSize );
}
while ( pos < end ) {
const size_t map_y = (int)( pos >> 15 );
const int map_x = (int)( (pos >> 12) & 7 ); // mod 8
const int bit_mask = 1 << map_x;
if ( set ) {
- if ( (image->cache_map[map_y] & bit_mask) == 0 ) dirty = true;
+ if ( (image->cache_map[map_y] & bit_mask) == 0 ) setNewBlocks = true;
image->cache_map[map_y] |= (uint8_t)bit_mask;
} else {
image->cache_map[map_y] &= (uint8_t)~bit_mask;
}
pos += DNBD3_BLOCK_SIZE;
}
- if ( dirty && image->crc32 != NULL ) {
- // If dirty is set, at least one of the blocks was not cached before, so queue all hash blocks
+ if ( setNewBlocks && image->crc32 != NULL ) {
+ // If setNewBlocks is set, at least one of the blocks was not cached before, so queue all hash blocks
// for checking, even though this might lead to checking some hash block again, if it was
// already complete and the block range spanned at least two hash blocks.
// First set start and end to borders of hash blocks
@@ -157,7 +120,7 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
while ( pos < end ) {
if ( image->cache_map == NULL ) break;
const int block = (int)( pos / HASH_BLOCK_SIZE );
- if ( image_isHashBlockComplete( image->cache_map, block, image->virtualFilesize ) ) {
+ if ( image_isHashBlockComplete( image->cache_map, block, image->realFilesize ) ) {
spin_unlock( &image->lock );
integrity_check( image, block );
spin_lock( &image->lock );
@@ -169,112 +132,54 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
}
/**
- * Mark image as complete by freeing the cache_map and deleting the map file on disk
+ * Returns true if the given image is complete.
+ * Also frees cache_map and deletes it on disk
+ * if it hasn't been complete before
* Locks on: image.lock
*/
-void image_markComplete(dnbd3_image_t *image)
+bool image_isComplete(dnbd3_image_t *image)
{
- char mapfile[PATHLEN] = "";
assert( image != NULL );
spin_lock( &image->lock );
- if ( image->cache_map != NULL ) {
- free( image->cache_map );
- image->cache_map = NULL;
- snprintf( mapfile, PATHLEN, "%s.map", image->path );
- }
- spin_unlock( &image->lock );
- if ( mapfile[0] != '\0' ) {
- remove( mapfile );
- }
-}
-
-/**
- * Save cache map of every image
- */
-void image_saveAllCacheMaps()
-{
- spin_lock( &imageListLock );
- for (int i = 0; i < _num_images; ++i) {
- if ( _images[i] == NULL ) continue;
- dnbd3_image_t * const image = _images[i];
- spin_lock( &image->lock );
- image->users++;
- spin_unlock( &imageListLock );
- spin_unlock( &image->lock );
- image_saveCacheMap( image );
- spin_lock( &imageListLock );
- spin_lock( &image->lock );
- image->users--;
+ if ( image->virtualFilesize == 0 ) {
spin_unlock( &image->lock );
+ return false;
}
- spin_unlock( &imageListLock );
-}
-
-/**
- * Saves the cache map of the given image.
- * Return true on success.
- * Locks on: imageListLock, image.lock
- */
-bool image_saveCacheMap(dnbd3_image_t *image)
-{
- if ( image == NULL || image->cache_map == NULL ) return true;
- image = image_lock( image ); // Again after locking:
- if ( image == NULL ) return true;
- spin_lock( &image->lock );
- // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to
- // figure out that this image's cache copy is complete
- if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) {
+ if ( image->cache_map == NULL ) {
spin_unlock( &image->lock );
- image_release( image );
return true;
}
- const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
- uint8_t *map = malloc( size );
- memcpy( map, image->cache_map, size );
- // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image,
- // cacheFd is written to and we don't want to hold a spinlock during I/O
- spin_unlock( &image->lock );
- assert( image->path != NULL );
- char mapfile[strlen( image->path ) + 4 + 1];
- strcpy( mapfile, image->path );
- strcat( mapfile, ".map" );
-
- int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
- if ( fd < 0 ) {
- const int err = errno;
- image_release( image );
- free( map );
- logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
- return false;
- }
-
- size_t done = 0;
- while ( done < size ) {
- const ssize_t ret = write( fd, map, size - done );
- if ( ret == -1 ) {
- if ( errno == EINTR ) continue;
- logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
- break;
- }
- if ( ret <= 0 ) {
- logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
+ bool complete = true;
+ int j;
+ const int map_len_bytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
+ for (j = 0; j < map_len_bytes - 1; ++j) {
+ if ( image->cache_map[j] != 0xFF ) {
+ complete = false;
break;
}
- done += (size_t)ret;
}
- if ( image->cacheFd != -1 ) {
- if ( dnbd3_fdatasync( image->cacheFd ) == -1 ) {
- logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno );
- logadd( LOG_ERROR, "Bailing out immediately" );
- exit( 1 );
+ if ( complete ) { // Every block except the last one is complete
+ // Last one might need extra treatment if it's not a full byte
+ const int blocks_in_last_byte = (image->virtualFilesize >> 12) & 7;
+ uint8_t last_byte = 0;
+ if ( blocks_in_last_byte == 0 ) {
+ last_byte = 0xFF;
+ } else {
+ for (j = 0; j < blocks_in_last_byte; ++j)
+ last_byte |= (uint8_t)(1 << j);
}
+ complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte);
}
- if ( dnbd3_fdatasync( fd ) == -1 ) {
- logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
+ if ( !complete ) {
+ spin_unlock( &image->lock );
+ return false;
}
- image_release( image ); // Release only after both hit the disk
- close( fd );
- free( map );
+ char mapfile[PATHLEN] = "";
+ free( image->cache_map );
+ image->cache_map = NULL;
+ snprintf( mapfile, PATHLEN, "%s.map", image->path );
+ spin_unlock( &image->lock );
+ unlink( mapfile );
return true;
}
@@ -435,7 +340,6 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
img->atime = now;
img->masterCrc32 = candidate->masterCrc32;
img->readFd = -1;
- img->cacheFd = -1;
img->rid = candidate->rid;
img->users = 1;
img->working = false;
@@ -467,15 +371,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
// Check if image is incomplete, handle
if ( candidate->cache_map != NULL ) {
- // -- Incomplete - rw check
- if ( candidate->cacheFd == -1 ) { // Make sure file is open for writing
- image_reopenCacheFd( candidate, false );
- // It might have failed - still offer proxy mode, we just can't cache
- if ( candidate->cacheFd == -1 ) {
- logadd( LOG_WARNING, "Cannot re-open %s for writing - replication disabled", candidate->path );
- }
- }
- if ( candidate->uplink == NULL && candidate->cacheFd != -1 ) {
+ if ( candidate->uplink == NULL ) {
uplink_init( candidate, -1, NULL, -1 );
}
}
@@ -484,49 +380,6 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
}
/**
- * Open the given image's main image file in
- * rw mode, assigning it to the cacheFd struct member.
- *
- * @param force If cacheFd was previously assigned a file descriptor (not == -1),
- * it will be closed first. Otherwise, nothing will happen and true will be returned
- * immediately.
- */
-bool image_reopenCacheFd(dnbd3_image_t *image, const bool force)
-{
- if ( image->cacheFd != -1 && !force )
- return true;
- const int nfd = open( image->path, O_RDWR );
- if ( nfd == -1 ) {
- if ( force ) {
- // Opening new one failed, force is enabled -- close old one
- spin_lock( &image->lock );
- int ofd = image->cacheFd;
- image->cacheFd = -1;
- spin_unlock( &image->lock );
- if ( ofd != -1 ) {
- close( ofd );
- }
- }
- return false;
- }
- // Open succeeded, now switch out while holding lock to make it look somewhat atomic
- spin_lock( &image->lock );
- int closeFd = -1;
- if ( force || image->cacheFd == -1 ) {
- closeFd = image->cacheFd;
- image->cacheFd = nfd;
- } else {
- closeFd = nfd;
- }
- spin_unlock( &image->lock );
- if ( closeFd != -1 ) { // Failed
- close( closeFd );
- }
- return true; // We either replaced the fd in force mode or the old one was -1, or
- // force was false and cacheFd != -1. Either way we consider that success
-}
-
-/**
* Lock the image by increasing its users count
* Returns the image on success, NULL if it is not found in the image list
* Every call to image_lock() needs to be followed by a call to image_release() at some point.
@@ -636,7 +489,12 @@ void image_killUplinks()
if ( _images[i] == NULL ) continue;
spin_lock( &_images[i]->lock );
if ( _images[i]->uplink != NULL ) {
- _images[i]->uplink->shutdown = true;
+ spin_lock( &_images[i]->uplink->queueLock );
+ if ( !_images[i]->uplink->shutdown ) {
+ thread_detach( _images[i]->uplink->thread );
+ _images[i]->uplink->shutdown = true;
+ }
+ spin_unlock( &_images[i]->uplink->queueLock );
signal_call( _images[i]->uplink->signal );
}
spin_unlock( &_images[i]->lock );
@@ -741,15 +599,17 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image)
logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid );
}
//
- image_saveCacheMap( image );
uplink_shutdown( image );
spin_lock( &image->lock );
free( image->cache_map );
free( image->crc32 );
free( image->path );
free( image->name );
+ image->cache_map = NULL;
+ image->crc32 = NULL;
+ image->path = NULL;
+ image->name = NULL;
spin_unlock( &image->lock );
- if ( image->cacheFd != -1 ) close( image->cacheFd );
if ( image->readFd != -1 ) close( image->readFd );
spin_destroy( &image->lock );
//
@@ -758,7 +618,7 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image)
return NULL ;
}
-static bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize)
+bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize)
{
if ( cacheMap == NULL ) return true;
const uint64_t end = (block + 1) * HASH_BLOCK_SIZE;
@@ -953,6 +813,7 @@ static bool image_load(char *base, char *path, int withUplink)
// XXX: Maybe try sha-256 or 512 first if you're paranoid (to be implemented)
// 2. Load CRC-32 list of image
+ bool doFullCheck = false;
uint32_t masterCrc = 0;
const int hashBlockCount = IMGSIZE_TO_HASHBLOCKS( virtualFilesize );
crc32list = image_loadCrcList( path, virtualFilesize, &masterCrc );
@@ -961,7 +822,7 @@ static bool image_load(char *base, char *path, int withUplink)
if ( crc32list != NULL ) {
if ( !image_checkRandomBlocks( 4, fdImage, realFilesize, crc32list, cache_map ) ) {
logadd( LOG_ERROR, "quick crc32 check of %s failed. Data corruption?", path );
- goto load_error;
+ doFullCheck = true;
}
}
@@ -1012,7 +873,6 @@ static bool image_load(char *base, char *path, int withUplink)
image->rid = (uint16_t)revision;
image->users = 0;
image->readFd = -1;
- image->cacheFd = -1;
image->working = (image->cache_map == NULL );
timing_get( &image->nextCompletenessEstimate );
image->completenessEstimate = -1;
@@ -1032,22 +892,13 @@ static bool image_load(char *base, char *path, int withUplink)
crc32list = NULL;
// Get rid of cache map if image is complete
- if ( image->cache_map != NULL && image_isComplete( image ) ) {
- image_markComplete( image );
- image->working = true;
+ if ( image->cache_map != NULL ) {
+ image_isComplete( image );
}
- // Image is definitely incomplete, open image file for writing, so we can update the cache
+ // Image is definitely incomplete, initialize uplink worker
if ( image->cache_map != NULL ) {
image->working = false;
- image->cacheFd = open( path, O_WRONLY );
- if ( image->cacheFd < 0 ) {
- // Proxy mode without disk caching is pointless, bail out
- image->cacheFd = -1;
- logadd( LOG_ERROR, "Could not open incomplete image %s for writing!", path );
- image = image_free( image );
- goto load_error;
- }
if ( withUplink ) {
uplink_init( image, -1, NULL, -1 );
}
@@ -1060,11 +911,16 @@ static bool image_load(char *base, char *path, int withUplink)
fdImage = -1;
} else {
logadd( LOG_ERROR, "Image list full: Could not add image %s", path );
- image->readFd = -1;
+ image->readFd = -1; // Keep fdImage instead, will be closed below
image = image_free( image );
goto load_error;
}
logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->name, (int)image->rid );
+ // CRC errors found...
+ if ( doFullCheck ) {
+ logadd( LOG_INFO, "Queueing full CRC32 check for '%s:%d'\n", image->name, (int)image->rid );
+ integrity_check( image, -1 );
+ }
function_return = true;
@@ -1407,7 +1263,7 @@ server_fail: ;
while ( !image->working && ++i < 100 )
usleep( 2000 );
}
- } else if ( uplinkSock >= 0 ) {
+ } else if ( uplinkSock != -1 ) {
close( uplinkSock );
}
return image;
@@ -1719,8 +1575,9 @@ int image_getCompletenessEstimate(dnbd3_image_t * const image)
}
/**
- * Check the CRC-32 of the given blocks. The array blocks is of variable length.
+ * Check the CRC-32 of the given blocks. The array "blocks" is of variable length.
* !! pass -1 as the last block so the function knows when to stop !!
+ * Does NOT check whether block index is within image.
* Returns true or false
*/
bool image_checkBlocksCrc32(const int fd, uint32_t *crc32list, const int *blocks, const uint64_t realFilesize)
diff --git a/src/server/image.h b/src/server/image.h
index dd8b818..4668eff 100644
--- a/src/server/image.h
+++ b/src/server/image.h
@@ -9,14 +9,12 @@ void image_serverStartup();
bool image_isComplete(dnbd3_image_t *image);
+bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize);
+
void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set);
void image_markComplete(dnbd3_image_t *image);
-void image_saveAllCacheMaps();
-
-bool image_saveCacheMap(dnbd3_image_t *image);
-
bool image_ensureOpen(dnbd3_image_t *image);
dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking);
diff --git a/src/server/integrity.c b/src/server/integrity.c
index e8124f4..c9ac798 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -3,17 +3,19 @@
#include "helper.h"
#include "locks.h"
#include "image.h"
+#include "uplink.h"
#include <assert.h>
#include <sys/syscall.h>
#include <sys/resource.h>
-#define CHECK_QUEUE_SIZE 100
+#define CHECK_QUEUE_SIZE 500
typedef struct
{
- dnbd3_image_t *image;
- int block;
+ dnbd3_image_t *image; // Image to check
+ int block; // Block to check
+ bool full; // Check all blocks in image; .block will be increased
} queue_entry;
static pthread_t thread;
@@ -72,7 +74,8 @@ void integrity_check(dnbd3_image_t *image, int block)
for (i = 0; i < queueLen; ++i) {
if ( freeSlot == -1 && checkQueue[i].image == NULL ) {
freeSlot = i;
- } else if ( checkQueue[i].image == image && checkQueue[i].block == block ) {
+ } else if ( checkQueue[i].image == image
+ && ( checkQueue[i].block == block || checkQueue[i].full ) ) {
pthread_mutex_unlock( &integrityQueueLock );
return;
}
@@ -86,7 +89,13 @@ void integrity_check(dnbd3_image_t *image, int block)
freeSlot = queueLen++;
}
checkQueue[freeSlot].image = image;
- checkQueue[freeSlot].block = block;
+ if ( block == -1 ) {
+ checkQueue[freeSlot].block = 0;
+ checkQueue[freeSlot].full = true;
+ } else {
+ checkQueue[freeSlot].block = block;
+ checkQueue[freeSlot].full = false;
+ }
pthread_cond_signal( &queueSignal );
pthread_mutex_unlock( &integrityQueueLock );
}
@@ -113,22 +122,30 @@ static void* integrity_main(void * data UNUSED)
for (i = queueLen - 1; i >= 0; --i) {
if ( _shutdown ) break;
dnbd3_image_t * const image = image_lock( checkQueue[i].image );
- checkQueue[i].image = NULL;
- if ( i + 1 == queueLen ) queueLen--;
+ if ( !checkQueue[i].full || image == NULL ) {
+ checkQueue[i].image = NULL;
+ if ( i + 1 == queueLen ) queueLen--;
+ }
if ( image == NULL ) continue;
// We have the image. Call image_release() some time
// Make sure the image is open for reading (closeUnusedFd)
if ( !image_ensureOpen( image ) ) {
+ // TODO: Open new fd for file with O_DIRECT in case we do a full scan,
+ // so we don't thrash the whole fs cache
logadd( LOG_MINOR, "Cannot hash check block %d of %s -- no readFd", checkQueue[i].block, image->path );
image_release( image );
continue;
}
+ bool full = checkQueue[i].full;
+ bool foundCorrupted = false;
spin_lock( &image->lock );
if ( image->crc32 != NULL && image->realFilesize != 0 ) {
- int const blocks[2] = { checkQueue[i].block, -1 };
+ int blocks[2] = { checkQueue[i].block, -1 };
pthread_mutex_unlock( &integrityQueueLock );
+ // Make copy of crc32 list as it might go away
const uint64_t fileSize = image->realFilesize;
- const size_t required = IMGSIZE_TO_HASHBLOCKS(fileSize) * sizeof(uint32_t);
+ const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize);
+ const size_t required = numHashBlocks * sizeof(uint32_t);
if ( buffer == NULL || required > bufferSize ) {
bufferSize = required;
if ( buffer != NULL ) free( buffer );
@@ -136,14 +153,60 @@ static void* integrity_main(void * data UNUSED)
}
memcpy( buffer, image->crc32, required );
spin_unlock( &image->lock );
- if ( !image_checkBlocksCrc32( image->readFd, (uint32_t*)buffer, blocks, fileSize ) ) {
- logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name );
- image_updateCachemap( image, blocks[0] * HASH_BLOCK_SIZE, (blocks[0] + 1) * HASH_BLOCK_SIZE, false );
+ int checkCount = full ? 5 : 1;
+ while ( blocks[0] < numHashBlocks ) {
+ bool complete = true;
+ if ( full ) {
+ // When checking full image, skip incomplete blocks, otherwise assume block is complete
+ spin_lock( &image->lock );
+ complete = image_isHashBlockComplete( image->cache_map, blocks[0], fileSize );
+ spin_unlock( &image->lock );
+ }
+ if ( complete && !image_checkBlocksCrc32( image->readFd, (uint32_t*)buffer, blocks, fileSize ) ) {
+ logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name );
+ image_updateCachemap( image, blocks[0] * HASH_BLOCK_SIZE, (blocks[0] + 1) * HASH_BLOCK_SIZE, false );
+ // If this is not a full check, queue one
+ if ( !full ) {
+ logadd( LOG_INFO, "Queueing full check for %s", image->name );
+ integrity_check( image, -1 );
+ }
+ foundCorrupted = true;
+ }
+ if ( complete && --checkCount == 0 ) break;
+ blocks[0]++;
}
pthread_mutex_lock( &integrityQueueLock );
+ if ( full ) {
+ assert( checkQueue[i].image == image );
+ assert( checkQueue[i].full );
+ if ( checkCount == 0 ) {
+ // Not done yet, keep going
+ checkQueue[i].block = blocks[0] + 1;
+ } else {
+ // Didn't check as many blocks as requested, so we must be done
+ checkQueue[i].image = NULL;
+ if ( i + 1 == queueLen ) queueLen--;
+ spin_lock( &image->lock );
+ if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper?
+ image->working = image->uplink->fd != -1 && image->readFd != -1;
+ }
+ spin_unlock( &image->lock );
+ }
+ }
} else {
spin_unlock( &image->lock );
}
+ if ( foundCorrupted ) {
+ // Something was fishy, make sure uplink exists
+ spin_lock( &image->lock );
+ image->working = false;
+ bool restart = image->uplink == NULL || image->uplink->shutdown;
+ spin_unlock( &image->lock );
+ if ( restart ) {
+ uplink_shutdown( image );
+ uplink_init( image, -1, NULL, -1 );
+ }
+ }
// Release :-)
image_release( image );
}
@@ -153,3 +216,4 @@ static void* integrity_main(void * data UNUSED)
bRunning = false;
return NULL ;
}
+
diff --git a/src/server/net.c b/src/server/net.c
index b160778..6fa536f 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -289,15 +289,17 @@ void* net_handleNewConnection(void *clientPtr)
logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
client->hostName, image_name, (int)rid );
} else {
+ bool penalty;
// Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
bOk = true;
if ( image->cache_map != NULL ) {
spin_lock( &image->lock );
- if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
bOk = ( rand() % 4 ) == 1;
}
+ penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1;
spin_unlock( &image->lock );
- if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ if ( penalty ) { // Wait 100ms if local caching is not working so this
usleep( 100000 ); // server gets a penalty and is less likely to be selected
}
}
diff --git a/src/server/server.c b/src/server/server.c
index b209451..0944c51 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -334,7 +334,7 @@ int main(int argc, char *argv[])
}
// Give other threads some time to start up before accepting connections
- sleep( 1 );
+ usleep( 100000 );
// setup network
listeners = setupNetwork( bindAddress );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index c0babaa..538f388 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -12,7 +12,13 @@
#include <inttypes.h>
#include <fcntl.h>
#include <poll.h>
+#include <unistd.h>
+#ifdef HAVE_FDATASYNC
+#define dnbd3_fdatasync fdatasync
+#else
+#define dnbd3_fdatasync fsync
+#endif
static uint64_t totalBytesReceived = 0;
static pthread_spinlock_t statisticsReceivedLock;
@@ -24,6 +30,8 @@ static int uplink_sendKeepalive(const int fd);
static void uplink_addCrc32(dnbd3_connection_t *uplink);
static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link);
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
+static bool uplink_saveCacheMap(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -47,7 +55,7 @@ uint64_t uplink_getTotalBytesReceived()
*/
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
- if ( !_isProxy ) return false;
+ if ( !_isProxy || _shutdown ) return false;
dnbd3_connection_t *link = NULL;
assert( image != NULL );
spin_lock( &image->lock );
@@ -66,8 +74,10 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
link->image = image;
link->bytesReceived = 0;
link->lastBytesReceived = 0;
+ link->idleCount = 0;
link->queueLen = 0;
link->fd = -1;
+ link->cacheFd = -1;
link->signal = NULL;
link->replicationHandle = 0;
spin_lock( &link->rttLock );
@@ -123,10 +133,15 @@ void uplink_shutdown(dnbd3_image_t *image)
join = true;
}
spin_unlock( &uplink->queueLock );
+ bool wait = image->uplink != NULL;
spin_unlock( &image->lock );
if ( join ) thread_join( thread, NULL );
- while ( image->uplink != NULL )
- usleep( 10000 );
+ while ( wait ) {
+ usleep( 5000 );
+ spin_lock( &image->lock );
+ wait = image->uplink != NULL && image->uplink->shutdown;
+ spin_unlock( &image->lock );
+ }
}
/**
@@ -273,6 +288,7 @@ static void* uplink_mainloop(void *data)
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
int discoverFailCount = 0;
+ int unsavedCount = 0;
ticks nextAltCheck, nextKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
@@ -282,6 +298,11 @@ static void* uplink_mainloop(void *data)
assert( link != NULL );
setThreadName( "idle-uplink" );
blockNoncriticalSignals();
+ // Make sure file is open for writing
+ if ( !uplink_reopenCacheFd( link, false ) ) {
+ // It might have failed - still offer proxy mode, we just can't cache
+ logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno );
+ }
//
link->signal = signal_new();
if ( link->signal == NULL ) {
@@ -335,7 +356,7 @@ static void* uplink_mainloop(void *data)
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
} while(0);
- if ( waitTime < 1500 ) waitTime = 1500;
+ if ( waitTime < 100 ) waitTime = 100;
if ( waitTime > 5000 ) waitTime = 5000;
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
@@ -371,17 +392,29 @@ static void* uplink_mainloop(void *data)
if ( link->fd == -1 ) timing_get( &nextAltCheck );
if ( _shutdown || link->shutdown ) goto cleanup;
}
- // Send keep alive if nothing is happening
declare_now;
- if ( link->fd != -1 && link->replicationHandle == 0 && timing_reached( &nextKeepalive, &now ) ) {
- timing_set( &nextKeepalive, &now, 20 );
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- const int fd = link->fd;
- link->fd = -1;
- close( fd );
+ if ( timing_reached( &nextKeepalive, &now ) ) {
+ timing_set( &nextKeepalive, &now, 10 );
+ link->idleCount++;
+ unsavedCount++;
+ if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) {
+ // fsync/save every 4 minutes, or every 60 seconds if link is idle
+ unsavedCount = 0;
+ uplink_saveCacheMap( link );
+ }
+ if ( link->idleCount % 2 == 0 ) {
+ // Save cache map only if we don't seem busy handling actual client requests
+ if ( link->fd != -1 && link->replicationHandle == 0 ) {
+ // Send keep alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ }
+ }
}
}
// See if we should trigger an RTT measurement
@@ -394,7 +427,6 @@ static void* uplink_mainloop(void *data)
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
- image_markComplete( link->image );
goto cleanup;
} else {
// Not complete - do measurement
@@ -441,9 +473,10 @@ static void* uplink_mainloop(void *data)
}
cleanup: ;
altservers_removeUplink( link );
+ uplink_saveCacheMap( link );
spin_lock( &link->image->lock );
- spin_lock( &link->queueLock );
link->image->uplink = NULL;
+ spin_lock( &link->queueLock );
const int fd = link->fd;
const dnbd3_signal_t* signal = link->signal;
link->fd = -1;
@@ -452,6 +485,9 @@ static void* uplink_mainloop(void *data)
link->shutdown = true;
thread_detach( link->thread );
}
+ // Do not access link->image after unlocking, since we set
+ // image->uplink to NULL. Acquire with image_lock first,
+ // like done below when checking whether to re-init uplink
spin_unlock( &link->image->lock );
spin_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
@@ -459,13 +495,26 @@ static void* uplink_mainloop(void *data)
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
- if ( link->betterFd != -1 ) close( link->betterFd );
+ if ( link->betterFd != -1 ) {
+ close( link->betterFd );
+ }
spin_destroy( &link->queueLock );
spin_destroy( &link->rttLock );
free( link->recvBuffer );
link->recvBuffer = NULL;
uplink_updateGlobalReceivedCounter( link );
- free( link );
+ if ( link->cacheFd != -1 ) {
+ close( link->cacheFd );
+ }
+ dnbd3_image_t *image = image_lock( link->image );
+ free( link ); // !!!
+ if ( image != NULL ) {
+ if ( !_shutdown && image->cache_map != NULL ) {
+ // Ingegrity checker must have found something in the meantime
+ uplink_init( image, -1, NULL, 0 );
+ }
+ image_release( image );
+ }
return NULL ;
}
@@ -556,36 +605,36 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
int ret, i;
for (;;) {
ret = dnbd3_read_reply( link->fd, &inReply, false );
- if ( ret == REPLY_INTR && !_shutdown && !link->shutdown ) continue;
+ if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
- if ( ret == REPLY_CLOSED ) {
+ if ( unlikely( ret == REPLY_CLOSED ) ) {
logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path );
goto error_cleanup;
}
- if ( ret == REPLY_WRONGMAGIC ) {
+ if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
goto error_cleanup;
}
- if ( ret != REPLY_OK ) {
+ if ( unlikely( ret != REPLY_OK ) ) {
logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path );
goto error_cleanup;
}
- if ( inReply.size > (uint32_t)_maxPayload ) {
+ if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path );
goto error_cleanup;
}
- if ( link->recvBufferLen < inReply.size ) {
+ if ( unlikely( link->recvBufferLen < inReply.size ) ) {
link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
}
- if ( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) {
+ if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
goto error_cleanup;
}
// Payload read completely
// Bail out if we're not interested
- if ( inReply.cmd != CMD_GET_BLOCK ) continue;
+ if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue;
// Is a legit block reply
struct iovec iov[2];
const uint64_t start = inReply.handle;
@@ -594,16 +643,16 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
link->bytesReceived += inReply.size;
spin_unlock( &link->image->lock );
// 1) Write to cache file
- if ( unlikely( link->image->cacheFd == -1 ) ) {
- image_reopenCacheFd( link->image, false );
+ if ( unlikely( link->cacheFd == -1 ) ) {
+ uplink_reopenCacheFd( link, false );
}
- if ( likely( link->image->cacheFd != -1 ) ) {
+ if ( likely( link->cacheFd != -1 ) ) {
int err = 0;
bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
uint32_t done = 0;
ret = 0;
while ( done < inReply.size ) {
- ret = (int)pwrite( link->image->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
+ ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
if ( unlikely( ret == -1 ) ) {
err = errno;
if ( err == EINTR ) continue;
@@ -614,7 +663,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
continue; // Success, retry write
}
if ( err == EBADF || err == EINVAL || err == EIO ) {
- if ( !tryAgain || !image_reopenCacheFd( link->image, true ) )
+ if ( !tryAgain || !uplink_reopenCacheFd( link, true ) )
break;
tryAgain = false;
continue; // Write handle to image successfully re-opened, try again
@@ -628,9 +677,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
done += (uint32_t)ret;
}
- if ( done > 0 ) image_updateCachemap( link->image, start, start + done, true );
- if ( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) {
- // Only disable caching if something seems severely wrong, not on ENOSPC since that might get better
+ if ( likely( done > 0 ) ) {
+ image_updateCachemap( link->image, start, start + done, true );
+ }
+ if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.",
link->image->name, (int)link->image->rid );
}
@@ -691,7 +741,13 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
if ( !served && start != link->replicationHandle )
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
#endif
- if ( start == link->replicationHandle ) link->replicationHandle = 0;
+ if ( start == link->replicationHandle ) {
+ // Was our background replication
+ link->replicationHandle = 0;
+ } else {
+ // Was some client -- reset idle counter
+ link->idleCount = 0;
+ }
}
spin_lock( &link->queueLock );
const bool rep = ( link->queueLen == 0 );
@@ -766,3 +822,94 @@ static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link)
link->lastBytesReceived = link->bytesReceived;
}
+/**
+ * Open the given image's main image file in
+ * rw mode, assigning it to the cacheFd struct member.
+ *
+ * @param force If cacheFd was previously assigned a file descriptor (not == -1),
+ * it will be closed first. Otherwise, nothing will happen and true will be returned
+ * immediately.
+ */
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force)
+{
+ if ( link->cacheFd != -1 ) {
+ if ( !force ) return true;
+ close( link->cacheFd );
+ }
+ link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT );
+ return link->cacheFd != -1;
+}
+
+/**
+ * Saves the cache map of the given image.
+ * Return true on success.
+ * Locks on: imageListLock, image.lock
+ */
+static bool uplink_saveCacheMap(dnbd3_connection_t *link)
+{
+ dnbd3_image_t *image = link->image;
+ assert( image != NULL );
+
+ if ( link->cacheFd != -1 ) {
+ if ( dnbd3_fdatasync( link->cacheFd ) == -1 ) {
+ // A failing fsync means we have no guarantee that any data
+ // since the last fsync (or open if none) has been saved. Apart
+ // from keeping the cache_map from the last successful fsync
+ // around and restoring it there isn't much we can do to recover
+ // a consistent state. Bail out.
+ logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno );
+ logadd( LOG_ERROR, "Bailing out immediately" );
+ exit( 1 );
+ }
+ }
+
+ if ( image->cache_map == NULL ) return true;
+ logadd( LOG_DEBUG1, "Saving cache map of %s:%d", image->name, (int)image->rid );
+ spin_lock( &image->lock );
+ // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to
+ // figure out that this image's cache copy is complete
+ if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) {
+ spin_unlock( &image->lock );
+ return true;
+ }
+ const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
+ uint8_t *map = malloc( size );
+ memcpy( map, image->cache_map, size );
+ // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image,
+ // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O
+ spin_unlock( &image->lock );
+ assert( image->path != NULL );
+ char mapfile[strlen( image->path ) + 4 + 1];
+ strcpy( mapfile, image->path );
+ strcat( mapfile, ".map" );
+
+ int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
+ if ( fd == -1 ) {
+ const int err = errno;
+ free( map );
+ logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
+ return false;
+ }
+
+ size_t done = 0;
+ while ( done < size ) {
+ const ssize_t ret = write( fd, map, size - done );
+ if ( ret == -1 ) {
+ if ( errno == EINTR ) continue;
+ logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
+ break;
+ }
+ if ( ret <= 0 ) {
+ logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
+ break;
+ }
+ done += (size_t)ret;
+ }
+ if ( dnbd3_fdatasync( fd ) == -1 ) {
+ logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
+ }
+ close( fd );
+ free( map );
+ return true;
+}
+
diff --git a/src/shared/fdsignal.inc/eventfd.c b/src/shared/fdsignal.inc/eventfd.c
index b4d4d6e..358d41c 100644
--- a/src/shared/fdsignal.inc/eventfd.c
+++ b/src/shared/fdsignal.inc/eventfd.c
@@ -27,7 +27,7 @@ dnbd3_signal_t* signal_newBlocking()
int signal_call(const dnbd3_signal_t* const signal)
{
if ( signal == NULL ) return SIGNAL_ERROR;
- static uint64_t one = 1;
+ static const uint64_t one = 1;
const int signalFd = ( (int)(intptr_t)signal ) - 1;
return write( signalFd, &one, sizeof one ) == sizeof one ? SIGNAL_OK : SIGNAL_ERROR;
}