From c5795aa1f76a35a9b02ce07f145d650a92cfeb86 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 5 Aug 2019 12:46:22 +0200 Subject: [SERVER] Switch threadpool back to spinlock, add idle thread counter --- src/server/threadpool.c | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) (limited to 'src/server/threadpool.c') diff --git a/src/server/threadpool.c b/src/server/threadpool.c index dac0980..c01ae7a 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -16,13 +16,14 @@ static void *threadpool_worker(void *entryPtr); static pthread_attr_t threadAttrs; static int maxIdleThreads = -1; +static atomic_int currentIdleThreads = 0; static entry_t *pool = NULL; -static pthread_mutex_t poolLock; +static pthread_spinlock_t poolLock; bool threadpool_init(int maxIdle) { if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; - mutex_init( &poolLock ); + pthread_spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); maxIdleThreads = maxIdle; pthread_attr_init( &threadAttrs ); pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); @@ -33,24 +34,29 @@ void threadpool_close() { _shutdown = true; if ( maxIdleThreads < 0 ) return; - mutex_lock( &poolLock ); + pthread_spin_lock( &poolLock ); maxIdleThreads = -1; entry_t *ptr = pool; + pool = NULL; + currentIdleThreads = 0; + pthread_spin_unlock( &poolLock ); while ( ptr != NULL ) { entry_t *current = ptr; ptr = ptr->next; signal_call( current->signal ); } - mutex_unlock( &poolLock ); - mutex_destroy( &poolLock ); + pthread_spin_destroy( &poolLock ); } bool threadpool_run(void *(*startRoutine)(void *), void *arg) { - mutex_lock( &poolLock ); + pthread_spin_lock( &poolLock ); entry_t *entry = pool; - if ( entry != NULL ) pool = entry->next; - mutex_unlock( &poolLock ); + if ( entry != NULL ) { + pool = entry->next; + currentIdleThreads--; + } + pthread_spin_unlock( &poolLock ); if ( entry == NULL ) { entry = (entry_t*)malloc( sizeof(entry_t) ); if ( entry == NULL ) { @@ -90,8 +96,8 @@ static void *threadpool_worker(void *entryPtr) if ( _shutdown ) break; if ( ret > 0 ) { if ( entry->startRoutine == NULL ) { - logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" ); - continue; + logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); + exit( 1 ); } // Start assigned work (*entry->startRoutine)( entry->arg ); @@ -100,21 +106,16 @@ static void *threadpool_worker(void *entryPtr) entry->arg = NULL; if ( _shutdown ) break; // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise - int threadCount = 0; - mutex_lock( &poolLock ); - entry_t *ptr = pool; - while ( ptr != NULL ) { - threadCount++; - ptr = ptr->next; - } - if ( threadCount >= maxIdleThreads ) { - mutex_unlock( &poolLock ); + if ( currentIdleThreads >= maxIdleThreads ) break; - } + // Race condition as we checked before locking, but worst case we have a couple + // too many threads idling around. At least the count stays accurate. + setThreadName( "[pool]" ); + pthread_spin_lock( &poolLock ); + currentIdleThreads++; entry->next = pool; pool = entry; - mutex_unlock( &poolLock ); - setThreadName( "[pool]" ); + pthread_spin_unlock( &poolLock ); } else { logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); } -- cgit v1.2.3-55-g7522 From 71c707da4e5405c986399c3f4505fa0a554548ba Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 5 Aug 2019 12:47:15 +0200 Subject: [SERVER] Add sanity check to threadpool_run for NULL routine --- src/server/threadpool.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/server/threadpool.c') diff --git a/src/server/threadpool.c b/src/server/threadpool.c index c01ae7a..340a98d 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -50,6 +50,10 @@ void threadpool_close() bool threadpool_run(void *(*startRoutine)(void *), void *arg) { + if ( startRoutine == NULL ) { + logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); + return false; // Or bail out!? + } pthread_spin_lock( &poolLock ); entry_t *entry = pool; if ( entry != NULL ) { -- cgit v1.2.3-55-g7522 From 0aca693bede4fe7e7e8098cbe33a96a88bc0ec85 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 16 Aug 2019 15:02:47 +0200 Subject: [SERVER] Lock free thread pool --- src/server/threadpool.c | 110 +++++++++++++++++++++++++----------------------- 1 file changed, 57 insertions(+), 53 deletions(-) (limited to 'src/server/threadpool.c') diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 340a98d..3947677 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -4,7 +4,6 @@ #include "locks.h" typedef struct _entry_t { - struct _entry_t *next; pthread_t thread; dnbd3_signal_t* signal; void *(*startRoutine)(void *); @@ -14,17 +13,20 @@ typedef struct _entry_t { static void *threadpool_worker(void *entryPtr); static pthread_attr_t threadAttrs; - -static int maxIdleThreads = -1; -static atomic_int currentIdleThreads = 0; -static entry_t *pool = NULL; -static pthread_spinlock_t poolLock; +static atomic_int maxIdleThreads = -1; +static _Atomic(entry_t *) *pool = NULL; bool threadpool_init(int maxIdle) { - if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; - pthread_spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); - maxIdleThreads = maxIdle; + if ( maxIdle < 0 ) + return false; + int exp = -1; + if ( !atomic_compare_exchange_strong( &maxIdleThreads, &exp, maxIdle ) ) + return false; + pool = malloc( maxIdle * sizeof(*pool) ); + for ( int i = 0; i < maxIdle; ++i ) { + atomic_init( &pool[i], NULL ); + } pthread_attr_init( &threadAttrs ); pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); return true; @@ -33,19 +35,15 @@ bool threadpool_init(int maxIdle) void threadpool_close() { _shutdown = true; - if ( maxIdleThreads < 0 ) return; - pthread_spin_lock( &poolLock ); + int max = maxIdleThreads; maxIdleThreads = -1; - entry_t *ptr = pool; - pool = NULL; - currentIdleThreads = 0; - pthread_spin_unlock( &poolLock ); - while ( ptr != NULL ) { - entry_t *current = ptr; - ptr = ptr->next; - signal_call( current->signal ); + if ( max <= 0 ) return; + for ( int i = 0; i < max; ++i ) { + entry_t *cur = pool[i]; + if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) { + signal_call( cur->signal ); + } } - pthread_spin_destroy( &poolLock ); } bool threadpool_run(void *(*startRoutine)(void *), void *arg) @@ -54,15 +52,16 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); return false; // Or bail out!? } - pthread_spin_lock( &poolLock ); - entry_t *entry = pool; - if ( entry != NULL ) { - pool = entry->next; - currentIdleThreads--; + entry_t *entry = NULL; + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry_t *cur = pool[i]; + if ( cur != NULL && atomic_compare_exchange_weak( &pool[i], &cur, NULL ) ) { + entry = cur; + break; + } } - pthread_spin_unlock( &poolLock ); if ( entry == NULL ) { - entry = (entry_t*)malloc( sizeof(entry_t) ); + entry = malloc( sizeof(entry_t) ); if ( entry == NULL ) { logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); return false; @@ -80,9 +79,9 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) return false; } } - entry->next = NULL; entry->startRoutine = startRoutine; entry->arg = arg; + atomic_thread_fence( memory_order_release ); signal_call( entry->signal ); return true; } @@ -94,35 +93,40 @@ static void *threadpool_worker(void *entryPtr) { blockNoncriticalSignals(); entry_t *entry = (entry_t*)entryPtr; + int ret; for ( ;; ) { +keep_going:; // Wait for signal from outside that we have work to do - int ret = signal_clear( entry->signal ); - if ( _shutdown ) break; - if ( ret > 0 ) { - if ( entry->startRoutine == NULL ) { - logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); - exit( 1 ); - } - // Start assigned work - (*entry->startRoutine)( entry->arg ); - // Reset vars for safety - entry->startRoutine = NULL; - entry->arg = NULL; - if ( _shutdown ) break; - // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise - if ( currentIdleThreads >= maxIdleThreads ) - break; - // Race condition as we checked before locking, but worst case we have a couple - // too many threads idling around. At least the count stays accurate. - setThreadName( "[pool]" ); - pthread_spin_lock( &poolLock ); - currentIdleThreads++; - entry->next = pool; - pool = entry; - pthread_spin_unlock( &poolLock ); - } else { + ret = signal_clear( entry->signal ); + atomic_thread_fence( memory_order_acquire ); + if ( _shutdown ) + break; + if ( ret <= 0 ) { logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); + continue; + } + if ( entry->startRoutine == NULL ) { + logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); + exit( 1 ); + } + // Start assigned work + (*entry->startRoutine)( entry->arg ); + // Reset vars for safety + entry->startRoutine = NULL; + entry->arg = NULL; + atomic_thread_fence( memory_order_release ); + if ( _shutdown ) + break; + // Put thread back into pool + setThreadName( "[pool]" ); + for ( int i = 0; i < maxIdleThreads; ++i ) { + entry_t *exp = NULL; + if ( atomic_compare_exchange_weak( &pool[i], &exp, entry ) ) { + goto keep_going; + } } + // Reaching here means pool is full; just let the thread exit + break; } signal_close( entry->signal ); free( entry ); -- cgit v1.2.3-55-g7522 From ac1bf45ebdd630fbc9ad2c1fa3c0ea99f5206799 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 28 Aug 2019 13:07:13 +0200 Subject: [SERVER] Make signal handling more POSIX According to POSIX, a signal sent to a PID can be delivered to an arbitrary thread of that process that hasn't the signal blocked. This seens to never happen on Linux, but would mess things up since the code expected the main signal handler to only be executed by the main thread. This should now be fixed by examining the destination PID of the signal as well as the ID of the thread currently running the signal handler. If we notice the signal wasn't sent by our own PID and the handler is not currently run by the main thread, we re-send the signal to the main thread. Otherwise, if the signal was sent by our own PID but the handler is not run in the main thread, do nothing. This way we can use pthread_kill() to wake up threads that might be stuck in a blocking syscall when it's time to shut down. --- src/server/globals.h | 1 + src/server/image.c | 10 ++-------- src/server/integrity.c | 17 +++++++++++++---- src/server/net.c | 11 ++++++----- src/server/rpc.c | 13 ++++++++----- src/server/server.c | 22 +++++++++++++++++----- src/server/threadpool.c | 28 ++++++++++++++++++++++------ src/server/threadpool.h | 5 +++++ 8 files changed, 74 insertions(+), 33 deletions(-) (limited to 'src/server/threadpool.c') diff --git a/src/server/globals.h b/src/server/globals.h index 5dd205a..f940666 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -138,6 +138,7 @@ struct _dnbd3_client char hostName[HOSTNAMELEN]; // inet_ntop version of host pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too) pthread_mutex_t lock; + pthread_t thread; }; // ####################################################### diff --git a/src/server/image.c b/src/server/image.c index de93cd4..248c12c 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -562,9 +562,7 @@ bool image_tryFreeAll() if ( _images[i] != NULL && _images[i]->users == 0 ) { dnbd3_image_t *image = _images[i]; _images[i] = NULL; - mutex_unlock( &imageListLock ); image = image_free( image ); - mutex_lock( &imageListLock ); } if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--; } @@ -574,15 +572,13 @@ bool image_tryFreeAll() /** * Free image. DOES NOT check if it's in use. - * Indirectly locks on imageListLock, image.lock, uplink.queueLock + * (Indirectly) locks on image.lock, uplink.queueLock */ static dnbd3_image_t* image_free(dnbd3_image_t *image) { assert( image != NULL ); assert( image->users == 0 ); - if ( !_shutdown ) { - logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid ); - } + logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", image->name, (int)image->rid ); // uplink_shutdown might return false to tell us // that the shutdown is in progress. Bail out since // this will get called again when the uplink is done. @@ -600,8 +596,6 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) mutex_unlock( &image->lock ); if ( image->readFd != -1 ) close( image->readFd ); mutex_destroy( &image->lock ); - // - memset( image, 0, sizeof(*image) ); free( image ); return NULL ; } diff --git a/src/server/integrity.c b/src/server/integrity.c index f358c46..e7ebeb2 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -184,13 +184,20 @@ static void* integrity_main(void * data UNUSED) mutex_unlock( &image->lock ); } #if defined(linux) || defined(__linux) - if ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) { + while ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) #else - if ( fsync( fd ) == -1 ) { + while ( fsync( fd ) == -1 ) #endif - logadd( LOG_ERROR, "Cannot flush %s for integrity check", image->path ); + { + if ( _shutdown ) + break; + if ( errno == EINTR ) + continue; + logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, errno ); exit( 1 ); } + if ( _shutdown ) + break; // Use direct I/O only if read length is multiple of 4096 to be on the safe side int tfd; if ( direct && ( end % DNBD3_BLOCK_SIZE ) == 0 ) { @@ -266,7 +273,9 @@ static void* integrity_main(void * data UNUSED) } } mutex_unlock( &integrityQueueLock ); - if ( buffer != NULL ) free( buffer ); + if ( buffer != NULL ) { + free( buffer ); + } bRunning = false; return NULL; } diff --git a/src/server/net.c b/src/server/net.c index e0b516e..9c855e4 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -44,6 +44,7 @@ #include #include #include +#include static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS]; static int _num_clients = 0; @@ -153,6 +154,7 @@ void* net_handleNewConnection(void *clientPtr) { dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr; dnbd3_request_t request; + client->thread = pthread_self(); // Await data from client. Since this is a fresh connection, we expect data right away sock_setTimeout( client->sock, _clientTimeout ); @@ -631,11 +633,10 @@ void net_disconnectAll() int i; mutex_lock( &_clients_lock ); for (i = 0; i < _num_clients; ++i) { - if ( _clients[i] == NULL ) continue; - dnbd3_client_t * const client = _clients[i]; - mutex_lock( &client->lock ); - if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR ); - mutex_unlock( &client->lock ); + if ( _clients[i] == NULL ) + continue; + shutdown( _clients[i]->sock, SHUT_RDWR ); + pthread_kill( _clients[i]->thread, SIGINT ); } mutex_unlock( &_clients_lock ); } diff --git a/src/server/rpc.c b/src/server/rpc.c index 261c6c0..662263e 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -137,13 +137,13 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int bool hasName = false; bool ok; int keepAlive = HTTP_KEEPALIVE; - do { + while ( !_shutdown ) { // Read request from client struct phr_header headers[100]; size_t numHeaders, prevLen = 0, consumed; struct string method, path; int minorVersion; - do { + while ( !_shutdown ) { // Parse before calling recv, there might be a complete pipelined request in the buffer already // If the request is incomplete, we allow exactly one additional recv() to complete it. // This should suffice for real world scenarios as I don't know of any @@ -188,7 +188,9 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int sendReply( sock, "400 Bad Request", "text/plain", "Server cannot understand what you're trying to say", -1, HTTP_CLOSE ); goto func_return; } - } while ( true ); + } // Loop while request header incomplete + if ( _shutdown ) + break; if ( keepAlive == HTTP_KEEPALIVE ) { // Only keep the connection alive (and indicate so) if the client seems to support this if ( minorVersion == 0 || hasHeaderValue( headers, numHeaders, &STR_CONNECTION, &STR_CLOSE ) ) { @@ -213,7 +215,8 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int } else { ok = sendReply( sock, "404 Not found", "text/plain", "Nothing", -1, keepAlive ); } - if ( !ok ) break; + if ( !ok ) + break; } // hoff might be beyond end if the client sent another request (burst) const ssize_t extra = hoff - consumed; @@ -225,7 +228,7 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int hasName = true; setThreadName( "HTTP" ); } - } while (true); + } // Loop while more requests func_return:; do { const int curCount = --status.count; diff --git a/src/server/server.c b/src/server/server.c index 1cdd2ab..0dddea7 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -37,6 +37,8 @@ #include #include #include +#include +#include #define LONGOPT_CRC4 1000 #define LONGOPT_ASSERT 1001 @@ -60,6 +62,7 @@ static _Atomic(job_t *) newJob; static bool hasTimerThread = false; static pthread_t timerThread; +static pid_t mainPid; static pthread_t mainThread; #define DEFAULT_TIMER_TIMEOUT (60) @@ -138,7 +141,7 @@ _Noreturn static void dnbd3_cleanup() logadd( LOG_INFO, "Cleanup..." ); if ( hasTimerThread ) { - pthread_kill( timerThread, SIGHUP ); + pthread_kill( timerThread, SIGINT ); thread_join( timerThread, NULL ); } @@ -162,6 +165,8 @@ _Noreturn static void dnbd3_cleanup() // Wait for clients to disconnect net_waitForAllDisconnected(); + threadpool_waitEmpty(); + // Clean up images retries = 5; while ( !image_tryFreeAll() && --retries > 0 ) { @@ -204,6 +209,7 @@ int main(int argc, char *argv[]) { 0, 0, 0, 0 } }; + mainPid = getpid(); mainThread = pthread_self(); opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); @@ -509,10 +515,16 @@ static void dnbd3_handleSignal(int signum) static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED) { - if ( !pthread_equal( pthread_self(), mainThread ) ) - return; - memcpy( &lastSignal, info, sizeof(siginfo_t) ); - dnbd3_handleSignal( signum ); + if ( info->si_pid != mainPid ) { // Source is not this process + memcpy( &lastSignal, info, sizeof(siginfo_t) ); // Copy signal info + if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) { + pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread + } + } + if ( pthread_equal( pthread_self(), mainThread ) ) { + // Signal received by main thread -- handle + dnbd3_handleSignal( signum ); + } } uint32_t dnbd3_serverUptime() diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 3947677..0b46fd6 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -15,6 +15,7 @@ static void *threadpool_worker(void *entryPtr); static pthread_attr_t threadAttrs; static atomic_int maxIdleThreads = -1; static _Atomic(entry_t *) *pool = NULL; +static atomic_int activeThreads = 0; bool threadpool_init(int maxIdle) { @@ -34,10 +35,9 @@ bool threadpool_init(int maxIdle) void threadpool_close() { - _shutdown = true; - int max = maxIdleThreads; - maxIdleThreads = -1; - if ( max <= 0 ) return; + int max = atomic_exchange( &maxIdleThreads, -1 ); + if ( max <= 0 ) + return; for ( int i = 0; i < max; ++i ) { entry_t *cur = pool[i]; if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) { @@ -46,9 +46,23 @@ void threadpool_close() } } +void threadpool_waitEmpty() +{ + if ( activeThreads == 0 ) + return; + do { + sleep( 1 ); + logadd( LOG_INFO, "Threadpool: %d threads still active", (int)activeThreads ); + } while ( activeThreads != 0 ); +} + bool threadpool_run(void *(*startRoutine)(void *), void *arg) { - if ( startRoutine == NULL ) { + if ( unlikely( _shutdown ) ) { + logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" ); + return false; + } + if ( unlikely( startRoutine == NULL ) ) { logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); return false; // Or bail out!? } @@ -60,7 +74,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) break; } } - if ( entry == NULL ) { + if ( unlikely( entry == NULL ) ) { entry = malloc( sizeof(entry_t) ); if ( entry == NULL ) { logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); @@ -78,6 +92,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) free( entry ); return false; } + activeThreads++; } entry->startRoutine = startRoutine; entry->arg = arg; @@ -130,6 +145,7 @@ keep_going:; } signal_close( entry->signal ); free( entry ); + activeThreads--; return NULL; } diff --git a/src/server/threadpool.h b/src/server/threadpool.h index 15dd151..ee0b3aa 100644 --- a/src/server/threadpool.h +++ b/src/server/threadpool.h @@ -17,6 +17,11 @@ bool threadpool_init(int maxIdleThreadCount); */ void threadpool_close(); +/** + * Block until all threads spawned have exited + */ +void threadpool_waitEmpty(); + /** * Run a thread using the thread pool. * @param startRoutine function to run in new thread -- cgit v1.2.3-55-g7522 From d3df3ba3005977629b8847b507df1fdae40ffbd5 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Sat, 14 Mar 2020 17:27:13 +0100 Subject: [SERVER] threadpool: Simplify get code, make debug code _DEBUG only --- src/server/threadpool.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/server/threadpool.c') diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 0b46fd6..96162a6 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -62,15 +62,16 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" ); return false; } +#ifdef _DEBUG if ( unlikely( startRoutine == NULL ) ) { logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" ); return false; // Or bail out!? } - entry_t *entry = NULL; +#endif + entry_t *entry; for ( int i = 0; i < maxIdleThreads; ++i ) { - entry_t *cur = pool[i]; - if ( cur != NULL && atomic_compare_exchange_weak( &pool[i], &cur, NULL ) ) { - entry = cur; + entry = atomic_exchange( &pool[i], NULL ); + if ( entry != NULL ) { break; } } @@ -120,10 +121,12 @@ keep_going:; logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); continue; } +#ifdef _DEBUG if ( entry->startRoutine == NULL ) { logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); exit( 1 ); } +#endif // Start assigned work (*entry->startRoutine)( entry->arg ); // Reset vars for safety -- cgit v1.2.3-55-g7522 From ba617b55eb606ab487f154b124750e121518d5e5 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 19 Mar 2020 11:26:12 +0100 Subject: [SERVER] Add name param to threadpool_run --- src/server/altservers.c | 2 +- src/server/image.c | 2 ++ src/server/server.c | 6 +++--- src/server/threadpool.c | 8 +++++++- src/server/threadpool.h | 3 ++- src/server/uplink.c | 2 +- 6 files changed, 16 insertions(+), 7 deletions(-) (limited to 'src/server/threadpool.c') diff --git a/src/server/altservers.c b/src/server/altservers.c index 1ba75f4..5076a05 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -172,7 +172,7 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) if ( uplink->rttTestResult != RTT_INPROGRESS ) { dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref ); if ( current == uplink ) { - threadpool_run( &altservers_runCheck, uplink ); + threadpool_run( &altservers_runCheck, uplink, "UPLINK" ); } else if ( current != NULL ) { ref_put( ¤t->reference ); } diff --git a/src/server/image.c b/src/server/image.c index 81ec479..0ec1d58 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -1817,6 +1817,7 @@ static void* closeUnusedFds(void* nix UNUSED) timing_gets( &deadline, -UNUSED_FD_TIMEOUT ); int fds[FDCOUNT]; int fdindex = 0; + setThreadName( "unused-fd-close" ); mutex_lock( &imageListLock ); for ( int i = 0; i < _num_images; ++i ) { dnbd3_image_t * const image = _images[i]; @@ -1857,6 +1858,7 @@ static void* saveLoadAllCacheMaps(void* nix UNUSED) static ticks nextSave; declare_now; bool full = timing_reached( &nextSave, &now ); + setThreadName( "cache-mapper" ); mutex_lock( &imageListLock ); for ( int i = 0; i < _num_images; ++i ) { dnbd3_image_t * const image = _images[i]; diff --git a/src/server/server.c b/src/server/server.c index 71a49b9..fa7bcda 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -404,7 +404,7 @@ int main(int argc, char *argv[]) if ( sigReload ) { sigReload = false; logadd( LOG_INFO, "SIGHUP received, re-scanning image directory" ); - threadpool_run( &server_asyncImageListLoad, NULL ); + threadpool_run( &server_asyncImageListLoad, NULL, "IMAGE_RELOAD" ); } if ( sigLogCycle ) { sigLogCycle = false; @@ -431,7 +431,7 @@ int main(int argc, char *argv[]) continue; } - if ( !threadpool_run( &net_handleNewConnection, (void *)dnbd3_client ) ) { + if ( !threadpool_run( &net_handleNewConnection, (void *)dnbd3_client, "CLIENT" ) ) { logadd( LOG_ERROR, "Could not start thread for new connection." ); free( dnbd3_client ); continue; @@ -574,7 +574,7 @@ static int handlePendingJobs(void) jobHead = *temp; // Make it list head *temp = NULL; // Split off part before that while ( todo != NULL ) { - threadpool_run( todo->startRoutine, todo->arg ); + threadpool_run( todo->startRoutine, todo->arg, "TIMER_TASK" ); old = todo; todo = todo->next; if ( old->intervalSecs == 0 ) { diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 96162a6..63ae19f 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -8,6 +8,7 @@ typedef struct _entry_t { dnbd3_signal_t* signal; void *(*startRoutine)(void *); void * arg; + const char *name; } entry_t; static void *threadpool_worker(void *entryPtr); @@ -56,7 +57,7 @@ void threadpool_waitEmpty() } while ( activeThreads != 0 ); } -bool threadpool_run(void *(*startRoutine)(void *), void *arg) +bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name) { if ( unlikely( _shutdown ) ) { logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" ); @@ -97,6 +98,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) } entry->startRoutine = startRoutine; entry->arg = arg; + entry->name = name; atomic_thread_fence( memory_order_release ); signal_call( entry->signal ); return true; @@ -126,6 +128,9 @@ keep_going:; logadd( LOG_ERROR, "Worker woke up but has no work to do!" ); exit( 1 ); } + if ( entry->name != NULL ) { + setThreadName( entry->name ); + } #endif // Start assigned work (*entry->startRoutine)( entry->arg ); @@ -146,6 +151,7 @@ keep_going:; // Reaching here means pool is full; just let the thread exit break; } + setThreadName( "[dead]" ); signal_close( entry->signal ); free( entry ); activeThreads--; diff --git a/src/server/threadpool.h b/src/server/threadpool.h index ee0b3aa..d8a526e 100644 --- a/src/server/threadpool.h +++ b/src/server/threadpool.h @@ -26,9 +26,10 @@ void threadpool_waitEmpty(); * Run a thread using the thread pool. * @param startRoutine function to run in new thread * @param arg argument to pass to thead + * @param name STRING CONSTANT (literal) for debugging purposes * @return true if thread was started */ -bool threadpool_run(void *(*startRoutine)(void *), void *arg); +bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name); #endif diff --git a/src/server/uplink.c b/src/server/uplink.c index af854d6..a7f140f 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -435,7 +435,7 @@ success_ref: job->length = len; job->uplink = uplink; ref_inc( &uplink->reference ); // Hold one for the thread, thread will return it - threadpool_run( &prefetchForClient, (void*)job ); + threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" ); } } if ( getUplink ) { -- cgit v1.2.3-55-g7522 From abe55c2bf2b93e9431ee2c22afd7f3f6611d71d8 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 10 Jun 2020 15:28:41 +0200 Subject: [SERVER] fix uninitialized variable --- src/server/threadpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/server/threadpool.c') diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 63ae19f..4ebefcb 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -69,7 +69,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg, const char *name) return false; // Or bail out!? } #endif - entry_t *entry; + entry_t *entry = NULL; for ( int i = 0; i < maxIdleThreads; ++i ) { entry = atomic_exchange( &pool[i], NULL ); if ( entry != NULL ) { -- cgit v1.2.3-55-g7522