summaryrefslogtreecommitdiffstats
path: root/src/server/threadpool.c
diff options
context:
space:
mode:
authorSimon Rettberg2015-01-05 14:17:49 +0100
committerSimon Rettberg2015-01-05 14:17:49 +0100
commit84f93cf7b7768cbf522c75c4e3fe204cf6fd785c (patch)
tree6a186d58a4d00d4ec39daeb5c9acb08830f1889a /src/server/threadpool.c
parent[SERVER] Dead code removal, minor performance tweaks, refactoring, etc. (diff)
downloaddnbd3-84f93cf7b7768cbf522c75c4e3fe204cf6fd785c.tar.gz
dnbd3-84f93cf7b7768cbf522c75c4e3fe204cf6fd785c.tar.xz
dnbd3-84f93cf7b7768cbf522c75c4e3fe204cf6fd785c.zip
[SERVER] Use a thread pool for client connections
Diffstat (limited to 'src/server/threadpool.c')
-rw-r--r--src/server/threadpool.c108
1 files changed, 108 insertions, 0 deletions
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
new file mode 100644
index 0000000..04a8e86
--- /dev/null
+++ b/src/server/threadpool.c
@@ -0,0 +1,108 @@
+#include "globals.h"
+#include "helper.h"
+#include "threadpool.h"
+#include "signal.h"
+#include "locks.h"
+#include <pthread.h>
+
+
+typedef struct _entry_t {
+ struct _entry_t *next;
+ pthread_t thread;
+ int signalFd;
+ void *(*startRoutine)(void *);
+ void * volatile arg;
+} entry_t;
+
+static void *threadpool_worker(void *entryPtr);
+
+static pthread_attr_t threadAttrs;
+
+static int maxIdleThreads = -1;
+static entry_t *pool = NULL;
+static pthread_spinlock_t poolLock;
+
+bool threadpool_init(int maxIdle)
+{
+ if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false;
+ maxIdleThreads = maxIdle;
+ spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE );
+ pthread_attr_init( &threadAttrs );
+ pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
+ return true;
+}
+
+bool threadpool_run(void *(*startRoutine)(void *), void *arg)
+{
+ spin_lock( &poolLock );
+ entry_t *entry = pool;
+ if ( entry != NULL ) pool = entry->next;
+ spin_unlock( &poolLock );
+ if ( entry == NULL ) {
+ entry = (entry_t*)malloc( sizeof(entry_t) );
+ entry->signalFd = signal_new();
+ if ( entry->signalFd < 0 ) {
+ printf( "[WARNING] Could not create signalFd for new thread pool thread\n" );
+ return false;
+ }
+ if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) {
+ printf( "[WARNING] Could not create new thread for thread pool\n" );
+ signal_close( entry->signalFd );
+ free( entry );
+ return false;
+ }
+ printf( "[DEBUG] Thread created!\n" );
+ }
+ entry->next = NULL;
+ entry->startRoutine = startRoutine;
+ entry->arg = arg;
+ signal_call( entry->signalFd );
+ return true;
+}
+
+/**
+ * This is a worker thread of our thread pool.
+ */
+static void *threadpool_worker(void *entryPtr)
+{
+ blockNoncriticalSignals();
+ entry_t *entry = (entry_t*)entryPtr;
+ while ( !_shutdown ) {
+ // Wait for signal from outside that we have work to do
+ int ret = signal_wait( entry->signalFd, -1 );
+ if ( ret > 0 ) {
+ if ( entry->startRoutine == NULL ) {
+ printf( "[DEBUG] Worker woke up but has no work to do!\n" );
+ continue;
+ }
+ // Start assigned work
+ (*entry->startRoutine)( entry->arg );
+ // Reset vars for safety
+ entry->startRoutine = NULL;
+ entry->arg = NULL;
+ // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise
+ int threadCount = 0;
+ spin_lock( &poolLock );
+ entry_t *ptr = pool;
+ while ( ptr != NULL ) {
+ threadCount++;
+ ptr = ptr->next;
+ }
+ if ( threadCount >= maxIdleThreads ) {
+ spin_unlock( &poolLock );
+ signal_close( entry->signalFd );
+ free( entry );
+ printf(" [DEBUG] Thread killed!\n" );
+ return NULL;
+ }
+ entry->next = pool;
+ pool = entry;
+ spin_unlock( &poolLock );
+ setThreadName( "[pool]" );
+ } else {
+ printf( "[DEBUG] Unexpected return value %d for signal_wait in threadpool worker!\n", ret );
+ }
+ }
+ return NULL;
+}
+