summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-01-31 10:26:04 +0100
committerSimon Rettberg2019-01-31 10:26:04 +0100
commit41a9d196062bfea0aadde87bc30ae262890334b7 (patch)
tree29c34938de13f93b4fb9b4cf7cd5499342079e5b
parent[SERVER] uplink: Check for _maxPayload when getting client request (diff)
downloaddnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.gz
dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.xz
dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.zip
[SERVER] Don't keep an uplink connection established forever
In case we don't use background replication a connection to an uplink server can potentially stay around forever. This in turn would prevent the uplink server from freeing the image as it appears to be in use.
-rw-r--r--src/server/altservers.c2
-rw-r--r--src/server/globals.h2
-rw-r--r--src/server/net.c18
-rw-r--r--src/server/uplink.c108
-rw-r--r--src/serverconfig.h12
5 files changed, 93 insertions, 49 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 0054f53..fe4624f 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -380,7 +380,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
// to prevent the counter from increasing rapidly if many images use the
// same uplink. If there's a network hickup, all uplinks will call this
// function and would increase the counter too quickly, disabling the server.
- if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_DELAY_INIT ) {
+ if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) {
altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE;
altServers[foundIndex].lastFail = now;
if ( lastOk != -1 ) {
diff --git a/src/server/globals.h b/src/server/globals.h
index 097df83..2b30bc2 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -72,7 +72,7 @@ struct _dnbd3_connection
uint64_t replicationHandle; // Handle of pending replication request
atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup.
int queueLen; // length of queue
- int idleCount; // How many iterations of keepalive check connection was idle
+ uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives)
dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
};
diff --git a/src/server/net.c b/src/server/net.c
index dcdbaea..a0a9ee3 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -281,7 +281,10 @@ void* net_handleNewConnection(void *clientPtr)
if ( bOk ) {
spin_lock( &image->lock );
image_file = image->readFd;
- timing_get( &image->atime );
+ if ( !client->isServer ) {
+ // Only update immediately if this is a client. Servers are handled on disconnect.
+ timing_get( &image->atime );
+ }
spin_unlock( &image->lock );
serializer_reset_write( &payload );
serializer_put_uint16( &payload, client_version < 3 ? client_version : PROTOCOL_VERSION ); // XXX: Since messed up fuse client was messed up before :(
@@ -492,9 +495,6 @@ void* net_handleNewConnection(void *clientPtr)
pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
pthread_mutex_unlock( &client->sendMutex );
- spin_lock( &image->lock );
- timing_get( &image->atime );
- spin_unlock( &image->lock );
set_name: ;
if ( !hasName ) {
hasName = true;
@@ -529,9 +529,17 @@ set_name: ;
}
}
exit_client_cleanup: ;
- removeFromList( client );
// First remove from list, then add to counter to prevent race condition
+ removeFromList( client );
totalBytesSent += client->bytesSent;
+ // Access time, but only if client didn't just probe
+ if ( image != NULL ) {
+ spin_lock( &image->lock );
+ if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) {
+ timing_get( &image->atime );
+ }
+ spin_unlock( &image->lock );
+ }
freeClientStruct( client ); // This will also call image_release on client->image
return NULL ;
fail_preadd: ;
diff --git a/src/server/uplink.c b/src/server/uplink.c
index d544795..736ef11 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -32,6 +32,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink);
static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
static bool uplink_saveCacheMap(dnbd3_connection_t *link);
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -69,7 +70,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
link->image = image;
link->bytesReceived = 0;
- link->idleCount = 0;
+ link->idleTime = 0;
link->queueLen = 0;
link->fd = -1;
link->cacheFd = -1;
@@ -285,14 +286,14 @@ static void* uplink_mainloop(void *data)
struct pollfd events[EV_COUNT];
dnbd3_connection_t *link = (dnbd3_connection_t*)data;
int numSocks, i, waitTime;
- int altCheckInterval = SERVER_RTT_DELAY_INIT;
- int discoverFailCount = 0;
- int unsavedCount = 0;
- ticks nextAltCheck, nextKeepalive;
+ int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
+ uint32_t discoverFailCount = 0;
+ uint32_t unsavedSeconds = 0;
+ ticks nextAltCheck, lastKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
timing_get( &nextAltCheck );
- nextKeepalive = nextAltCheck;
+ lastKeepalive = nextAltCheck;
//
assert( link != NULL );
setThreadName( "idle-uplink" );
@@ -351,12 +352,14 @@ static void* uplink_mainloop(void *data)
// more to do here
}
// poll()
- do {
+ if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
+ waitTime = 500;
+ } else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
- } while(0);
- if ( waitTime < 100 ) waitTime = 100;
- if ( waitTime > 5000 ) waitTime = 5000;
+ if ( waitTime < 100 ) waitTime = 100;
+ if ( waitTime > 5000 ) waitTime = 5000;
+ }
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
@@ -378,6 +381,8 @@ static void* uplink_mainloop(void *data)
if ( link->fd != -1 ) {
// Uplink seems fine, relay requests to it...
uplink_sendRequests( link, true );
+ } else { // No uplink; maybe it was shutdown since it was idle for too long
+ link->idleTime = 0;
}
}
// Uplink socket
@@ -386,55 +391,71 @@ static void* uplink_mainloop(void *data)
close( events[EV_SOCKET].fd );
events[EV_SOCKET].fd = -1;
logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
+ setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
uplink_handleReceive( link );
if ( link->fd == -1 ) timing_get( &nextAltCheck );
if ( _shutdown || link->shutdown ) goto cleanup;
}
declare_now;
- if ( timing_reached( &nextKeepalive, &now ) ) {
- timing_set( &nextKeepalive, &now, 10 );
- link->idleCount++;
- unsavedCount++;
- if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) {
+ uint32_t timepassed = timing_diff( &lastKeepalive, &now );
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ lastKeepalive = now;
+ link->idleTime += timepassed;
+ unsavedSeconds += timepassed;
+ if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) {
// fsync/save every 4 minutes, or every 60 seconds if link is idle
- unsavedCount = 0;
+ unsavedSeconds = 0;
uplink_saveCacheMap( link );
}
- if ( link->idleCount % 2 == 0 ) {
- // Save cache map only if we don't seem busy handling actual client requests
- if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
- // Send keep alive if nothing is happening
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- const int fd = link->fd;
- link->fd = -1;
- close( fd );
- }
+ // Keep-alive
+ if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
+ // Send keep-alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ setThreadName( "panic-uplink" );
}
}
+ // Don't keep link established if we're idle for too much
+ if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
+ close( link->fd );
+ link->fd = events[EV_SOCKET].fd = -1;
+ link->cycleDetected = false;
+ if ( link->recvBufferLen != 0 ) {
+ link->recvBufferLen = 0;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ }
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid );
+ setThreadName( "idle-uplink" );
+ }
}
// See if we should trigger an RTT measurement
spin_lock( &link->rttLock );
const int rttTestResult = link->rttTestResult;
spin_unlock( &link->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || link->fd == -1 || link->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
+ setThreadName( "finished-uplink" );
goto cleanup;
- } else {
+ } else if ( !uplink_connectionShouldShutdown( link ) ) {
// Not complete - do measurement
altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = 0;
}
}
- altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
+ altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
@@ -442,7 +463,7 @@ static void* uplink_mainloop(void *data)
link->rttTestResult = RTT_IDLE;
spin_unlock( &link->rttLock );
discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED) );
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
@@ -693,13 +714,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
goto error_cleanup;
}
if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
- logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path );
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path );
goto error_cleanup;
}
if ( unlikely( link->recvBufferLen < inReply.size ) ) {
link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
+ if ( link->recvBuffer == NULL ) {
+ logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
+ exit( 1 );
+ }
}
if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
@@ -753,8 +778,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
image_updateCachemap( link->image, start, start + done, true );
}
if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
- logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.",
- link->image->name, (int)link->image->rid );
+ logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
+ link->image->name, (int)link->image->rid, err );
}
}
// 2) Figure out which clients are interested in it
@@ -808,8 +833,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
spin_unlock( &link->queueLock );
#ifdef _DEBUG
- if ( !served && start != link->replicationHandle )
+ if ( !served && start != link->replicationHandle ) {
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
+ }
#endif
if ( start == link->replicationHandle ) {
// Was our background replication
@@ -818,9 +844,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
if ( !served && link->cacheFd != -1 ) {
posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
}
- } else {
+ }
+ if ( served ) {
// Was some client -- reset idle counter
- link->idleCount = 0;
+ link->idleTime = 0;
// Re-enable replication if disabled
if ( link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
@@ -981,3 +1008,8 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link)
return true;
}
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link)
+{
+ return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT );
+}
+
diff --git a/src/serverconfig.h b/src/serverconfig.h
index 6fcda58..0cbb320 100644
--- a/src/serverconfig.h
+++ b/src/serverconfig.h
@@ -26,11 +26,15 @@
// the timeout for cases where we wait for additional data or are actively sending a reply
#define SOCKET_TIMEOUT_CLIENT_RETRIES 3
+#define SERVER_UPLINK_KEEPALIVE_INTERVAL 10 // (Seconds) Send keep-alive if nothing else is happening on the uplink
+#define SERVER_UPLINK_IDLE_TIMEOUT 1800 // (Seconds) Timeout after which we tear down an uplink connection if no blocks needed to be fetched
+
// +++++ Other magic constants
-#define SERVER_RTT_PROBES 5
-#define SERVER_RTT_DELAY_INIT 5
-#define SERVER_RTT_DELAY_MAX 45
-#define SERVER_RTT_DELAY_FAILED 180
+#define SERVER_RTT_PROBES 5 // How many probes to average over
+#define SERVER_RTT_INTERVAL_INIT 5 // Initial interval between probes
+#define SERVER_RTT_INTERVAL_MAX 45 // Maximum interval between probes
+#define SERVER_RTT_BACKOFF_COUNT 5 // If we can't reach any uplink server this many times, consider the uplink bad
+#define SERVER_RTT_INTERVAL_FAILED 180 // Interval to use if no uplink server is reachable for above many times
#define SERVER_REMOTE_IMAGE_CHECK_CACHETIME 120 // 2 minutes