summaryrefslogtreecommitdiffstats
path: root/src/server/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/threadpool.c')
-rw-r--r--src/server/threadpool.c28
1 files changed, 22 insertions, 6 deletions
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 3947677..0b46fd6 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -15,6 +15,7 @@ static void *threadpool_worker(void *entryPtr);
static pthread_attr_t threadAttrs;
static atomic_int maxIdleThreads = -1;
static _Atomic(entry_t *) *pool = NULL;
+static atomic_int activeThreads = 0;
bool threadpool_init(int maxIdle)
{
@@ -34,10 +35,9 @@ bool threadpool_init(int maxIdle)
void threadpool_close()
{
- _shutdown = true;
- int max = maxIdleThreads;
- maxIdleThreads = -1;
- if ( max <= 0 ) return;
+ int max = atomic_exchange( &maxIdleThreads, -1 );
+ if ( max <= 0 )
+ return;
for ( int i = 0; i < max; ++i ) {
entry_t *cur = pool[i];
if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) {
@@ -46,9 +46,23 @@ void threadpool_close()
}
}
+void threadpool_waitEmpty()
+{
+ if ( activeThreads == 0 )
+ return;
+ do {
+ sleep( 1 );
+ logadd( LOG_INFO, "Threadpool: %d threads still active", (int)activeThreads );
+ } while ( activeThreads != 0 );
+}
+
bool threadpool_run(void *(*startRoutine)(void *), void *arg)
{
- if ( startRoutine == NULL ) {
+ if ( unlikely( _shutdown ) ) {
+ logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" );
+ return false;
+ }
+ if ( unlikely( startRoutine == NULL ) ) {
logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" );
return false; // Or bail out!?
}
@@ -60,7 +74,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
break;
}
}
- if ( entry == NULL ) {
+ if ( unlikely( entry == NULL ) ) {
entry = malloc( sizeof(entry_t) );
if ( entry == NULL ) {
logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" );
@@ -78,6 +92,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
free( entry );
return false;
}
+ activeThreads++;
}
entry->startRoutine = startRoutine;
entry->arg = arg;
@@ -130,6 +145,7 @@ keep_going:;
}
signal_close( entry->signal );
free( entry );
+ activeThreads--;
return NULL;
}