summaryrefslogtreecommitdiffstats
path: root/src/server
diff options
context:
space:
mode:
authorSimon Rettberg2019-01-31 10:26:04 +0100
committerSimon Rettberg2019-01-31 10:26:04 +0100
commit41a9d196062bfea0aadde87bc30ae262890334b7 (patch)
tree29c34938de13f93b4fb9b4cf7cd5499342079e5b /src/server
parent[SERVER] uplink: Check for _maxPayload when getting client request (diff)
downloaddnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.gz
dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.xz
dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.zip
[SERVER] Don't keep an uplink connection established forever
In case we don't use background replication a connection to an uplink server can potentially stay around forever. This in turn would prevent the uplink server from freeing the image as it appears to be in use.
Diffstat (limited to 'src/server')
-rw-r--r--src/server/altservers.c2
-rw-r--r--src/server/globals.h2
-rw-r--r--src/server/net.c18
-rw-r--r--src/server/uplink.c108
4 files changed, 85 insertions, 45 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 0054f53..fe4624f 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -380,7 +380,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
// to prevent the counter from increasing rapidly if many images use the
// same uplink. If there's a network hickup, all uplinks will call this
// function and would increase the counter too quickly, disabling the server.
- if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_DELAY_INIT ) {
+ if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) {
altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE;
altServers[foundIndex].lastFail = now;
if ( lastOk != -1 ) {
diff --git a/src/server/globals.h b/src/server/globals.h
index 097df83..2b30bc2 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -72,7 +72,7 @@ struct _dnbd3_connection
uint64_t replicationHandle; // Handle of pending replication request
atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup.
int queueLen; // length of queue
- int idleCount; // How many iterations of keepalive check connection was idle
+ uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives)
dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
};
diff --git a/src/server/net.c b/src/server/net.c
index dcdbaea..a0a9ee3 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -281,7 +281,10 @@ void* net_handleNewConnection(void *clientPtr)
if ( bOk ) {
spin_lock( &image->lock );
image_file = image->readFd;
- timing_get( &image->atime );
+ if ( !client->isServer ) {
+ // Only update immediately if this is a client. Servers are handled on disconnect.
+ timing_get( &image->atime );
+ }
spin_unlock( &image->lock );
serializer_reset_write( &payload );
serializer_put_uint16( &payload, client_version < 3 ? client_version : PROTOCOL_VERSION ); // XXX: Since messed up fuse client was messed up before :(
@@ -492,9 +495,6 @@ void* net_handleNewConnection(void *clientPtr)
pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
pthread_mutex_unlock( &client->sendMutex );
- spin_lock( &image->lock );
- timing_get( &image->atime );
- spin_unlock( &image->lock );
set_name: ;
if ( !hasName ) {
hasName = true;
@@ -529,9 +529,17 @@ set_name: ;
}
}
exit_client_cleanup: ;
- removeFromList( client );
// First remove from list, then add to counter to prevent race condition
+ removeFromList( client );
totalBytesSent += client->bytesSent;
+ // Access time, but only if client didn't just probe
+ if ( image != NULL ) {
+ spin_lock( &image->lock );
+ if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) {
+ timing_get( &image->atime );
+ }
+ spin_unlock( &image->lock );
+ }
freeClientStruct( client ); // This will also call image_release on client->image
return NULL ;
fail_preadd: ;
diff --git a/src/server/uplink.c b/src/server/uplink.c
index d544795..736ef11 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -32,6 +32,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink);
static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
static bool uplink_saveCacheMap(dnbd3_connection_t *link);
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -69,7 +70,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
link->image = image;
link->bytesReceived = 0;
- link->idleCount = 0;
+ link->idleTime = 0;
link->queueLen = 0;
link->fd = -1;
link->cacheFd = -1;
@@ -285,14 +286,14 @@ static void* uplink_mainloop(void *data)
struct pollfd events[EV_COUNT];
dnbd3_connection_t *link = (dnbd3_connection_t*)data;
int numSocks, i, waitTime;
- int altCheckInterval = SERVER_RTT_DELAY_INIT;
- int discoverFailCount = 0;
- int unsavedCount = 0;
- ticks nextAltCheck, nextKeepalive;
+ int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
+ uint32_t discoverFailCount = 0;
+ uint32_t unsavedSeconds = 0;
+ ticks nextAltCheck, lastKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
timing_get( &nextAltCheck );
- nextKeepalive = nextAltCheck;
+ lastKeepalive = nextAltCheck;
//
assert( link != NULL );
setThreadName( "idle-uplink" );
@@ -351,12 +352,14 @@ static void* uplink_mainloop(void *data)
// more to do here
}
// poll()
- do {
+ if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
+ waitTime = 500;
+ } else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
- } while(0);
- if ( waitTime < 100 ) waitTime = 100;
- if ( waitTime > 5000 ) waitTime = 5000;
+ if ( waitTime < 100 ) waitTime = 100;
+ if ( waitTime > 5000 ) waitTime = 5000;
+ }
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
@@ -378,6 +381,8 @@ static void* uplink_mainloop(void *data)
if ( link->fd != -1 ) {
// Uplink seems fine, relay requests to it...
uplink_sendRequests( link, true );
+ } else { // No uplink; maybe it was shutdown since it was idle for too long
+ link->idleTime = 0;
}
}
// Uplink socket
@@ -386,55 +391,71 @@ static void* uplink_mainloop(void *data)
close( events[EV_SOCKET].fd );
events[EV_SOCKET].fd = -1;
logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
+ setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
uplink_handleReceive( link );
if ( link->fd == -1 ) timing_get( &nextAltCheck );
if ( _shutdown || link->shutdown ) goto cleanup;
}
declare_now;
- if ( timing_reached( &nextKeepalive, &now ) ) {
- timing_set( &nextKeepalive, &now, 10 );
- link->idleCount++;
- unsavedCount++;
- if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) {
+ uint32_t timepassed = timing_diff( &lastKeepalive, &now );
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ lastKeepalive = now;
+ link->idleTime += timepassed;
+ unsavedSeconds += timepassed;
+ if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) {
// fsync/save every 4 minutes, or every 60 seconds if link is idle
- unsavedCount = 0;
+ unsavedSeconds = 0;
uplink_saveCacheMap( link );
}
- if ( link->idleCount % 2 == 0 ) {
- // Save cache map only if we don't seem busy handling actual client requests
- if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
- // Send keep alive if nothing is happening
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- const int fd = link->fd;
- link->fd = -1;
- close( fd );
- }
+ // Keep-alive
+ if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
+ // Send keep-alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ setThreadName( "panic-uplink" );
}
}
+ // Don't keep link established if we're idle for too much
+ if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
+ close( link->fd );
+ link->fd = events[EV_SOCKET].fd = -1;
+ link->cycleDetected = false;
+ if ( link->recvBufferLen != 0 ) {
+ link->recvBufferLen = 0;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ }
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid );
+ setThreadName( "idle-uplink" );
+ }
}
// See if we should trigger an RTT measurement
spin_lock( &link->rttLock );
const int rttTestResult = link->rttTestResult;
spin_unlock( &link->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || link->fd == -1 || link->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
+ setThreadName( "finished-uplink" );
goto cleanup;
- } else {
+ } else if ( !uplink_connectionShouldShutdown( link ) ) {
// Not complete - do measurement
altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = 0;
}
}
- altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
+ altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
@@ -442,7 +463,7 @@ static void* uplink_mainloop(void *data)
link->rttTestResult = RTT_IDLE;
spin_unlock( &link->rttLock );
discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED) );
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
@@ -693,13 +714,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
goto error_cleanup;
}
if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
- logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path );
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path );
goto error_cleanup;
}
if ( unlikely( link->recvBufferLen < inReply.size ) ) {
link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
+ if ( link->recvBuffer == NULL ) {
+ logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
+ exit( 1 );
+ }
}
if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
@@ -753,8 +778,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
image_updateCachemap( link->image, start, start + done, true );
}
if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
- logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.",
- link->image->name, (int)link->image->rid );
+ logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
+ link->image->name, (int)link->image->rid, err );
}
}
// 2) Figure out which clients are interested in it
@@ -808,8 +833,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
spin_unlock( &link->queueLock );
#ifdef _DEBUG
- if ( !served && start != link->replicationHandle )
+ if ( !served && start != link->replicationHandle ) {
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
+ }
#endif
if ( start == link->replicationHandle ) {
// Was our background replication
@@ -818,9 +844,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
if ( !served && link->cacheFd != -1 ) {
posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
}
- } else {
+ }
+ if ( served ) {
// Was some client -- reset idle counter
- link->idleCount = 0;
+ link->idleTime = 0;
// Re-enable replication if disabled
if ( link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
@@ -981,3 +1008,8 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link)
return true;
}
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link)
+{
+ return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT );
+}
+