summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-07 16:31:05 +0200
committerSimon Rettberg2019-08-07 16:31:05 +0200
commit121dd5eceb64be43d188670bff5bce265d57d199 (patch)
tree7e88e3668e9c1376b3ebee93f9eafbf7bfb8ee64
parent[BENCH] Allow specifying request block size (diff)
downloaddnbd3-121dd5eceb64be43d188670bff5bce265d57d199.tar.gz
dnbd3-121dd5eceb64be43d188670bff5bce265d57d199.tar.xz
dnbd3-121dd5eceb64be43d188670bff5bce265d57d199.zip
[SERVER] Lock-free queue for altservers check thread
-rw-r--r--src/server/altservers.c97
-rw-r--r--src/server/uplink.c8
2 files changed, 57 insertions, 48 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index bbbc584..a270bf3 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -14,10 +14,8 @@
#define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0);
#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__)
-static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
-static pthread_mutex_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL)
-static pthread_mutex_t pendingLockConsume; // Lock for removing something (nonNULL -> NULL)
-static dnbd3_signal_t* runSignal = NULL;
+static dnbd3_connection_t * _Atomic pending[SERVER_MAX_PENDING_ALT_CHECKS];
+static dnbd3_signal_t * _Atomic runSignal = NULL;
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
static int numAltServers = 0;
@@ -32,8 +30,6 @@ void altservers_init()
{
srand( (unsigned int)time( NULL ) );
// Init spinlock
- mutex_init( &pendingLockWrite );
- mutex_init( &pendingLockConsume );
mutex_init( &altServersLock );
// Init signal
runSignal = signal_new();
@@ -48,12 +44,9 @@ void altservers_init()
}
// Init waiting links queue -- this is currently a global static array so
// it will already be zero, but in case we refactor later do it explicitly
- // while also holding the write lock so thread sanitizer is happy
- mutex_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
pending[i] = NULL;
}
- mutex_unlock( &pendingLockWrite );
}
void altservers_shutdown()
@@ -130,52 +123,77 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate
*/
void altservers_findUplink(dnbd3_connection_t *uplink)
{
+ if ( uplink->shutdown )
+ return;
int i;
// if betterFd != -1 it means the uplink is supposed to switch to another
// server. As this function here is called by the uplink thread, it can
// never be that the uplink is supposed to switch, but instead calls
// this function.
assert( uplink->betterFd == -1 );
- mutex_lock( &pendingLockWrite );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
+ // XXX As this function is only ever called by the image's uplink thread,
+ // it cannot happen that the uplink ends up in this list concurrently
+ mutex_lock( &uplink->rttLock );
if ( uplink->rttTestResult == RTT_INPROGRESS ) {
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != uplink ) continue;
// Yep, measuring right now
- mutex_unlock( &pendingLockWrite );
return;
}
}
// Find free slot for measurement
+ uplink->rttTestResult = RTT_INPROGRESS;
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != NULL ) continue;
- pending[i] = uplink;
- uplink->rttTestResult = RTT_INPROGRESS;
- mutex_unlock( &pendingLockWrite );
- signal_call( runSignal ); // Wake altservers thread up
- return;
+ dnbd3_connection_t *null = NULL;
+ if ( atomic_compare_exchange_strong( &pending[i], &null, uplink ) ) {
+ mutex_unlock( &uplink->rttLock );
+ atomic_thread_fence( memory_order_release );
+ signal_call( runSignal ); // Wake altservers thread up
+ return;
+ }
}
// End of loop - no free slot
- mutex_unlock( &pendingLockWrite );
+ uplink->rttTestResult = RTT_NOT_REACHABLE;
+ mutex_unlock( &uplink->rttLock );
logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." );
}
/**
- * The given uplink is about to disappear, so remove it from any queues
+ * The given uplink is about to disappear,
+ * wait until any pending RTT check is done.
*/
void altservers_removeUplink(dnbd3_connection_t *uplink)
{
- mutex_lock( &pendingLockConsume );
- mutex_lock( &pendingLockWrite );
- for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] == uplink ) {
+ assert( uplink != NULL );
+ assert( uplink->shutdown );
+ int i;
+ for ( i = 1 ;; ++i ) {
+ atomic_thread_fence( memory_order_acquire );
+ if ( runSignal == NULL ) {
+ // Thread is already done, remove manually
uplink->rttTestResult = RTT_NOT_REACHABLE;
- pending[i] = NULL;
+ break;
+ }
+ // Thread still running, wait until test is done
+ bool found = false;
+ for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
+ if ( pending[i] == uplink ) {
+ found = true;
+ break;
+ }
+ }
+ if ( !found ) // No more test running
+ break;
+ usleep( 10000 ); // 10ms
+ signal_call( runSignal ); // Wake altservers thread up
+ if ( i % 500 == 0 ) {
+ logadd( LOG_INFO, "Still waiting for altserver check for uplink %p...", (void*)uplink );
}
}
- mutex_unlock( &pendingLockWrite );
- mutex_unlock( &pendingLockConsume );
+ logadd( LOG_DEBUG1, "Waited for %d iterations for altservers check when tearing down uplink", i );
}
/**
@@ -432,28 +450,18 @@ static void *altservers_main(void *data UNUSED)
usleep( 100000 );
}
// Work your way through the queue
+ atomic_thread_fence( memory_order_acquire );
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- mutex_lock( &pendingLockWrite );
- if ( pending[itLink] == NULL ) {
- mutex_unlock( &pendingLockWrite );
- continue; // Check once before locking, as a mutex is expensive
- }
- mutex_unlock( &pendingLockWrite );
- mutex_lock( &pendingLockConsume );
- mutex_lock( &pendingLockWrite );
dnbd3_connection_t * const uplink = pending[itLink];
- mutex_unlock( &pendingLockWrite );
- if ( uplink == NULL ) { // Check again after locking
- mutex_unlock( &pendingLockConsume );
+ if ( uplink == NULL )
continue;
- }
dnbd3_image_t * const image = image_lock( uplink->image );
if ( image == NULL ) { // Check again after locking
+ mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_NOT_REACHABLE;
- mutex_lock( &pendingLockWrite );
+ assert( pending[itLink] == uplink );
pending[itLink] = NULL;
- mutex_unlock( &pendingLockWrite );
- mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &uplink->rttLock );
logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" );
continue;
}
@@ -592,10 +600,9 @@ static void *altservers_main(void *data UNUSED)
}
image_release( image );
// end of loop over all pending uplinks
- mutex_lock( &pendingLockWrite );
+ assert( pending[itLink] == uplink );
pending[itLink] = NULL;
- mutex_unlock( &pendingLockWrite );
- mutex_unlock( &pendingLockConsume );
+ atomic_thread_fence( memory_order_release );
}
// Save cache maps of all images if applicable
declare_now;
@@ -606,7 +613,9 @@ static void *altservers_main(void *data UNUSED)
}
}
cleanup: ;
- if ( runSignal != NULL ) signal_close( runSignal );
+ if ( runSignal != NULL ) {
+ signal_close( runSignal );
+ }
runSignal = NULL;
return NULL ;
}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 9f99fe4..bb1ffdc 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -583,6 +583,10 @@ static void* uplink_mainloop(void *data)
#endif
}
cleanup: ;
+ if ( !link->shutdown ) {
+ link->shutdown = true;
+ thread_detach( link->thread );
+ }
altservers_removeUplink( link );
uplink_saveCacheMap( link );
mutex_lock( &link->image->lock );
@@ -596,10 +600,6 @@ static void* uplink_mainloop(void *data)
link->fd = -1;
mutex_unlock( &link->sendMutex );
link->signal = NULL;
- if ( !link->shutdown ) {
- link->shutdown = true;
- thread_detach( link->thread );
- }
// Do not access link->image after unlocking, since we set
// image->uplink to NULL. Acquire with image_lock first,
// like done below when checking whether to re-init uplink