From d9c2a6cf943ca08f31f61a3fada940f77e3a03d3 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 11 Jan 2016 12:09:23 +0100 Subject: [SERVER] Fix a lot of (mostly harmless) data races --- LOCKS | 5 ++-- src/server/altservers.c | 35 +++++++++++++++++----- src/server/globals.h | 1 + src/server/helper.c | 9 ++++-- src/server/helper.h | 2 +- src/server/image.c | 79 ++++++++++++++++++++++++++++--------------------- src/server/integrity.c | 4 ++- src/server/net.c | 33 +++++++++++++-------- src/server/rpc.c | 30 +++++++++---------- src/server/server.c | 13 ++++---- src/server/threadpool.c | 5 ++-- src/server/uplink.c | 38 +++++++++++++++++------- src/shared/log.c | 6 ++-- src/shared/sockhelper.c | 2 +- 14 files changed, 166 insertions(+), 96 deletions(-) diff --git a/LOCKS b/LOCKS index f2e64dd..a28e42a 100644 --- a/LOCKS +++ b/LOCKS @@ -16,14 +16,15 @@ integrityQueueLock remoteCloneLock | reloadLock _images_lock _images[].lock -uplink.queueLock -pendingLockProduce pendingLockConsume +pendingLockProduce +uplink.queueLock altServersLock client.sendMutex client.statsLock statisticsSentLock statisticsReceivedLock +uplink.rttLock If you need to lock multiple clients/images/... at once, lock the client with the lowest array index first. diff --git a/src/server/altservers.c b/src/server/altservers.c index d82b522..1a1e844 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -19,7 +19,7 @@ #include static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; -static pthread_spinlock_t pendingLockProduce; // Lock for adding something to pending. (NULL -> nonNULL) +static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL) static int signalFd = -1; @@ -40,6 +40,7 @@ int altservers_getCount() void altservers_init() { + spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE ); spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE ); memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { @@ -136,14 +137,14 @@ void altservers_findUplink(dnbd3_connection_t *uplink) // never be that the uplink is supposed to switch, but instead calls // this function. assert( uplink->betterFd == -1 ); - spin_lock( &pendingLockProduce ); + spin_lock( &pendingLockWrite ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress if ( uplink->rttTestResult == RTT_INPROGRESS ) { for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] != uplink ) continue; // Yep, measuring right now - spin_unlock( &pendingLockProduce ); + spin_unlock( &pendingLockWrite ); return; } } @@ -152,12 +153,12 @@ void altservers_findUplink(dnbd3_connection_t *uplink) if ( pending[i] != NULL ) continue; pending[i] = uplink; uplink->rttTestResult = RTT_INPROGRESS; - spin_unlock( &pendingLockProduce ); + spin_unlock( &pendingLockWrite ); signal_call( signalFd ); // Wake altservers thread up return; } // End of loop - no free slot - spin_unlock( &pendingLockProduce ); + spin_unlock( &pendingLockWrite ); logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." ); } @@ -167,12 +168,14 @@ void altservers_findUplink(dnbd3_connection_t *uplink) void altservers_removeUplink(dnbd3_connection_t *uplink) { pthread_mutex_lock( &pendingLockConsume ); + spin_lock( &pendingLockWrite ); for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] == uplink ) { uplink->rttTestResult = RTT_NOT_REACHABLE; pending[i] = NULL; } } + spin_unlock( &pendingLockWrite ); pthread_mutex_unlock( &pendingLockConsume ); } @@ -357,10 +360,11 @@ static void *altservers_main(void *data UNUSED) setThreadName( "altserver-check" ); blockNoncriticalSignals(); // Init spinlock - spin_init( &pendingLockProduce, PTHREAD_PROCESS_PRIVATE ); // Init waiting links queue + spin_lock( &pendingLockWrite ); for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) pending[i] = NULL; + spin_unlock( &pendingLockWrite ); // Init signal-pipe signalFd = signal_new(); if ( signalFd < 0 ) { @@ -379,9 +383,16 @@ static void *altservers_main(void *data UNUSED) } // Work your way through the queue for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { - if ( pending[itLink] == NULL ) continue; // Check once before locking, as a mutex is expensive + spin_lock( &pendingLockWrite ); + if ( pending[itLink] == NULL ) { + spin_unlock( &pendingLockWrite ); + continue; // Check once before locking, as a mutex is expensive + } + spin_unlock( &pendingLockWrite ); pthread_mutex_lock( &pendingLockConsume ); + spin_lock( &pendingLockWrite ); dnbd3_connection_t * const uplink = pending[itLink]; + spin_unlock( &pendingLockWrite ); if ( uplink == NULL ) { // Check again after locking pthread_mutex_unlock( &pendingLockConsume ); continue; @@ -389,7 +400,9 @@ static void *altservers_main(void *data UNUSED) dnbd3_image_t * const image = image_lock( uplink->image ); if ( image == NULL ) { // Check again after locking uplink->rttTestResult = RTT_NOT_REACHABLE; + spin_lock( &pendingLockWrite ); pending[itLink] = NULL; + spin_unlock( &pendingLockWrite ); pthread_mutex_unlock( &pendingLockConsume ); logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement\n" ); continue; @@ -493,21 +506,29 @@ static void *altservers_main(void *data UNUSED) if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { // yep logadd( LOG_DEBUG1, "Change @ %s - best: %uµs, current: %uµs\n", image->lower_name, bestRtt, currentRtt ); + spin_lock( &uplink->rttLock ); uplink->betterFd = bestSock; uplink->betterServer = servers[bestIndex]; uplink->rttTestResult = RTT_DOCHANGE; + spin_unlock( &uplink->rttLock ); static uint64_t counter = 1; write( uplink->signal, &counter, sizeof(counter) ); } else if (bestSock == -1) { // No server was reachable + spin_lock( &uplink->rttLock ); uplink->rttTestResult = RTT_NOT_REACHABLE; + spin_unlock( &uplink->rttLock ); } else { // nope if ( bestSock != -1 ) close( bestSock ); + spin_lock( &uplink->rttLock ); uplink->rttTestResult = RTT_DONTCHANGE; + spin_unlock( &uplink->rttLock ); } // end of loop over all pending uplinks + spin_lock( &pendingLockWrite ); pending[itLink] = NULL; + spin_unlock( &pendingLockWrite ); pthread_mutex_unlock( &pendingLockConsume ); } // Save cache maps of all images if applicable diff --git a/src/server/globals.h b/src/server/globals.h index 9cf349a..c67b5a6 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -49,6 +49,7 @@ struct _dnbd3_connection int queueLen; // length of queue dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer dnbd3_host_t currentServer; // Current server we're connected to + pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer int rttTestResult; // RTT_* dnbd3_host_t betterServer; // The better server int betterFd; // Active connection to better server, ready to use diff --git a/src/server/helper.c b/src/server/helper.c index 87e0b7f..5201744 100644 --- a/src/server/helper.c +++ b/src/server/helper.c @@ -125,9 +125,14 @@ void trim_right(char * const string) *end-- = '\0'; } -void setThreadName(char *name) +void setThreadName(const char *name) { - if ( strlen( name ) > 16 ) name[16] = '\0'; + char newName[16]; + if ( strlen( name ) > 15 ) { + snprintf( newName, sizeof(newName), "%s", name ); + newName[15] = '\0'; + name = newName; + } prctl( PR_SET_NAME, (unsigned long)name, 0, 0, 0 ); } diff --git a/src/server/helper.h b/src/server/helper.h index 389a98c..1fb9835 100644 --- a/src/server/helper.h +++ b/src/server/helper.h @@ -16,7 +16,7 @@ bool host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen); void strtolower(char *string); void remove_trailing_slash(char *string); void trim_right(char * const string); -void setThreadName(char *name); +void setThreadName(const char *name); void blockNoncriticalSignals(); static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b) diff --git a/src/server/image.c b/src/server/image.c index da19b6a..875117b 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -188,11 +188,16 @@ void image_saveAllCacheMaps() spin_lock( &imageListLock ); for (int i = 0; i < _num_images; ++i) { if ( _images[i] == NULL ) continue; - _images[i]->users++; + dnbd3_image_t * const image = _images[i]; + spin_lock( &image->lock ); + image->users++; + spin_unlock( &image->lock ); spin_unlock( &imageListLock ); - image_saveCacheMap( _images[i] ); + image_saveCacheMap( image ); spin_lock( &imageListLock ); - _images[i]->users--; + spin_lock( &image->lock ); + image->users--; + spin_unlock( &image->lock ); } spin_unlock( &imageListLock ); } @@ -348,6 +353,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) } else { // Seems everything is fine again \o/ candidate->working = true; + logadd( LOG_INFO, "Changed state of %s:%d to 'working'", candidate->lower_name, candidate->rid ); } } } @@ -415,23 +421,24 @@ dnbd3_image_t* image_release(dnbd3_image_t *image) /** * Remove image from images array. Only free it if it has - * no active users + * no active users and was actually in the list. * Locks on: imageListLock, image[].lock */ static void image_remove(dnbd3_image_t *image) { - bool wasInList = false; + bool mustFree = false; spin_lock( &imageListLock ); spin_lock( &image->lock ); - for (int i = _num_images - 1; i >= 0; --i) { - if ( _images[i] != image ) continue; - _images[i] = NULL; - wasInList = true; - if ( i + 1 == _num_images ) _num_images--; + for ( int i = _num_images - 1; i >= 0; --i ) { + if ( _images[i] == image ) { + _images[i] = NULL; + mustFree = ( image->users == 0 ); + } + if ( _images[i] == NULL && i + 1 == _num_images ) _num_images--; } spin_unlock( &image->lock ); spin_unlock( &imageListLock ); - if ( wasInList && image->users == 0 ) image = image_free( image ); + if ( mustFree ) image = image_free( image ); } /** @@ -487,11 +494,15 @@ bool image_loadAll(char *path) imgHandle = _images[i]; _images[i] = NULL; if ( i + 1 == _num_images ) _num_images--; - if ( imgHandle->users != 0 ) continue; // Still in use, do not free (last releasing user will trigger) - // Image is not in use anymore, free the dangling entry immediately - spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock - image_free( imgHandle ); - spin_lock( &imageListLock ); + spin_lock( &imgHandle->lock ); + const bool freeImg = ( imgHandle->users == 0 ); + spin_unlock( &imgHandle->lock ); + if ( freeImg ) { + // Image is not in use anymore, free the dangling entry immediately + spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock + image_free( imgHandle ); + spin_lock( &imageListLock ); + } } spin_unlock( &imageListLock ); if ( _shutdown ) return true; @@ -514,7 +525,7 @@ bool image_tryFreeAll() { spin_lock( &imageListLock ); for (int i = _num_images - 1; i >= 0; --i) { - if ( _images[i] != NULL && _images[i]->users == 0 ) { + if ( _images[i] != NULL && _images[i]->users == 0 ) { // XXX Data race... _images[i] = image_free( _images[i] ); } if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--; @@ -585,26 +596,28 @@ static bool image_load_all_internal(char *base, char *path) #define SUBDIR_LEN 120 assert( path != NULL ); assert( *path == '/' ); - struct dirent *entry; - DIR *dir = opendir( path ); + struct dirent entry, *entryPtr; + const int pathLen = strlen( path ); + char subpath[PATHLEN]; + struct stat st; + DIR * const dir = opendir( path ); + if ( dir == NULL ) { logadd( LOG_ERROR, "Could not opendir '%s' for loading", path ); return false; } - const int pathLen = strlen( path ); - const int len = pathLen + SUBDIR_LEN + 1; - char subpath[len]; - struct stat st; - while ( !_shutdown && (entry = readdir( dir )) != NULL ) { - if ( strcmp( entry->d_name, "." ) == 0 || strcmp( entry->d_name, ".." ) == 0 ) continue; - if ( strlen( entry->d_name ) > SUBDIR_LEN ) { - logadd( LOG_WARNING, "Skipping entry %s: Too long (max %d bytes)", entry->d_name, (int)SUBDIR_LEN ); + + while ( !_shutdown && (entryPtr = readdir( dir )) != NULL ) { + entry = *entryPtr; + if ( strcmp( entry.d_name, "." ) == 0 || strcmp( entry.d_name, ".." ) == 0 ) continue; + if ( strlen( entry.d_name ) > SUBDIR_LEN ) { + logadd( LOG_WARNING, "Skipping entry %s: Too long (max %d bytes)", entry.d_name, (int)SUBDIR_LEN ); continue; } - if ( entry->d_name[0] == '/' || path[pathLen - 1] == '/' ) { - snprintf( subpath, len, "%s%s", path, entry->d_name ); + if ( entry.d_name[0] == '/' || path[pathLen - 1] == '/' ) { + snprintf( subpath, PATHLEN, "%s%s", path, entry.d_name ); } else { - snprintf( subpath, len, "%s/%s", path, entry->d_name ); + snprintf( subpath, PATHLEN, "%s/%s", path, entry.d_name ); } if ( stat( subpath, &st ) < 0 ) { logadd( LOG_WARNING, "stat() for '%s' failed. Ignoring....", subpath ); @@ -820,18 +833,18 @@ static bool image_load(char *base, char *path, int withUplink) } if ( i >= _num_images ) { if ( _num_images >= SERVER_MAX_IMAGES ) { - logadd( LOG_ERROR, "Cannot load image '%s': maximum number of images reached.", path ); spin_unlock( &imageListLock ); + logadd( LOG_ERROR, "Cannot load image '%s': maximum number of images reached.", path ); image = image_free( image ); goto load_error; } _images[_num_images++] = image; - logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->lower_name, (int)image->rid ); } // Keep fd for reading image->readFd = fdImage; fdImage = -1; spin_unlock( &imageListLock ); + logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->lower_name, (int)image->rid ); function_return = true; @@ -1522,7 +1535,7 @@ static bool image_ensureDiskSpace(uint64_t size) (int)(size / (1024 * 1024)) ); // Find least recently used image dnbd3_image_t *oldest = NULL; - int i; + int i; // XXX improve locking for (i = 0; i < _num_images; ++i) { if ( _images[i] == NULL ) continue; dnbd3_image_t *current = image_lock( _images[i] ); diff --git a/src/server/integrity.c b/src/server/integrity.c index ef909aa..c697be8 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -41,13 +41,15 @@ void integrity_init() assert( queueLen == -1 ); pthread_mutex_init( &integrityQueueLock, NULL ); pthread_cond_init( &queueSignal, NULL ); + pthread_mutex_lock( &integrityQueueLock ); + queueLen = 0; + pthread_mutex_unlock( &integrityQueueLock ); bRunning = true; if ( 0 != thread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) { bRunning = false; logadd( LOG_WARNING, "Could not start integrity check thread. Corrupted images will not be detected." ); return; } - queueLen = 0; } void integrity_shutdown() diff --git a/src/server/net.c b/src/server/net.c index 9dbba50..8929937 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -157,7 +157,7 @@ void net_init() void *net_client_handler(void *dnbd3_client) { - dnbd3_client_t *client = (dnbd3_client_t *)dnbd3_client; + dnbd3_client_t * const client = (dnbd3_client_t *)dnbd3_client; dnbd3_request_t request; dnbd3_reply_t reply; @@ -180,9 +180,10 @@ void *net_client_handler(void *dnbd3_client) reply.magic = dnbd3_packet_magic; sock_setTimeout( client->sock, _clientTimeout ); - if ( !host_to_string( &client->host, client->hostName, HOSTNAMELEN ) ) { - client->hostName[HOSTNAMELEN-1] = '\0'; - } + spin_lock( &client->lock ); + host_to_string( &client->host, client->hostName, HOSTNAMELEN ); + client->hostName[HOSTNAMELEN-1] = '\0'; + spin_unlock( &client->lock ); // Receive first packet. This must be CMD_SELECT_IMAGE by protocol specification if ( recv_request_header( client->sock, &request ) ) { @@ -201,7 +202,10 @@ void *net_client_handler(void *dnbd3_client) logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName ); } } else { - client->image = image = image_getOrLoad( image_name, rid ); + image = image_getOrLoad( image_name, rid ); + spin_lock( &client->lock ); + client->image = image; + spin_unlock( &client->lock ); if ( image == NULL ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); } else if ( !image->working ) { @@ -225,7 +229,7 @@ void *net_client_handler(void *dnbd3_client) serializer_reset_write( &payload ); serializer_put_uint16( &payload, PROTOCOL_VERSION ); serializer_put_string( &payload, image->lower_name ); - serializer_put_uint16( &payload, image->rid ); + serializer_put_uint16( &payload, (uint16_t)image->rid ); serializer_put_uint64( &payload, image->virtualFilesize ); reply.cmd = CMD_SELECT_IMAGE; reply.size = serializer_get_written_length( &payload ); @@ -326,8 +330,8 @@ void *net_client_handler(void *dnbd3_client) spin_unlock( &image->lock ); if ( !isCached ) { if ( !uplink_request( client, request.handle, request.offset, request.size ) ) { - logadd( LOG_DEBUG1, "Could not relay un-cached request from %s to upstream proxy, disabling image %s:%d", - client->hostName, image->lower_name, (int)image->rid ); + logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", + client->hostName, image->lower_name, image->rid ); image->working = false; goto exit_client_cleanup; } @@ -365,11 +369,15 @@ void *net_client_handler(void *dnbd3_client) const int err = errno; if ( lock ) pthread_mutex_unlock( &client->sendMutex ); if ( ret == -1 ) { - if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) { + if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN + && err != EAGAIN && err != EWOULDBLOCK ) { logadd( LOG_DEBUG1, "sendfile to %s failed (image to net. sent %d/%d, errno=%d)", client->hostName, (int)done, (int)realBytes, err ); } - if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false; + if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) { + logadd( LOG_INFO, "Disabling %s:%d", image->lower_name, image->rid ); + image->working = false; + } } goto exit_client_cleanup; } @@ -411,7 +419,9 @@ void *net_client_handler(void *dnbd3_client) pthread_mutex_lock( &client->sendMutex ); send_reply( client->sock, &reply, NULL ); pthread_mutex_unlock( &client->sendMutex ); + spin_lock( &image->lock ); image->atime = time( NULL ); + spin_unlock( &image->lock ); set_name: ; if ( !hasName ) { hasName = true; @@ -420,7 +430,6 @@ set_name: ; break; case CMD_SET_CLIENT_MODE: - image->atime = time( NULL ); client->isServer = false; break; @@ -449,7 +458,7 @@ set_name: ; exit_client_cleanup: ; dnbd3_removeClient( client ); net_updateGlobalSentStatsFromClient( client ); // Don't need client's lock here as it's not active anymore - client = dnbd3_freeClient( client ); // This will also call image_release on client->image + dnbd3_freeClient( client ); // This will also call image_release on client->image return NULL ; } diff --git a/src/server/rpc.c b/src/server/rpc.c index 3ea8a9a..4cd1f6e 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -61,25 +61,24 @@ static void clientsToJson(json_t *jsonClients) if ( _clients[i] == NULL ) { continue; } - // Do not lock on client.lock here: - // 1) .image can only be set once, will never change (just like .image.id) - // 2) .hostName never changes as well - // 3) .bytesSent and .tmpBytesSent are guarded by .statsLock - // 4) the client cannot be freed, as it's still in the list and we hold the list's lock - if ( _clients[i]->image == NULL ) { - imgId = -1; - } else { - strncpy( host, _clients[i]->hostName, HOSTNAMELEN - 1 ); - imgId = _clients[i]->image->id; - spin_lock( &_clients[i]->statsLock ); - bytesSent = _clients[i]->bytesSent; - net_updateGlobalSentStatsFromClient( _clients[i] ); // Do this since we read the totalBytesSent counter later - spin_unlock( &_clients[i]->statsLock ); - } + dnbd3_client_t * const client = _clients[i]; + spin_lock( &client->lock ); spin_unlock( &_clients_lock ); // Unlock so we give other threads a chance to access the client list. // We might not get an atomic snapshot of the currently connected clients, // but that doesn't really make a difference anyways. + if ( client->image == NULL ) { + spin_unlock( &client->lock ); + imgId = -1; + } else { + strncpy( host, client->hostName, HOSTNAMELEN - 1 ); + imgId = client->image->id; + spin_lock( &client->statsLock ); + spin_unlock( &client->lock ); + bytesSent = client->bytesSent; + net_updateGlobalSentStatsFromClient( client ); // Do this since we read the totalBytesSent counter later + spin_unlock( &client->statsLock ); + } if ( imgId != -1 ) { clientStats = json_pack( "{sssisI}", "address", host, @@ -91,3 +90,4 @@ static void clientsToJson(json_t *jsonClients) } spin_unlock( &_clients_lock ); } + diff --git a/src/server/server.c b/src/server/server.c index dd71312..30b8594 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -415,11 +415,11 @@ void dnbd3_removeClient(dnbd3_client_t *client) { int i; spin_lock( &_clients_lock ); - const int cutoff = MAX(10, _num_clients / 2); - for (i = _num_clients - 1; i >= 0; --i) { - if ( _clients[i] != client ) continue; - _clients[i] = NULL; - if ( i > cutoff && i + 1 == _num_clients ) --_num_clients; + for ( i = _num_clients - 1; i >= 0; --i ) { + if ( _clients[i] == client ) { + _clients[i] = NULL; + } + if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; } spin_unlock( &_clients_lock ); } @@ -443,9 +443,8 @@ dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client) spin_lock( &client->image->lock ); if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); spin_unlock( &client->image->lock ); - image_release( client->image ); + client->image = image_release( client->image ); } - client->image = NULL; spin_unlock( &client->lock ); spin_destroy( &client->lock ); spin_destroy( &client->statsLock ); diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 41f1f0b..b1c46a3 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -4,6 +4,7 @@ #include "../shared/signal.h" #include "locks.h" #include +#include typedef struct _entry_t { @@ -93,7 +94,7 @@ static void *threadpool_worker(void *entryPtr) if ( _shutdown ) break; if ( ret > 0 ) { if ( entry->startRoutine == NULL ) { - logadd( LOG_DEBUG1, "Worker woke up but has no work to do!\n" ); + logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" ); continue; } // Start assigned work @@ -119,7 +120,7 @@ static void *threadpool_worker(void *entryPtr) spin_unlock( &poolLock ); setThreadName( "[pool]" ); } else { - logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!\n", ret ); + logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); } } signal_close( entry->signalFd ); diff --git a/src/server/uplink.c b/src/server/uplink.c index 726b08b..e0bdcae 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -69,12 +69,15 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) goto failure; } link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); + spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); link->image = image; link->bytesReceived = 0; link->queueLen = 0; link->fd = -1; link->signal = -1; link->replicationHandle = 0; + spin_lock( &link->rttLock ); if ( sock >= 0 ) { link->betterFd = sock; link->betterServer = *host; @@ -83,9 +86,9 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->betterFd = -1; link->rttTestResult = RTT_IDLE; } + spin_unlock( &link->rttLock ); link->recvBufferLen = 0; link->shutdown = false; - spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) { logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; @@ -133,7 +136,7 @@ void uplink_shutdown(dnbd3_image_t *image) /** * Remove given client from uplink request queue - * Locks on: uplink.queueLock, client.sendMutex + * Locks on: uplink.queueLock */ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) { @@ -142,8 +145,8 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) if ( uplink->queue[i].client == client ) { uplink->queue[i].client = NULL; uplink->queue[i].status = ULR_FREE; - if ( uplink->queueLen == i + 1 ) uplink->queueLen--; } + if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--; } spin_unlock( &uplink->queueLock ); } @@ -273,16 +276,20 @@ static void* uplink_mainloop(void *data) } while ( !_shutdown && !link->shutdown ) { // Check if server switch is in order - if ( link->rttTestResult == RTT_DOCHANGE ) { + spin_lock( &link->rttLock ); + if ( link->rttTestResult != RTT_DOCHANGE ) { + spin_unlock( &link->rttLock ); + } else { link->rttTestResult = RTT_IDLE; - discoverFailCount = 0; // The rttTest worker thread has finished our request. // And says it's better to switch to another server const int fd = link->fd; link->fd = link->betterFd; - if ( fd != -1 ) close( fd ); link->betterFd = -1; link->currentServer = link->betterServer; + spin_unlock( &link->rttLock ); + discoverFailCount = 0; + if ( fd != -1 ) close( fd ); link->replicationHandle = 0; link->image->working = true; link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received @@ -372,7 +379,10 @@ static void* uplink_mainloop(void *data) } } // See if we should trigger an RTT measurement - if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { + spin_lock( &link->rttLock ); + const int rttTestResult = link->rttTestResult; + spin_unlock( &link->rttLock ); + if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) { // This probably means the system time was changed - handle this case properly by capping the timeout nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2; @@ -390,8 +400,10 @@ static void* uplink_mainloop(void *data) altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); nextAltCheck = now + altCheckInterval; } - } else if ( link->rttTestResult == RTT_NOT_REACHABLE ) { + } else if ( rttTestResult == RTT_NOT_REACHABLE ) { + spin_lock( &link->rttLock ); link->rttTestResult = RTT_IDLE; + spin_unlock( &link->rttLock ); discoverFailCount++; nextAltCheck = now + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED); } @@ -441,6 +453,7 @@ static void* uplink_mainloop(void *data) usleep( 10000 ); if ( link->betterFd != -1 ) close( link->betterFd ); spin_destroy( &link->queueLock ); + spin_destroy( &link->rttLock ); free( link->recvBuffer ); link->recvBuffer = NULL; spin_lock( &statisticsReceivedLock ); @@ -524,7 +537,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) /** * Receive data from uplink server and process/dispatch - * Locks on: link.lock, indirectly on images[].lock + * Locks on: link.lock, images[].lock */ static void uplink_handleReceive(dnbd3_connection_t *link) { @@ -566,7 +579,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link) struct iovec iov[2]; const uint64_t start = inReply.handle; const uint64_t end = inReply.handle + inReply.size; + spin_lock( &link->image->lock ); link->bytesReceived += inReply.size; + spin_unlock( &link->image->lock ); // 1) Write to cache file if ( link->image->cacheFd != -1 ) { uint32_t done = 0; @@ -644,7 +659,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) #endif if ( start == link->replicationHandle ) link->replicationHandle = 0; } - if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link ); + spin_lock( &link->queueLock ); + const bool rep = ( link->queueLen == 0 ); + spin_unlock( &link->queueLock ); + if ( rep ) uplink_sendReplicationRequest( link ); return; // Error handling from failed receive or message parsing error_cleanup: ; diff --git a/src/shared/log.c b/src/shared/log.c index 6d77dc5..da27392 100644 --- a/src/shared/log.c +++ b/src/shared/log.c @@ -82,12 +82,12 @@ void logadd(const logmask_t mask, const char *fmt, ...) va_list ap; int ret; time_t rawtime; - struct tm *timeinfo; + struct tm timeinfo; char buffer[LINE_LEN]; time( &rawtime ); - timeinfo = localtime( &rawtime ); - size_t offset = strftime( buffer, LINE_LEN, "[%d.%m. %H:%M:%S] ", timeinfo ); + localtime_r( &rawtime, &timeinfo ); + size_t offset = strftime( buffer, LINE_LEN, "[%d.%m. %H:%M:%S] ", &timeinfo ); offset += writeLevel( buffer + offset, mask ); va_start( ap, fmt ); ret = vsnprintf( buffer + offset, LINE_LEN - offset, fmt, ap ); diff --git a/src/shared/sockhelper.c b/src/shared/sockhelper.c index e93d45c..d1dbd8c 100644 --- a/src/shared/sockhelper.c +++ b/src/shared/sockhelper.c @@ -207,7 +207,7 @@ int sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int l bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port) { if ( list->count >= MAXLISTEN ) return false; - struct addrinfo hints, *res, *ptr; + struct addrinfo hints, *res = NULL, *ptr; char portStr[6]; const int on = 1; int openCount = 0; -- cgit v1.2.3-55-g7522