summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2016-01-11 12:09:23 +0100
committerSimon Rettberg2016-01-11 12:09:23 +0100
commitd9c2a6cf943ca08f31f61a3fada940f77e3a03d3 (patch)
tree31f627a3d52ff838b046f41516a0fbef0b58b9ee
parent[KERNEL/CLIENT] Several minor tweaks and changes (diff)
downloaddnbd3-d9c2a6cf943ca08f31f61a3fada940f77e3a03d3.tar.gz
dnbd3-d9c2a6cf943ca08f31f61a3fada940f77e3a03d3.tar.xz
dnbd3-d9c2a6cf943ca08f31f61a3fada940f77e3a03d3.zip
[SERVER] Fix a lot of (mostly harmless) data races
-rw-r--r--LOCKS5
-rw-r--r--src/server/altservers.c35
-rw-r--r--src/server/globals.h1
-rw-r--r--src/server/helper.c9
-rw-r--r--src/server/helper.h2
-rw-r--r--src/server/image.c79
-rw-r--r--src/server/integrity.c4
-rw-r--r--src/server/net.c33
-rw-r--r--src/server/rpc.c30
-rw-r--r--src/server/server.c13
-rw-r--r--src/server/threadpool.c5
-rw-r--r--src/server/uplink.c38
-rw-r--r--src/shared/log.c6
-rw-r--r--src/shared/sockhelper.c2
14 files changed, 166 insertions, 96 deletions
diff --git a/LOCKS b/LOCKS
index f2e64dd..a28e42a 100644
--- a/LOCKS
+++ b/LOCKS
@@ -16,14 +16,15 @@ integrityQueueLock
remoteCloneLock | reloadLock
_images_lock
_images[].lock
-uplink.queueLock
-pendingLockProduce
pendingLockConsume
+pendingLockProduce
+uplink.queueLock
altServersLock
client.sendMutex
client.statsLock
statisticsSentLock
statisticsReceivedLock
+uplink.rttLock
If you need to lock multiple clients/images/... at once,
lock the client with the lowest array index first.
diff --git a/src/server/altservers.c b/src/server/altservers.c
index d82b522..1a1e844 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -19,7 +19,7 @@
#include <stdio.h>
static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
-static pthread_spinlock_t pendingLockProduce; // Lock for adding something to pending. (NULL -> nonNULL)
+static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL)
static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL)
static int signalFd = -1;
@@ -40,6 +40,7 @@ int altservers_getCount()
void altservers_init()
{
+ spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE );
spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE );
memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
@@ -136,14 +137,14 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
// never be that the uplink is supposed to switch, but instead calls
// this function.
assert( uplink->betterFd == -1 );
- spin_lock( &pendingLockProduce );
+ spin_lock( &pendingLockWrite );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
if ( uplink->rttTestResult == RTT_INPROGRESS ) {
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != uplink ) continue;
// Yep, measuring right now
- spin_unlock( &pendingLockProduce );
+ spin_unlock( &pendingLockWrite );
return;
}
}
@@ -152,12 +153,12 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
if ( pending[i] != NULL ) continue;
pending[i] = uplink;
uplink->rttTestResult = RTT_INPROGRESS;
- spin_unlock( &pendingLockProduce );
+ spin_unlock( &pendingLockWrite );
signal_call( signalFd ); // Wake altservers thread up
return;
}
// End of loop - no free slot
- spin_unlock( &pendingLockProduce );
+ spin_unlock( &pendingLockWrite );
logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." );
}
@@ -167,12 +168,14 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
void altservers_removeUplink(dnbd3_connection_t *uplink)
{
pthread_mutex_lock( &pendingLockConsume );
+ spin_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] == uplink ) {
uplink->rttTestResult = RTT_NOT_REACHABLE;
pending[i] = NULL;
}
}
+ spin_unlock( &pendingLockWrite );
pthread_mutex_unlock( &pendingLockConsume );
}
@@ -357,10 +360,11 @@ static void *altservers_main(void *data UNUSED)
setThreadName( "altserver-check" );
blockNoncriticalSignals();
// Init spinlock
- spin_init( &pendingLockProduce, PTHREAD_PROCESS_PRIVATE );
// Init waiting links queue
+ spin_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i)
pending[i] = NULL;
+ spin_unlock( &pendingLockWrite );
// Init signal-pipe
signalFd = signal_new();
if ( signalFd < 0 ) {
@@ -379,9 +383,16 @@ static void *altservers_main(void *data UNUSED)
}
// Work your way through the queue
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- if ( pending[itLink] == NULL ) continue; // Check once before locking, as a mutex is expensive
+ spin_lock( &pendingLockWrite );
+ if ( pending[itLink] == NULL ) {
+ spin_unlock( &pendingLockWrite );
+ continue; // Check once before locking, as a mutex is expensive
+ }
+ spin_unlock( &pendingLockWrite );
pthread_mutex_lock( &pendingLockConsume );
+ spin_lock( &pendingLockWrite );
dnbd3_connection_t * const uplink = pending[itLink];
+ spin_unlock( &pendingLockWrite );
if ( uplink == NULL ) { // Check again after locking
pthread_mutex_unlock( &pendingLockConsume );
continue;
@@ -389,7 +400,9 @@ static void *altservers_main(void *data UNUSED)
dnbd3_image_t * const image = image_lock( uplink->image );
if ( image == NULL ) { // Check again after locking
uplink->rttTestResult = RTT_NOT_REACHABLE;
+ spin_lock( &pendingLockWrite );
pending[itLink] = NULL;
+ spin_unlock( &pendingLockWrite );
pthread_mutex_unlock( &pendingLockConsume );
logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement\n" );
continue;
@@ -493,21 +506,29 @@ static void *altservers_main(void *data UNUSED)
if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
// yep
logadd( LOG_DEBUG1, "Change @ %s - best: %uµs, current: %uµs\n", image->lower_name, bestRtt, currentRtt );
+ spin_lock( &uplink->rttLock );
uplink->betterFd = bestSock;
uplink->betterServer = servers[bestIndex];
uplink->rttTestResult = RTT_DOCHANGE;
+ spin_unlock( &uplink->rttLock );
static uint64_t counter = 1;
write( uplink->signal, &counter, sizeof(counter) );
} else if (bestSock == -1) {
// No server was reachable
+ spin_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_NOT_REACHABLE;
+ spin_unlock( &uplink->rttLock );
} else {
// nope
if ( bestSock != -1 ) close( bestSock );
+ spin_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;
+ spin_unlock( &uplink->rttLock );
}
// end of loop over all pending uplinks
+ spin_lock( &pendingLockWrite );
pending[itLink] = NULL;
+ spin_unlock( &pendingLockWrite );
pthread_mutex_unlock( &pendingLockConsume );
}
// Save cache maps of all images if applicable
diff --git a/src/server/globals.h b/src/server/globals.h
index 9cf349a..c67b5a6 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -49,6 +49,7 @@ struct _dnbd3_connection
int queueLen; // length of queue
dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer
dnbd3_host_t currentServer; // Current server we're connected to
+ pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer
int rttTestResult; // RTT_*
dnbd3_host_t betterServer; // The better server
int betterFd; // Active connection to better server, ready to use
diff --git a/src/server/helper.c b/src/server/helper.c
index 87e0b7f..5201744 100644
--- a/src/server/helper.c
+++ b/src/server/helper.c
@@ -125,9 +125,14 @@ void trim_right(char * const string)
*end-- = '\0';
}
-void setThreadName(char *name)
+void setThreadName(const char *name)
{
- if ( strlen( name ) > 16 ) name[16] = '\0';
+ char newName[16];
+ if ( strlen( name ) > 15 ) {
+ snprintf( newName, sizeof(newName), "%s", name );
+ newName[15] = '\0';
+ name = newName;
+ }
prctl( PR_SET_NAME, (unsigned long)name, 0, 0, 0 );
}
diff --git a/src/server/helper.h b/src/server/helper.h
index 389a98c..1fb9835 100644
--- a/src/server/helper.h
+++ b/src/server/helper.h
@@ -16,7 +16,7 @@ bool host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen);
void strtolower(char *string);
void remove_trailing_slash(char *string);
void trim_right(char * const string);
-void setThreadName(char *name);
+void setThreadName(const char *name);
void blockNoncriticalSignals();
static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b)
diff --git a/src/server/image.c b/src/server/image.c
index da19b6a..875117b 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -188,11 +188,16 @@ void image_saveAllCacheMaps()
spin_lock( &imageListLock );
for (int i = 0; i < _num_images; ++i) {
if ( _images[i] == NULL ) continue;
- _images[i]->users++;
+ dnbd3_image_t * const image = _images[i];
+ spin_lock( &image->lock );
+ image->users++;
+ spin_unlock( &image->lock );
spin_unlock( &imageListLock );
- image_saveCacheMap( _images[i] );
+ image_saveCacheMap( image );
spin_lock( &imageListLock );
- _images[i]->users--;
+ spin_lock( &image->lock );
+ image->users--;
+ spin_unlock( &image->lock );
}
spin_unlock( &imageListLock );
}
@@ -348,6 +353,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
} else {
// Seems everything is fine again \o/
candidate->working = true;
+ logadd( LOG_INFO, "Changed state of %s:%d to 'working'", candidate->lower_name, candidate->rid );
}
}
}
@@ -415,23 +421,24 @@ dnbd3_image_t* image_release(dnbd3_image_t *image)
/**
* Remove image from images array. Only free it if it has
- * no active users
+ * no active users and was actually in the list.
* Locks on: imageListLock, image[].lock
*/
static void image_remove(dnbd3_image_t *image)
{
- bool wasInList = false;
+ bool mustFree = false;
spin_lock( &imageListLock );
spin_lock( &image->lock );
- for (int i = _num_images - 1; i >= 0; --i) {
- if ( _images[i] != image ) continue;
- _images[i] = NULL;
- wasInList = true;
- if ( i + 1 == _num_images ) _num_images--;
+ for ( int i = _num_images - 1; i >= 0; --i ) {
+ if ( _images[i] == image ) {
+ _images[i] = NULL;
+ mustFree = ( image->users == 0 );
+ }
+ if ( _images[i] == NULL && i + 1 == _num_images ) _num_images--;
}
spin_unlock( &image->lock );
spin_unlock( &imageListLock );
- if ( wasInList && image->users == 0 ) image = image_free( image );
+ if ( mustFree ) image = image_free( image );
}
/**
@@ -487,11 +494,15 @@ bool image_loadAll(char *path)
imgHandle = _images[i];
_images[i] = NULL;
if ( i + 1 == _num_images ) _num_images--;
- if ( imgHandle->users != 0 ) continue; // Still in use, do not free (last releasing user will trigger)
- // Image is not in use anymore, free the dangling entry immediately
- spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock
- image_free( imgHandle );
- spin_lock( &imageListLock );
+ spin_lock( &imgHandle->lock );
+ const bool freeImg = ( imgHandle->users == 0 );
+ spin_unlock( &imgHandle->lock );
+ if ( freeImg ) {
+ // Image is not in use anymore, free the dangling entry immediately
+ spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock
+ image_free( imgHandle );
+ spin_lock( &imageListLock );
+ }
}
spin_unlock( &imageListLock );
if ( _shutdown ) return true;
@@ -514,7 +525,7 @@ bool image_tryFreeAll()
{
spin_lock( &imageListLock );
for (int i = _num_images - 1; i >= 0; --i) {
- if ( _images[i] != NULL && _images[i]->users == 0 ) {
+ if ( _images[i] != NULL && _images[i]->users == 0 ) { // XXX Data race...
_images[i] = image_free( _images[i] );
}
if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--;
@@ -585,26 +596,28 @@ static bool image_load_all_internal(char *base, char *path)
#define SUBDIR_LEN 120
assert( path != NULL );
assert( *path == '/' );
- struct dirent *entry;
- DIR *dir = opendir( path );
+ struct dirent entry, *entryPtr;
+ const int pathLen = strlen( path );
+ char subpath[PATHLEN];
+ struct stat st;
+ DIR * const dir = opendir( path );
+
if ( dir == NULL ) {
logadd( LOG_ERROR, "Could not opendir '%s' for loading", path );
return false;
}
- const int pathLen = strlen( path );
- const int len = pathLen + SUBDIR_LEN + 1;
- char subpath[len];
- struct stat st;
- while ( !_shutdown && (entry = readdir( dir )) != NULL ) {
- if ( strcmp( entry->d_name, "." ) == 0 || strcmp( entry->d_name, ".." ) == 0 ) continue;
- if ( strlen( entry->d_name ) > SUBDIR_LEN ) {
- logadd( LOG_WARNING, "Skipping entry %s: Too long (max %d bytes)", entry->d_name, (int)SUBDIR_LEN );
+
+ while ( !_shutdown && (entryPtr = readdir( dir )) != NULL ) {
+ entry = *entryPtr;
+ if ( strcmp( entry.d_name, "." ) == 0 || strcmp( entry.d_name, ".." ) == 0 ) continue;
+ if ( strlen( entry.d_name ) > SUBDIR_LEN ) {
+ logadd( LOG_WARNING, "Skipping entry %s: Too long (max %d bytes)", entry.d_name, (int)SUBDIR_LEN );
continue;
}
- if ( entry->d_name[0] == '/' || path[pathLen - 1] == '/' ) {
- snprintf( subpath, len, "%s%s", path, entry->d_name );
+ if ( entry.d_name[0] == '/' || path[pathLen - 1] == '/' ) {
+ snprintf( subpath, PATHLEN, "%s%s", path, entry.d_name );
} else {
- snprintf( subpath, len, "%s/%s", path, entry->d_name );
+ snprintf( subpath, PATHLEN, "%s/%s", path, entry.d_name );
}
if ( stat( subpath, &st ) < 0 ) {
logadd( LOG_WARNING, "stat() for '%s' failed. Ignoring....", subpath );
@@ -820,18 +833,18 @@ static bool image_load(char *base, char *path, int withUplink)
}
if ( i >= _num_images ) {
if ( _num_images >= SERVER_MAX_IMAGES ) {
- logadd( LOG_ERROR, "Cannot load image '%s': maximum number of images reached.", path );
spin_unlock( &imageListLock );
+ logadd( LOG_ERROR, "Cannot load image '%s': maximum number of images reached.", path );
image = image_free( image );
goto load_error;
}
_images[_num_images++] = image;
- logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->lower_name, (int)image->rid );
}
// Keep fd for reading
image->readFd = fdImage;
fdImage = -1;
spin_unlock( &imageListLock );
+ logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->lower_name, (int)image->rid );
function_return = true;
@@ -1522,7 +1535,7 @@ static bool image_ensureDiskSpace(uint64_t size)
(int)(size / (1024 * 1024)) );
// Find least recently used image
dnbd3_image_t *oldest = NULL;
- int i;
+ int i; // XXX improve locking
for (i = 0; i < _num_images; ++i) {
if ( _images[i] == NULL ) continue;
dnbd3_image_t *current = image_lock( _images[i] );
diff --git a/src/server/integrity.c b/src/server/integrity.c
index ef909aa..c697be8 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -41,13 +41,15 @@ void integrity_init()
assert( queueLen == -1 );
pthread_mutex_init( &integrityQueueLock, NULL );
pthread_cond_init( &queueSignal, NULL );
+ pthread_mutex_lock( &integrityQueueLock );
+ queueLen = 0;
+ pthread_mutex_unlock( &integrityQueueLock );
bRunning = true;
if ( 0 != thread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) {
bRunning = false;
logadd( LOG_WARNING, "Could not start integrity check thread. Corrupted images will not be detected." );
return;
}
- queueLen = 0;
}
void integrity_shutdown()
diff --git a/src/server/net.c b/src/server/net.c
index 9dbba50..8929937 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -157,7 +157,7 @@ void net_init()
void *net_client_handler(void *dnbd3_client)
{
- dnbd3_client_t *client = (dnbd3_client_t *)dnbd3_client;
+ dnbd3_client_t * const client = (dnbd3_client_t *)dnbd3_client;
dnbd3_request_t request;
dnbd3_reply_t reply;
@@ -180,9 +180,10 @@ void *net_client_handler(void *dnbd3_client)
reply.magic = dnbd3_packet_magic;
sock_setTimeout( client->sock, _clientTimeout );
- if ( !host_to_string( &client->host, client->hostName, HOSTNAMELEN ) ) {
- client->hostName[HOSTNAMELEN-1] = '\0';
- }
+ spin_lock( &client->lock );
+ host_to_string( &client->host, client->hostName, HOSTNAMELEN );
+ client->hostName[HOSTNAMELEN-1] = '\0';
+ spin_unlock( &client->lock );
// Receive first packet. This must be CMD_SELECT_IMAGE by protocol specification
if ( recv_request_header( client->sock, &request ) ) {
@@ -201,7 +202,10 @@ void *net_client_handler(void *dnbd3_client)
logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName );
}
} else {
- client->image = image = image_getOrLoad( image_name, rid );
+ image = image_getOrLoad( image_name, rid );
+ spin_lock( &client->lock );
+ client->image = image;
+ spin_unlock( &client->lock );
if ( image == NULL ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
} else if ( !image->working ) {
@@ -225,7 +229,7 @@ void *net_client_handler(void *dnbd3_client)
serializer_reset_write( &payload );
serializer_put_uint16( &payload, PROTOCOL_VERSION );
serializer_put_string( &payload, image->lower_name );
- serializer_put_uint16( &payload, image->rid );
+ serializer_put_uint16( &payload, (uint16_t)image->rid );
serializer_put_uint64( &payload, image->virtualFilesize );
reply.cmd = CMD_SELECT_IMAGE;
reply.size = serializer_get_written_length( &payload );
@@ -326,8 +330,8 @@ void *net_client_handler(void *dnbd3_client)
spin_unlock( &image->lock );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, request.offset, request.size ) ) {
- logadd( LOG_DEBUG1, "Could not relay un-cached request from %s to upstream proxy, disabling image %s:%d",
- client->hostName, image->lower_name, (int)image->rid );
+ logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d",
+ client->hostName, image->lower_name, image->rid );
image->working = false;
goto exit_client_cleanup;
}
@@ -365,11 +369,15 @@ void *net_client_handler(void *dnbd3_client)
const int err = errno;
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
if ( ret == -1 ) {
- if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) {
+ if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN
+ && err != EAGAIN && err != EWOULDBLOCK ) {
logadd( LOG_DEBUG1, "sendfile to %s failed (image to net. sent %d/%d, errno=%d)",
client->hostName, (int)done, (int)realBytes, err );
}
- if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false;
+ if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) {
+ logadd( LOG_INFO, "Disabling %s:%d", image->lower_name, image->rid );
+ image->working = false;
+ }
}
goto exit_client_cleanup;
}
@@ -411,7 +419,9 @@ void *net_client_handler(void *dnbd3_client)
pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
pthread_mutex_unlock( &client->sendMutex );
+ spin_lock( &image->lock );
image->atime = time( NULL );
+ spin_unlock( &image->lock );
set_name: ;
if ( !hasName ) {
hasName = true;
@@ -420,7 +430,6 @@ set_name: ;
break;
case CMD_SET_CLIENT_MODE:
- image->atime = time( NULL );
client->isServer = false;
break;
@@ -449,7 +458,7 @@ set_name: ;
exit_client_cleanup: ;
dnbd3_removeClient( client );
net_updateGlobalSentStatsFromClient( client ); // Don't need client's lock here as it's not active anymore
- client = dnbd3_freeClient( client ); // This will also call image_release on client->image
+ dnbd3_freeClient( client ); // This will also call image_release on client->image
return NULL ;
}
diff --git a/src/server/rpc.c b/src/server/rpc.c
index 3ea8a9a..4cd1f6e 100644
--- a/src/server/rpc.c
+++ b/src/server/rpc.c
@@ -61,25 +61,24 @@ static void clientsToJson(json_t *jsonClients)
if ( _clients[i] == NULL ) {
continue;
}
- // Do not lock on client.lock here:
- // 1) .image can only be set once, will never change (just like .image.id)
- // 2) .hostName never changes as well
- // 3) .bytesSent and .tmpBytesSent are guarded by .statsLock
- // 4) the client cannot be freed, as it's still in the list and we hold the list's lock
- if ( _clients[i]->image == NULL ) {
- imgId = -1;
- } else {
- strncpy( host, _clients[i]->hostName, HOSTNAMELEN - 1 );
- imgId = _clients[i]->image->id;
- spin_lock( &_clients[i]->statsLock );
- bytesSent = _clients[i]->bytesSent;
- net_updateGlobalSentStatsFromClient( _clients[i] ); // Do this since we read the totalBytesSent counter later
- spin_unlock( &_clients[i]->statsLock );
- }
+ dnbd3_client_t * const client = _clients[i];
+ spin_lock( &client->lock );
spin_unlock( &_clients_lock );
// Unlock so we give other threads a chance to access the client list.
// We might not get an atomic snapshot of the currently connected clients,
// but that doesn't really make a difference anyways.
+ if ( client->image == NULL ) {
+ spin_unlock( &client->lock );
+ imgId = -1;
+ } else {
+ strncpy( host, client->hostName, HOSTNAMELEN - 1 );
+ imgId = client->image->id;
+ spin_lock( &client->statsLock );
+ spin_unlock( &client->lock );
+ bytesSent = client->bytesSent;
+ net_updateGlobalSentStatsFromClient( client ); // Do this since we read the totalBytesSent counter later
+ spin_unlock( &client->statsLock );
+ }
if ( imgId != -1 ) {
clientStats = json_pack( "{sssisI}",
"address", host,
@@ -91,3 +90,4 @@ static void clientsToJson(json_t *jsonClients)
}
spin_unlock( &_clients_lock );
}
+
diff --git a/src/server/server.c b/src/server/server.c
index dd71312..30b8594 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -415,11 +415,11 @@ void dnbd3_removeClient(dnbd3_client_t *client)
{
int i;
spin_lock( &_clients_lock );
- const int cutoff = MAX(10, _num_clients / 2);
- for (i = _num_clients - 1; i >= 0; --i) {
- if ( _clients[i] != client ) continue;
- _clients[i] = NULL;
- if ( i > cutoff && i + 1 == _num_clients ) --_num_clients;
+ for ( i = _num_clients - 1; i >= 0; --i ) {
+ if ( _clients[i] == client ) {
+ _clients[i] = NULL;
+ }
+ if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients;
}
spin_unlock( &_clients_lock );
}
@@ -443,9 +443,8 @@ dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client)
spin_lock( &client->image->lock );
if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client );
spin_unlock( &client->image->lock );
- image_release( client->image );
+ client->image = image_release( client->image );
}
- client->image = NULL;
spin_unlock( &client->lock );
spin_destroy( &client->lock );
spin_destroy( &client->statsLock );
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 41f1f0b..b1c46a3 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -4,6 +4,7 @@
#include "../shared/signal.h"
#include "locks.h"
#include <pthread.h>
+#include <stdlib.h>
typedef struct _entry_t {
@@ -93,7 +94,7 @@ static void *threadpool_worker(void *entryPtr)
if ( _shutdown ) break;
if ( ret > 0 ) {
if ( entry->startRoutine == NULL ) {
- logadd( LOG_DEBUG1, "Worker woke up but has no work to do!\n" );
+ logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" );
continue;
}
// Start assigned work
@@ -119,7 +120,7 @@ static void *threadpool_worker(void *entryPtr)
spin_unlock( &poolLock );
setThreadName( "[pool]" );
} else {
- logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!\n", ret );
+ logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret );
}
}
signal_close( entry->signalFd );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 726b08b..e0bdcae 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -69,12 +69,15 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
goto failure;
}
link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
+ spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
link->image = image;
link->bytesReceived = 0;
link->queueLen = 0;
link->fd = -1;
link->signal = -1;
link->replicationHandle = 0;
+ spin_lock( &link->rttLock );
if ( sock >= 0 ) {
link->betterFd = sock;
link->betterServer = *host;
@@ -83,9 +86,9 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
link->betterFd = -1;
link->rttTestResult = RTT_IDLE;
}
+ spin_unlock( &link->rttLock );
link->recvBufferLen = 0;
link->shutdown = false;
- spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
@@ -133,7 +136,7 @@ void uplink_shutdown(dnbd3_image_t *image)
/**
* Remove given client from uplink request queue
- * Locks on: uplink.queueLock, client.sendMutex
+ * Locks on: uplink.queueLock
*/
void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
{
@@ -142,8 +145,8 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
if ( uplink->queue[i].client == client ) {
uplink->queue[i].client = NULL;
uplink->queue[i].status = ULR_FREE;
- if ( uplink->queueLen == i + 1 ) uplink->queueLen--;
}
+ if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--;
}
spin_unlock( &uplink->queueLock );
}
@@ -273,16 +276,20 @@ static void* uplink_mainloop(void *data)
}
while ( !_shutdown && !link->shutdown ) {
// Check if server switch is in order
- if ( link->rttTestResult == RTT_DOCHANGE ) {
+ spin_lock( &link->rttLock );
+ if ( link->rttTestResult != RTT_DOCHANGE ) {
+ spin_unlock( &link->rttLock );
+ } else {
link->rttTestResult = RTT_IDLE;
- discoverFailCount = 0;
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
const int fd = link->fd;
link->fd = link->betterFd;
- if ( fd != -1 ) close( fd );
link->betterFd = -1;
link->currentServer = link->betterServer;
+ spin_unlock( &link->rttLock );
+ discoverFailCount = 0;
+ if ( fd != -1 ) close( fd );
link->replicationHandle = 0;
link->image->working = true;
link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
@@ -372,7 +379,10 @@ static void* uplink_mainloop(void *data)
}
}
// See if we should trigger an RTT measurement
- if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
+ spin_lock( &link->rttLock );
+ const int rttTestResult = link->rttTestResult;
+ spin_unlock( &link->rttLock );
+ if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) {
// This probably means the system time was changed - handle this case properly by capping the timeout
nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2;
@@ -390,8 +400,10 @@ static void* uplink_mainloop(void *data)
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
nextAltCheck = now + altCheckInterval;
}
- } else if ( link->rttTestResult == RTT_NOT_REACHABLE ) {
+ } else if ( rttTestResult == RTT_NOT_REACHABLE ) {
+ spin_lock( &link->rttLock );
link->rttTestResult = RTT_IDLE;
+ spin_unlock( &link->rttLock );
discoverFailCount++;
nextAltCheck = now + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED);
}
@@ -441,6 +453,7 @@ static void* uplink_mainloop(void *data)
usleep( 10000 );
if ( link->betterFd != -1 ) close( link->betterFd );
spin_destroy( &link->queueLock );
+ spin_destroy( &link->rttLock );
free( link->recvBuffer );
link->recvBuffer = NULL;
spin_lock( &statisticsReceivedLock );
@@ -524,7 +537,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
/**
* Receive data from uplink server and process/dispatch
- * Locks on: link.lock, indirectly on images[].lock
+ * Locks on: link.lock, images[].lock
*/
static void uplink_handleReceive(dnbd3_connection_t *link)
{
@@ -566,7 +579,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
struct iovec iov[2];
const uint64_t start = inReply.handle;
const uint64_t end = inReply.handle + inReply.size;
+ spin_lock( &link->image->lock );
link->bytesReceived += inReply.size;
+ spin_unlock( &link->image->lock );
// 1) Write to cache file
if ( link->image->cacheFd != -1 ) {
uint32_t done = 0;
@@ -644,7 +659,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
#endif
if ( start == link->replicationHandle ) link->replicationHandle = 0;
}
- if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link );
+ spin_lock( &link->queueLock );
+ const bool rep = ( link->queueLen == 0 );
+ spin_unlock( &link->queueLock );
+ if ( rep ) uplink_sendReplicationRequest( link );
return;
// Error handling from failed receive or message parsing
error_cleanup: ;
diff --git a/src/shared/log.c b/src/shared/log.c
index 6d77dc5..da27392 100644
--- a/src/shared/log.c
+++ b/src/shared/log.c
@@ -82,12 +82,12 @@ void logadd(const logmask_t mask, const char *fmt, ...)
va_list ap;
int ret;
time_t rawtime;
- struct tm *timeinfo;
+ struct tm timeinfo;
char buffer[LINE_LEN];
time( &rawtime );
- timeinfo = localtime( &rawtime );
- size_t offset = strftime( buffer, LINE_LEN, "[%d.%m. %H:%M:%S] ", timeinfo );
+ localtime_r( &rawtime, &timeinfo );
+ size_t offset = strftime( buffer, LINE_LEN, "[%d.%m. %H:%M:%S] ", &timeinfo );
offset += writeLevel( buffer + offset, mask );
va_start( ap, fmt );
ret = vsnprintf( buffer + offset, LINE_LEN - offset, fmt, ap );
diff --git a/src/shared/sockhelper.c b/src/shared/sockhelper.c
index e93d45c..d1dbd8c 100644
--- a/src/shared/sockhelper.c
+++ b/src/shared/sockhelper.c
@@ -207,7 +207,7 @@ int sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int l
bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port)
{
if ( list->count >= MAXLISTEN ) return false;
- struct addrinfo hints, *res, *ptr;
+ struct addrinfo hints, *res = NULL, *ptr;
char portStr[6];
const int on = 1;
int openCount = 0;