summaryrefslogtreecommitdiffstats
path: root/src/server/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/threadpool.c')
-rw-r--r--src/server/threadpool.c150
1 files changed, 92 insertions, 58 deletions
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index dac0980..a21bd0d 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -4,26 +4,31 @@
#include "locks.h"
typedef struct _entry_t {
- struct _entry_t *next;
pthread_t thread;
dnbd3_signal_t* signal;
void *(*startRoutine)(void *);
void * arg;
+ const char *name;
} entry_t;
static void *threadpool_worker(void *entryPtr);
static pthread_attr_t threadAttrs;
-
-static int maxIdleThreads = -1;
-static entry_t *pool = NULL;
-static pthread_mutex_t poolLock;
+static atomic_int maxIdleThreads = -1;
+static _Atomic(entry_t *) *pool = NULL;
+static atomic_int activeThreads = 0;
bool threadpool_init(int maxIdle)
{
- if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false;
- mutex_init( &poolLock );
- maxIdleThreads = maxIdle;
+ if ( maxIdle < 0 )
+ return false;
+ int exp = -1;
+ if ( !atomic_compare_exchange_strong( &maxIdleThreads, &exp, maxIdle ) )
+ return false;
+ pool = malloc( maxIdle * sizeof(*pool) );
+ for ( int i = 0; i < maxIdle; ++i ) {
+ atomic_init( &pool[i], NULL );
+ }
pthread_attr_init( &threadAttrs );
pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
return true;
@@ -31,28 +36,48 @@ bool threadpool_init(int maxIdle)
void threadpool_close()
{
- _shutdown = true;
- if ( maxIdleThreads < 0 ) return;
- mutex_lock( &poolLock );
- maxIdleThreads = -1;
- entry_t *ptr = pool;
- while ( ptr != NULL ) {
- entry_t *current = ptr;
- ptr = ptr->next;
- signal_call( current->signal );
+ int max = atomic_exchange( &maxIdleThreads, -1 );
+ if ( max <= 0 )
+ return;
+ for ( int i = 0; i < max; ++i ) {
+ entry_t *cur = pool[i];
+ if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) {
+ signal_call( cur->signal );
+ }
}
- mutex_unlock( &poolLock );
- mutex_destroy( &poolLock );
}
-bool threadpool_run(void *(*startRoutine)(void *), void *arg)
+void threadpool_waitEmpty()
{
- mutex_lock( &poolLock );
- entry_t *entry = pool;
- if ( entry != NULL ) pool = entry->next;
- mutex_unlock( &poolLock );
- if ( entry == NULL ) {
- entry = (entry_t*)malloc( sizeof(entry_t) );
+ if ( activeThreads == 0 )
+ return;
+ do {
+ sleep( 1 );
+ logadd( LOG_INFO, "Threadpool: %d threads still active", (int)activeThreads );
+ } while ( activeThreads != 0 );
+}
+
+bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name)
+{
+ if ( unlikely( _shutdown ) ) {
+ logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" );
+ return false;
+ }
+#ifdef DEBUG
+ if ( unlikely( startRoutine == NULL ) ) {
+ logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" );
+ return false; // Or bail out!?
+ }
+#endif
+ entry_t *entry = NULL;
+ for ( int i = 0; i < maxIdleThreads; ++i ) {
+ entry = atomic_exchange( &pool[i], NULL );
+ if ( entry != NULL ) {
+ break;
+ }
+ }
+ if ( unlikely( entry == NULL ) ) {
+ entry = malloc( sizeof(entry_t) );
if ( entry == NULL ) {
logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" );
return false;
@@ -64,15 +89,17 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
return false;
}
if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) {
- logadd( LOG_WARNING, "Could not create new thread for thread pool\n" );
+ logadd( LOG_WARNING, "Could not create new thread for thread pool (%d active)\n", (int)activeThreads );
signal_close( entry->signal );
free( entry );
return false;
}
+ activeThreads++;
}
- entry->next = NULL;
entry->startRoutine = startRoutine;
entry->arg = arg;
+ entry->name = name;
+ atomic_thread_fence( memory_order_release );
signal_call( entry->signal );
return true;
}
@@ -84,43 +111,50 @@ static void *threadpool_worker(void *entryPtr)
{
blockNoncriticalSignals();
entry_t *entry = (entry_t*)entryPtr;
+ int ret;
for ( ;; ) {
+keep_going:;
// Wait for signal from outside that we have work to do
- int ret = signal_clear( entry->signal );
- if ( _shutdown ) break;
- if ( ret > 0 ) {
- if ( entry->startRoutine == NULL ) {
- logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" );
- continue;
- }
- // Start assigned work
- (*entry->startRoutine)( entry->arg );
- // Reset vars for safety
- entry->startRoutine = NULL;
- entry->arg = NULL;
- if ( _shutdown ) break;
- // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise
- int threadCount = 0;
- mutex_lock( &poolLock );
- entry_t *ptr = pool;
- while ( ptr != NULL ) {
- threadCount++;
- ptr = ptr->next;
- }
- if ( threadCount >= maxIdleThreads ) {
- mutex_unlock( &poolLock );
- break;
- }
- entry->next = pool;
- pool = entry;
- mutex_unlock( &poolLock );
- setThreadName( "[pool]" );
- } else {
+ ret = signal_clear( entry->signal );
+ atomic_thread_fence( memory_order_acquire );
+ if ( _shutdown )
+ break;
+ if ( ret <= 0 ) {
logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret );
+ continue;
+ }
+#ifdef DEBUG
+ if ( entry->startRoutine == NULL ) {
+ logadd( LOG_ERROR, "Worker woke up but has no work to do!" );
+ exit( 1 );
+ }
+ if ( entry->name != NULL ) {
+ setThreadName( entry->name );
+ }
+#endif
+ // Start assigned work
+ (*entry->startRoutine)( entry->arg );
+ // Reset vars for safety
+ entry->startRoutine = NULL;
+ entry->arg = NULL;
+ atomic_thread_fence( memory_order_release );
+ if ( _shutdown )
+ break;
+ // Put thread back into pool
+ setThreadName( "[pool]" );
+ for ( int i = 0; i < maxIdleThreads; ++i ) {
+ entry_t *exp = NULL;
+ if ( atomic_compare_exchange_weak( &pool[i], &exp, entry ) ) {
+ goto keep_going;
+ }
}
+ // Reaching here means pool is full; just let the thread exit
+ break;
}
+ setThreadName( "[dead]" );
signal_close( entry->signal );
free( entry );
+ activeThreads--;
return NULL;
}