summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2019-01-31 10:26:04 +0100
committerSimon Rettberg2019-01-31 10:26:04 +0100
commit41a9d196062bfea0aadde87bc30ae262890334b7 (patch)
tree29c34938de13f93b4fb9b4cf7cd5499342079e5b /src/server/uplink.c
parent[SERVER] uplink: Check for _maxPayload when getting client request (diff)
downloaddnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.gz
dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.xz
dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.zip
[SERVER] Don't keep an uplink connection established forever
In case we don't use background replication a connection to an uplink server can potentially stay around forever. This in turn would prevent the uplink server from freeing the image as it appears to be in use.
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c108
1 files changed, 70 insertions, 38 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index d544795..736ef11 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -32,6 +32,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink);
static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
static bool uplink_saveCacheMap(dnbd3_connection_t *link);
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -69,7 +70,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
link->image = image;
link->bytesReceived = 0;
- link->idleCount = 0;
+ link->idleTime = 0;
link->queueLen = 0;
link->fd = -1;
link->cacheFd = -1;
@@ -285,14 +286,14 @@ static void* uplink_mainloop(void *data)
struct pollfd events[EV_COUNT];
dnbd3_connection_t *link = (dnbd3_connection_t*)data;
int numSocks, i, waitTime;
- int altCheckInterval = SERVER_RTT_DELAY_INIT;
- int discoverFailCount = 0;
- int unsavedCount = 0;
- ticks nextAltCheck, nextKeepalive;
+ int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
+ uint32_t discoverFailCount = 0;
+ uint32_t unsavedSeconds = 0;
+ ticks nextAltCheck, lastKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
timing_get( &nextAltCheck );
- nextKeepalive = nextAltCheck;
+ lastKeepalive = nextAltCheck;
//
assert( link != NULL );
setThreadName( "idle-uplink" );
@@ -351,12 +352,14 @@ static void* uplink_mainloop(void *data)
// more to do here
}
// poll()
- do {
+ if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
+ waitTime = 500;
+ } else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
- } while(0);
- if ( waitTime < 100 ) waitTime = 100;
- if ( waitTime > 5000 ) waitTime = 5000;
+ if ( waitTime < 100 ) waitTime = 100;
+ if ( waitTime > 5000 ) waitTime = 5000;
+ }
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
@@ -378,6 +381,8 @@ static void* uplink_mainloop(void *data)
if ( link->fd != -1 ) {
// Uplink seems fine, relay requests to it...
uplink_sendRequests( link, true );
+ } else { // No uplink; maybe it was shutdown since it was idle for too long
+ link->idleTime = 0;
}
}
// Uplink socket
@@ -386,55 +391,71 @@ static void* uplink_mainloop(void *data)
close( events[EV_SOCKET].fd );
events[EV_SOCKET].fd = -1;
logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
+ setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
uplink_handleReceive( link );
if ( link->fd == -1 ) timing_get( &nextAltCheck );
if ( _shutdown || link->shutdown ) goto cleanup;
}
declare_now;
- if ( timing_reached( &nextKeepalive, &now ) ) {
- timing_set( &nextKeepalive, &now, 10 );
- link->idleCount++;
- unsavedCount++;
- if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) {
+ uint32_t timepassed = timing_diff( &lastKeepalive, &now );
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ lastKeepalive = now;
+ link->idleTime += timepassed;
+ unsavedSeconds += timepassed;
+ if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) {
// fsync/save every 4 minutes, or every 60 seconds if link is idle
- unsavedCount = 0;
+ unsavedSeconds = 0;
uplink_saveCacheMap( link );
}
- if ( link->idleCount % 2 == 0 ) {
- // Save cache map only if we don't seem busy handling actual client requests
- if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
- // Send keep alive if nothing is happening
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- const int fd = link->fd;
- link->fd = -1;
- close( fd );
- }
+ // Keep-alive
+ if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
+ // Send keep-alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ setThreadName( "panic-uplink" );
}
}
+ // Don't keep link established if we're idle for too much
+ if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
+ close( link->fd );
+ link->fd = events[EV_SOCKET].fd = -1;
+ link->cycleDetected = false;
+ if ( link->recvBufferLen != 0 ) {
+ link->recvBufferLen = 0;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ }
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid );
+ setThreadName( "idle-uplink" );
+ }
}
// See if we should trigger an RTT measurement
spin_lock( &link->rttLock );
const int rttTestResult = link->rttTestResult;
spin_unlock( &link->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || link->fd == -1 || link->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
+ setThreadName( "finished-uplink" );
goto cleanup;
- } else {
+ } else if ( !uplink_connectionShouldShutdown( link ) ) {
// Not complete - do measurement
altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = 0;
}
}
- altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
+ altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
@@ -442,7 +463,7 @@ static void* uplink_mainloop(void *data)
link->rttTestResult = RTT_IDLE;
spin_unlock( &link->rttLock );
discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED) );
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
@@ -693,13 +714,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
goto error_cleanup;
}
if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
- logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path );
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path );
goto error_cleanup;
}
if ( unlikely( link->recvBufferLen < inReply.size ) ) {
link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
+ if ( link->recvBuffer == NULL ) {
+ logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
+ exit( 1 );
+ }
}
if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
@@ -753,8 +778,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
image_updateCachemap( link->image, start, start + done, true );
}
if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
- logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.",
- link->image->name, (int)link->image->rid );
+ logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
+ link->image->name, (int)link->image->rid, err );
}
}
// 2) Figure out which clients are interested in it
@@ -808,8 +833,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
spin_unlock( &link->queueLock );
#ifdef _DEBUG
- if ( !served && start != link->replicationHandle )
+ if ( !served && start != link->replicationHandle ) {
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
+ }
#endif
if ( start == link->replicationHandle ) {
// Was our background replication
@@ -818,9 +844,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
if ( !served && link->cacheFd != -1 ) {
posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
}
- } else {
+ }
+ if ( served ) {
// Was some client -- reset idle counter
- link->idleCount = 0;
+ link->idleTime = 0;
// Re-enable replication if disabled
if ( link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
@@ -981,3 +1008,8 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link)
return true;
}
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link)
+{
+ return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT );
+}
+