summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/globals.h1
-rw-r--r--src/server/image.c10
-rw-r--r--src/server/integrity.c17
-rw-r--r--src/server/net.c11
-rw-r--r--src/server/rpc.c13
-rw-r--r--src/server/server.c22
-rw-r--r--src/server/threadpool.c28
-rw-r--r--src/server/threadpool.h5
8 files changed, 74 insertions, 33 deletions
diff --git a/src/server/globals.h b/src/server/globals.h
index 5dd205a..f940666 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -138,6 +138,7 @@ struct _dnbd3_client
char hostName[HOSTNAMELEN]; // inet_ntop version of host
pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too)
pthread_mutex_t lock;
+ pthread_t thread;
};
// #######################################################
diff --git a/src/server/image.c b/src/server/image.c
index de93cd4..248c12c 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -562,9 +562,7 @@ bool image_tryFreeAll()
if ( _images[i] != NULL && _images[i]->users == 0 ) {
dnbd3_image_t *image = _images[i];
_images[i] = NULL;
- mutex_unlock( &imageListLock );
image = image_free( image );
- mutex_lock( &imageListLock );
}
if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--;
}
@@ -574,15 +572,13 @@ bool image_tryFreeAll()
/**
* Free image. DOES NOT check if it's in use.
- * Indirectly locks on imageListLock, image.lock, uplink.queueLock
+ * (Indirectly) locks on image.lock, uplink.queueLock
*/
static dnbd3_image_t* image_free(dnbd3_image_t *image)
{
assert( image != NULL );
assert( image->users == 0 );
- if ( !_shutdown ) {
- logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid );
- }
+ logadd( ( _shutdown ? LOG_DEBUG1 : LOG_INFO ), "Freeing image %s:%d", image->name, (int)image->rid );
// uplink_shutdown might return false to tell us
// that the shutdown is in progress. Bail out since
// this will get called again when the uplink is done.
@@ -600,8 +596,6 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image)
mutex_unlock( &image->lock );
if ( image->readFd != -1 ) close( image->readFd );
mutex_destroy( &image->lock );
- //
- memset( image, 0, sizeof(*image) );
free( image );
return NULL ;
}
diff --git a/src/server/integrity.c b/src/server/integrity.c
index f358c46..e7ebeb2 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -184,13 +184,20 @@ static void* integrity_main(void * data UNUSED)
mutex_unlock( &image->lock );
}
#if defined(linux) || defined(__linux)
- if ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) {
+ while ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 )
#else
- if ( fsync( fd ) == -1 ) {
+ while ( fsync( fd ) == -1 )
#endif
- logadd( LOG_ERROR, "Cannot flush %s for integrity check", image->path );
+ {
+ if ( _shutdown )
+ break;
+ if ( errno == EINTR )
+ continue;
+ logadd( LOG_ERROR, "Cannot flush %s for integrity check (errno=%d)", image->path, errno );
exit( 1 );
}
+ if ( _shutdown )
+ break;
// Use direct I/O only if read length is multiple of 4096 to be on the safe side
int tfd;
if ( direct && ( end % DNBD3_BLOCK_SIZE ) == 0 ) {
@@ -266,7 +273,9 @@ static void* integrity_main(void * data UNUSED)
}
}
mutex_unlock( &integrityQueueLock );
- if ( buffer != NULL ) free( buffer );
+ if ( buffer != NULL ) {
+ free( buffer );
+ }
bRunning = false;
return NULL;
}
diff --git a/src/server/net.c b/src/server/net.c
index e0b516e..9c855e4 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -44,6 +44,7 @@
#include <jansson.h>
#include <inttypes.h>
#include <stdatomic.h>
+#include <signal.h>
static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS];
static int _num_clients = 0;
@@ -153,6 +154,7 @@ void* net_handleNewConnection(void *clientPtr)
{
dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr;
dnbd3_request_t request;
+ client->thread = pthread_self();
// Await data from client. Since this is a fresh connection, we expect data right away
sock_setTimeout( client->sock, _clientTimeout );
@@ -631,11 +633,10 @@ void net_disconnectAll()
int i;
mutex_lock( &_clients_lock );
for (i = 0; i < _num_clients; ++i) {
- if ( _clients[i] == NULL ) continue;
- dnbd3_client_t * const client = _clients[i];
- mutex_lock( &client->lock );
- if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR );
- mutex_unlock( &client->lock );
+ if ( _clients[i] == NULL )
+ continue;
+ shutdown( _clients[i]->sock, SHUT_RDWR );
+ pthread_kill( _clients[i]->thread, SIGINT );
}
mutex_unlock( &_clients_lock );
}
diff --git a/src/server/rpc.c b/src/server/rpc.c
index 261c6c0..662263e 100644
--- a/src/server/rpc.c
+++ b/src/server/rpc.c
@@ -137,13 +137,13 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
bool hasName = false;
bool ok;
int keepAlive = HTTP_KEEPALIVE;
- do {
+ while ( !_shutdown ) {
// Read request from client
struct phr_header headers[100];
size_t numHeaders, prevLen = 0, consumed;
struct string method, path;
int minorVersion;
- do {
+ while ( !_shutdown ) {
// Parse before calling recv, there might be a complete pipelined request in the buffer already
// If the request is incomplete, we allow exactly one additional recv() to complete it.
// This should suffice for real world scenarios as I don't know of any
@@ -188,7 +188,9 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
sendReply( sock, "400 Bad Request", "text/plain", "Server cannot understand what you're trying to say", -1, HTTP_CLOSE );
goto func_return;
}
- } while ( true );
+ } // Loop while request header incomplete
+ if ( _shutdown )
+ break;
if ( keepAlive == HTTP_KEEPALIVE ) {
// Only keep the connection alive (and indicate so) if the client seems to support this
if ( minorVersion == 0 || hasHeaderValue( headers, numHeaders, &STR_CONNECTION, &STR_CLOSE ) ) {
@@ -213,7 +215,8 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
} else {
ok = sendReply( sock, "404 Not found", "text/plain", "Nothing", -1, keepAlive );
}
- if ( !ok ) break;
+ if ( !ok )
+ break;
}
// hoff might be beyond end if the client sent another request (burst)
const ssize_t extra = hoff - consumed;
@@ -225,7 +228,7 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
hasName = true;
setThreadName( "HTTP" );
}
- } while (true);
+ } // Loop while more requests
func_return:;
do {
const int curCount = --status.count;
diff --git a/src/server/server.c b/src/server/server.c
index 1cdd2ab..0dddea7 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -37,6 +37,8 @@
#include <signal.h>
#include <getopt.h>
#include <assert.h>
+#include <sys/types.h>
+#include <unistd.h>
#define LONGOPT_CRC4 1000
#define LONGOPT_ASSERT 1001
@@ -60,6 +62,7 @@ static _Atomic(job_t *) newJob;
static bool hasTimerThread = false;
static pthread_t timerThread;
+static pid_t mainPid;
static pthread_t mainThread;
#define DEFAULT_TIMER_TIMEOUT (60)
@@ -138,7 +141,7 @@ _Noreturn static void dnbd3_cleanup()
logadd( LOG_INFO, "Cleanup..." );
if ( hasTimerThread ) {
- pthread_kill( timerThread, SIGHUP );
+ pthread_kill( timerThread, SIGINT );
thread_join( timerThread, NULL );
}
@@ -162,6 +165,8 @@ _Noreturn static void dnbd3_cleanup()
// Wait for clients to disconnect
net_waitForAllDisconnected();
+ threadpool_waitEmpty();
+
// Clean up images
retries = 5;
while ( !image_tryFreeAll() && --retries > 0 ) {
@@ -204,6 +209,7 @@ int main(int argc, char *argv[])
{ 0, 0, 0, 0 }
};
+ mainPid = getpid();
mainThread = pthread_self();
opt = getopt_long( argc, argv, optString, longOpts, &longIndex );
@@ -509,10 +515,16 @@ static void dnbd3_handleSignal(int signum)
static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED)
{
- if ( !pthread_equal( pthread_self(), mainThread ) )
- return;
- memcpy( &lastSignal, info, sizeof(siginfo_t) );
- dnbd3_handleSignal( signum );
+ if ( info->si_pid != mainPid ) { // Source is not this process
+ memcpy( &lastSignal, info, sizeof(siginfo_t) ); // Copy signal info
+ if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) {
+ pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread
+ }
+ }
+ if ( pthread_equal( pthread_self(), mainThread ) ) {
+ // Signal received by main thread -- handle
+ dnbd3_handleSignal( signum );
+ }
}
uint32_t dnbd3_serverUptime()
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 3947677..0b46fd6 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -15,6 +15,7 @@ static void *threadpool_worker(void *entryPtr);
static pthread_attr_t threadAttrs;
static atomic_int maxIdleThreads = -1;
static _Atomic(entry_t *) *pool = NULL;
+static atomic_int activeThreads = 0;
bool threadpool_init(int maxIdle)
{
@@ -34,10 +35,9 @@ bool threadpool_init(int maxIdle)
void threadpool_close()
{
- _shutdown = true;
- int max = maxIdleThreads;
- maxIdleThreads = -1;
- if ( max <= 0 ) return;
+ int max = atomic_exchange( &maxIdleThreads, -1 );
+ if ( max <= 0 )
+ return;
for ( int i = 0; i < max; ++i ) {
entry_t *cur = pool[i];
if ( cur != NULL && atomic_compare_exchange_strong( &pool[i], &cur, NULL ) ) {
@@ -46,9 +46,23 @@ void threadpool_close()
}
}
+void threadpool_waitEmpty()
+{
+ if ( activeThreads == 0 )
+ return;
+ do {
+ sleep( 1 );
+ logadd( LOG_INFO, "Threadpool: %d threads still active", (int)activeThreads );
+ } while ( activeThreads != 0 );
+}
+
bool threadpool_run(void *(*startRoutine)(void *), void *arg)
{
- if ( startRoutine == NULL ) {
+ if ( unlikely( _shutdown ) ) {
+ logadd( LOG_MINOR, "Cannot submit work to threadpool while shutting down!" );
+ return false;
+ }
+ if ( unlikely( startRoutine == NULL ) ) {
logadd( LOG_ERROR, "Trying to queue work for thread pool with NULL startRoutine" );
return false; // Or bail out!?
}
@@ -60,7 +74,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
break;
}
}
- if ( entry == NULL ) {
+ if ( unlikely( entry == NULL ) ) {
entry = malloc( sizeof(entry_t) );
if ( entry == NULL ) {
logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" );
@@ -78,6 +92,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
free( entry );
return false;
}
+ activeThreads++;
}
entry->startRoutine = startRoutine;
entry->arg = arg;
@@ -130,6 +145,7 @@ keep_going:;
}
signal_close( entry->signal );
free( entry );
+ activeThreads--;
return NULL;
}
diff --git a/src/server/threadpool.h b/src/server/threadpool.h
index 15dd151..ee0b3aa 100644
--- a/src/server/threadpool.h
+++ b/src/server/threadpool.h
@@ -18,6 +18,11 @@ bool threadpool_init(int maxIdleThreadCount);
void threadpool_close();
/**
+ * Block until all threads spawned have exited
+ */
+void threadpool_waitEmpty();
+
+/**
* Run a thread using the thread pool.
* @param startRoutine function to run in new thread
* @param arg argument to pass to thead