From 84f93cf7b7768cbf522c75c4e3fe204cf6fd785c Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 5 Jan 2015 14:17:49 +0100 Subject: [SERVER] Use a thread pool for client connections --- src/server/globals.h | 2 - src/server/locks.c | 2 +- src/server/locks.h | 3 +- src/server/net.c | 9 +--- src/server/server.c | 14 +++---- src/server/signal.h | 3 +- src/server/threadpool.c | 108 ++++++++++++++++++++++++++++++++++++++++++++++++ src/server/threadpool.h | 23 +++++++++++ 8 files changed, 145 insertions(+), 19 deletions(-) create mode 100644 src/server/threadpool.c create mode 100644 src/server/threadpool.h (limited to 'src/server') diff --git a/src/server/globals.h b/src/server/globals.h index 1ac1495..09a9a5b 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -115,11 +115,9 @@ struct _dnbd3_client { int sock; dnbd3_host_t host; - pthread_t thread; dnbd3_image_t *image; pthread_spinlock_t lock; pthread_mutex_t sendMutex; - bool running; bool isServer; // true if a server in proxy mode, false if real client }; diff --git a/src/server/locks.c b/src/server/locks.c index d779e6f..e464182 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -288,11 +288,11 @@ static void *debug_thread_watchdog(void *something) void debug_locks_start_watchdog() { #ifdef _DEBUG + watchdogSignal = signal_new(); if ( 0 != thread_create( &watchdog, NULL, &debug_thread_watchdog, (void *)NULL ) ) { memlogf( "[ERROR] Could not start debug-lock watchdog." ); return; } - watchdogSignal = signal_new(); #endif } diff --git a/src/server/locks.h b/src/server/locks.h index ab355c9..efd0a37 100644 --- a/src/server/locks.h +++ b/src/server/locks.h @@ -1,9 +1,10 @@ #ifndef _LOCKS_H_ #define _LOCKS_H_ +#include + #ifdef _DEBUG -#include #include #include #include diff --git a/src/server/net.c b/src/server/net.c index 8e5189f..9525456 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -118,7 +117,7 @@ static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) void *net_client_handler(void *dnbd3_client) { - dnbd3_client_t *client = (dnbd3_client_t *)(uintptr_t)dnbd3_client; + dnbd3_client_t *client = (dnbd3_client_t *)dnbd3_client; dnbd3_request_t request; dnbd3_reply_t reply; @@ -136,9 +135,6 @@ void *net_client_handler(void *dnbd3_client) dnbd3_server_entry_t server_list[NUMBER_SERVERS]; - // Block some signals not important to this thread - blockNoncriticalSignals(); - // Set to zero to make valgrind happy memset( &reply, 0, sizeof(reply) ); memset( &payload, 0, sizeof(payload) ); @@ -359,8 +355,7 @@ void *net_client_handler(void *dnbd3_client) exit_client_cleanup: ; if ( image_file != -1 ) close( image_file ); dnbd3_remove_client( client ); - client->running = false; client = dnbd3_free_client( client ); - pthread_exit( NULL ); return NULL ; } + diff --git a/src/server/server.c b/src/server/server.c index ee39b8f..d3c03f4 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -45,6 +44,7 @@ #include "globals.h" #include "integrity.h" #include "helper.h" +#include "threadpool.h" #define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on static int sockets[MAX_SERVER_SOCKETS], socket_count = 0; @@ -327,9 +327,11 @@ int main(int argc, char *argv[]) // setup rpc //pthread_t thread_rpc; //thread_create(&(thread_rpc), NULL, &dnbd3_rpc_mainloop, NULL); - pthread_attr_t threadAttrs; - pthread_attr_init( &threadAttrs ); - pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); + // Initialize thread pool + if ( !threadpool_init( 10 ) ) { + printf( "Could not init thread pool!\n" ); + exit( EXIT_FAILURE ); + } memlogf( "[INFO] Server is ready..." ); @@ -375,14 +377,13 @@ int main(int argc, char *argv[]) continue; } - if ( 0 != thread_create( &(dnbd3_client->thread), &threadAttrs, net_client_handler, (void *)(uintptr_t)dnbd3_client ) ) { + if ( !threadpool_run( net_client_handler, (void *)dnbd3_client ) ) { memlogf( "[ERROR] Could not start thread for new client." ); dnbd3_remove_client( dnbd3_client ); dnbd3_client = dnbd3_free_client( dnbd3_client ); continue; } } - pthread_attr_destroy( &threadAttrs ); dnbd3_cleanup(); } @@ -413,7 +414,6 @@ dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd) free( dnbd3_client ); return NULL ; } - dnbd3_client->running = true; dnbd3_client->sock = fd; spin_init( &dnbd3_client->lock, PTHREAD_PROCESS_PRIVATE ); pthread_mutex_init( &dnbd3_client->sendMutex, NULL ); diff --git a/src/server/signal.h b/src/server/signal.h index 0504274..1bbc724 100644 --- a/src/server/signal.h +++ b/src/server/signal.h @@ -19,7 +19,8 @@ int signal_call(int signalFd); /** * Wait for given signal, with an optional timeout. - * If timeout == 0, wait forever. + * If timeout == 0, just poll once. + * If timeout < 0, wait forever. * @return > 0 telling how many times the signal was called, * SIGNAL_TIMEOUT if the timeout was reached, * SIGNAL_ERROR if some error occured diff --git a/src/server/threadpool.c b/src/server/threadpool.c new file mode 100644 index 0000000..04a8e86 --- /dev/null +++ b/src/server/threadpool.c @@ -0,0 +1,108 @@ +#include "globals.h" +#include "helper.h" +#include "threadpool.h" +#include "signal.h" +#include "locks.h" +#include + + +typedef struct _entry_t { + struct _entry_t *next; + pthread_t thread; + int signalFd; + void *(*startRoutine)(void *); + void * volatile arg; +} entry_t; + +static void *threadpool_worker(void *entryPtr); + +static pthread_attr_t threadAttrs; + +static int maxIdleThreads = -1; +static entry_t *pool = NULL; +static pthread_spinlock_t poolLock; + +bool threadpool_init(int maxIdle) +{ + if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; + maxIdleThreads = maxIdle; + spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); + pthread_attr_init( &threadAttrs ); + pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); + return true; +} + +bool threadpool_run(void *(*startRoutine)(void *), void *arg) +{ + spin_lock( &poolLock ); + entry_t *entry = pool; + if ( entry != NULL ) pool = entry->next; + spin_unlock( &poolLock ); + if ( entry == NULL ) { + entry = (entry_t*)malloc( sizeof(entry_t) ); + entry->signalFd = signal_new(); + if ( entry->signalFd < 0 ) { + printf( "[WARNING] Could not create signalFd for new thread pool thread\n" ); + return false; + } + if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) { + printf( "[WARNING] Could not create new thread for thread pool\n" ); + signal_close( entry->signalFd ); + free( entry ); + return false; + } + printf( "[DEBUG] Thread created!\n" ); + } + entry->next = NULL; + entry->startRoutine = startRoutine; + entry->arg = arg; + signal_call( entry->signalFd ); + return true; +} + +/** + * This is a worker thread of our thread pool. + */ +static void *threadpool_worker(void *entryPtr) +{ + blockNoncriticalSignals(); + entry_t *entry = (entry_t*)entryPtr; + while ( !_shutdown ) { + // Wait for signal from outside that we have work to do + int ret = signal_wait( entry->signalFd, -1 ); + if ( ret > 0 ) { + if ( entry->startRoutine == NULL ) { + printf( "[DEBUG] Worker woke up but has no work to do!\n" ); + continue; + } + // Start assigned work + (*entry->startRoutine)( entry->arg ); + // Reset vars for safety + entry->startRoutine = NULL; + entry->arg = NULL; + // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise + int threadCount = 0; + spin_lock( &poolLock ); + entry_t *ptr = pool; + while ( ptr != NULL ) { + threadCount++; + ptr = ptr->next; + } + if ( threadCount >= maxIdleThreads ) { + spin_unlock( &poolLock ); + signal_close( entry->signalFd ); + free( entry ); + printf(" [DEBUG] Thread killed!\n" ); + return NULL; + } + entry->next = pool; + pool = entry; + spin_unlock( &poolLock ); + setThreadName( "[pool]" ); + } else { + printf( "[DEBUG] Unexpected return value %d for signal_wait in threadpool worker!\n", ret ); + } + } + return NULL; +} + diff --git a/src/server/threadpool.h b/src/server/threadpool.h new file mode 100644 index 0000000..b3b0fe6 --- /dev/null +++ b/src/server/threadpool.h @@ -0,0 +1,23 @@ +#ifndef _THREADPOOL_H_ +#define _THREADPOOL_H_ + +#include "../types.h" + +/** + * Initialize the thread pool. This must be called before using + * threadpool_run, and must only be called once. + * @param maxIdleThreadCount maximum number of idle threads in the pool + * @return true if initialized successfully + */ +bool threadpool_init(int maxIdleThreadCount); + +/** + * Run a thread using the thread pool. + * @param startRoutine function to run in new thread + * @param arg argument to pass to thead + * @return true if thread was started + */ +bool threadpool_run(void *(*startRoutine)(void *), void *arg); + +#endif + -- cgit v1.2.3-55-g7522