summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/altservers.c4
-rw-r--r--src/server/globals.h7
-rw-r--r--src/server/image.c193
-rw-r--r--src/server/integrity.c20
-rw-r--r--src/server/net.c17
-rw-r--r--src/server/uplink.c114
6 files changed, 197 insertions, 158 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 3fdbe0d..a6ad235 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -628,10 +628,6 @@ failed:
if ( best.fd != -1 ) {
close( best.fd );
}
- if ( !image->working || uplink->cycleDetected ) {
- image->working = true;
- LOG( LOG_DEBUG1, "[RTT] No better alt server found, enabling '%s:%d' again... :-(", image->name, (int)image->rid );
- }
uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;
diff --git a/src/server/globals.h b/src/server/globals.h
index b1336dc..31fbce5 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -136,7 +136,12 @@ struct _dnbd3_image
atomic_int completenessEstimate; // Completeness estimate in percent
atomic_int users; // clients currently using this image. XXX Lock on imageListLock when modifying and checking whether the image should be freed. Reading it elsewhere is fine without the lock.
int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server
- atomic_bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected
+ struct {
+ atomic_bool uplink; // No uplink connected
+ atomic_bool write; // Error writing to file
+ atomic_bool read; // Error reading from file
+ atomic_bool changed; // File disappeared or changed, thorough check required if it seems to be back
+ } problem;
uint16_t rid; // revision of image
pthread_mutex_t lock;
};
diff --git a/src/server/image.c b/src/server/image.c
index 6017e59..1ce1574 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -53,7 +53,7 @@ static bool image_ensureDiskSpace(uint64_t size, bool force);
static dnbd3_cache_map_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize);
static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc);
-static void image_checkRandomBlocks(dnbd3_image_t *image, const int count);
+static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd);
static void* closeUnusedFds(void*);
static void allocCacheMap(dnbd3_image_t *image, bool complete);
@@ -239,35 +239,76 @@ bool image_isComplete(dnbd3_image_t *image)
*/
bool image_ensureOpen(dnbd3_image_t *image)
{
- if ( image->readFd != -1 ) return image;
- int newFd = open( image->path, O_RDONLY );
+ bool sizeChanged = false;
+ if ( image->readFd != -1 && !image->problem.changed )
+ return true;
+ int newFd = image->readFd == -1 ? open( image->path, O_RDONLY ) : dup( image->readFd );
if ( newFd == -1 ) {
- logadd( LOG_WARNING, "Cannot open %s for reading", image->path );
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Cannot open %s for reading", image->path );
+ image->problem.read = true;
+ }
} else {
- // Check size
+ // Check size + read access
+ char buffer[100];
const off_t flen = lseek( newFd, 0, SEEK_END );
if ( flen == -1 ) {
- logadd( LOG_WARNING, "Could not seek to end of %s (errno %d)", image->path, errno );
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Could not seek to end of %s (errno %d)", image->path, errno );
+ image->problem.read = true;
+ }
close( newFd );
newFd = -1;
} else if ( (uint64_t)flen != image->realFilesize ) {
- logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64, image->realFilesize, (uint64_t)flen );
+ if ( !image->problem.changed ) {
+ logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64,
+ image->realFilesize, (uint64_t)flen );
+ }
+ sizeChanged = true;
+ } else if ( pread( newFd, buffer, sizeof(buffer), 0 ) == -1 ) {
+ if ( !image->problem.read ) {
+ logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)",
+ (int)sizeof(buffer), image->path, errno );
+ image->problem.read = true;
+ }
close( newFd );
newFd = -1;
}
}
if ( newFd == -1 ) {
- mutex_lock( &image->lock );
- image->working = false;
- mutex_unlock( &image->lock );
+ if ( sizeChanged ) {
+ image->problem.changed = true;
+ }
return false;
}
+
+ // Re-opened. Check if the "size/content changed" flag was set before and if so, check crc32,
+ // but only if the size we just got above is correct.
+ if ( image->problem.changed && !sizeChanged ) {
+ if ( image->crc32 == NULL ) {
+ // Cannot verify further, hope for the best
+ image->problem.changed = false;
+ logadd( LOG_DEBUG1, "Size of image %s:%d changed back to expected value",
+ image->name, (int)image->rid );
+ } else if ( image_checkRandomBlocks( image, 1, newFd ) ) {
+ // This should have checked the first block (if complete) -> All is well again
+ image->problem.changed = false;
+ logadd( LOG_DEBUG1, "Size and CRC of image %s:%d changed back to expected value",
+ image->name, (int)image->rid );
+ }
+ } else {
+ image->problem.changed = sizeChanged;
+ }
+
mutex_lock( &image->lock );
if ( image->readFd == -1 ) {
image->readFd = newFd;
+ image->problem.read = false;
mutex_unlock( &image->lock );
} else {
- // There was a race while opening the file (happens cause not locked cause blocking), we lost the race so close new fd and proceed
+ // There was a race while opening the file (happens cause not locked cause blocking),
+ // we lost the race so close new fd and proceed.
+ // *OR* we dup()'ed above for cheating when the image changed before.
mutex_unlock( &image->lock );
close( newFd );
}
@@ -296,7 +337,7 @@ dnbd3_image_t* image_byId(int imgId)
* point...
* Locks on: imageListLock, _images[].lock
*/
-dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
+dnbd3_image_t* image_get(char *name, uint16_t revision, bool ensureFdOpen)
{
int i;
const char *removingText = _removeMissingImages ? ", removing from list" : "";
@@ -326,84 +367,36 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
candidate->users++;
mutex_unlock( &imageListLock );
- // Found, see if it works
- // TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list
- // TODO: But remember size-changed images forever
- if ( candidate->working || checkIfWorking ) {
- // Is marked working, but might not have an fd open
- if ( !image_ensureOpen( candidate ) ) {
- mutex_lock( &candidate->lock );
- timing_get( &candidate->lastWorkCheck );
- mutex_unlock( &candidate->lock );
- if ( _removeMissingImages ) {
- candidate = image_remove( candidate ); // No release here, the image is still returned and should be released by caller
- }
- return candidate;
- }
- }
-
- if ( !checkIfWorking ) return candidate; // Not interested in re-cechking working state
-
- // ...not working...
-
- // Don't re-check too often
- mutex_lock( &candidate->lock );
- bool check;
- declare_now;
- check = timing_diff( &candidate->lastWorkCheck, &now ) > NONWORKING_RECHECK_INTERVAL_SECONDS;
- if ( check ) {
- candidate->lastWorkCheck = now;
- }
- mutex_unlock( &candidate->lock );
- if ( !check ) {
+ if ( !ensureFdOpen ) // Don't want to re-check
return candidate;
- }
- // reaching this point means:
- // 1) We should check if the image is working, it might or might not be in working state right now
- // 2) The image is open for reading (or at least was at some point, the fd might be stale if images lie on an NFS share etc.)
- // 3) We made sure not to re-check this image too often
-
- // Common for ro and rw images: Size check, read check
- const off_t len = lseek( candidate->readFd, 0, SEEK_END );
- bool reload = false;
- if ( len == -1 ) {
- logadd( LOG_WARNING, "lseek() on %s failed (errno=%d)%s.", candidate->path, errno, removingText );
- reload = true;
- } else if ( (uint64_t)len != candidate->realFilesize ) {
- logadd( LOG_WARNING, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64
- ". Try sending SIGHUP to server if you know what you're doing.",
- candidate->path, candidate->realFilesize, (uint64_t)len );
- } else {
- // Seek worked, file size is same, now see if we can read from file
- char buffer[100];
- if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) {
- logadd( LOG_WARNING, "Reading first %d bytes from %s failed (errno=%d)%s.",
- (int)sizeof(buffer), candidate->path, errno, removingText );
- reload = true;
- } else if ( !candidate->working ) {
- // Seems everything is fine again \o/
- candidate->working = true;
- logadd( LOG_INFO, "Changed state of %s:%d to 'working'", candidate->name, candidate->rid );
- }
- }
+ if ( image_ensureOpen( candidate ) && !candidate->problem.read )
+ return candidate; // We have a read fd and no read or changed problems
- if ( reload ) {
+ // -- image could not be opened again, or is open but has problem --
+
+ if ( _removeMissingImages && !file_isReadable( candidate->path ) ) {
+ candidate = image_remove( candidate );
+ // No image_release here, the image is still returned and should be released by caller
+ } else if ( candidate->readFd != -1 ) {
+ // We cannot just close the fd as it might be in use. Make a copy and remove old entry.
+ candidate = image_remove( candidate );
// Could not access the image with exising fd - mark for reload which will re-open the file.
// make a copy of the image struct but keep the old one around. If/When it's not being used
// anymore, it will be freed automatically.
- logadd( LOG_DEBUG1, "Reloading image file %s", candidate->path );
+ logadd( LOG_DEBUG1, "Reloading image file %s because of read problem/changed", candidate->path );
dnbd3_image_t *img = calloc( sizeof(dnbd3_image_t), 1 );
img->path = strdup( candidate->path );
img->name = strdup( candidate->name );
img->virtualFilesize = candidate->virtualFilesize;
img->realFilesize = candidate->realFilesize;
- img->atime = now;
+ timing_get( &img->atime );
img->masterCrc32 = candidate->masterCrc32;
img->readFd = -1;
img->rid = candidate->rid;
img->users = 1;
- img->working = false;
+ img->problem.read = true;
+ img->problem.changed = candidate->problem.changed;
img->ref_cacheMap = NULL;
mutex_init( &img->lock, LOCK_IMAGE );
if ( candidate->crc32 != NULL ) {
@@ -419,18 +412,17 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
if ( image_addToList( img ) ) {
image_release( candidate );
candidate = img;
+ // Check if image is incomplete, initialize uplink
+ if ( candidate->ref_cacheMap != NULL ) {
+ uplink_init( candidate, -1, NULL, -1 );
+ }
+ // Try again with new instance
+ image_ensureOpen( candidate );
} else {
img->users = 0;
image_free( img );
}
- // Check if image is incomplete, initialize uplink
- if ( candidate->ref_cacheMap != NULL ) {
- uplink_init( candidate, -1, NULL, -1 );
- }
- // readFd == -1 and working == FALSE at this point,
- // this function needs some splitting up for handling as we need to run most
- // of the above code again. for now we know that the next call for this
- // name:rid will get ne newly inserted "img" and try to re-open the file.
+ // readFd == -1 and problem.read == true
}
return candidate; // We did all we can, hopefully it's working
@@ -900,7 +892,6 @@ static bool image_load(char *base, char *path, int withUplink)
image->rid = (uint16_t)revision;
image->users = 0;
image->readFd = -1;
- image->working = ( cache == NULL );
timing_get( &image->nextCompletenessEstimate );
image->completenessEstimate = -1;
mutex_init( &image->lock, LOCK_IMAGE );
@@ -925,7 +916,7 @@ static bool image_load(char *base, char *path, int withUplink)
// Image is definitely incomplete, initialize uplink worker
if ( image->ref_cacheMap != NULL ) {
- image->working = false;
+ image->problem.uplink = true;
if ( withUplink ) {
uplink_init( image, -1, NULL, -1 );
}
@@ -937,7 +928,7 @@ static bool image_load(char *base, char *path, int withUplink)
// Keep fd for reading
fdImage = -1;
// Check CRC32
- image_checkRandomBlocks( image, 4 );
+ image_checkRandomBlocks( image, 4, -1 );
} else {
logadd( LOG_ERROR, "Image list full: Could not add image %s", path );
image->readFd = -1; // Keep fdImage instead, will be closed below
@@ -1027,10 +1018,19 @@ static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t f
return retval;
}
-static void image_checkRandomBlocks(dnbd3_image_t *image, const int count)
+/**
+ * Check up to count random blocks from given image. If fromFd is -1, the check will
+ * be run asynchronously using the integrity checker. Otherwise, the check will
+ * happen in the function and return the result of the check.
+ * @param image image to check
+ * @param count number of blocks to check (max)
+ * @param fromFd, check synchronously and use this fd for reading, -1 = async
+ * @return true = OK, false = error. Meaningless if fromFd == -1
+ */
+static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd)
{
if ( image->crc32 == NULL )
- return;
+ return true;
// This checks the first block and (up to) count - 1 random blocks for corruption
// via the known crc32 list. This is very sloppy and is merely supposed to detect
// accidental corruption due to broken dnbd3-proxy functionality or file system
@@ -1038,7 +1038,7 @@ static void image_checkRandomBlocks(dnbd3_image_t *image, const int count)
assert( count > 0 );
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize );
- int blocks[count];
+ int blocks[count+1]; // +1 for "-1" in sync case
int index = 0, j;
int block;
if ( image_isHashBlockComplete( cache, 0, image->virtualFilesize ) ) {
@@ -1062,9 +1062,16 @@ while_end: ;
if ( cache != NULL ) {
ref_put( &cache->reference );
}
- for ( int i = 0; i < index; ++i ) {
- integrity_check( image, blocks[i], true );
+ if ( fromFd == -1 ) {
+ // Async
+ for ( int i = 0; i < index; ++i ) {
+ integrity_check( image, blocks[i], true );
+ }
+ return true;
}
+ // Sync
+ blocks[index] = -1;
+ return image_checkBlocksCrc32( fromFd, image->crc32, blocks, image->realFilesize );
}
/**
@@ -1306,7 +1313,7 @@ server_fail: ;
} else {
// Clumsy busy wait, but this should only take as long as it takes to start a thread, so is it really worth using a signalling mechanism?
int i = 0;
- while ( !image->working && ++i < 100 )
+ while ( image->problem.uplink && ++i < 100 )
usleep( 2000 );
}
} else if ( uplinkSock != -1 ) {
@@ -1599,7 +1606,7 @@ int image_getCompletenessEstimate(dnbd3_image_t * const image)
assert( image != NULL );
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
if ( cache == NULL )
- return image->working ? 100 : 0;
+ return 100;
const int len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
if ( unlikely( len == 0 ) ) {
ref_put( &cache->reference );
diff --git a/src/server/integrity.c b/src/server/integrity.c
index 4006dfc..91e53b8 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -195,9 +195,10 @@ static void* integrity_main(void * data UNUSED)
readFd = directFd;
}
}
- if ( readFd == -1 ) { // Try buffered; flush to disk for that
- image_ensureOpen( image );
- readFd = image->readFd;
+ if ( readFd == -1 ) { // Try buffered as fallback
+ if ( image_ensureOpen( image ) && !image->problem.read ) {
+ readFd = image->readFd;
+ }
}
if ( readFd == -1 ) {
logadd( LOG_MINOR, "Couldn't get any valid fd for integrity check of %s... ignoring...", image->path );
@@ -237,16 +238,6 @@ static void* integrity_main(void * data UNUSED)
// Done with this task as nothing left
checkQueue[i].image = NULL;
if ( i + 1 == queueLen ) queueLen--;
- // Mark as working again if applicable
- if ( !foundCorrupted ) {
- dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
- if ( uplink != NULL ) { // TODO: image_determineWorkingState() helper?
- mutex_lock( &image->lock );
- image->working = uplink->current.fd != -1 && image->readFd != -1;
- mutex_unlock( &image->lock );
- ref_put( &uplink->reference );
- }
- }
} else {
// Still more blocks to go...
checkQueue[i].block = blocks[0];
@@ -254,9 +245,6 @@ static void* integrity_main(void * data UNUSED)
}
if ( foundCorrupted && !_shutdown ) {
// Something was fishy, make sure uplink exists
- mutex_lock( &image->lock );
- image->working = false;
- mutex_unlock( &image->lock );
uplink_init( image, -1, NULL, -1 );
}
// Release :-)
diff --git a/src/server/net.c b/src/server/net.c
index aba4e7d..29147be 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -262,7 +262,7 @@ void* net_handleNewConnection(void *clientPtr)
atomic_thread_fence( memory_order_release );
if ( unlikely( image == NULL ) ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( unlikely( !image->working ) ) {
+ } else if ( unlikely( image->problem.read || image->problem.changed ) ) {
logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
client->hostName, image_name, (int)rid );
} else {
@@ -273,8 +273,14 @@ void* net_handleNewConnection(void *clientPtr)
if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) {
bOk = ( rand() % 4 ) == 1;
}
- if ( bOk && uplink != NULL && uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
- usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ if ( bOk && uplink != NULL ) {
+ if ( uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ if ( image->problem.uplink ) {
+ // Penaltize depending on completeness, if no uplink is available
+ usleep( ( 100 - image->completenessEstimate ) * 100 );
+ }
}
if ( uplink != NULL ) {
ref_put( &uplink->reference );
@@ -383,9 +389,8 @@ void* net_handleNewConnection(void *clientPtr)
ref_put( &cache->reference );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d",
+ logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d",
client->hostName, image->name, image->rid );
- image->working = false;
goto exit_client_cleanup;
}
break; // DONE, exit request.cmd switch
@@ -456,7 +461,7 @@ void* net_handleNewConnection(void *clientPtr)
}
if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) {
logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid );
- image->working = false;
+ image->problem.read = true;
}
}
goto exit_client_cleanup;
diff --git a/src/server/uplink.c b/src/server/uplink.c
index f39e633..aba53ba 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -53,9 +53,9 @@ static void* uplink_mainloop(void *data);
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
static void uplink_handleReceive(dnbd3_uplink_t *uplink);
-static int uplink_sendKeepalive(const int fd);
+static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink);
static void uplink_addCrc32(dnbd3_uplink_t *uplink);
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
+static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink);
static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink);
@@ -117,6 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
+ image->problem.uplink = true;
if ( sock != -1 ) {
uplink->better.fd = sock;
int index = altservers_hostToIndex( host );
@@ -371,6 +372,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
if ( unlikely( uplink->current.fd == -1 ) ) {
+ uplink->image->problem.uplink = true;
mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
@@ -378,12 +380,14 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
if ( hops < 200 ) ++hops;
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
if ( unlikely( !ret ) ) {
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
} else {
// Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
int state;
+ mutex_unlock( &uplink->sendMutex );
mutex_lock( &uplink->queueLock );
if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
state = uplink->queue[freeSlot].status;
@@ -460,9 +464,9 @@ static void* uplink_mainloop(void *data)
}
while ( !_shutdown && !uplink->shutdown ) {
// poll()
- waitTime = uplink->rttTestResult == RTT_DOCHANGE ? 0 : -1;
- if ( waitTime == 0 ) {
+ if ( uplink->rttTestResult == RTT_DOCHANGE ) {
// 0 means poll, since we're about to change the server
+ waitTime = 0;
} else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
@@ -495,7 +499,7 @@ static void* uplink_mainloop(void *data)
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
uplink->replicationHandle = REP_NONE;
- uplink->image->working = true;
+ uplink->image->problem.uplink = false;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -510,6 +514,11 @@ static void* uplink_mainloop(void *data)
uplink_sendRequests( uplink, false );
uplink_sendReplicationRequest( uplink );
events[EV_SOCKET].events = POLLIN | POLLRDHUP;
+ if ( uplink->image->problem.uplink ) {
+ // Some of the requests above must have failed again already :-(
+ logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
+ uplink_connectionFailed( uplink, true );
+ }
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
@@ -517,6 +526,7 @@ static void* uplink_mainloop(void *data)
// Check events
// Signal
if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ uplink->image->problem.uplink = true;
logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
goto cleanup;
} else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
@@ -553,14 +563,10 @@ static void* uplink_mainloop(void *data)
}
// Keep-alive
if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) {
- // Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( uplink->current.fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( uplink );
- } else {
+ // Send keep-alive if nothing is happening, and try to trigger background rep.
+ if ( !uplink_sendKeepalive( uplink ) || !uplink_sendReplicationRequest( uplink ) ) {
uplink_connectionFailed( uplink, true );
- logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
- setThreadName( "panic-uplink" );
+ logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
}
}
// Don't keep uplink established if we're idle for too much
@@ -578,6 +584,7 @@ static void* uplink_mainloop(void *data)
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name );
setThreadName( "finished-uplink" );
+ uplink->image->problem.uplink = false;
goto cleanup;
} else {
// Not complete - do measurement
@@ -592,10 +599,6 @@ static void* uplink_mainloop(void *data)
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) {
discoverFailCount++;
- if ( uplink->image->working && uplink->current.fd == -1 && discoverFailCount > (SERVER_RTT_MAX_UNREACH / 2) ) {
- logadd( LOG_DEBUG1, "Disabling %s:%d since no uplink is available", uplink->image->name, (int)uplink->image->rid );
- uplink->image->working = false;
- }
if ( uplink->current.fd == -1 ) {
uplink->cycleDetected = false;
}
@@ -624,8 +627,9 @@ static void* uplink_mainloop(void *data)
}
}
mutex_unlock( &uplink->queueLock );
- if ( resend )
+ if ( resend ) {
uplink_sendRequests( uplink, true );
+ }
}
#endif
}
@@ -653,6 +657,9 @@ static void* uplink_mainloop(void *data)
return NULL ;
}
+/**
+ * Only called from uplink thread.
+ */
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
// Scan for new requests
@@ -672,13 +679,15 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
if ( hops < 200 ) ++hops;
mutex_lock( &uplink->sendMutex );
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !ret ) {
+ if ( likely( ret ) ) {
+ mutex_unlock( &uplink->sendMutex );
+ } else {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( uplink->current.index );
return;
}
mutex_lock( &uplink->queueLock );
@@ -695,21 +704,27 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
* server. This means we might request data we already have, but it makes
* the code simpler. Worst case would be only one bit is zero, which means
* 4kb are missing, but we will request 32kb.
+ *
+ * Only called form uplink thread, so current.fd is assumed to be valid.
+ *
+ * @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
*/
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
+static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( uplink == NULL || uplink->current.fd == -1 ) return;
- if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication
+ if ( uplink->current.fd == -1 )
+ return false; // Should never be called in this state, consider send error
+ if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
+ return true; // Don't do background replication
if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
- return; // Already a replication request on the wire, or no more blocks to replicate
+ return true; // Already a replication request on the wire, or no more blocks to replicate
dnbd3_image_t * const image = uplink->image;
- if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
- if ( image->users < _bgrMinClients ) return; // Not enough active users
+ if ( image->users < _bgrMinClients )
+ return true; // Not enough active users
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
- if ( cache == NULL || image->users < _bgrMinClients ) {
+ if ( cache == NULL || image->users ) {
// No cache map (=image complete)
ref_put( &cache->reference );
- return;
+ return true;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
const int lastBlockIndex = mapBytes - 1;
@@ -741,17 +756,20 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
if ( replicationIndex == -1 ) {
// Replication might be complete, uplink_mainloop should take care....
uplink->nextReplicationIndex = -1;
- return;
+ return true;
}
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
uplink->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !sendOk ) {
+ if ( likely( sendOk ) ) {
+ mutex_unlock( &uplink->sendMutex );
+ } else {
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
- return;
+ return false;
}
if ( replicationIndex == lastBlockIndex ) {
uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
@@ -762,6 +780,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
// Just crossed a hash block boundary, look for new candidate starting at this very index
uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
}
+ return true;
}
/**
@@ -816,6 +835,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
/**
* Receive data from uplink server and process/dispatch
* Locks on: uplink.lock, images[].lock
+ * Only called from uplink thread, so current.fd is assumed to be valid.
*/
static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
@@ -990,11 +1010,14 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
mutex_lock( &uplink->queueLock );
const bool rep = ( uplink->queueLen == 0 );
mutex_unlock( &uplink->queueLock );
- if ( rep ) uplink_sendReplicationRequest( uplink );
+ if ( rep ) {
+ if ( !uplink_sendReplicationRequest( uplink ) )
+ goto error_cleanup;
+ }
}
return;
// Error handling from failed receive or message parsing
- error_cleanup: ;
+error_cleanup: ;
uplink_connectionFailed( uplink, true );
}
@@ -1005,8 +1028,10 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
if ( uplink->current.fd == -1 )
return;
+ setThreadName( "panic-uplink" );
altservers_serverFailed( uplink->current.index );
mutex_lock( &uplink->sendMutex );
+ uplink->image->problem.uplink = true;
close( uplink->current.fd );
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
@@ -1025,14 +1050,24 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
}
/**
- * Send keep alive request to server
+ * Send keep alive request to server.
+ * Called from uplink thread, current.fd must be valid.
*/
-static int uplink_sendKeepalive(const int fd)
+static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink)
{
static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
- return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_lock( &uplink->sendMutex );
+ bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_unlock( &uplink->sendMutex );
+ return sendOk;
}
+/**
+ * Request crclist from uplink.
+ * Called from uplink thread, current.fd must be valid.
+ * FIXME This is broken as it could happen that another message arrives after sending
+ * the request. Refactor, split and move receive into general receive handler.
+ */
static void uplink_addCrc32(dnbd3_uplink_t *uplink)
{
dnbd3_image_t *image = uplink->image;
@@ -1042,6 +1077,9 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
uint32_t *buffer = malloc( bytes );
mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
+ if ( !sendOk ) {
+ uplink->image->problem.uplink = true;
+ }
mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );