summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2020-03-03 12:21:01 +0100
committerSimon Rettberg2020-03-03 12:21:01 +0100
commit26c1ad7af0f5749c5343a5823b9c8cece885ce84 (patch)
tree0fe45f629560edb47bd86c7dc78b69715348b600
parent[SERVER] altservers: Fix missing index mapping (replication) (diff)
downloaddnbd3-26c1ad7af0f5749c5343a5823b9c8cece885ce84.tar.gz
dnbd3-26c1ad7af0f5749c5343a5823b9c8cece885ce84.tar.xz
dnbd3-26c1ad7af0f5749c5343a5823b9c8cece885ce84.zip
[SERVER] Remove "working" flag, introduce fine-grained flags
Tracking the "working" state of images using one boolean is insufficient regarding the different ways in which providing an image can fail. Introduce separate flags for different conditions, like "file not readable", "file not writable", "no uplink server available", "file content has changed".
-rw-r--r--src/server/altservers.c4
-rw-r--r--src/server/globals.h7
-rw-r--r--src/server/image.c193
-rw-r--r--src/server/integrity.c20
-rw-r--r--src/server/net.c17
-rw-r--r--src/server/uplink.c114
6 files changed, 197 insertions, 158 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 3fdbe0d..a6ad235 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -628,10 +628,6 @@ failed:
if ( best.fd != -1 ) {
close( best.fd );
}
- if ( !image->working || uplink->cycleDetected ) {
- image->working = true;
- LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid );
- }
uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;
diff --git a/src/server/globals.h b/src/server/globals.h
index b1336dc..31fbce5 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -136,7 +136,12 @@ struct _dnbd3_image
atomic_int completenessEstimate; // Completeness estimate in percent
atomic_int users; // clients currently using this image. XXX Lock on imageListLock when modifying and checking whether the image should be freed. Reading it elsewhere is fine without the lock.
int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server
- atomic_bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected
+ struct {
+ atomic_bool uplink; // No uplink connected
+ atomic_bool write; // Error writing to file
+ atomic_bool read; // Error reading from file
+ atomic_bool changed; // File disappeared or changed, thorough check required if it seems to be back
+ } problem;
uint16_t rid; // revision of image
pthread_mutex_t lock;
};
diff --git a/src/server/image.c b/src/server/image.c
index 6017e59..1ce1574 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -53,7 +53,7 @@ static bool image_ensureDiskSpace(uint64_t size, bool force);
static dnbd3_cache_map_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize);
static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc);
-static void image_checkRandomBlocks(dnbd3_image_t *image, const int count);
+static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd);
static void* closeUnusedFds(void*);
static void allocCacheMap(dnbd3_image_t *image, bool complete);
@@ -239,35 +239,76 @@ bool image_isComplete(dnbd3_image_t *image)
*/
bool image_ensureOpen(dnbd3_image_t *image)
{
- if ( image->readFd != -1 ) return image;
- int newFd = open( image->path, O_RDONLY );
+ bool sizeChanged = false;
+ if ( image->readFd != -1 && !image->problem.changed )
+ return true;
+ int newFd = image->readFd == -1 ? open( image->path, O_RDONLY ) : dup( image->readFd );
if ( newFd == -1 ) {
- logadd( LOG_WARNING, "Cannot open %s for reading", image->path );
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Cannot open %s for reading", image->path );
+ image->problem.read = true;
+ }
} else {
- // Check size
+ // Check size + read access
+ char buffer[100];
const off_t flen = lseek( newFd, 0, SEEK_END );
if ( flen == -1 ) {
- logadd( LOG_WARNING, "Could not seek to end of %s (errno %d)", image->path, errno );
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Could not seek to end of %s (errno %d)", image->path, errno );
+ image->problem.read = true;
+ }
close( newFd );
newFd = -1;
} else if ( (uint64_t)flen != image->realFilesize ) {
- logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64, image->realFilesize, (uint64_t)flen );
+ if ( !image->problem.changed ) {
+ logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64,
+ image->realFilesize, (uint64_t)flen );
+ }
+ sizeChanged = true;
+ } else if ( pread( newFd, buffer, sizeof(buffer), 0 ) == -1 ) {
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)",
+ (int)sizeof(buffer), image->path, errno );
+ image->problem.read = true;
+ }
close( newFd );
newFd = -1;
}
}
if ( newFd == -1 ) {
- mutex_lock( &image->lock );
- image->working = false;
- mutex_unlock( &image->lock );
+ if ( sizeChanged ) {
+ image->problem.changed = true;
+ }
return false;
}
+
+ // Re-opened. Check if the "size/content changed" flag was set before and if so, check crc32,
+ // but only if the size we just got above is correct.
+ if ( image->problem.changed && !sizeChanged ) {
+ if ( image->crc32 == NULL ) {
+ // Cannot verify further, hope for the best
+ image->problem.changed = false;
+ logadd( LOG_DEBUG1, "Size of image %s:%d changed back to expected value",
+ image->name, (int)image->rid );
+ } else if ( image_checkRandomBlocks( image, 1, newFd ) ) {
+ // This should have checked the first block (if complete) -> All is well again
+ image->problem.changed = false;
+ logadd( LOG_DEBUG1, "Size and CRC of image %s:%d changed back to expected value",
+ image->name, (int)image->rid );
+ }
+ } else {
+ image->problem.changed = sizeChanged;
+ }
+
mutex_lock( &image->lock );
if ( image->readFd == -1 ) {
image->readFd = newFd;
+ image->problem.read = false;
mutex_unlock( &image->lock );
} else {
- // There was a race while opening the file (happens cause not locked cause blocking), we lost the race so close new fd and proceed
+ // There was a race while opening the file (happens cause not locked cause blocking),
+ // we lost the race so close new fd and proceed.
+ // *OR* we dup()'ed above for cheating when the image changed before.
mutex_unlock( &image->lock );
close( newFd );
}
@@ -296,7 +337,7 @@ dnbd3_image_t* image_byId(int imgId)
* point...
* Locks on: imageListLock, _images[].lock
*/
-dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
+dnbd3_image_t* image_get(char *name, uint16_t revision, bool ensureFdOpen)
{
int i;
const char *removingText = _removeMissingImages ? ", removing from list" : "";
@@ -326,84 +367,36 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
candidate->users++;
mutex_unlock( &imageListLock );
- // Found, see if it works
- // TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list
- // TODO: But remember size-changed images forever
- if ( candidate->working || checkIfWorking ) {
- // Is marked working, but might not have an fd open
- if ( !image_ensureOpen( candidate ) ) {
- mutex_lock( &candidate->lock );
- timing_get( &candidate->lastWorkCheck );
- mutex_unlock( &candidate->lock );
- if ( _removeMissingImages ) {
- candidate = image_remove( candidate ); // No release here, the image is still returned and should be released by caller
- }
- return candidate;
- }
- }
-
- if ( !checkIfWorking ) return candidate; // Not interested in re-cechking working state
-
- // ...not working...
-
- // Don't re-check too often
- mutex_lock( &candidate->lock );
- bool check;
- declare_now;
- check = timing_diff( &candidate->lastWorkCheck, &now ) > NONWORKING_RECHECK_INTERVAL_SECONDS;
- if ( check ) {
- candidate->lastWorkCheck = now;
- }
- mutex_unlock( &candidate->lock );
- if ( !check ) {
+ if ( !ensureFdOpen ) // Don't want to re-check
return candidate;
- }
- // reaching this point means:
- // 1) We should check if the image is working, it might or might not be in working state right now
- // 2) The image is open for reading (or at least was at some point, the fd might be stale if images lie on an NFS share etc.)
- // 3) We made sure not to re-check this image too often
-
- // Common for ro and rw images: Size check, read check
- const off_t len = lseek( candidate->readFd, 0, SEEK_END );
- bool reload = false;
- if ( len == -1 ) {
- logadd( LOG_WARNING, "lseek() on %s failed (errno=%d)%s.", candidate->path, errno, removingText );
- reload = true;
- } else if ( (uint64_t)len != candidate->realFilesize ) {
- logadd( LOG_WARNING, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64
- ". Try sending SIGHUP to server if you know what you're doing.",
- candidate->path, candidate->realFilesize, (uint64_t)len );
- } else {
- // Seek worked, file size is same, now see if we can read from file
- char buffer[100];
- if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) {
- logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)%s.",
- (int)sizeof(buffer), candidate->path, errno, removingText );
- reload = true;
- } else if ( !candidate->working ) {
- // Seems everything is fine again \o/
- candidate->working = true;
- logadd( LOG_INFO, "Changed state of %s:%d to 'working'", candidate->name, candidate->rid );
- }
- }
+ if ( image_ensureOpen( candidate ) && !candidate->problem.read )
+ return candidate; // We have a read fd and no read or changed problems
- if ( reload ) {
+ // -- image could not be opened again, or is open but has problem --
+
+ if ( _removeMissingImages && !file_isReadable( candidate->path ) ) {
+ candidate = image_remove( candidate );
+ // No image_release here, the image is still returned and should be released by caller
+ } else if ( candidate->readFd != -1 ) {
+ // We cannot just close the fd as it might be in use. Make a copy and remove old entry.
+ candidate = image_remove( candidate );
// Could not access the image with exising fd - mark for reload which will re-open the file.
// make a copy of the image struct but keep the old one around. If/When it's not being used
// anymore, it will be freed automatically.
- logadd( LOG_DEBUG1, "Reloading image file %s", candidate->path );
+ logadd( LOG_DEBUG1, "Reloading image file %s because of read problem/changed", candidate->path );
dnbd3_image_t *img = calloc( sizeof(dnbd3_image_t), 1 );
img->path = strdup( candidate->path );
img->name = strdup( candidate->name );
img->virtualFilesize = candidate->virtualFilesize;
img->realFilesize = candidate->realFilesize;
- img->atime = now;
+ timing_get( &img->atime );
img->masterCrc32 = candidate->masterCrc32;
img->readFd = -1;
img->rid = candidate->rid;
img->users = 1;
- img->working = false;
+ img->problem.read = true;
+ img->problem.changed = candidate->problem.changed;
img->ref_cacheMap = NULL;
mutex_init( &img->lock, LOCK_IMAGE );
if ( candidate->crc32 != NULL ) {
@@ -419,18 +412,17 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
if ( image_addToList( img ) ) {
image_release( candidate );
candidate = img;
+ // Check if image is incomplete, initialize uplink
+ if ( candidate->ref_cacheMap != NULL ) {
+ uplink_init( candidate, -1, NULL, -1 );
+ }
+ // Try again with new instance
+ image_ensureOpen( candidate );
} else {
img->users = 0;
image_free( img );
}
- // Check if image is incomplete, initialize uplink
- if ( candidate->ref_cacheMap != NULL ) {
- uplink_init( candidate, -1, NULL, -1 );
- }
- // readFd == -1 and working == FALSE at this point,
- // this function needs some splitting up for handling as we need to run most
- // of the above code again. for now we know that the next call for this
- // name:rid will get ne newly inserted "img" and try to re-open the file.
+ // readFd == -1 and problem.read == true
}
return candidate; // We did all we can, hopefully it's working
@@ -900,7 +892,6 @@ static bool image_load(char *base, char *path, int withUplink)
image->rid = (uint16_t)revision;
image->users = 0;
image->readFd = -1;
- image->working = ( cache == NULL );
timing_get( &image->nextCompletenessEstimate );
image->completenessEstimate = -1;
mutex_init( &image->lock, LOCK_IMAGE );
@@ -925,7 +916,7 @@ static bool image_load(char *base, char *path, int withUplink)
// Image is definitely incomplete, initialize uplink worker
if ( image->ref_cacheMap != NULL ) {
- image->working = false;
+ image->problem.uplink = true;
if ( withUplink ) {
uplink_init( image, -1, NULL, -1 );
}
@@ -937,7 +928,7 @@ static bool image_load(char *base, char *path, int withUplink)
// Keep fd for reading
fdImage = -1;
// Check CRC32
- image_checkRandomBlocks( image, 4 );
+ image_checkRandomBlocks( image, 4, -1 );
} else {
logadd( LOG_ERROR, "Image list full: Could not add image %s", path );
image->readFd = -1; // Keep fdImage instead, will be closed below
@@ -1027,10 +1018,19 @@ static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t f
return retval;
}
-static void image_checkRandomBlocks(dnbd3_image_t *image, const int count)
+/**
+ * Check up to count random blocks from given image. If fromFd is -1, the check will
+ * be run asynchronously using the integrity checker. Otherwise, the check will
+ * happen in the function and return the result of the check.
+ * @param image image to check
+ * @param count number of blocks to check (max)
+ * @param fromFd, check synchronously and use this fd for reading, -1 = async
+ * @return true = OK, false = error. Meaningless if fromFd == -1
+ */
+static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd)
{
if ( image->crc32 == NULL )
- return;
+ return true;
// This checks the first block and (up to) count - 1 random blocks for corruption
// via the known crc32 list. This is very sloppy and is merely supposed to detect
// accidental corruption due to broken dnbd3-proxy functionality or file system
@@ -1038,7 +1038,7 @@ static void image_checkRandomBlocks(dnbd3_image_t *image, const int count)
assert( count > 0 );
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize );
- int blocks[count];
+ int blocks[count+1]; // +1 for "-1" in sync case
int index = 0, j;
int block;
if ( image_isHashBlockComplete( cache, 0, image->virtualFilesize ) ) {
@@ -1062,9 +1062,16 @@ while_end: ;
if ( cache != NULL ) {
ref_put( &cache->reference );
}
- for ( int i = 0; i < index; ++i ) {
- integrity_check( image, blocks[i], true );
+ if ( fromFd == -1 ) {
+ // Async
+ for ( int i = 0; i < index; ++i ) {
+ integrity_check( image, blocks[i], true );
+ }
+ return true;
}
+ // Sync
+ blocks[index] = -1;
+ return image_checkBlocksCrc32( fromFd, image->crc32, blocks, image->realFilesize );
}
/**
@@ -1306,7 +1313,7 @@ server_fail: ;
} else {
// Clumsy busy wait, but this should only take as long as it takes to start a thread, so is it really worth using a signalling mechanism?
int i = 0;
- while ( !image->working && ++i < 100 )
+ while ( image->problem.uplink && ++i < 100 )
usleep( 2000 );
}
} else if ( uplinkSock != -1 ) {
@@ -1599,7 +1606,7 @@ int image_getCompletenessEstimate(dnbd3_image_t * const image)
assert( image != NULL );
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
if ( cache == NULL )
- return image->working ? 100 : 0;
+ return 100;
const int len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
if ( unlikely( len == 0 ) ) {
ref_put( &cache->reference );
diff --git a/src/server/integrity.c b/src/server/integrity.c
index 4006dfc..91e53b8 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -195,9 +195,10 @@ static void* integrity_main(void * data UNUSED)
readFd = directFd;
}
}
- if ( readFd == -1 ) { // Try buffered; flush to disk for that
- image_ensureOpen( image );
- readFd = image->readFd;
+ if ( readFd == -1 ) { // Try buffered as fallback
+ if ( image_ensureOpen( image ) && !image->problem.read ) {
+ readFd = image->readFd;
+ }
}
if ( readFd == -1 ) {
logadd( LOG_MINOR, "Couldn't get any valid fd for integrity check of %s... ignoring...", image->path );
@@ -237,16 +238,6 @@ static void* integrity_main(void * data UNUSED)
// Done with this task as nothing left
checkQueue[i].image = NULL;
if ( i + 1 == queueLen ) queueLen--;
- // Mark as working again if applicable
- if ( !foundCorrupted ) {
- dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
- if ( uplink != NULL ) { // TODO: image_determineWorkingState() helper?
- mutex_lock( &image->lock );
- image->working = uplink->current.fd != -1 && image->readFd != -1;
- mutex_unlock( &image->lock );
- ref_put( &uplink->reference );
- }
- }
} else {
// Still more blocks to go...
checkQueue[i].block = blocks[0];
@@ -254,9 +245,6 @@ static void* integrity_main(void * data UNUSED)
}
if ( foundCorrupted && !_shutdown ) {
// Something was fishy, make sure uplink exists
- mutex_lock( &image->lock );
- image->working = false;
- mutex_unlock( &image->lock );
uplink_init( image, -1, NULL, -1 );
}
// Release :-)
diff --git a/src/server/net.c b/src/server/net.c
index aba4e7d..29147be 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -262,7 +262,7 @@ void* net_handleNewConnection(void *clientPtr)
atomic_thread_fence( memory_order_release );
if ( unlikely( image == NULL ) ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( unlikely( !image->working ) ) {
+ } else if ( unlikely( image->problem.read || image->problem.changed ) ) {
logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
client->hostName, image_name, (int)rid );
} else {
@@ -273,8 +273,14 @@ void* net_handleNewConnection(void *clientPtr)
if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) {
bOk = ( rand() % 4 ) == 1;
}
- if ( bOk && uplink != NULL && uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
- usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ if ( bOk && uplink != NULL ) {
+ if ( uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ if ( image->problem.uplink ) {
+ // Penaltize depending on completeness, if no uplink is available
+ usleep( ( 100 - image->completenessEstimate ) * 100 );
+ }
}
if ( uplink != NULL ) {
ref_put( &uplink->reference );
@@ -383,9 +389,8 @@ void* net_handleNewConnection(void *clientPtr)
ref_put( &cache->reference );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d",
+ logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d",
client->hostName, image->name, image->rid );
- image->working = false;
goto exit_client_cleanup;
}
break; // DONE, exit request.cmd switch
@@ -456,7 +461,7 @@ void* net_handleNewConnection(void *clientPtr)
}
if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) {
logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid );
- image->working = false;
+ image->problem.read = true;
}
}
goto exit_client_cleanup;
diff --git a/src/server/uplink.c b/src/server/uplink.c
index f39e633..aba53ba 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -53,9 +53,9 @@ static void* uplink_mainloop(void *data);
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
static void uplink_handleReceive(dnbd3_uplink_t *uplink);
-static int uplink_sendKeepalive(const int fd);
+static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink);
static void uplink_addCrc32(dnbd3_uplink_t *uplink);
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
+static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink);
static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink);
@@ -117,6 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
+ image->problem.uplink = true;
if ( sock != -1 ) {
uplink->better.fd = sock;
int index = altservers_hostToIndex( host );
@@ -371,6 +372,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
if ( unlikely( uplink->current.fd == -1 ) ) {
+ uplink->image->problem.uplink = true;
mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
@@ -378,12 +380,14 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
if ( hops < 200 ) ++hops;
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
if ( unlikely( !ret ) ) {
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
} else {
// Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
int state;
+ mutex_unlock( &uplink->sendMutex );
mutex_lock( &uplink->queueLock );
if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
state = uplink->queue[freeSlot].status;
@@ -460,9 +464,9 @@ static void* uplink_mainloop(void *data)
}
while ( !_shutdown && !uplink->shutdown ) {
// poll()
- waitTime = uplink->rttTestResult == RTT_DOCHANGE ? 0 : -1;
- if ( waitTime == 0 ) {
+ if ( uplink->rttTestResult == RTT_DOCHANGE ) {
// 0 means poll, since we're about to change the server
+ waitTime = 0;
} else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
@@ -495,7 +499,7 @@ static void* uplink_mainloop(void *data)
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
uplink->replicationHandle = REP_NONE;
- uplink->image->working = true;
+ uplink->image->problem.uplink = false;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -510,6 +514,11 @@ static void* uplink_mainloop(void *data)
uplink_sendRequests( uplink, false );
uplink_sendReplicationRequest( uplink );
events[EV_SOCKET].events = POLLIN | POLLRDHUP;
+ if ( uplink->image->problem.uplink ) {
+ // Some of the requests above must have failed again already :-(
+ logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
+ uplink_connectionFailed( uplink, true );
+ }
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
@@ -517,6 +526,7 @@ static void* uplink_mainloop(void *data)
// Check events
// Signal
if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ uplink->image->problem.uplink = true;
logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
goto cleanup;
} else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
@@ -553,14 +563,10 @@ static void* uplink_mainloop(void *data)
}
// Keep-alive
if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) {
- // Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( uplink->current.fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( uplink );
- } else {
+ // Send keep-alive if nothing is happening, and try to trigger background rep.
+ if ( !uplink_sendKeepalive( uplink ) || !uplink_sendReplicationRequest( uplink ) ) {
uplink_connectionFailed( uplink, true );
- logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
- setThreadName( "panic-uplink" );
+ logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
}
}
// Don't keep uplink established if we're idle for too much
@@ -578,6 +584,7 @@ static void* uplink_mainloop(void *data)
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name );
setThreadName( "finished-uplink" );
+ uplink->image->problem.uplink = false;
goto cleanup;
} else {
// Not complete - do measurement
@@ -592,10 +599,6 @@ static void* uplink_mainloop(void *data)
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) {
discoverFailCount++;
- if ( uplink->image->working && uplink->current.fd == -1 && discoverFailCount > (SERVER_RTT_MAX_UNREACH / 2) ) {
- logadd( LOG_DEBUG1, "Disabling %s:%d since no uplink is available", uplink->image->name, (int)uplink->image->rid );
- uplink->image->working = false;
- }
if ( uplink->current.fd == -1 ) {
uplink->cycleDetected = false;
}
@@ -624,8 +627,9 @@ static void* uplink_mainloop(void *data)
}
}
mutex_unlock( &uplink->queueLock );
- if ( resend )
+ if ( resend ) {
uplink_sendRequests( uplink, true );
+ }
}
#endif
}
@@ -653,6 +657,9 @@ static void* uplink_mainloop(void *data)
return NULL ;
}
+/**
+ * Only called from uplink thread.
+ */
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
// Scan for new requests
@@ -672,13 +679,15 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
if ( hops < 200 ) ++hops;
mutex_lock( &uplink->sendMutex );
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !ret ) {
+ if ( likely( ret ) ) {
+ mutex_unlock( &uplink->sendMutex );
+ } else {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( uplink->current.index );
return;
}
mutex_lock( &uplink->queueLock );
@@ -695,21 +704,27 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
* server. This means we might request data we already have, but it makes
* the code simpler. Worst case would be only one bit is zero, which means
* 4kb are missing, but we will request 32kb.
+ *
+ * Only called form uplink thread, so current.fd is assumed to be valid.
+ *
+ * @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
*/
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
+static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( uplink == NULL || uplink->current.fd == -1 ) return;
- if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication
+ if ( uplink->current.fd == -1 )
+ return false; // Should never be called in this state, consider send error
+ if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
+ return true; // Don't do background replication
if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
- return; // Already a replication request on the wire, or no more blocks to replicate
+ return true; // Already a replication request on the wire, or no more blocks to replicate
dnbd3_image_t * const image = uplink->image;
- if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
- if ( image->users < _bgrMinClients ) return; // Not enough active users
+ if ( image->users < _bgrMinClients )
+ return true; // Not enough active users
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
- if ( cache == NULL || image->users < _bgrMinClients ) {
+ if ( cache == NULL || image->users ) {
// No cache map (=image complete)
ref_put( &cache->reference );
- return;
+ return true;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
const int lastBlockIndex = mapBytes - 1;
@@ -741,17 +756,20 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
if ( replicationIndex == -1 ) {
// Replication might be complete, uplink_mainloop should take care....
uplink->nextReplicationIndex = -1;
- return;
+ return true;
}
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
uplink->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !sendOk ) {
+ if ( likely( sendOk ) ) {
+ mutex_unlock( &uplink->sendMutex );
+ } else {
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
- return;
+ return false;
}
if ( replicationIndex == lastBlockIndex ) {
uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
@@ -762,6 +780,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
// Just crossed a hash block boundary, look for new candidate starting at this very index
uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
}
+ return true;
}
/**
@@ -816,6 +835,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
/**
* Receive data from uplink server and process/dispatch
* Locks on: uplink.lock, images[].lock
+ * Only called from uplink thread, so current.fd is assumed to be valid.
*/
static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
@@ -990,11 +1010,14 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
mutex_lock( &uplink->queueLock );
const bool rep = ( uplink->queueLen == 0 );
mutex_unlock( &uplink->queueLock );
- if ( rep ) uplink_sendReplicationRequest( uplink );
+ if ( rep ) {
+ if ( !uplink_sendReplicationRequest( uplink ) )
+ goto error_cleanup;
+ }
}
return;
// Error handling from failed receive or message parsing
- error_cleanup: ;
+error_cleanup: ;
uplink_connectionFailed( uplink, true );
}
@@ -1005,8 +1028,10 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
if ( uplink->current.fd == -1 )
return;
+ setThreadName( "panic-uplink" );
altservers_serverFailed( uplink->current.index );
mutex_lock( &uplink->sendMutex );
+ uplink->image->problem.uplink = true;
close( uplink->current.fd );
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
@@ -1025,14 +1050,24 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
}
/**
- * Send keep alive request to server
+ * Send keep alive request to server.
+ * Called from uplink thread, current.fd must be valid.
*/
-static int uplink_sendKeepalive(const int fd)
+static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink)
{
static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
- return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_lock( &uplink->sendMutex );
+ bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_unlock( &uplink->sendMutex );
+ return sendOk;
}
+/**
+ * Request crclist from uplink.
+ * Called from uplink thread, current.fd must be valid.
+ * FIXME This is broken as it could happen that another message arrives after sending
+ * the request. Refactor, split and move receive into general receive handler.
+ */
static void uplink_addCrc32(dnbd3_uplink_t *uplink)
{
dnbd3_image_t *image = uplink->image;
@@ -1042,6 +1077,9 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
uint32_t *buffer = malloc( bytes );
mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
+ if ( !sendOk ) {
+ uplink->image->problem.uplink = true;
+ }
mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );