summaryrefslogtreecommitdiffstats
path: root/src/server/threadpool.c
diff options
context:
space:
mode:
authorSimon Rettberg2015-01-06 22:36:11 +0100
committerSimon Rettberg2015-01-06 22:36:11 +0100
commit63e040ea63755052ceca98aac3e6f9843cadbeee (patch)
tree90f5f0d3dfff42873ac05401f7f280dd13def4ee /src/server/threadpool.c
parent[SERVER] Use a thread pool for client connections (diff)
downloaddnbd3-63e040ea63755052ceca98aac3e6f9843cadbeee.tar.gz
dnbd3-63e040ea63755052ceca98aac3e6f9843cadbeee.tar.xz
dnbd3-63e040ea63755052ceca98aac3e6f9843cadbeee.zip
[SERVER] Big code cleanup, refactoring, minor bugfixing
Diffstat (limited to 'src/server/threadpool.c')
-rw-r--r--src/server/threadpool.c35
1 files changed, 29 insertions, 6 deletions
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 04a8e86..34e996c 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -25,13 +25,29 @@ static pthread_spinlock_t poolLock;
bool threadpool_init(int maxIdle)
{
if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false;
- maxIdleThreads = maxIdle;
spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE );
+ maxIdleThreads = maxIdle;
pthread_attr_init( &threadAttrs );
pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
return true;
}
+void threadpool_close()
+{
+ _shutdown = true;
+ if ( maxIdleThreads < 0 ) return;
+ spin_lock( &poolLock );
+ maxIdleThreads = -1;
+ entry_t *ptr = pool;
+ while ( ptr != NULL ) {
+ entry_t *current = ptr;
+ ptr = ptr->next;
+ signal_call( current->signalFd );
+ }
+ spin_unlock( &poolLock );
+ spin_destroy( &poolLock );
+}
+
bool threadpool_run(void *(*startRoutine)(void *), void *arg)
{
spin_lock( &poolLock );
@@ -40,9 +56,14 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
spin_unlock( &poolLock );
if ( entry == NULL ) {
entry = (entry_t*)malloc( sizeof(entry_t) );
+ if ( entry == NULL ) {
+ printf( "[WARNING] Could not alloc entry_t for new thread\n" );
+ return false;
+ }
entry->signalFd = signal_new();
if ( entry->signalFd < 0 ) {
printf( "[WARNING] Could not create signalFd for new thread pool thread\n" );
+ free( entry );
return false;
}
if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) {
@@ -67,9 +88,10 @@ static void *threadpool_worker(void *entryPtr)
{
blockNoncriticalSignals();
entry_t *entry = (entry_t*)entryPtr;
- while ( !_shutdown ) {
+ for ( ;; ) {
// Wait for signal from outside that we have work to do
int ret = signal_wait( entry->signalFd, -1 );
+ if ( _shutdown ) break;
if ( ret > 0 ) {
if ( entry->startRoutine == NULL ) {
printf( "[DEBUG] Worker woke up but has no work to do!\n" );
@@ -80,6 +102,7 @@ static void *threadpool_worker(void *entryPtr)
// Reset vars for safety
entry->startRoutine = NULL;
entry->arg = NULL;
+ if ( _shutdown ) break;
// Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise
int threadCount = 0;
spin_lock( &poolLock );
@@ -90,10 +113,7 @@ static void *threadpool_worker(void *entryPtr)
}
if ( threadCount >= maxIdleThreads ) {
spin_unlock( &poolLock );
- signal_close( entry->signalFd );
- free( entry );
- printf(" [DEBUG] Thread killed!\n" );
- return NULL;
+ break;
}
entry->next = pool;
pool = entry;
@@ -103,6 +123,9 @@ static void *threadpool_worker(void *entryPtr)
printf( "[DEBUG] Unexpected return value %d for signal_wait in threadpool worker!\n", ret );
}
}
+ signal_close( entry->signalFd );
+ free( entry );
+ printf(" [DEBUG] Thread killed!\n" );
return NULL;
}