diff options
Diffstat (limited to 'src/server/threadpool.c')
-rw-r--r-- | src/server/threadpool.c | 150 |
1 files changed, 92 insertions, 58 deletions
diff --git a/src/server/threadpool.c b/src/server/threadpool.c index dac0980..a21bd0d 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -4,26 +4,31 @@ #include "locks.h" typedef struct _entry_t { - struct _entry_t *next; pthread_t thread; dnbd3_signal_t* signal; void *(*startRoutine)(void *); void * arg; + const char *name; } entry_t; static void *threadpool_worker(void *entryPtr); static pthread_attr_t threadAttrs; - -static int maxIdleThreads = -1; -static entry_t *pool = NULL; -static pthread_mutex_t poolLock; +static atomic_int maxIdleThreads = -1; +static _Atomic(entry_t *) *pool = NULL; +static atomic_int activeThreads = 0; bool threadpool_init(int maxIdle) { - if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; - mutex_init( &poolLock ); - maxIdleThreads = maxIdle; + if ( maxIdle < 0 ) + return false; + int exp = -1; + if ( !atomic_compare_exchange_strong( &maxIdleThreads, &exp, maxIdle ) ) + return false; + pool = malloc( maxIdle * sizeof(*pool) ); + for ( int i = 0; i < maxIdle; ++i ) { + atomic_init( &pool[i], NULL ); + } pthread_attr_init( &threadAttrs ); pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); return true; @@ -31,28 +36,48 @@ bool threadpool_init(int maxIdle) void threadpool_close() { - _shutdown = true; - if ( maxIdleThreads < 0 ) return; - mutex_lock( &poolLock ); - maxIdleThreads = -1; - entry_t *ptr = pool; - while ( ptr != NULL ) { - entry_t *current = ptr; - ptr = ptr->next; - signal_call( current->signal ); + int max = atomic_exchange( &maxIdleThreads, -1 ); + if ( max <= 0 ) + return; + for ( int i = 0; i < max; ++i ) { + entry_t *cur = pool[i]; + if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) { + signal_call( cur->signal ); + } } - mutex_unlock( &poolLock ); - mutex_destroy( &poolLock ); } -bool threadpool_run(void *(*startRoutine)(void *), void *arg) +void threadpool_waitEmpty() { - mutex_lock( &poolLock ); - entry_t *entry = pool; - if ( entry != NULL ) pool = entry->next; - mutex_unlock( &poolLock ); - if ( entry == NULL ) { - entry = (entry_t*)malloc( sizeof(entry_t) ); + if ( activeThreads == 0 ) + return; + do { + sleep( 1 ); + logadd( LOG_INFO, "Threadpool: %d threads still active", (int)activeThreads ); + } while ( activeThreads != 0 ); +} + +bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name) +{ + if ( unlikely( _shutdown ) ) { + logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" ); + return false; + } +#ifdef DEBUG + if ( unlikely( startRoutine == NULL ) ) { + logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); + return false; // Or bail out!? + } +#endif + entry_t *entry = NULL; + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry = atomic_exchange( &pool[i], NULL ); + if ( entry != NULL ) { + break; + } + } + if ( unlikely( entry == NULL ) ) { + entry = malloc( sizeof(entry_t) ); if ( entry == NULL ) { logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); return false; @@ -64,15 +89,17 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) return false; } if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) { - logadd( LOG_WARNING, "Could not create new thread for thread pool\n" ); + logadd( LOG_WARNING, "Could not create new thread for thread pool (%d active)\n", (int)activeThreads ); signal_close( entry->signal ); free( entry ); return false; } + activeThreads++; } - entry->next = NULL; entry->startRoutine = startRoutine; entry->arg = arg; + entry->name = name; + atomic_thread_fence( memory_order_release ); signal_call( entry->signal ); return true; } @@ -84,43 +111,50 @@ static void *threadpool_worker(void *entryPtr) { blockNoncriticalSignals(); entry_t *entry = (entry_t*)entryPtr; + int ret; for ( ;; ) { +keep_going:; // Wait for signal from outside that we have work to do - int ret = signal_clear( entry->signal ); - if ( _shutdown ) break; - if ( ret > 0 ) { - if ( entry->startRoutine == NULL ) { - logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" ); - continue; - } - // Start assigned work - (*entry->startRoutine)( entry->arg ); - // Reset vars for safety - entry->startRoutine = NULL; - entry->arg = NULL; - if ( _shutdown ) break; - // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise - int threadCount = 0; - mutex_lock( &poolLock ); - entry_t *ptr = pool; - while ( ptr != NULL ) { - threadCount++; - ptr = ptr->next; - } - if ( threadCount >= maxIdleThreads ) { - mutex_unlock( &poolLock ); - break; - } - entry->next = pool; - pool = entry; - mutex_unlock( &poolLock ); - setThreadName( "[pool]" ); - } else { + ret = signal_clear( entry->signal ); + atomic_thread_fence( memory_order_acquire ); + if ( _shutdown ) + break; + if ( ret <= 0 ) { logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); + continue; + } +#ifdef DEBUG + if ( entry->startRoutine == NULL ) { + logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); + exit( 1 ); + } + if ( entry->name != NULL ) { + setThreadName( entry->name ); + } +#endif + // Start assigned work + (*entry->startRoutine)( entry->arg ); + // Reset vars for safety + entry->startRoutine = NULL; + entry->arg = NULL; + atomic_thread_fence( memory_order_release ); + if ( _shutdown ) + break; + // Put thread back into pool + setThreadName( "[pool]" ); + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry_t *exp = NULL; + if ( atomic_compare_exchange_weak( &pool[i], &exp, entry ) ) { + goto keep_going; + } } + // Reaching here means pool is full; just let the thread exit + break; } + setThreadName( "[dead]" ); signal_close( entry->signal ); free( entry ); + activeThreads--; return NULL; } |