summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c108
1 files changed, 70 insertions, 38 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index d544795..736ef11 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -32,6 +32,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink);
static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
static bool uplink_saveCacheMap(dnbd3_connection_t *link);
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -69,7 +70,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
link->image = image;
link->bytesReceived = 0;
- link->idleCount = 0;
+ link->idleTime = 0;
link->queueLen = 0;
link->fd = -1;
link->cacheFd = -1;
@@ -285,14 +286,14 @@ static void* uplink_mainloop(void *data)
struct pollfd events[EV_COUNT];
dnbd3_connection_t *link = (dnbd3_connection_t*)data;
int numSocks, i, waitTime;
- int altCheckInterval = SERVER_RTT_DELAY_INIT;
- int discoverFailCount = 0;
- int unsavedCount = 0;
- ticks nextAltCheck, nextKeepalive;
+ int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
+ uint32_t discoverFailCount = 0;
+ uint32_t unsavedSeconds = 0;
+ ticks nextAltCheck, lastKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
timing_get( &nextAltCheck );
- nextKeepalive = nextAltCheck;
+ lastKeepalive = nextAltCheck;
//
assert( link != NULL );
setThreadName( "idle-uplink" );
@@ -351,12 +352,14 @@ static void* uplink_mainloop(void *data)
// more to do here
}
// poll()
- do {
+ if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
+ waitTime = 500;
+ } else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
- } while(0);
- if ( waitTime < 100 ) waitTime = 100;
- if ( waitTime > 5000 ) waitTime = 5000;
+ if ( waitTime < 100 ) waitTime = 100;
+ if ( waitTime > 5000 ) waitTime = 5000;
+ }
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
@@ -378,6 +381,8 @@ static void* uplink_mainloop(void *data)
if ( link->fd != -1 ) {
// Uplink seems fine, relay requests to it...
uplink_sendRequests( link, true );
+ } else { // No uplink; maybe it was shutdown since it was idle for too long
+ link->idleTime = 0;
}
}
// Uplink socket
@@ -386,55 +391,71 @@ static void* uplink_mainloop(void *data)
close( events[EV_SOCKET].fd );
events[EV_SOCKET].fd = -1;
logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
+ setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
uplink_handleReceive( link );
if ( link->fd == -1 ) timing_get( &nextAltCheck );
if ( _shutdown || link->shutdown ) goto cleanup;
}
declare_now;
- if ( timing_reached( &nextKeepalive, &now ) ) {
- timing_set( &nextKeepalive, &now, 10 );
- link->idleCount++;
- unsavedCount++;
- if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) {
+ uint32_t timepassed = timing_diff( &lastKeepalive, &now );
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ lastKeepalive = now;
+ link->idleTime += timepassed;
+ unsavedSeconds += timepassed;
+ if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) {
// fsync/save every 4 minutes, or every 60 seconds if link is idle
- unsavedCount = 0;
+ unsavedSeconds = 0;
uplink_saveCacheMap( link );
}
- if ( link->idleCount % 2 == 0 ) {
- // Save cache map only if we don't seem busy handling actual client requests
- if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
- // Send keep alive if nothing is happening
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- const int fd = link->fd;
- link->fd = -1;
- close( fd );
- }
+ // Keep-alive
+ if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
+ // Send keep-alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ setThreadName( "panic-uplink" );
}
}
+ // Don't keep link established if we're idle for too much
+ if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
+ close( link->fd );
+ link->fd = events[EV_SOCKET].fd = -1;
+ link->cycleDetected = false;
+ if ( link->recvBufferLen != 0 ) {
+ link->recvBufferLen = 0;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ }
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid );
+ setThreadName( "idle-uplink" );
+ }
}
// See if we should trigger an RTT measurement
spin_lock( &link->rttLock );
const int rttTestResult = link->rttTestResult;
spin_unlock( &link->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || link->fd == -1 || link->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
+ setThreadName( "finished-uplink" );
goto cleanup;
- } else {
+ } else if ( !uplink_connectionShouldShutdown( link ) ) {
// Not complete - do measurement
altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = 0;
}
}
- altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
+ altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
@@ -442,7 +463,7 @@ static void* uplink_mainloop(void *data)
link->rttTestResult = RTT_IDLE;
spin_unlock( &link->rttLock );
discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED) );
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
@@ -693,13 +714,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
goto error_cleanup;
}
if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
- logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path );
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path );
goto error_cleanup;
}
if ( unlikely( link->recvBufferLen < inReply.size ) ) {
link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
+ if ( link->recvBuffer == NULL ) {
+ logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
+ exit( 1 );
+ }
}
if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
@@ -753,8 +778,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
image_updateCachemap( link->image, start, start + done, true );
}
if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
- logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.",
- link->image->name, (int)link->image->rid );
+ logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
+ link->image->name, (int)link->image->rid, err );
}
}
// 2) Figure out which clients are interested in it
@@ -808,8 +833,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
spin_unlock( &link->queueLock );
#ifdef _DEBUG
- if ( !served && start != link->replicationHandle )
+ if ( !served && start != link->replicationHandle ) {
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
+ }
#endif
if ( start == link->replicationHandle ) {
// Was our background replication
@@ -818,9 +844,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
if ( !served && link->cacheFd != -1 ) {
posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
}
- } else {
+ }
+ if ( served ) {
// Was some client -- reset idle counter
- link->idleCount = 0;
+ link->idleTime = 0;
// Re-enable replication if disabled
if ( link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
@@ -981,3 +1008,8 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link)
return true;
}
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link)
+{
+ return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT );
+}
+