summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2015-01-05 14:17:49 +0100
committerSimon Rettberg2015-01-05 14:17:49 +0100
commit84f93cf7b7768cbf522c75c4e3fe204cf6fd785c (patch)
tree6a186d58a4d00d4ec39daeb5c9acb08830f1889a
parent[SERVER] Dead code removal, minor performance tweaks, refactoring, etc. (diff)
downloaddnbd3-84f93cf7b7768cbf522c75c4e3fe204cf6fd785c.tar.gz
dnbd3-84f93cf7b7768cbf522c75c4e3fe204cf6fd785c.tar.xz
dnbd3-84f93cf7b7768cbf522c75c4e3fe204cf6fd785c.zip
[SERVER] Use a thread pool for client connections
-rw-r--r--src/server/globals.h2
-rw-r--r--src/server/locks.c2
-rw-r--r--src/server/locks.h3
-rw-r--r--src/server/net.c9
-rw-r--r--src/server/server.c14
-rw-r--r--src/server/signal.h3
-rw-r--r--src/server/threadpool.c108
-rw-r--r--src/server/threadpool.h23
8 files changed, 145 insertions, 19 deletions
diff --git a/src/server/globals.h b/src/server/globals.h
index 1ac1495..09a9a5b 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -115,11 +115,9 @@ struct _dnbd3_client
{
int sock;
dnbd3_host_t host;
- pthread_t thread;
dnbd3_image_t *image;
pthread_spinlock_t lock;
pthread_mutex_t sendMutex;
- bool running;
bool isServer; // true if a server in proxy mode, false if real client
};
diff --git a/src/server/locks.c b/src/server/locks.c
index d779e6f..e464182 100644
--- a/src/server/locks.c
+++ b/src/server/locks.c
@@ -288,11 +288,11 @@ static void *debug_thread_watchdog(void *something)
void debug_locks_start_watchdog()
{
#ifdef _DEBUG
+ watchdogSignal = signal_new();
if ( 0 != thread_create( &watchdog, NULL, &debug_thread_watchdog, (void *)NULL ) ) {
memlogf( "[ERROR] Could not start debug-lock watchdog." );
return;
}
- watchdogSignal = signal_new();
#endif
}
diff --git a/src/server/locks.h b/src/server/locks.h
index ab355c9..efd0a37 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -1,9 +1,10 @@
#ifndef _LOCKS_H_
#define _LOCKS_H_
+#include <pthread.h>
+
#ifdef _DEBUG
-#include <pthread.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
diff --git a/src/server/net.c b/src/server/net.c
index 8e5189f..9525456 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -23,7 +23,6 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
-#include <pthread.h>
#include <fcntl.h>
#include <sys/sendfile.h>
#include <sys/types.h>
@@ -118,7 +117,7 @@ static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
void *net_client_handler(void *dnbd3_client)
{
- dnbd3_client_t *client = (dnbd3_client_t *)(uintptr_t)dnbd3_client;
+ dnbd3_client_t *client = (dnbd3_client_t *)dnbd3_client;
dnbd3_request_t request;
dnbd3_reply_t reply;
@@ -136,9 +135,6 @@ void *net_client_handler(void *dnbd3_client)
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
- // Block some signals not important to this thread
- blockNoncriticalSignals();
-
// Set to zero to make valgrind happy
memset( &reply, 0, sizeof(reply) );
memset( &payload, 0, sizeof(payload) );
@@ -359,8 +355,7 @@ void *net_client_handler(void *dnbd3_client)
exit_client_cleanup: ;
if ( image_file != -1 ) close( image_file );
dnbd3_remove_client( client );
- client->running = false;
client = dnbd3_free_client( client );
- pthread_exit( NULL );
return NULL ;
}
+
diff --git a/src/server/server.c b/src/server/server.c
index ee39b8f..d3c03f4 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -22,7 +22,6 @@
#include <stdlib.h>
#include <signal.h>
#include <getopt.h>
-#include <pthread.h>
#include <string.h>
#include <fcntl.h>
#include <sys/ioctl.h>
@@ -45,6 +44,7 @@
#include "globals.h"
#include "integrity.h"
#include "helper.h"
+#include "threadpool.h"
#define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on
static int sockets[MAX_SERVER_SOCKETS], socket_count = 0;
@@ -327,9 +327,11 @@ int main(int argc, char *argv[])
// setup rpc
//pthread_t thread_rpc;
//thread_create(&(thread_rpc), NULL, &dnbd3_rpc_mainloop, NULL);
- pthread_attr_t threadAttrs;
- pthread_attr_init( &threadAttrs );
- pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
+ // Initialize thread pool
+ if ( !threadpool_init( 10 ) ) {
+ printf( "Could not init thread pool!\n" );
+ exit( EXIT_FAILURE );
+ }
memlogf( "[INFO] Server is ready..." );
@@ -375,14 +377,13 @@ int main(int argc, char *argv[])
continue;
}
- if ( 0 != thread_create( &(dnbd3_client->thread), &threadAttrs, net_client_handler, (void *)(uintptr_t)dnbd3_client ) ) {
+ if ( !threadpool_run( net_client_handler, (void *)dnbd3_client ) ) {
memlogf( "[ERROR] Could not start thread for new client." );
dnbd3_remove_client( dnbd3_client );
dnbd3_client = dnbd3_free_client( dnbd3_client );
continue;
}
}
- pthread_attr_destroy( &threadAttrs );
dnbd3_cleanup();
}
@@ -413,7 +414,6 @@ dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd)
free( dnbd3_client );
return NULL ;
}
- dnbd3_client->running = true;
dnbd3_client->sock = fd;
spin_init( &dnbd3_client->lock, PTHREAD_PROCESS_PRIVATE );
pthread_mutex_init( &dnbd3_client->sendMutex, NULL );
diff --git a/src/server/signal.h b/src/server/signal.h
index 0504274..1bbc724 100644
--- a/src/server/signal.h
+++ b/src/server/signal.h
@@ -19,7 +19,8 @@ int signal_call(int signalFd);
/**
* Wait for given signal, with an optional timeout.
- * If timeout == 0, wait forever.
+ * If timeout == 0, just poll once.
+ * If timeout < 0, wait forever.
* @return > 0 telling how many times the signal was called,
* SIGNAL_TIMEOUT if the timeout was reached,
* SIGNAL_ERROR if some error occured
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
new file mode 100644
index 0000000..04a8e86
--- /dev/null
+++ b/src/server/threadpool.c
@@ -0,0 +1,108 @@
+#include "globals.h"
+#include "helper.h"
+#include "threadpool.h"
+#include "signal.h"
+#include "locks.h"
+#include <pthread.h>
+
+
+typedef struct _entry_t {
+ struct _entry_t *next;
+ pthread_t thread;
+ int signalFd;
+ void *(*startRoutine)(void *);
+ void * volatile arg;
+} entry_t;
+
+static void *threadpool_worker(void *entryPtr);
+
+static pthread_attr_t threadAttrs;
+
+static int maxIdleThreads = -1;
+static entry_t *pool = NULL;
+static pthread_spinlock_t poolLock;
+
+bool threadpool_init(int maxIdle)
+{
+ if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false;
+ maxIdleThreads = maxIdle;
+ spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE );
+ pthread_attr_init( &threadAttrs );
+ pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
+ return true;
+}
+
+bool threadpool_run(void *(*startRoutine)(void *), void *arg)
+{
+ spin_lock( &poolLock );
+ entry_t *entry = pool;
+ if ( entry != NULL ) pool = entry->next;
+ spin_unlock( &poolLock );
+ if ( entry == NULL ) {
+ entry = (entry_t*)malloc( sizeof(entry_t) );
+ entry->signalFd = signal_new();
+ if ( entry->signalFd < 0 ) {
+ printf( "[WARNING] Could not create signalFd for new thread pool thread\n" );
+ return false;
+ }
+ if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) {
+ printf( "[WARNING] Could not create new thread for thread pool\n" );
+ signal_close( entry->signalFd );
+ free( entry );
+ return false;
+ }
+ printf( "[DEBUG] Thread created!\n" );
+ }
+ entry->next = NULL;
+ entry->startRoutine = startRoutine;
+ entry->arg = arg;
+ signal_call( entry->signalFd );
+ return true;
+}
+
+/**
+ * This is a worker thread of our thread pool.
+ */
+static void *threadpool_worker(void *entryPtr)
+{
+ blockNoncriticalSignals();
+ entry_t *entry = (entry_t*)entryPtr;
+ while ( !_shutdown ) {
+ // Wait for signal from outside that we have work to do
+ int ret = signal_wait( entry->signalFd, -1 );
+ if ( ret > 0 ) {
+ if ( entry->startRoutine == NULL ) {
+ printf( "[DEBUG] Worker woke up but has no work to do!\n" );
+ continue;
+ }
+ // Start assigned work
+ (*entry->startRoutine)( entry->arg );
+ // Reset vars for safety
+ entry->startRoutine = NULL;
+ entry->arg = NULL;
+ // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise
+ int threadCount = 0;
+ spin_lock( &poolLock );
+ entry_t *ptr = pool;
+ while ( ptr != NULL ) {
+ threadCount++;
+ ptr = ptr->next;
+ }
+ if ( threadCount >= maxIdleThreads ) {
+ spin_unlock( &poolLock );
+ signal_close( entry->signalFd );
+ free( entry );
+ printf(" [DEBUG] Thread killed!\n" );
+ return NULL;
+ }
+ entry->next = pool;
+ pool = entry;
+ spin_unlock( &poolLock );
+ setThreadName( "[pool]" );
+ } else {
+ printf( "[DEBUG] Unexpected return value %d for signal_wait in threadpool worker!\n", ret );
+ }
+ }
+ return NULL;
+}
+
diff --git a/src/server/threadpool.h b/src/server/threadpool.h
new file mode 100644
index 0000000..b3b0fe6
--- /dev/null
+++ b/src/server/threadpool.h
@@ -0,0 +1,23 @@
+#ifndef _THREADPOOL_H_
+#define _THREADPOOL_H_
+
+#include "../types.h"
+
+/**
+ * Initialize the thread pool. This must be called before using
+ * threadpool_run, and must only be called once.
+ * @param maxIdleThreadCount maximum number of idle threads in the pool
+ * @return true if initialized successfully
+ */
+bool threadpool_init(int maxIdleThreadCount);
+
+/**
+ * Run a thread using the thread pool.
+ * @param startRoutine function to run in new thread
+ * @param arg argument to pass to thead
+ * @return true if thread was started
+ */
+bool threadpool_run(void *(*startRoutine)(void *), void *arg);
+
+#endif
+