From b335a105db3593f4d7b24ec877b2b478caafd098 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 8 Feb 2019 15:34:27 +0100 Subject: [SERVER] uplink: Dedicated function for handling link failure --- src/server/uplink.c | 81 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 31 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 736ef11..f534994 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -33,6 +33,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link); static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); static bool uplink_saveCacheMap(dnbd3_connection_t *link); static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link); +static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew); // ############ uplink connection handling @@ -56,7 +57,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version dnbd3_connection_t *link = NULL; assert( image != NULL ); spin_lock( &image->lock ); - if ( image->uplink != NULL ) { + if ( image->uplink != NULL && !image->uplink->shutdown ) { spin_unlock( &image->lock ); if ( sock >= 0 ) close( sock ); return true; // There's already an uplink, so should we consider this success or failure? @@ -284,7 +285,7 @@ static void* uplink_mainloop(void *data) #define EV_SOCKET (1) #define EV_COUNT (2) struct pollfd events[EV_COUNT]; - dnbd3_connection_t *link = (dnbd3_connection_t*)data; + dnbd3_connection_t * const link = (dnbd3_connection_t*)data; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_INTERVAL_INIT; uint32_t discoverFailCount = 0; @@ -313,6 +314,29 @@ static void* uplink_mainloop(void *data) events[EV_SIGNAL].fd = signal_getWaitFd( link->signal ); events[EV_SOCKET].fd = -1; while ( !_shutdown && !link->shutdown ) { + // poll() + spin_lock( &link->rttLock ); + waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1; + spin_unlock( &link->rttLock ); + if ( waitTime == 0 ) { + // Nothing + } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { + waitTime = 1000; + } else { + declare_now; + waitTime = (int)timing_diffMs( &now, &nextAltCheck ); + if ( waitTime < 100 ) waitTime = 100; + if ( waitTime > 5000 ) waitTime = 5000; + } + events[EV_SOCKET].fd = link->fd; + numSocks = poll( events, EV_COUNT, waitTime ); + if ( _shutdown || link->shutdown ) goto cleanup; + if ( numSocks == -1 ) { // Error? + if ( errno == EINTR ) continue; + logadd( LOG_DEBUG1, "poll() error %d", (int)errno ); + usleep( 10000 ); + continue; + } // Check if server switch is in order spin_lock( &link->rttLock ); if ( link->rttTestResult != RTT_DOCHANGE ) { @@ -346,28 +370,10 @@ static void* uplink_mainloop(void *data) uplink_sendRequests( link, false ); uplink_sendReplicationRequest( link ); events[EV_SOCKET].events = POLLIN | POLLRDHUP; - events[EV_SOCKET].fd = link->fd; timing_gets( &nextAltCheck, altCheckInterval ); // The rtt worker already did the handshake for our image, so there's nothing // more to do here } - // poll() - if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { - waitTime = 500; - } else { - declare_now; - waitTime = (int)timing_diffMs( &now, &nextAltCheck ); - if ( waitTime < 100 ) waitTime = 100; - if ( waitTime > 5000 ) waitTime = 5000; - } - numSocks = poll( events, EV_COUNT, waitTime ); - if ( _shutdown || link->shutdown ) goto cleanup; - if ( numSocks == -1 ) { // Error? - if ( errno == EINTR ) continue; - logadd( LOG_DEBUG1, "poll() error %d", (int)errno ); - usleep( 10000 ); - continue; - } // Check events // Signal if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { @@ -387,14 +393,11 @@ static void* uplink_mainloop(void *data) } // Uplink socket if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { - link->fd = -1; - close( events[EV_SOCKET].fd ); - events[EV_SOCKET].fd = -1; + uplink_connectionFailed( link, true ); logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); setThreadName( "panic-uplink" ); } else if ( (events[EV_SOCKET].revents & POLLIN) ) { uplink_handleReceive( link ); - if ( link->fd == -1 ) timing_get( &nextAltCheck ); if ( _shutdown || link->shutdown ) goto cleanup; } declare_now; @@ -415,10 +418,8 @@ static void* uplink_mainloop(void *data) // Re-trigger periodically, in case it requires a minimum user count uplink_sendReplicationRequest( link ); } else { + uplink_connectionFailed( link, true ); logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" ); - const int fd = link->fd; - link->fd = -1; - close( fd ); setThreadName( "panic-uplink" ); } } @@ -496,7 +497,9 @@ static void* uplink_mainloop(void *data) altservers_removeUplink( link ); uplink_saveCacheMap( link ); spin_lock( &link->image->lock ); - link->image->uplink = NULL; + if ( link->image->uplink == link ) { + link->image->uplink = NULL; + } spin_lock( &link->queueLock ); const int fd = link->fd; const dnbd3_signal_t* signal = link->signal; @@ -863,12 +866,28 @@ static void uplink_handleReceive(dnbd3_connection_t *link) return; // Error handling from failed receive or message parsing error_cleanup: ; + uplink_connectionFailed( link, true ); +} + +static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) +{ + if ( link->fd == -1 ) + return; altservers_serverFailed( &link->currentServer ); - const int fd = link->fd; + close( link->fd ); link->fd = -1; link->replicationHandle = REP_NONE; - if ( fd != -1 ) close( fd ); - altservers_findUplink( link ); // Can we just call it here? + if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { + link->nextReplicationIndex = 0; + } + if ( !findNew ) + return; + spin_lock( &link->rttLock ); + bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1; + spin_unlock( &link->rttLock ); + if ( bail ) + return; + altservers_findUplink( link ); } /** -- cgit v1.2.3-55-g7522