diff options
author | Simon Rettberg | 2019-01-31 10:26:04 +0100 |
---|---|---|
committer | Simon Rettberg | 2019-01-31 10:26:04 +0100 |
commit | 41a9d196062bfea0aadde87bc30ae262890334b7 (patch) | |
tree | 29c34938de13f93b4fb9b4cf7cd5499342079e5b /src/server/uplink.c | |
parent | [SERVER] uplink: Check for _maxPayload when getting client request (diff) | |
download | dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.gz dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.tar.xz dnbd3-41a9d196062bfea0aadde87bc30ae262890334b7.zip |
[SERVER] Don't keep an uplink connection established forever
In case we don't use background replication a connection to an uplink
server can potentially stay around forever. This in turn would prevent
the uplink server from freeing the image as it appears to be in use.
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r-- | src/server/uplink.c | 108 |
1 files changed, 70 insertions, 38 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c index d544795..736ef11 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -32,6 +32,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink); static void uplink_sendReplicationRequest(dnbd3_connection_t *link); static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); static bool uplink_saveCacheMap(dnbd3_connection_t *link); +static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link); // ############ uplink connection handling @@ -69,7 +70,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); link->image = image; link->bytesReceived = 0; - link->idleCount = 0; + link->idleTime = 0; link->queueLen = 0; link->fd = -1; link->cacheFd = -1; @@ -285,14 +286,14 @@ static void* uplink_mainloop(void *data) struct pollfd events[EV_COUNT]; dnbd3_connection_t *link = (dnbd3_connection_t*)data; int numSocks, i, waitTime; - int altCheckInterval = SERVER_RTT_DELAY_INIT; - int discoverFailCount = 0; - int unsavedCount = 0; - ticks nextAltCheck, nextKeepalive; + int altCheckInterval = SERVER_RTT_INTERVAL_INIT; + uint32_t discoverFailCount = 0; + uint32_t unsavedSeconds = 0; + ticks nextAltCheck, lastKeepalive; char buffer[200]; memset( events, 0, sizeof(events) ); timing_get( &nextAltCheck ); - nextKeepalive = nextAltCheck; + lastKeepalive = nextAltCheck; // assert( link != NULL ); setThreadName( "idle-uplink" ); @@ -351,12 +352,14 @@ static void* uplink_mainloop(void *data) // more to do here } // poll() - do { + if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { + waitTime = 500; + } else { declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); - } while(0); - if ( waitTime < 100 ) waitTime = 100; - if ( waitTime > 5000 ) waitTime = 5000; + if ( waitTime < 100 ) waitTime = 100; + if ( waitTime > 5000 ) waitTime = 5000; + } numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || link->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? @@ -378,6 +381,8 @@ static void* uplink_mainloop(void *data) if ( link->fd != -1 ) { // Uplink seems fine, relay requests to it... uplink_sendRequests( link, true ); + } else { // No uplink; maybe it was shutdown since it was idle for too long + link->idleTime = 0; } } // Uplink socket @@ -386,55 +391,71 @@ static void* uplink_mainloop(void *data) close( events[EV_SOCKET].fd ); events[EV_SOCKET].fd = -1; logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); + setThreadName( "panic-uplink" ); } else if ( (events[EV_SOCKET].revents & POLLIN) ) { uplink_handleReceive( link ); if ( link->fd == -1 ) timing_get( &nextAltCheck ); if ( _shutdown || link->shutdown ) goto cleanup; } declare_now; - if ( timing_reached( &nextKeepalive, &now ) ) { - timing_set( &nextKeepalive, &now, 10 ); - link->idleCount++; - unsavedCount++; - if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) { + uint32_t timepassed = timing_diff( &lastKeepalive, &now ); + if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { + lastKeepalive = now; + link->idleTime += timepassed; + unsavedSeconds += timepassed; + if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) { // fsync/save every 4 minutes, or every 60 seconds if link is idle - unsavedCount = 0; + unsavedSeconds = 0; uplink_saveCacheMap( link ); } - if ( link->idleCount % 2 == 0 ) { - // Save cache map only if we don't seem busy handling actual client requests - if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { - // Send keep alive if nothing is happening - if ( uplink_sendKeepalive( link->fd ) ) { - // Re-trigger periodically, in case it requires a minimum user count - uplink_sendReplicationRequest( link ); - } else { - const int fd = link->fd; - link->fd = -1; - close( fd ); - } + // Keep-alive + if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { + // Send keep-alive if nothing is happening + if ( uplink_sendKeepalive( link->fd ) ) { + // Re-trigger periodically, in case it requires a minimum user count + uplink_sendReplicationRequest( link ); + } else { + logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" ); + const int fd = link->fd; + link->fd = -1; + close( fd ); + setThreadName( "panic-uplink" ); } } + // Don't keep link established if we're idle for too much + if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { + close( link->fd ); + link->fd = events[EV_SOCKET].fd = -1; + link->cycleDetected = false; + if ( link->recvBufferLen != 0 ) { + link->recvBufferLen = 0; + free( link->recvBuffer ); + link->recvBuffer = NULL; + } + logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid ); + setThreadName( "idle-uplink" ); + } } // See if we should trigger an RTT measurement spin_lock( &link->rttLock ); const int rttTestResult = link->rttTestResult; spin_unlock( &link->rttLock ); if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { - if ( timing_reached( &nextAltCheck, &now ) || link->fd == -1 || link->cycleDetected ) { + if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) { // It seems it's time for a check if ( image_isComplete( link->image ) ) { // Quit work if image is complete logadd( LOG_INFO, "Replication of %s complete.", link->image->name ); + setThreadName( "finished-uplink" ); goto cleanup; - } else { + } else if ( !uplink_connectionShouldShutdown( link ) ) { // Not complete - do measurement altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { link->nextReplicationIndex = 0; } } - altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); + altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX); timing_set( &nextAltCheck, &now, altCheckInterval ); } } else if ( rttTestResult == RTT_NOT_REACHABLE ) { @@ -442,7 +463,7 @@ static void* uplink_mainloop(void *data) link->rttTestResult = RTT_IDLE; spin_unlock( &link->rttLock ); discoverFailCount++; - timing_set( &nextAltCheck, &now, (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED) ); + timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); } #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { @@ -693,13 +714,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link) goto error_cleanup; } if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { - logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path ); + logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path ); goto error_cleanup; } if ( unlikely( link->recvBufferLen < inReply.size ) ) { link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); + if ( link->recvBuffer == NULL ) { + logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" ); + exit( 1 ); + } } if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) { logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path ); @@ -753,8 +778,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link) image_updateCachemap( link->image, start, start + done, true ); } if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { - logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.", - link->image->name, (int)link->image->rid ); + logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", + link->image->name, (int)link->image->rid, err ); } } // 2) Figure out which clients are interested in it @@ -808,8 +833,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } spin_unlock( &link->queueLock ); #ifdef _DEBUG - if ( !served && start != link->replicationHandle ) + if ( !served && start != link->replicationHandle ) { logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); + } #endif if ( start == link->replicationHandle ) { // Was our background replication @@ -818,9 +844,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) if ( !served && link->cacheFd != -1 ) { posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); } - } else { + } + if ( served ) { // Was some client -- reset idle counter - link->idleCount = 0; + link->idleTime = 0; // Re-enable replication if disabled if ( link->nextReplicationIndex == -1 ) { link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; @@ -981,3 +1008,8 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link) return true; } +static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link) +{ + return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT ); +} + |