From 0aca693bede4fe7e7e8098cbe33a96a88bc0ec85 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 16 Aug 2019 15:02:47 +0200 Subject: [SERVER] Lock free thread pool --- src/server/threadpool.c | 110 +++++++++++++++++++++++++----------------------- 1 file changed, 57 insertions(+), 53 deletions(-) (limited to 'src/server') diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 340a98d..3947677 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -4,7 +4,6 @@ #include "locks.h" typedef struct _entry_t { - struct _entry_t *next; pthread_t thread; dnbd3_signal_t* signal; void *(*startRoutine)(void *); @@ -14,17 +13,20 @@ typedef struct _entry_t { static void *threadpool_worker(void *entryPtr); static pthread_attr_t threadAttrs; - -static int maxIdleThreads = -1; -static atomic_int currentIdleThreads = 0; -static entry_t *pool = NULL; -static pthread_spinlock_t poolLock; +static atomic_int maxIdleThreads = -1; +static _Atomic(entry_t *) *pool = NULL; bool threadpool_init(int maxIdle) { - if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; - pthread_spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); - maxIdleThreads = maxIdle; + if ( maxIdle < 0 ) + return false; + int exp = -1; + if ( !atomic_compare_exchange_strong( &maxIdleThreads, &exp, maxIdle ) ) + return false; + pool = malloc( maxIdle * sizeof(*pool) ); + for ( int i = 0; i < maxIdle; ++i ) { + atomic_init( &pool[i], NULL ); + } pthread_attr_init( &threadAttrs ); pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); return true; @@ -33,19 +35,15 @@ bool threadpool_init(int maxIdle) void threadpool_close() { _shutdown = true; - if ( maxIdleThreads < 0 ) return; - pthread_spin_lock( &poolLock ); + int max = maxIdleThreads; maxIdleThreads = -1; - entry_t *ptr = pool; - pool = NULL; - currentIdleThreads = 0; - pthread_spin_unlock( &poolLock ); - while ( ptr != NULL ) { - entry_t *current = ptr; - ptr = ptr->next; - signal_call( current->signal ); + if ( max <= 0 ) return; + for ( int i = 0; i < max; ++i ) { + entry_t *cur = pool[i]; + if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) { + signal_call( cur->signal ); + } } - pthread_spin_destroy( &poolLock ); } bool threadpool_run(void *(*startRoutine)(void *), void *arg) @@ -54,15 +52,16 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); return false; // Or bail out!? } - pthread_spin_lock( &poolLock ); - entry_t *entry = pool; - if ( entry != NULL ) { - pool = entry->next; - currentIdleThreads--; + entry_t *entry = NULL; + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry_t *cur = pool[i]; + if ( cur != NULL && atomic_compare_exchange_weak( &pool[i], &cur, NULL ) ) { + entry = cur; + break; + } } - pthread_spin_unlock( &poolLock ); if ( entry == NULL ) { - entry = (entry_t*)malloc( sizeof(entry_t) ); + entry = malloc( sizeof(entry_t) ); if ( entry == NULL ) { logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); return false; @@ -80,9 +79,9 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) return false; } } - entry->next = NULL; entry->startRoutine = startRoutine; entry->arg = arg; + atomic_thread_fence( memory_order_release ); signal_call( entry->signal ); return true; } @@ -94,35 +93,40 @@ static void *threadpool_worker(void *entryPtr) { blockNoncriticalSignals(); entry_t *entry = (entry_t*)entryPtr; + int ret; for ( ;; ) { +keep_going:; // Wait for signal from outside that we have work to do - int ret = signal_clear( entry->signal ); - if ( _shutdown ) break; - if ( ret > 0 ) { - if ( entry->startRoutine == NULL ) { - logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); - exit( 1 ); - } - // Start assigned work - (*entry->startRoutine)( entry->arg ); - // Reset vars for safety - entry->startRoutine = NULL; - entry->arg = NULL; - if ( _shutdown ) break; - // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise - if ( currentIdleThreads >= maxIdleThreads ) - break; - // Race condition as we checked before locking, but worst case we have a couple - // too many threads idling around. At least the count stays accurate. - setThreadName( "[pool]" ); - pthread_spin_lock( &poolLock ); - currentIdleThreads++; - entry->next = pool; - pool = entry; - pthread_spin_unlock( &poolLock ); - } else { + ret = signal_clear( entry->signal ); + atomic_thread_fence( memory_order_acquire ); + if ( _shutdown ) + break; + if ( ret <= 0 ) { logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); + continue; + } + if ( entry->startRoutine == NULL ) { + logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); + exit( 1 ); + } + // Start assigned work + (*entry->startRoutine)( entry->arg ); + // Reset vars for safety + entry->startRoutine = NULL; + entry->arg = NULL; + atomic_thread_fence( memory_order_release ); + if ( _shutdown ) + break; + // Put thread back into pool + setThreadName( "[pool]" ); + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry_t *exp = NULL; + if ( atomic_compare_exchange_weak( &pool[i], &exp, entry ) ) { + goto keep_going; + } } + // Reaching here means pool is full; just let the thread exit + break; } signal_close( entry->signal ); free( entry ); -- cgit v1.2.3-55-g7522