From 43e57ce5e11e9052f5a7db66f2e8613f1784f919 Mon Sep 17 00:00:00 2001 From: Frederic Robra Date: Tue, 25 Jun 2019 17:03:28 +0200 Subject: first version of dnbd3-ng --- src/server/threadpool.c | 126 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 src/server/threadpool.c (limited to 'src/server/threadpool.c') diff --git a/src/server/threadpool.c b/src/server/threadpool.c new file mode 100644 index 0000000..b55fe19 --- /dev/null +++ b/src/server/threadpool.c @@ -0,0 +1,126 @@ +#include "threadpool.h" +#include "globals.h" +#include "helper.h" +#include "locks.h" + +typedef struct _entry_t { + struct _entry_t *next; + pthread_t thread; + dnbd3_signal_t* signal; + void *(*startRoutine)(void *); + void * arg; +} entry_t; + +static void *threadpool_worker(void *entryPtr); + +static pthread_attr_t threadAttrs; + +static int maxIdleThreads = -1; +static entry_t *pool = NULL; +static pthread_spinlock_t poolLock; + +bool threadpool_init(int maxIdle) +{ + if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; + spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); + maxIdleThreads = maxIdle; + pthread_attr_init( &threadAttrs ); + pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); + return true; +} + +void threadpool_close() +{ + _shutdown = true; + if ( maxIdleThreads < 0 ) return; + spin_lock( &poolLock ); + maxIdleThreads = -1; + entry_t *ptr = pool; + while ( ptr != NULL ) { + entry_t *current = ptr; + ptr = ptr->next; + signal_call( current->signal ); + } + spin_unlock( &poolLock ); + spin_destroy( &poolLock ); +} + +bool threadpool_run(void *(*startRoutine)(void *), void *arg) +{ + spin_lock( &poolLock ); + entry_t *entry = pool; + if ( entry != NULL ) pool = entry->next; + spin_unlock( &poolLock ); + if ( entry == NULL ) { + entry = (entry_t*)malloc( sizeof(entry_t) ); + if ( entry == NULL ) { + logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); + return false; + } + entry->signal = signal_newBlocking(); + if ( entry->signal == NULL ) { + logadd( LOG_WARNING, "Could not create signal for new thread pool thread\n" ); + free( entry ); + return false; + } + if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) { + logadd( LOG_WARNING, "Could not create new thread for thread pool\n" ); + signal_close( entry->signal ); + free( entry ); + return false; + } + } + entry->next = NULL; + entry->startRoutine = startRoutine; + entry->arg = arg; + signal_call( entry->signal ); + return true; +} + +/** + * This is a worker thread of our thread pool. + */ +static void *threadpool_worker(void *entryPtr) +{ + blockNoncriticalSignals(); + entry_t *entry = (entry_t*)entryPtr; + for ( ;; ) { + // Wait for signal from outside that we have work to do + int ret = signal_clear( entry->signal ); + if ( _shutdown ) break; + if ( ret > 0 ) { + if ( entry->startRoutine == NULL ) { + logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" ); + continue; + } + // Start assigned work + (*entry->startRoutine)( entry->arg ); + // Reset vars for safety + entry->startRoutine = NULL; + entry->arg = NULL; + if ( _shutdown ) break; + // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise + int threadCount = 0; + spin_lock( &poolLock ); + entry_t *ptr = pool; + while ( ptr != NULL ) { + threadCount++; + ptr = ptr->next; + } + if ( threadCount >= maxIdleThreads ) { + spin_unlock( &poolLock ); + break; + } + entry->next = pool; + pool = entry; + spin_unlock( &poolLock ); + setThreadName( "[pool]" ); + } else { + logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); + } + } + signal_close( entry->signal ); + free( entry ); + return NULL; +} + -- cgit v1.2.3-55-g7522