From 121dd5eceb64be43d188670bff5bce265d57d199 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 7 Aug 2019 16:31:05 +0200 Subject: [SERVER] Lock-free queue for altservers check thread --- src/server/altservers.c | 97 +++++++++++++++++++++++++++---------------------- src/server/uplink.c | 8 ++-- 2 files changed, 57 insertions(+), 48 deletions(-) diff --git a/src/server/altservers.c b/src/server/altservers.c index bbbc584..a270bf3 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -14,10 +14,8 @@ #define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0); #define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) -static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; -static pthread_mutex_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) -static pthread_mutex_t pendingLockConsume; // Lock for removing something (nonNULL -> NULL) -static dnbd3_signal_t* runSignal = NULL; +static dnbd3_connection_t * _Atomic pending[SERVER_MAX_PENDING_ALT_CHECKS]; +static dnbd3_signal_t * _Atomic runSignal = NULL; static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; static int numAltServers = 0; @@ -32,8 +30,6 @@ void altservers_init() { srand( (unsigned int)time( NULL ) ); // Init spinlock - mutex_init( &pendingLockWrite ); - mutex_init( &pendingLockConsume ); mutex_init( &altServersLock ); // Init signal runSignal = signal_new(); @@ -48,12 +44,9 @@ void altservers_init() } // Init waiting links queue -- this is currently a global static array so // it will already be zero, but in case we refactor later do it explicitly - // while also holding the write lock so thread sanitizer is happy - mutex_lock( &pendingLockWrite ); for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { pending[i] = NULL; } - mutex_unlock( &pendingLockWrite ); } void altservers_shutdown() @@ -130,52 +123,77 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate */ void altservers_findUplink(dnbd3_connection_t *uplink) { + if ( uplink->shutdown ) + return; int i; // if betterFd != -1 it means the uplink is supposed to switch to another // server. As this function here is called by the uplink thread, it can // never be that the uplink is supposed to switch, but instead calls // this function. assert( uplink->betterFd == -1 ); - mutex_lock( &pendingLockWrite ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress + // XXX As this function is only ever called by the image's uplink thread, + // it cannot happen that the uplink ends up in this list concurrently + mutex_lock( &uplink->rttLock ); if ( uplink->rttTestResult == RTT_INPROGRESS ) { for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] != uplink ) continue; // Yep, measuring right now - mutex_unlock( &pendingLockWrite ); return; } } // Find free slot for measurement + uplink->rttTestResult = RTT_INPROGRESS; for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] != NULL ) continue; - pending[i] = uplink; - uplink->rttTestResult = RTT_INPROGRESS; - mutex_unlock( &pendingLockWrite ); - signal_call( runSignal ); // Wake altservers thread up - return; + dnbd3_connection_t *null = NULL; + if ( atomic_compare_exchange_strong( &pending[i], &null, uplink ) ) { + mutex_unlock( &uplink->rttLock ); + atomic_thread_fence( memory_order_release ); + signal_call( runSignal ); // Wake altservers thread up + return; + } } // End of loop - no free slot - mutex_unlock( &pendingLockWrite ); + uplink->rttTestResult = RTT_NOT_REACHABLE; + mutex_unlock( &uplink->rttLock ); logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." ); } /** - * The given uplink is about to disappear, so remove it from any queues + * The given uplink is about to disappear, + * wait until any pending RTT check is done. */ void altservers_removeUplink(dnbd3_connection_t *uplink) { - mutex_lock( &pendingLockConsume ); - mutex_lock( &pendingLockWrite ); - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { - if ( pending[i] == uplink ) { + assert( uplink != NULL ); + assert( uplink->shutdown ); + int i; + for ( i = 1 ;; ++i ) { + atomic_thread_fence( memory_order_acquire ); + if ( runSignal == NULL ) { + // Thread is already done, remove manually uplink->rttTestResult = RTT_NOT_REACHABLE; - pending[i] = NULL; + break; + } + // Thread still running, wait until test is done + bool found = false; + for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( pending[i] == uplink ) { + found = true; + break; + } + } + if ( !found ) // No more test running + break; + usleep( 10000 ); // 10ms + signal_call( runSignal ); // Wake altservers thread up + if ( i % 500 == 0 ) { + logadd( LOG_INFO, "Still waiting for altserver check for uplink %p...", (void*)uplink ); } } - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); + logadd( LOG_DEBUG1, "Waited for %d iterations for altservers check when tearing down uplink", i ); } /** @@ -432,28 +450,18 @@ static void *altservers_main(void *data UNUSED) usleep( 100000 ); } // Work your way through the queue + atomic_thread_fence( memory_order_acquire ); for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { - mutex_lock( &pendingLockWrite ); - if ( pending[itLink] == NULL ) { - mutex_unlock( &pendingLockWrite ); - continue; // Check once before locking, as a mutex is expensive - } - mutex_unlock( &pendingLockWrite ); - mutex_lock( &pendingLockConsume ); - mutex_lock( &pendingLockWrite ); dnbd3_connection_t * const uplink = pending[itLink]; - mutex_unlock( &pendingLockWrite ); - if ( uplink == NULL ) { // Check again after locking - mutex_unlock( &pendingLockConsume ); + if ( uplink == NULL ) continue; - } dnbd3_image_t * const image = image_lock( uplink->image ); if ( image == NULL ) { // Check again after locking + mutex_lock( &uplink->rttLock ); uplink->rttTestResult = RTT_NOT_REACHABLE; - mutex_lock( &pendingLockWrite ); + assert( pending[itLink] == uplink ); pending[itLink] = NULL; - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); + mutex_unlock( &uplink->rttLock ); logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" ); continue; } @@ -592,10 +600,9 @@ static void *altservers_main(void *data UNUSED) } image_release( image ); // end of loop over all pending uplinks - mutex_lock( &pendingLockWrite ); + assert( pending[itLink] == uplink ); pending[itLink] = NULL; - mutex_unlock( &pendingLockWrite ); - mutex_unlock( &pendingLockConsume ); + atomic_thread_fence( memory_order_release ); } // Save cache maps of all images if applicable declare_now; @@ -606,7 +613,9 @@ static void *altservers_main(void *data UNUSED) } } cleanup: ; - if ( runSignal != NULL ) signal_close( runSignal ); + if ( runSignal != NULL ) { + signal_close( runSignal ); + } runSignal = NULL; return NULL ; } diff --git a/src/server/uplink.c b/src/server/uplink.c index 9f99fe4..bb1ffdc 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -583,6 +583,10 @@ static void* uplink_mainloop(void *data) #endif } cleanup: ; + if ( !link->shutdown ) { + link->shutdown = true; + thread_detach( link->thread ); + } altservers_removeUplink( link ); uplink_saveCacheMap( link ); mutex_lock( &link->image->lock ); @@ -596,10 +600,6 @@ static void* uplink_mainloop(void *data) link->fd = -1; mutex_unlock( &link->sendMutex ); link->signal = NULL; - if ( !link->shutdown ) { - link->shutdown = true; - thread_detach( link->thread ); - } // Do not access link->image after unlocking, since we set // image->uplink to NULL. Acquire with image_lock first, // like done below when checking whether to re-init uplink -- cgit v1.2.3-55-g7522