summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-28 13:07:13 +0200
committerSimon Rettberg2019-08-28 13:07:13 +0200
commitac1bf45ebdd630fbc9ad2c1fa3c0ea99f5206799 (patch)
tree951388f8267c0194a142bf13d99b947ee7f820e6
parent[SERVER] Remove old comments (diff)
downloaddnbd3-ac1bf45ebdd630fbc9ad2c1fa3c0ea99f5206799.tar.gz
dnbd3-ac1bf45ebdd630fbc9ad2c1fa3c0ea99f5206799.tar.xz
dnbd3-ac1bf45ebdd630fbc9ad2c1fa3c0ea99f5206799.zip
[SERVER] Make signal handling more POSIX
According to POSIX, a signal sent to a PID can be delivered to an arbitrary thread of that process that hasn't the signal blocked. This seens to never happen on Linux, but would mess things up since the code expected the main signal handler to only be executed by the main thread. This should now be fixed by examining the destination PID of the signal as well as the ID of the thread currently running the signal handler. If we notice the signal wasn't sent by our own PID and the handler is not currently run by the main thread, we re-send the signal to the main thread. Otherwise, if the signal was sent by our own PID but the handler is not run in the main thread, do nothing. This way we can use pthread_kill() to wake up threads that might be stuck in a blocking syscall when it's time to shut down.
-rw-r--r--src/server/globals.h1
-rw-r--r--src/server/image.c10
-rw-r--r--src/server/integrity.c17
-rw-r--r--src/server/net.c11
-rw-r--r--src/server/rpc.c13
-rw-r--r--src/server/server.c22
-rw-r--r--src/server/threadpool.c28
-rw-r--r--src/server/threadpool.h5
8 files changed, 74 insertions, 33 deletions
diff --git a/src/server/globals.h b/src/server/globals.h
index 5dd205a..f940666 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -138,6 +138,7 @@ struct _dnbd3_client
char hostName[HOSTNAMELEN]; // inet_ntop version of host
pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too)
pthread_mutex_t lock;
+ pthread_t thread;
};
// #######################################################
diff --git a/src/server/image.c b/src/server/image.c
index de93cd4..248c12c 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -562,9 +562,7 @@ bool image_tryFreeAll()
if ( _images[i] != NULL && _images[i]->users == 0 ) {
dnbd3_image_t *image = _images[i];
_images[i] = NULL;
- mutex_unlock( &imageListLock );
image = image_free( image );
- mutex_lock( &imageListLock );
}
if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--;
}
@@ -574,15 +572,13 @@ bool image_tryFreeAll()
/**
* Free image. DOES NOT check if it's in use.
- * Indirectly locks on imageListLock, image.lock, uplink.queueLock
+ * (Indirectly) locks on image.lock, uplink.queueLock
*/
static dnbd3_image_t* image_free(dnbd3_image_t *image)
{
assert( image != NULL );
assert( image->users == 0 );
- if ( !_shutdown ) {
- logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid );
- }
+ logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", image->name, (int)image->rid );
// uplink_shutdown might return false to tell us
// that the shutdown is in progress. Bail out since
// this will get called again when the uplink is done.
@@ -600,8 +596,6 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image)
mutex_unlock( &image->lock );
if ( image->readFd != -1 ) close( image->readFd );
mutex_destroy( &image->lock );
- //
- memset( image, 0, sizeof(*image) );
free( image );
return NULL ;
}
diff --git a/src/server/integrity.c b/src/server/integrity.c
index f358c46..e7ebeb2 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -184,13 +184,20 @@ static void* integrity_main(void * data UNUSED)
mutex_unlock( &image->lock );
}
#if defined(linux) || defined(__linux)
- if ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) {
+ while ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 )
#else
- if ( fsync( fd ) == -1 ) {
+ while ( fsync( fd ) == -1 )
#endif
- logadd( LOG_ERROR, "Cannot flush %s for integrity check", image->path );
+ {
+ if ( _shutdown )
+ break;
+ if ( errno == EINTR )
+ continue;
+ logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, errno );
exit( 1 );
}
+ if ( _shutdown )
+ break;
// Use direct I/O only if read length is multiple of 4096 to be on the safe side
int tfd;
if ( direct && ( end % DNBD3_BLOCK_SIZE ) == 0 ) {
@@ -266,7 +273,9 @@ static void* integrity_main(void * data UNUSED)
}
}
mutex_unlock( &integrityQueueLock );
- if ( buffer != NULL ) free( buffer );
+ if ( buffer != NULL ) {
+ free( buffer );
+ }
bRunning = false;
return NULL;
}
diff --git a/src/server/net.c b/src/server/net.c
index e0b516e..9c855e4 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -44,6 +44,7 @@
#include <jansson.h>
#include <inttypes.h>
#include <stdatomic.h>
+#include <signal.h>
static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS];
static int _num_clients = 0;
@@ -153,6 +154,7 @@ void* net_handleNewConnection(void *clientPtr)
{
dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr;
dnbd3_request_t request;
+ client->thread = pthread_self();
// Await data from client. Since this is a fresh connection, we expect data right away
sock_setTimeout( client->sock, _clientTimeout );
@@ -631,11 +633,10 @@ void net_disconnectAll()
int i;
mutex_lock( &_clients_lock );
for (i = 0; i < _num_clients; ++i) {
- if ( _clients[i] == NULL ) continue;
- dnbd3_client_t * const client = _clients[i];
- mutex_lock( &client->lock );
- if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR );
- mutex_unlock( &client->lock );
+ if ( _clients[i] == NULL )
+ continue;
+ shutdown( _clients[i]->sock, SHUT_RDWR );
+ pthread_kill( _clients[i]->thread, SIGINT );
}
mutex_unlock( &_clients_lock );
}
diff --git a/src/server/rpc.c b/src/server/rpc.c
index 261c6c0..662263e 100644
--- a/src/server/rpc.c
+++ b/src/server/rpc.c
@@ -137,13 +137,13 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
bool hasName = false;
bool ok;
int keepAlive = HTTP_KEEPALIVE;
- do {
+ while ( !_shutdown ) {
// Read request from client
struct phr_header headers[100];
size_t numHeaders, prevLen = 0, consumed;
struct string method, path;
int minorVersion;
- do {
+ while ( !_shutdown ) {
// Parse before calling recv, there might be a complete pipelined request in the buffer already
// If the request is incomplete, we allow exactly one additional recv() to complete it.
// This should suffice for real world scenarios as I don't know of any
@@ -188,7 +188,9 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
sendReply( sock, "400 Bad Request", "text/plain", "Server cannot understand what you're trying to say", -1, HTTP_CLOSE );
goto func_return;
}
- } while ( true );
+ } // Loop while request header incomplete
+ if ( _shutdown )
+ break;
if ( keepAlive == HTTP_KEEPALIVE ) {
// Only keep the connection alive (and indicate so) if the client seems to support this
if ( minorVersion == 0 || hasHeaderValue( headers, numHeaders, &STR_CONNECTION, &STR_CLOSE ) ) {
@@ -213,7 +215,8 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
} else {
ok = sendReply( sock, "404 Not found", "text/plain", "Nothing", -1, keepAlive );
}
- if ( !ok ) break;
+ if ( !ok )
+ break;
}
// hoff might be beyond end if the client sent another request (burst)
const ssize_t extra = hoff - consumed;
@@ -225,7 +228,7 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
hasName = true;
setThreadName( "HTTP" );
}
- } while (true);
+ } // Loop while more requests
func_return:;
do {
const int curCount = --status.count;
diff --git a/src/server/server.c b/src/server/server.c
index 1cdd2ab..0dddea7 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -37,6 +37,8 @@
#include <signal.h>
#include <getopt.h>
#include <assert.h>
+#include <sys/types.h>
+#include <unistd.h>
#define LONGOPT_CRC4 1000
#define LONGOPT_ASSERT 1001
@@ -60,6 +62,7 @@ static _Atomic(job_t *) newJob;
static bool hasTimerThread = false;
static pthread_t timerThread;
+static pid_t mainPid;
static pthread_t mainThread;
#define DEFAULT_TIMER_TIMEOUT (60)
@@ -138,7 +141,7 @@ _Noreturn static void dnbd3_cleanup()
logadd( LOG_INFO, "Cleanup..." );
if ( hasTimerThread ) {
- pthread_kill( timerThread, SIGHUP );
+ pthread_kill( timerThread, SIGINT );
thread_join( timerThread, NULL );
}
@@ -162,6 +165,8 @@ _Noreturn static void dnbd3_cleanup()
// Wait for clients to disconnect
net_waitForAllDisconnected();
+ threadpool_waitEmpty();
+
// Clean up images
retries = 5;
while ( !image_tryFreeAll() && --retries > 0 ) {
@@ -204,6 +209,7 @@ int main(int argc, char *argv[])
{ 0, 0, 0, 0 }
};
+ mainPid = getpid();
mainThread = pthread_self();
opt = getopt_long( argc, argv, optString, longOpts, &longIndex );
@@ -509,10 +515,16 @@ static void dnbd3_handleSignal(int signum)
static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED)
{
- if ( !pthread_equal( pthread_self(), mainThread ) )
- return;
- memcpy( &lastSignal, info, sizeof(siginfo_t) );
- dnbd3_handleSignal( signum );
+ if ( info->si_pid != mainPid ) { // Source is not this process
+ memcpy( &lastSignal, info, sizeof(siginfo_t) ); // Copy signal info
+ if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) {
+ pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread
+ }
+ }
+ if ( pthread_equal( pthread_self(), mainThread ) ) {
+ // Signal received by main thread -- handle
+ dnbd3_handleSignal( signum );
+ }
}
uint32_t dnbd3_serverUptime()
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 3947677..0b46fd6 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -15,6 +15,7 @@ static void *threadpool_worker(void *entryPtr);
static pthread_attr_t threadAttrs;
static atomic_int maxIdleThreads = -1;
static _Atomic(entry_t *) *pool = NULL;
+static atomic_int activeThreads = 0;
bool threadpool_init(int maxIdle)
{
@@ -34,10 +35,9 @@ bool threadpool_init(int maxIdle)
void threadpool_close()
{
- _shutdown = true;
- int max = maxIdleThreads;
- maxIdleThreads = -1;
- if ( max <= 0 ) return;
+ int max = atomic_exchange( &maxIdleThreads, -1 );
+ if ( max <= 0 )
+ return;
for ( int i = 0; i < max; ++i ) {
entry_t *cur = pool[i];
if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) {
@@ -46,9 +46,23 @@ void threadpool_close()
}
}
+void threadpool_waitEmpty()
+{
+ if ( activeThreads == 0 )
+ return;
+ do {
+ sleep( 1 );
+ logadd( LOG_INFO, "Threadpool: %d threads still active", (int)activeThreads );
+ } while ( activeThreads != 0 );
+}
+
bool threadpool_run(void *(*startRoutine)(void *), void *arg)
{
- if ( startRoutine == NULL ) {
+ if ( unlikely( _shutdown ) ) {
+ logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" );
+ return false;
+ }
+ if ( unlikely( startRoutine == NULL ) ) {
logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" );
return false; // Or bail out!?
}
@@ -60,7 +74,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
break;
}
}
- if ( entry == NULL ) {
+ if ( unlikely( entry == NULL ) ) {
entry = malloc( sizeof(entry_t) );
if ( entry == NULL ) {
logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" );
@@ -78,6 +92,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
free( entry );
return false;
}
+ activeThreads++;
}
entry->startRoutine = startRoutine;
entry->arg = arg;
@@ -130,6 +145,7 @@ keep_going:;
}
signal_close( entry->signal );
free( entry );
+ activeThreads--;
return NULL;
}
diff --git a/src/server/threadpool.h b/src/server/threadpool.h
index 15dd151..ee0b3aa 100644
--- a/src/server/threadpool.h
+++ b/src/server/threadpool.h
@@ -18,6 +18,11 @@ bool threadpool_init(int maxIdleThreadCount);
void threadpool_close();
/**
+ * Block until all threads spawned have exited
+ */
+void threadpool_waitEmpty();
+
+/**
* Run a thread using the thread pool.
* @param startRoutine function to run in new thread
* @param arg argument to pass to thead