summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-16 15:02:47 +0200
committerSimon Rettberg2019-08-16 15:02:47 +0200
commit0aca693bede4fe7e7e8098cbe33a96a88bc0ec85 (patch)
tree4dd879f05f81a603958fec0fc139ba02c00c6e31 /src
parent[SHARED] Better errno handling in connect() helper (diff)
downloaddnbd3-0aca693bede4fe7e7e8098cbe33a96a88bc0ec85.tar.gz
dnbd3-0aca693bede4fe7e7e8098cbe33a96a88bc0ec85.tar.xz
dnbd3-0aca693bede4fe7e7e8098cbe33a96a88bc0ec85.zip
[SERVER] Lock free thread pool
Diffstat (limited to 'src')
-rw-r--r--src/server/threadpool.c110
1 files changed, 57 insertions, 53 deletions
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 340a98d..3947677 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -4,7 +4,6 @@
#include "locks.h"
typedef struct _entry_t {
- struct _entry_t *next;
pthread_t thread;
dnbd3_signal_t* signal;
void *(*startRoutine)(void *);
@@ -14,17 +13,20 @@ typedef struct _entry_t {
static void *threadpool_worker(void *entryPtr);
static pthread_attr_t threadAttrs;
-
-static int maxIdleThreads = -1;
-static atomic_int currentIdleThreads = 0;
-static entry_t *pool = NULL;
-static pthread_spinlock_t poolLock;
+static atomic_int maxIdleThreads = -1;
+static _Atomic(entry_t *) *pool = NULL;
bool threadpool_init(int maxIdle)
{
- if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false;
- pthread_spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE );
- maxIdleThreads = maxIdle;
+ if ( maxIdle < 0 )
+ return false;
+ int exp = -1;
+ if ( !atomic_compare_exchange_strong( &maxIdleThreads, &exp, maxIdle ) )
+ return false;
+ pool = malloc( maxIdle * sizeof(*pool) );
+ for ( int i = 0; i < maxIdle; ++i ) {
+ atomic_init( &pool[i], NULL );
+ }
pthread_attr_init( &threadAttrs );
pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
return true;
@@ -33,19 +35,15 @@ bool threadpool_init(int maxIdle)
void threadpool_close()
{
_shutdown = true;
- if ( maxIdleThreads < 0 ) return;
- pthread_spin_lock( &poolLock );
+ int max = maxIdleThreads;
maxIdleThreads = -1;
- entry_t *ptr = pool;
- pool = NULL;
- currentIdleThreads = 0;
- pthread_spin_unlock( &poolLock );
- while ( ptr != NULL ) {
- entry_t *current = ptr;
- ptr = ptr->next;
- signal_call( current->signal );
+ if ( max <= 0 ) return;
+ for ( int i = 0; i < max; ++i ) {
+ entry_t *cur = pool[i];
+ if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) {
+ signal_call( cur->signal );
+ }
}
- pthread_spin_destroy( &poolLock );
}
bool threadpool_run(void *(*startRoutine)(void *), void *arg)
@@ -54,15 +52,16 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" );
return false; // Or bail out!?
}
- pthread_spin_lock( &poolLock );
- entry_t *entry = pool;
- if ( entry != NULL ) {
- pool = entry->next;
- currentIdleThreads--;
+ entry_t *entry = NULL;
+ for ( int i = 0; i < maxIdleThreads; ++i ) {
+ entry_t *cur = pool[i];
+ if ( cur != NULL && atomic_compare_exchange_weak( &pool[i], &cur, NULL ) ) {
+ entry = cur;
+ break;
+ }
}
- pthread_spin_unlock( &poolLock );
if ( entry == NULL ) {
- entry = (entry_t*)malloc( sizeof(entry_t) );
+ entry = malloc( sizeof(entry_t) );
if ( entry == NULL ) {
logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" );
return false;
@@ -80,9 +79,9 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
return false;
}
}
- entry->next = NULL;
entry->startRoutine = startRoutine;
entry->arg = arg;
+ atomic_thread_fence( memory_order_release );
signal_call( entry->signal );
return true;
}
@@ -94,35 +93,40 @@ static void *threadpool_worker(void *entryPtr)
{
blockNoncriticalSignals();
entry_t *entry = (entry_t*)entryPtr;
+ int ret;
for ( ;; ) {
+keep_going:;
// Wait for signal from outside that we have work to do
- int ret = signal_clear( entry->signal );
- if ( _shutdown ) break;
- if ( ret > 0 ) {
- if ( entry->startRoutine == NULL ) {
- logadd( LOG_ERROR, "Worker woke up but has no work to do!" );
- exit( 1 );
- }
- // Start assigned work
- (*entry->startRoutine)( entry->arg );
- // Reset vars for safety
- entry->startRoutine = NULL;
- entry->arg = NULL;
- if ( _shutdown ) break;
- // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise
- if ( currentIdleThreads >= maxIdleThreads )
- break;
- // Race condition as we checked before locking, but worst case we have a couple
- // too many threads idling around. At least the count stays accurate.
- setThreadName( "[pool]" );
- pthread_spin_lock( &poolLock );
- currentIdleThreads++;
- entry->next = pool;
- pool = entry;
- pthread_spin_unlock( &poolLock );
- } else {
+ ret = signal_clear( entry->signal );
+ atomic_thread_fence( memory_order_acquire );
+ if ( _shutdown )
+ break;
+ if ( ret <= 0 ) {
logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret );
+ continue;
+ }
+ if ( entry->startRoutine == NULL ) {
+ logadd( LOG_ERROR, "Worker woke up but has no work to do!" );
+ exit( 1 );
+ }
+ // Start assigned work
+ (*entry->startRoutine)( entry->arg );
+ // Reset vars for safety
+ entry->startRoutine = NULL;
+ entry->arg = NULL;
+ atomic_thread_fence( memory_order_release );
+ if ( _shutdown )
+ break;
+ // Put thread back into pool
+ setThreadName( "[pool]" );
+ for ( int i = 0; i < maxIdleThreads; ++i ) {
+ entry_t *exp = NULL;
+ if ( atomic_compare_exchange_weak( &pool[i], &exp, entry ) ) {
+ goto keep_going;
+ }
}
+ // Reaching here means pool is full; just let the thread exit
+ break;
}
signal_close( entry->signal );
free( entry );