From 41a9d196062bfea0aadde87bc30ae262890334b7 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 31 Jan 2019 10:26:04 +0100 Subject: [SERVER] Don't keep an uplink connection established forever In case we don't use background replication a connection to an uplink server can potentially stay around forever. This in turn would prevent the uplink server from freeing the image as it appears to be in use. --- src/server/uplink.c | 108 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 38 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index d544795..736ef11 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -32,6 +32,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink); static void uplink_sendReplicationRequest(dnbd3_connection_t *link); static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); static bool uplink_saveCacheMap(dnbd3_connection_t *link); +static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link); // ############ uplink connection handling @@ -69,7 +70,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); link->image = image; link->bytesReceived = 0; - link->idleCount = 0; + link->idleTime = 0; link->queueLen = 0; link->fd = -1; link->cacheFd = -1; @@ -285,14 +286,14 @@ static void* uplink_mainloop(void *data) struct pollfd events[EV_COUNT]; dnbd3_connection_t *link = (dnbd3_connection_t*)data; int numSocks, i, waitTime; - int altCheckInterval = SERVER_RTT_DELAY_INIT; - int discoverFailCount = 0; - int unsavedCount = 0; - ticks nextAltCheck, nextKeepalive; + int altCheckInterval = SERVER_RTT_INTERVAL_INIT; + uint32_t discoverFailCount = 0; + uint32_t unsavedSeconds = 0; + ticks nextAltCheck, lastKeepalive; char buffer[200]; memset( events, 0, sizeof(events) ); timing_get( &nextAltCheck ); - nextKeepalive = nextAltCheck; + lastKeepalive = nextAltCheck; // assert( link != NULL ); setThreadName( "idle-uplink" ); @@ -351,12 +352,14 @@ static void* uplink_mainloop(void *data) // more to do here } // poll() - do { + if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { + waitTime = 500; + } else { declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); - } while(0); - if ( waitTime < 100 ) waitTime = 100; - if ( waitTime > 5000 ) waitTime = 5000; + if ( waitTime < 100 ) waitTime = 100; + if ( waitTime > 5000 ) waitTime = 5000; + } numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || link->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? @@ -378,6 +381,8 @@ static void* uplink_mainloop(void *data) if ( link->fd != -1 ) { // Uplink seems fine, relay requests to it... uplink_sendRequests( link, true ); + } else { // No uplink; maybe it was shutdown since it was idle for too long + link->idleTime = 0; } } // Uplink socket @@ -386,55 +391,71 @@ static void* uplink_mainloop(void *data) close( events[EV_SOCKET].fd ); events[EV_SOCKET].fd = -1; logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); + setThreadName( "panic-uplink" ); } else if ( (events[EV_SOCKET].revents & POLLIN) ) { uplink_handleReceive( link ); if ( link->fd == -1 ) timing_get( &nextAltCheck ); if ( _shutdown || link->shutdown ) goto cleanup; } declare_now; - if ( timing_reached( &nextKeepalive, &now ) ) { - timing_set( &nextKeepalive, &now, 10 ); - link->idleCount++; - unsavedCount++; - if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) { + uint32_t timepassed = timing_diff( &lastKeepalive, &now ); + if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { + lastKeepalive = now; + link->idleTime += timepassed; + unsavedSeconds += timepassed; + if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) { // fsync/save every 4 minutes, or every 60 seconds if link is idle - unsavedCount = 0; + unsavedSeconds = 0; uplink_saveCacheMap( link ); } - if ( link->idleCount % 2 == 0 ) { - // Save cache map only if we don't seem busy handling actual client requests - if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { - // Send keep alive if nothing is happening - if ( uplink_sendKeepalive( link->fd ) ) { - // Re-trigger periodically, in case it requires a minimum user count - uplink_sendReplicationRequest( link ); - } else { - const int fd = link->fd; - link->fd = -1; - close( fd ); - } + // Keep-alive + if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { + // Send keep-alive if nothing is happening + if ( uplink_sendKeepalive( link->fd ) ) { + // Re-trigger periodically, in case it requires a minimum user count + uplink_sendReplicationRequest( link ); + } else { + logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" ); + const int fd = link->fd; + link->fd = -1; + close( fd ); + setThreadName( "panic-uplink" ); } } + // Don't keep link established if we're idle for too much + if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { + close( link->fd ); + link->fd = events[EV_SOCKET].fd = -1; + link->cycleDetected = false; + if ( link->recvBufferLen != 0 ) { + link->recvBufferLen = 0; + free( link->recvBuffer ); + link->recvBuffer = NULL; + } + logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid ); + setThreadName( "idle-uplink" ); + } } // See if we should trigger an RTT measurement spin_lock( &link->rttLock ); const int rttTestResult = link->rttTestResult; spin_unlock( &link->rttLock ); if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { - if ( timing_reached( &nextAltCheck, &now ) || link->fd == -1 || link->cycleDetected ) { + if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) { // It seems it's time for a check if ( image_isComplete( link->image ) ) { // Quit work if image is complete logadd( LOG_INFO, "Replication of %s complete.", link->image->name ); + setThreadName( "finished-uplink" ); goto cleanup; - } else { + } else if ( !uplink_connectionShouldShutdown( link ) ) { // Not complete - do measurement altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { link->nextReplicationIndex = 0; } } - altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); + altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX); timing_set( &nextAltCheck, &now, altCheckInterval ); } } else if ( rttTestResult == RTT_NOT_REACHABLE ) { @@ -442,7 +463,7 @@ static void* uplink_mainloop(void *data) link->rttTestResult = RTT_IDLE; spin_unlock( &link->rttLock ); discoverFailCount++; - timing_set( &nextAltCheck, &now, (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED) ); + timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); } #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { @@ -693,13 +714,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link) goto error_cleanup; } if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { - logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path ); + logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path ); goto error_cleanup; } if ( unlikely( link->recvBufferLen < inReply.size ) ) { link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); + if ( link->recvBuffer == NULL ) { + logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" ); + exit( 1 ); + } } if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) { logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path ); @@ -753,8 +778,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link) image_updateCachemap( link->image, start, start + done, true ); } if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { - logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.", - link->image->name, (int)link->image->rid ); + logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", + link->image->name, (int)link->image->rid, err ); } } // 2) Figure out which clients are interested in it @@ -808,8 +833,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } spin_unlock( &link->queueLock ); #ifdef _DEBUG - if ( !served && start != link->replicationHandle ) + if ( !served && start != link->replicationHandle ) { logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); + } #endif if ( start == link->replicationHandle ) { // Was our background replication @@ -818,9 +844,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) if ( !served && link->cacheFd != -1 ) { posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); } - } else { + } + if ( served ) { // Was some client -- reset idle counter - link->idleCount = 0; + link->idleTime = 0; // Re-enable replication if disabled if ( link->nextReplicationIndex == -1 ) { link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; @@ -981,3 +1008,8 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link) return true; } +static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link) +{ + return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT ); +} + -- cgit v1.2.3-55-g7522