From b071050dd6a99c54c5995dc0f5694edd847a2792 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 26 Jul 2019 17:22:56 +0200 Subject: [SERVER] Turn all spinlocks into mutexes Just assume sane platforms offer smart mutexes that have a fast-path with spinlocks internally for locks that have little to no congestion. In all other cases, mutexes should perform better anyways. --- src/server/altservers.c | 101 +++++++++++++------------ src/server/globals.c | 10 ++- src/server/globals.h | 8 +- src/server/image.c | 196 ++++++++++++++++++++++++------------------------ src/server/integrity.c | 48 ++++++------ src/server/locks.c | 129 ++++++++++++++++++++----------- src/server/locks.h | 35 +++++---- src/server/net.c | 110 +++++++++++++-------------- src/server/rpc.c | 34 ++++----- src/server/threadpool.c | 20 ++--- src/server/uplink.c | 188 +++++++++++++++++++++++----------------------- 11 files changed, 467 insertions(+), 412 deletions(-) diff --git a/src/server/altservers.c b/src/server/altservers.c index b91ceab..bbbc584 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -15,13 +15,13 @@ #define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; -static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) -static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL) +static pthread_mutex_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) +static pthread_mutex_t pendingLockConsume; // Lock for removing something (nonNULL -> NULL) static dnbd3_signal_t* runSignal = NULL; static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; static int numAltServers = 0; -static pthread_spinlock_t altServersLock; +static pthread_mutex_t altServersLock; static pthread_t altThread; @@ -32,8 +32,9 @@ void altservers_init() { srand( (unsigned int)time( NULL ) ); // Init spinlock - spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE ); - spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE ); + mutex_init( &pendingLockWrite ); + mutex_init( &pendingLockConsume ); + mutex_init( &altServersLock ); // Init signal runSignal = signal_new(); if ( runSignal == NULL ) { @@ -48,11 +49,11 @@ void altservers_init() // Init waiting links queue -- this is currently a global static array so // it will already be zero, but in case we refactor later do it explicitly // while also holding the write lock so thread sanitizer is happy - spin_lock( &pendingLockWrite ); + mutex_lock( &pendingLockWrite ); for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { pending[i] = NULL; } - spin_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockWrite ); } void altservers_shutdown() @@ -99,10 +100,10 @@ int altservers_load() bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly) { int i, freeSlot = -1; - spin_lock( &altServersLock ); + mutex_lock( &altServersLock ); for (i = 0; i < numAltServers; ++i) { if ( isSameAddressPort( &altServers[i].host, host ) ) { - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); return false; } else if ( freeSlot == -1 && altServers[i].host.type == 0 ) { freeSlot = i; @@ -111,7 +112,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate if ( freeSlot == -1 ) { if ( numAltServers >= SERVER_MAX_ALTS ) { logadd( LOG_WARNING, "Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS ); - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); return false; } freeSlot = numAltServers++; @@ -120,7 +121,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate altServers[freeSlot].isPrivate = isPrivate; altServers[freeSlot].isClientOnly = isClientOnly; if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment ); - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); return true; } @@ -135,14 +136,14 @@ void altservers_findUplink(dnbd3_connection_t *uplink) // never be that the uplink is supposed to switch, but instead calls // this function. assert( uplink->betterFd == -1 ); - spin_lock( &pendingLockWrite ); + mutex_lock( &pendingLockWrite ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress if ( uplink->rttTestResult == RTT_INPROGRESS ) { for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] != uplink ) continue; // Yep, measuring right now - spin_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockWrite ); return; } } @@ -151,12 +152,12 @@ void altservers_findUplink(dnbd3_connection_t *uplink) if ( pending[i] != NULL ) continue; pending[i] = uplink; uplink->rttTestResult = RTT_INPROGRESS; - spin_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockWrite ); signal_call( runSignal ); // Wake altservers thread up return; } // End of loop - no free slot - spin_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockWrite ); logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." ); } @@ -165,16 +166,16 @@ void altservers_findUplink(dnbd3_connection_t *uplink) */ void altservers_removeUplink(dnbd3_connection_t *uplink) { - pthread_mutex_lock( &pendingLockConsume ); - spin_lock( &pendingLockWrite ); + mutex_lock( &pendingLockConsume ); + mutex_lock( &pendingLockWrite ); for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] == uplink ) { uplink->rttTestResult = RTT_NOT_REACHABLE; pending[i] = NULL; } } - spin_unlock( &pendingLockWrite ); - pthread_mutex_unlock( &pendingLockConsume ); + mutex_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockConsume ); } /** @@ -190,7 +191,7 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output int count = 0; int scores[size]; int score; - spin_lock( &altServersLock ); + mutex_lock( &altServersLock ); if ( size > numAltServers ) size = numAltServers; for (i = 0; i < numAltServers; ++i) { if ( altServers[i].host.type == 0 ) continue; // Slot is empty @@ -226,7 +227,7 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output } } } - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); return count; } @@ -242,7 +243,7 @@ int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency) int count = 0, i; ticks now; timing_get( &now ); - spin_lock( &altServersLock ); + mutex_lock( &altServersLock ); // Flip first server in list with a random one every time this is called if ( numAltServers > 1 ) { const dnbd3_alt_server_t tmp = altServers[0]; @@ -273,7 +274,7 @@ int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency) output[count++] = srv->host; if ( count >= size ) break; } - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); return count; } @@ -281,12 +282,12 @@ json_t* altservers_toJson() { json_t *list = json_array(); - spin_lock( &altServersLock ); + mutex_lock( &altServersLock ); char host[100]; const int count = numAltServers; dnbd3_alt_server_t src[count]; memcpy( src, altServers, sizeof(src) ); - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); for (int i = 0; i < count; ++i) { json_t *rtts = json_array(); for (int j = 0; j < SERVER_RTT_PROBES; ++j) { @@ -313,7 +314,7 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const { unsigned int avg = rtt; int i; - spin_lock( &altServersLock ); + mutex_lock( &altServersLock ); for (i = 0; i < numAltServers; ++i) { if ( !isSameAddressPort( host, &altServers[i].host ) ) continue; altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt; @@ -334,7 +335,7 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const } break; } - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); return avg; } @@ -369,7 +370,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host) int foundIndex = -1, lastOk = -1; ticks now; timing_get( &now ); - spin_lock( &altServersLock ); + mutex_lock( &altServersLock ); for (i = 0; i < numAltServers; ++i) { if ( foundIndex == -1 ) { // Looking for the failed server in list @@ -395,7 +396,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host) altServers[lastOk] = tmp; } } - spin_unlock( &altServersLock ); + mutex_unlock( &altServersLock ); } /** * Mainloop of this module. It will wait for requests by uplinks to find a @@ -432,27 +433,27 @@ static void *altservers_main(void *data UNUSED) } // Work your way through the queue for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { - spin_lock( &pendingLockWrite ); + mutex_lock( &pendingLockWrite ); if ( pending[itLink] == NULL ) { - spin_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockWrite ); continue; // Check once before locking, as a mutex is expensive } - spin_unlock( &pendingLockWrite ); - pthread_mutex_lock( &pendingLockConsume ); - spin_lock( &pendingLockWrite ); + mutex_unlock( &pendingLockWrite ); + mutex_lock( &pendingLockConsume ); + mutex_lock( &pendingLockWrite ); dnbd3_connection_t * const uplink = pending[itLink]; - spin_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockWrite ); if ( uplink == NULL ) { // Check again after locking - pthread_mutex_unlock( &pendingLockConsume ); + mutex_unlock( &pendingLockConsume ); continue; } dnbd3_image_t * const image = image_lock( uplink->image ); if ( image == NULL ) { // Check again after locking uplink->rttTestResult = RTT_NOT_REACHABLE; - spin_lock( &pendingLockWrite ); + mutex_lock( &pendingLockWrite ); pending[itLink] = NULL; - spin_unlock( &pendingLockWrite ); - pthread_mutex_unlock( &pendingLockConsume ); + mutex_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockConsume ); logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" ); continue; } @@ -520,7 +521,7 @@ static void *altservers_main(void *data UNUSED) } clock_gettime( BEST_CLOCK_SOURCE, &end ); // Measurement done - everything fine so far - spin_lock( &uplink->rttLock ); + mutex_lock( &uplink->rttLock ); const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer ); // Penaltize rtt if this was a cycle; this will treat this server with lower priority // in the near future too, so we prevent alternating between two servers that are both @@ -531,7 +532,7 @@ static void *altservers_main(void *data UNUSED) unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000; - spin_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->rttLock ); if ( uplink->fd != -1 && isCurrent ) { // Was measuring current server currentRtt = avg; @@ -565,25 +566,25 @@ static void *altservers_main(void *data UNUSED) LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); } sock_setTimeout( bestSock, _uplinkTimeout ); - spin_lock( &uplink->rttLock ); + mutex_lock( &uplink->rttLock ); uplink->betterFd = bestSock; uplink->betterServer = servers[bestIndex]; uplink->betterVersion = bestProtocolVersion; uplink->rttTestResult = RTT_DOCHANGE; - spin_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); } else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) { // No server was reachable - spin_lock( &uplink->rttLock ); + mutex_lock( &uplink->rttLock ); uplink->rttTestResult = RTT_NOT_REACHABLE; - spin_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->rttLock ); } else { // nope if ( bestSock != -1 ) close( bestSock ); - spin_lock( &uplink->rttLock ); + mutex_lock( &uplink->rttLock ); uplink->rttTestResult = RTT_DONTCHANGE; uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away - spin_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->rttLock ); if ( !image->working ) { image->working = true; LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink ); @@ -591,10 +592,10 @@ static void *altservers_main(void *data UNUSED) } image_release( image ); // end of loop over all pending uplinks - spin_lock( &pendingLockWrite ); + mutex_lock( &pendingLockWrite ); pending[itLink] = NULL; - spin_unlock( &pendingLockWrite ); - pthread_mutex_unlock( &pendingLockConsume ); + mutex_unlock( &pendingLockWrite ); + mutex_unlock( &pendingLockConsume ); } // Save cache maps of all images if applicable declare_now; diff --git a/src/server/globals.c b/src/server/globals.c index 010274d..d0de704 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -1,5 +1,6 @@ #include "globals.h" #include "ini.h" +#include "locks.h" #include "../shared/log.h" #include #include @@ -39,7 +40,7 @@ atomic_bool _pretendClient = false; * ignore certain values which cannot be changed safely at runtime. */ static atomic_bool initialLoad = true; -static pthread_mutex_t loadLock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t loadLock; #define IS_TRUE(value) (atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0) #define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0) @@ -110,7 +111,10 @@ void globals_loadConfig() char *name = NULL; asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME ); if ( name == NULL ) return; - if ( pthread_mutex_trylock( &loadLock ) != 0 ) { + if ( initialLoad ) { + mutex_init( &loadLock ); + } + if ( mutex_trylock( &loadLock ) != 0 ) { logadd( LOG_INFO, "Ignoring config reload request due to already running reload" ); return; } @@ -128,7 +132,7 @@ void globals_loadConfig() globals_dumpConfig( buffer, sizeof(buffer) ); logadd( LOG_DEBUG1, "Effective configuration:\n%s", buffer ); initialLoad = false; - pthread_mutex_unlock( &loadLock ); + mutex_unlock( &loadLock ); } static void sanitizeFixedConfig() diff --git a/src/server/globals.h b/src/server/globals.h index 031f565..b248800 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -54,10 +54,10 @@ struct _dnbd3_connection dnbd3_signal_t* signal; // used to wake up the process pthread_t thread; // thread holding the connection pthread_mutex_t sendMutex; // For locking socket while sending - pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. + pthread_mutex_t queueLock; // lock for synchronization on request queue etc. dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer dnbd3_host_t currentServer; // Current server we're connected to - pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer + pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer int rttTestResult; // RTT_* int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! int betterVersion; // protocol version of better server @@ -121,7 +121,7 @@ struct _dnbd3_image int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected uint16_t rid; // revision of image - pthread_spinlock_t lock; + pthread_mutex_t lock; }; struct _dnbd3_client @@ -134,7 +134,7 @@ struct _dnbd3_client dnbd3_host_t host; char hostName[HOSTNAMELEN]; // inet_ntop version of host pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too) - pthread_spinlock_t lock; + pthread_mutex_t lock; }; // ####################################################### diff --git a/src/server/image.c b/src/server/image.c index 061f9a3..bfba6cb 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -25,9 +25,9 @@ static dnbd3_image_t *_images[SERVER_MAX_IMAGES]; static int _num_images = 0; -static pthread_spinlock_t imageListLock; -static pthread_mutex_t remoteCloneLock = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t reloadLock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t imageListLock; +static pthread_mutex_t remoteCloneLock; +static pthread_mutex_t reloadLock; #define NAMELEN 500 #define CACHELEN 20 typedef struct @@ -59,7 +59,9 @@ static bool image_checkRandomBlocks(const int count, int fdImage, const int64_t void image_serverStartup() { srand( (unsigned int)time( NULL ) ); - spin_init( &imageListLock, PTHREAD_PROCESS_PRIVATE ); + mutex_init( &imageListLock ); + mutex_init( &remoteCloneLock ); + mutex_init( &reloadLock ); } /** @@ -87,12 +89,12 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co return; bool setNewBlocks = false; uint64_t pos = start; - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->cache_map == NULL ) { // Image seems already complete if ( set ) { // This makes no sense - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); logadd( LOG_DEBUG1, "image_updateCachemap(true) with no cache_map: %s", image->path ); return; } @@ -125,14 +127,14 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co if ( image->cache_map == NULL ) break; const int block = (int)( pos / HASH_BLOCK_SIZE ); if ( image_isHashBlockComplete( image->cache_map, block, image->realFilesize ) ) { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); integrity_check( image, block ); - spin_lock( &image->lock ); + mutex_lock( &image->lock ); } pos += HASH_BLOCK_SIZE; } } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); } /** @@ -144,13 +146,13 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co bool image_isComplete(dnbd3_image_t *image) { assert( image != NULL ); - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->virtualFilesize == 0 ) { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return false; } if ( image->cache_map == NULL ) { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return true; } bool complete = true; @@ -175,14 +177,14 @@ bool image_isComplete(dnbd3_image_t *image) complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte); } if ( !complete ) { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return false; } char mapfile[PATHLEN] = ""; free( image->cache_map ); image->cache_map = NULL; snprintf( mapfile, PATHLEN, "%s.map", image->path ); - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); unlink( mapfile ); return true; } @@ -215,18 +217,18 @@ bool image_ensureOpen(dnbd3_image_t *image) } } if ( newFd == -1 ) { - spin_lock( &image->lock ); + mutex_lock( &image->lock ); image->working = false; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return false; } - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->readFd == -1 ) { image->readFd = newFd; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); } else { // There was a race while opening the file (happens cause not locked cause blocking), we lost the race so close new fd and proceed - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); close( newFd ); } return image->readFd != -1; @@ -247,7 +249,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) const size_t slen = strlen( name ); if ( slen == 0 || name[slen - 1] == '/' || name[0] == '/' ) return NULL ; // Go through array - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { dnbd3_image_t * const image = _images[i]; if ( image == NULL || strcmp( image->name, name ) != 0 ) continue; @@ -261,14 +263,14 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) // Not found if ( candidate == NULL ) { - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return NULL ; } - spin_lock( &candidate->lock ); - spin_unlock( &imageListLock ); + mutex_lock( &candidate->lock ); + mutex_unlock( &imageListLock ); candidate->users++; - spin_unlock( &candidate->lock ); + mutex_unlock( &candidate->lock ); // Found, see if it works // TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list @@ -276,9 +278,9 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) if ( candidate->working || checkIfWorking ) { // Is marked working, but might not have an fd open if ( !image_ensureOpen( candidate ) ) { - spin_lock( &candidate->lock ); + mutex_lock( &candidate->lock ); timing_get( &candidate->lastWorkCheck ); - spin_unlock( &candidate->lock ); + mutex_unlock( &candidate->lock ); if ( _removeMissingImages ) { candidate = image_remove( candidate ); // No release here, the image is still returned and should be released by caller } @@ -291,14 +293,14 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) // ...not working... // Don't re-check too often - spin_lock( &candidate->lock ); + mutex_lock( &candidate->lock ); bool check; declare_now; check = timing_diff( &candidate->lastWorkCheck, &now ) > NONWORKING_RECHECK_INTERVAL_SECONDS; if ( check ) { candidate->lastWorkCheck = now; } - spin_unlock( &candidate->lock ); + mutex_unlock( &candidate->lock ); if ( !check ) { return candidate; } @@ -347,19 +349,19 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) img->rid = candidate->rid; img->users = 1; img->working = false; - spin_init( &img->lock, PTHREAD_PROCESS_PRIVATE ); + mutex_init( &img->lock ); if ( candidate->crc32 != NULL ) { const size_t mb = IMGSIZE_TO_HASHBLOCKS( candidate->virtualFilesize ) * sizeof(uint32_t); img->crc32 = malloc( mb ); memcpy( img->crc32, candidate->crc32, mb ); } - spin_lock( &candidate->lock ); + mutex_lock( &candidate->lock ); if ( candidate->cache_map != NULL ) { const size_t mb = IMGSIZE_TO_MAPBYTES( candidate->virtualFilesize ); img->cache_map = malloc( mb ); memcpy( img->cache_map, candidate->cache_map, mb ); } - spin_unlock( &candidate->lock ); + mutex_unlock( &candidate->lock ); if ( image_addToList( img ) ) { image_release( candidate ); candidate = img; @@ -393,17 +395,17 @@ dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places tha { if ( image == NULL ) return NULL ; int i; - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { if ( _images[i] == image ) { - spin_lock( &image->lock ); - spin_unlock( &imageListLock ); + mutex_lock( &image->lock ); + mutex_unlock( &imageListLock ); image->users++; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return image; } } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return NULL ; } @@ -416,14 +418,14 @@ dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places tha dnbd3_image_t* image_release(dnbd3_image_t *image) { if ( image == NULL ) return NULL; - spin_lock( &imageListLock ); - spin_lock( &image->lock ); + mutex_lock( &imageListLock ); + mutex_lock( &image->lock ); assert( image->users > 0 ); image->users--; bool inUse = image->users != 0; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( inUse ) { // Still in use, do nothing - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return NULL; } // Getting here means we decreased the usage counter to zero @@ -431,11 +433,11 @@ dnbd3_image_t* image_release(dnbd3_image_t *image) // responsible for freeing it for (int i = 0; i < _num_images; ++i) { if ( _images[i] == image ) { // Found, do nothing - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return NULL; } } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); // So it wasn't in the images list anymore either, get rid of it if ( !inUse ) image = image_free( image ); return NULL; @@ -467,8 +469,8 @@ static bool isForbiddenExtension(const char* name) static dnbd3_image_t* image_remove(dnbd3_image_t *image) { bool mustFree = false; - spin_lock( &imageListLock ); - spin_lock( &image->lock ); + mutex_lock( &imageListLock ); + mutex_lock( &image->lock ); for ( int i = _num_images - 1; i >= 0; --i ) { if ( _images[i] == image ) { _images[i] = NULL; @@ -476,8 +478,8 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image) } if ( _images[i] == NULL && i + 1 == _num_images ) _num_images--; } - spin_unlock( &image->lock ); - spin_unlock( &imageListLock ); + mutex_unlock( &image->lock ); + mutex_unlock( &imageListLock ); if ( mustFree ) image = image_free( image ); return image; } @@ -488,22 +490,22 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image) void image_killUplinks() { int i; - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { if ( _images[i] == NULL ) continue; - spin_lock( &_images[i]->lock ); + mutex_lock( &_images[i]->lock ); if ( _images[i]->uplink != NULL ) { - spin_lock( &_images[i]->uplink->queueLock ); + mutex_lock( &_images[i]->uplink->queueLock ); if ( !_images[i]->uplink->shutdown ) { thread_detach( _images[i]->uplink->thread ); _images[i]->uplink->shutdown = true; } - spin_unlock( &_images[i]->uplink->queueLock ); + mutex_unlock( &_images[i]->uplink->queueLock ); signal_call( _images[i]->uplink->signal ); } - spin_unlock( &_images[i]->lock ); + mutex_unlock( &_images[i]->lock ); } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); } /** @@ -518,14 +520,14 @@ bool image_loadAll(char *path) dnbd3_image_t *imgHandle; if ( path == NULL ) path = _basePath; - if ( pthread_mutex_trylock( &reloadLock ) != 0 ) { + if ( mutex_trylock( &reloadLock ) != 0 ) { logadd( LOG_MINOR, "Could not (re)load image list, already in progress." ); return false; } if ( _removeMissingImages ) { // Check if all loaded images still exist on disk logadd( LOG_INFO, "Checking for vanished images" ); - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); for ( int i = _num_images - 1; i >= 0; --i ) { if ( _shutdown ) break; if ( _images[i] == NULL ) { @@ -534,38 +536,38 @@ bool image_loadAll(char *path) } imgId = _images[i]->id; snprintf( imgPath, PATHLEN, "%s", _images[i]->path ); - spin_unlock( &imageListLock ); // isReadable hits the fs; unlock + mutex_unlock( &imageListLock ); // isReadable hits the fs; unlock // Check if fill can still be opened for reading ret = file_isReadable( imgPath ); // Lock again, see if image is still there, free if required - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); if ( ret || i >= _num_images || _images[i] == NULL || _images[i]->id != imgId ) continue; // Image needs to be removed imgHandle = _images[i]; _images[i] = NULL; if ( i + 1 == _num_images ) _num_images--; - spin_lock( &imgHandle->lock ); + mutex_lock( &imgHandle->lock ); const bool freeImg = ( imgHandle->users == 0 ); - spin_unlock( &imgHandle->lock ); + mutex_unlock( &imgHandle->lock ); // We unlocked, but the image has been removed from the list already, so // there's no way the users-counter can increase at this point. if ( freeImg ) { // Image is not in use anymore, free the dangling entry immediately - spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock + mutex_unlock( &imageListLock ); // image_free might do several fs operations; unlock image_free( imgHandle ); - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); } } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); if ( _shutdown ) { - pthread_mutex_unlock( &reloadLock ); + mutex_unlock( &reloadLock ); return true; } } // Now scan for new images logadd( LOG_INFO, "Scanning for new or modified images" ); ret = image_load_all_internal( path, path ); - pthread_mutex_unlock( &reloadLock ); + mutex_unlock( &reloadLock ); logadd( LOG_INFO, "Finished scanning %s", path ); return ret; } @@ -577,18 +579,18 @@ bool image_loadAll(char *path) */ bool image_tryFreeAll() { - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); for (int i = _num_images - 1; i >= 0; --i) { if ( _images[i] != NULL && _images[i]->users == 0 ) { // XXX Data race... dnbd3_image_t *image = _images[i]; _images[i] = NULL; - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); image = image_free( image ); - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); } if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--; } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return _num_images == 0; } @@ -604,7 +606,7 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) } // uplink_shutdown( image ); - spin_lock( &image->lock ); + mutex_lock( &image->lock ); free( image->cache_map ); free( image->crc32 ); free( image->path ); @@ -613,9 +615,9 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) image->crc32 = NULL; image->path = NULL; image->name = NULL; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( image->readFd != -1 ) close( image->readFd ); - spin_destroy( &image->lock ); + mutex_destroy( &image->lock ); // memset( image, 0, sizeof(*image) ); free( image ); @@ -700,7 +702,7 @@ static bool image_addToList(dnbd3_image_t *image) { int i; static int imgIdCounter = 0; // Used to assign unique numeric IDs to images - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); // Now we're locked, assign unique ID to image (unique for this running server instance!) image->id = ++imgIdCounter; for ( i = 0; i < _num_images; ++i ) { @@ -710,12 +712,12 @@ static bool image_addToList(dnbd3_image_t *image) } if ( i >= _num_images ) { if ( _num_images >= _maxImages ) { - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return false; } _images[_num_images++] = image; } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return true; } @@ -880,7 +882,7 @@ static bool image_load(char *base, char *path, int withUplink) image->working = (image->cache_map == NULL ); timing_get( &image->nextCompletenessEstimate ); image->completenessEstimate = -1; - spin_init( &image->lock, PTHREAD_PROCESS_PRIVATE ); + mutex_init( &image->lock ); int32_t offset; if ( stat( path, &st ) == 0 ) { // Negatively offset atime by file modification time @@ -1152,12 +1154,12 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, char *cmpname = name; int useIndex = -1, fallbackIndex = 0; if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN; - pthread_mutex_lock( &remoteCloneLock ); + mutex_lock( &remoteCloneLock ); for (int i = 0; i < CACHELEN; ++i) { if ( remoteCloneCache[i].rid == revision && strcmp( cmpname, remoteCloneCache[i].name ) == 0 ) { useIndex = i; if ( timing_reached( &remoteCloneCache[i].deadline, &now ) ) break; - pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked... + mutex_unlock( &remoteCloneLock ); // Was recently checked... return image; } if ( timing_1le2( &remoteCloneCache[i].deadline, &remoteCloneCache[fallbackIndex].deadline ) ) { @@ -1169,7 +1171,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, if ( revision != 0 ) { if ( image == NULL ) image = image_get( name, revision, true ); if ( image != NULL ) { - pthread_mutex_unlock( &remoteCloneLock ); + mutex_unlock( &remoteCloneLock ); return image; } } @@ -1182,7 +1184,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, timing_set( &remoteCloneCache[useIndex].deadline, &now, SERVER_REMOTE_IMAGE_CHECK_CACHETIME ); snprintf( remoteCloneCache[useIndex].name, NAMELEN, "%s", cmpname ); remoteCloneCache[useIndex].rid = revision; - pthread_mutex_unlock( &remoteCloneLock ); + mutex_unlock( &remoteCloneLock ); // Get some alt servers and try to get the image from there #define REP_NUM_SRV (8) @@ -1229,7 +1231,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, logadd( LOG_MINOR, "Won't proxy '%s:%d': Larger than maxReplicationSize", name, (int)revision ); goto server_fail; } - pthread_mutex_lock( &reloadLock ); + mutex_lock( &reloadLock ); // Ensure disk space entirely if not using sparse files, otherwise just make sure we have some room at least if ( _sparseFiles ) { ok = image_ensureDiskSpace( 2ull * 1024 * 1024 * 1024, false ); // 2GiB, maybe configurable one day @@ -1237,7 +1239,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, ok = image_ensureDiskSpace( remoteImageSize + ( 10 * 1024 * 1024 ), false ); // some extra space for cache map etc. } ok = ok && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img - pthread_mutex_unlock( &reloadLock ); + mutex_unlock( &reloadLock ); if ( !ok ) goto server_fail; // Cloning worked :-) @@ -1343,18 +1345,18 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste } // Now lock on the loading mutex, then check again if the image exists (we're multi-threaded) - pthread_mutex_lock( &reloadLock ); + mutex_lock( &reloadLock ); dnbd3_image_t* image = image_get( name, detectedRid, true ); if ( image != NULL ) { // The image magically appeared in the meantime logadd( LOG_DEBUG2, "Magically appeared" ); - pthread_mutex_unlock( &reloadLock ); + mutex_unlock( &reloadLock ); return image; } // Still not loaded, let's try to do so logadd( LOG_DEBUG2, "Calling load" ); image_load( _basePath, imageFile, false ); - pthread_mutex_unlock( &reloadLock ); + mutex_unlock( &reloadLock ); // If loading succeeded, this will return the image logadd( LOG_DEBUG2, "Calling get" ); return image_get( name, requestedRid, true ); @@ -1507,12 +1509,12 @@ json_t* image_getListAsJson() int users, completeness, idleTime; declare_now; - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); for ( i = 0; i < _num_images; ++i ) { if ( _images[i] == NULL ) continue; dnbd3_image_t *image = _images[i]; - spin_lock( &image->lock ); - spin_unlock( &imageListLock ); + mutex_lock( &image->lock ); + mutex_unlock( &imageListLock ); users = image->users; idleTime = (int)timing_diff( &image->atime, &now ); completeness = image_getCompletenessEstimate( image ); @@ -1526,7 +1528,7 @@ json_t* image_getListAsJson() } } image->users++; // Prevent freeing after we unlock - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); jsonImage = json_pack( "{sisssisisisisI}", "id", image->id, // id, name, rid never change, so access them without locking @@ -1545,9 +1547,9 @@ json_t* image_getListAsJson() json_array_append_new( imagesJson, jsonImage ); image = image_release( image ); // Since we did image->users++; - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); return imagesJson; } @@ -1655,9 +1657,9 @@ static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_ bool image_ensureDiskSpaceLocked(uint64_t size, bool force) { bool ret; - pthread_mutex_lock( &reloadLock ); + mutex_lock( &reloadLock ); ret = image_ensureDiskSpace( size, force ); - pthread_mutex_unlock( &reloadLock ); + mutex_unlock( &reloadLock ); return ret; } @@ -1739,13 +1741,13 @@ void image_closeUnusedFd() ticks deadline; timing_gets( &deadline, -UNUSED_FD_TIMEOUT ); char imgstr[300]; - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { dnbd3_image_t * const image = _images[i]; if ( image == NULL ) continue; - spin_lock( &image->lock ); - spin_unlock( &imageListLock ); + mutex_lock( &image->lock ); + mutex_unlock( &imageListLock ); if ( image->users == 0 && image->uplink == NULL && timing_reached( &image->atime, &deadline ) ) { snprintf( imgstr, sizeof(imgstr), "%s:%d", image->name, (int)image->rid ); fd = image->readFd; @@ -1753,14 +1755,14 @@ void image_closeUnusedFd() } else { fd = -1; } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( fd != -1 ) { close( fd ); logadd( LOG_DEBUG1, "Inactive fd closed for %s", imgstr ); } - spin_lock( &imageListLock ); + mutex_lock( &imageListLock ); } - spin_unlock( &imageListLock ); + mutex_unlock( &imageListLock ); } /* diff --git a/src/server/integrity.c b/src/server/integrity.c index 88b7487..8f17855 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -39,11 +39,11 @@ static void* integrity_main(void *data); void integrity_init() { assert( queueLen == -1 ); - pthread_mutex_init( &integrityQueueLock, NULL ); + mutex_init( &integrityQueueLock ); pthread_cond_init( &queueSignal, NULL ); - pthread_mutex_lock( &integrityQueueLock ); + mutex_lock( &integrityQueueLock ); queueLen = 0; - pthread_mutex_unlock( &integrityQueueLock ); + mutex_unlock( &integrityQueueLock ); bRunning = true; if ( 0 != thread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) { bRunning = false; @@ -56,13 +56,13 @@ void integrity_shutdown() { assert( queueLen != -1 ); logadd( LOG_DEBUG1, "Shutting down integrity checker...\n" ); - pthread_mutex_lock( &integrityQueueLock ); + mutex_lock( &integrityQueueLock ); pthread_cond_signal( &queueSignal ); - pthread_mutex_unlock( &integrityQueueLock ); + mutex_unlock( &integrityQueueLock ); thread_join( thread, NULL ); while ( bRunning ) usleep( 10000 ); - pthread_mutex_destroy( &integrityQueueLock ); + mutex_destroy( &integrityQueueLock ); pthread_cond_destroy( &queueSignal ); logadd( LOG_DEBUG1, "Integrity checker exited normally.\n" ); } @@ -80,7 +80,7 @@ void integrity_check(dnbd3_image_t *image, int block) return; } int i, freeSlot = -1; - pthread_mutex_lock( &integrityQueueLock ); + mutex_lock( &integrityQueueLock ); for (i = 0; i < queueLen; ++i) { if ( freeSlot == -1 && checkQueue[i].image == NULL ) { freeSlot = i; @@ -92,13 +92,13 @@ void integrity_check(dnbd3_image_t *image, int block) checkQueue[i].count += 1; } logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (%d +%d)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); - pthread_mutex_unlock( &integrityQueueLock ); + mutex_unlock( &integrityQueueLock ); return; } } if ( freeSlot == -1 ) { if ( queueLen >= CHECK_QUEUE_SIZE ) { - pthread_mutex_unlock( &integrityQueueLock ); + mutex_unlock( &integrityQueueLock ); logadd( LOG_INFO, "Check queue full, discarding check request...\n" ); return; } @@ -113,7 +113,7 @@ void integrity_check(dnbd3_image_t *image, int block) checkQueue[freeSlot].count = 1; } pthread_cond_signal( &queueSignal ); - pthread_mutex_unlock( &integrityQueueLock ); + mutex_unlock( &integrityQueueLock ); } static void* integrity_main(void * data UNUSED) @@ -130,10 +130,10 @@ static void* integrity_main(void * data UNUSED) pid_t tid = (pid_t)syscall( SYS_gettid ); setpriority( PRIO_PROCESS, tid, 10 ); #endif - pthread_mutex_lock( &integrityQueueLock ); + mutex_lock( &integrityQueueLock ); while ( !_shutdown ) { if ( queueLen == 0 ) { - pthread_cond_wait( &queueSignal, &integrityQueueLock ); + mutex_cond_wait( &queueSignal, &integrityQueueLock ); } for (i = queueLen - 1; i >= 0; --i) { if ( _shutdown ) break; @@ -146,10 +146,10 @@ static void* integrity_main(void * data UNUSED) // We have the image. Call image_release() some time const int qCount = checkQueue[i].count; bool foundCorrupted = false; - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->crc32 != NULL && image->realFilesize != 0 ) { int blocks[2] = { checkQueue[i].block, -1 }; - pthread_mutex_unlock( &integrityQueueLock ); + mutex_unlock( &integrityQueueLock ); // Make copy of crc32 list as it might go away const uint64_t fileSize = image->realFilesize; const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize); @@ -160,7 +160,7 @@ static void* integrity_main(void * data UNUSED) buffer = malloc( bufferSize ); } memcpy( buffer, image->crc32, required ); - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); // Open for direct I/O if possible; this prevents polluting the fs cache int fd = open( image->path, O_RDONLY | O_DIRECT ); bool direct = fd != -1; @@ -178,9 +178,9 @@ static void* integrity_main(void * data UNUSED) bool complete = true; if ( qCount == CHECK_ALL ) { // When checking full image, skip incomplete blocks, otherwise assume block is complete - spin_lock( &image->lock ); + mutex_lock( &image->lock ); complete = image_isHashBlockComplete( image->cache_map, blocks[0], fileSize ); - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); } #if defined(linux) || defined(__linux) if ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) { @@ -220,7 +220,7 @@ static void* integrity_main(void * data UNUSED) close( fd ); } } - pthread_mutex_lock( &integrityQueueLock ); + mutex_lock( &integrityQueueLock ); assert( checkQueue[i].image == image ); if ( qCount != CHECK_ALL ) { // Not a full check; update the counter @@ -238,25 +238,25 @@ static void* integrity_main(void * data UNUSED) if ( i + 1 == queueLen ) queueLen--; // Mark as working again if applicable if ( !foundCorrupted ) { - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper? image->working = image->uplink->fd != -1 && image->readFd != -1; } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); } } else { // Still more blocks to go... checkQueue[i].block = blocks[0]; } } else { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); } if ( foundCorrupted ) { // Something was fishy, make sure uplink exists - spin_lock( &image->lock ); + mutex_lock( &image->lock ); image->working = false; bool restart = image->uplink == NULL || image->uplink->shutdown; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( restart ) { uplink_shutdown( image ); uplink_init( image, -1, NULL, -1 ); @@ -266,7 +266,7 @@ static void* integrity_main(void * data UNUSED) image_release( image ); } } - pthread_mutex_unlock( &integrityQueueLock ); + mutex_unlock( &integrityQueueLock ); if ( buffer != NULL ) free( buffer ); bRunning = false; return NULL; diff --git a/src/server/locks.c b/src/server/locks.c index 71a1845..a5b7c76 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -38,23 +38,23 @@ int debugThreadCount = 0; static debug_lock_t locks[MAXLOCKS]; static debug_thread_t threads[MAXTHREADS]; static int init_done = 0; -static pthread_spinlock_t initdestory; +static pthread_mutex_t initdestory; static int lockId = 0; static pthread_t watchdog = 0; static dnbd3_signal_t* watchdogSignal = NULL; static void *debug_thread_watchdog(void *something); -int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared) +int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock) { if ( !init_done ) { memset( locks, 0, MAXLOCKS * sizeof(debug_lock_t) ); memset( threads, 0, MAXTHREADS * sizeof(debug_thread_t) ); - pthread_spin_init( &initdestory, PTHREAD_PROCESS_PRIVATE ); + pthread_mutex_init( &initdestory, NULL ); init_done = 1; } int first = -1; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXLOCKS; ++i) { if ( locks[i].lock == lock ) { logadd( LOG_ERROR, "Lock %p (%s) already initialized (%s:%d)\n", (void*)lock, name, file, line ); @@ -64,7 +64,7 @@ int debug_spin_init(const char *name, const char *file, int line, pthread_spinlo } if ( first == -1 ) { logadd( LOG_ERROR, "No more free debug locks (%s:%d)\n", file, line ); - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); debug_dump_lock_stats(); exit( 4 ); } @@ -72,28 +72,28 @@ int debug_spin_init(const char *name, const char *file, int line, pthread_spinlo locks[first].locked = 0; snprintf( locks[first].name, LOCKLEN, "%s", name ); snprintf( locks[first].where, LOCKLEN, "I %s:%d", file, line ); - pthread_spin_unlock( &initdestory ); - return pthread_spin_init( lock, shared ); + pthread_mutex_unlock( &initdestory ); + return pthread_mutex_init( lock, NULL ); } -int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock) +int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock) { debug_lock_t *l = NULL; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXLOCKS; ++i) { if ( locks[i].lock == lock ) { l = &locks[i]; break; } } - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); if ( l == NULL ) { logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); debug_dump_lock_stats(); exit( 4 ); } debug_thread_t *t = NULL; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXTHREADS; ++i) { if ( threads[i].tid != 0 ) continue; threads[i].tid = pthread_self(); @@ -103,15 +103,15 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo t = &threads[i]; break; } - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); if ( t == NULL ) { logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); exit( 4 ); } - const int retval = pthread_spin_lock( lock ); - pthread_spin_lock( &initdestory ); + const int retval = pthread_mutex_lock( lock ); + pthread_mutex_lock( &initdestory ); t->tid = 0; - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); if ( l->locked ) { logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line ); exit( 4 ); @@ -120,30 +120,30 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo timing_get( &l->locktime ); l->thread = pthread_self(); snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); l->lockId = ++lockId; - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); return retval; } -int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock) +int debug_mutex_trylock(const char *name, const char *file, int line, pthread_mutex_t *lock) { debug_lock_t *l = NULL; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXLOCKS; ++i) { if ( locks[i].lock == lock ) { l = &locks[i]; break; } } - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); if ( l == NULL ) { logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); debug_dump_lock_stats(); exit( 4 ); } debug_thread_t *t = NULL; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXTHREADS; ++i) { if ( threads[i].tid != 0 ) continue; threads[i].tid = pthread_self(); @@ -153,15 +153,15 @@ int debug_spin_trylock(const char *name, const char *file, int line, pthread_spi t = &threads[i]; break; } - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); if ( t == NULL ) { logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for %p (%s) at %s:%d\n", (void*)lock, name, file, line ); exit( 4 ); } - const int retval = pthread_spin_trylock( lock ); - pthread_spin_lock( &initdestory ); + const int retval = pthread_mutex_trylock( lock ); + pthread_mutex_lock( &initdestory ); t->tid = 0; - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); if ( retval == 0 ) { if ( l->locked ) { logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line ); @@ -171,24 +171,24 @@ int debug_spin_trylock(const char *name, const char *file, int line, pthread_spi timing_get( &l->locktime ); l->thread = pthread_self(); snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); l->lockId = ++lockId; - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); } return retval; } -int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock) +int debug_mutex_unlock(const char *name, const char *file, int line, pthread_mutex_t *lock) { debug_lock_t *l = NULL; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXLOCKS; ++i) { if ( locks[i].lock == lock ) { l = &locks[i]; break; } } - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); if ( l == NULL ) { logadd( LOG_ERROR, "Tried to unlock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); exit( 4 ); @@ -200,13 +200,58 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin l->locked = 0; l->thread = 0; snprintf( l->where, LOCKLEN, "U %s:%d", file, line ); - int retval = pthread_spin_unlock( lock ); + int retval = pthread_mutex_unlock( lock ); return retval; } -int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock) +int debug_mutex_cond_wait(const char *name, const char *file, int line, pthread_cond_t *restrict cond, pthread_mutex_t *restrict lock) { - pthread_spin_lock( &initdestory ); + debug_lock_t *l = NULL; + pthread_mutex_lock( &initdestory ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == lock ) { + l = &locks[i]; + break; + } + } + pthread_mutex_unlock( &initdestory ); + if ( l == NULL ) { + logadd( LOG_ERROR, "Tried to cond_wait on uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + if ( !l->locked ) { + logadd( LOG_ERROR, "Cond_wait sanity check: lock %p (%s) not locked at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + pthread_t self = pthread_self(); + if ( l->thread != self ) { + logadd( LOG_ERROR, "Cond_wait called from non-owning thread for %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + l->locked = 0; + l->thread = 0; + snprintf( l->where, LOCKLEN, "CW %s:%d", file, line ); + int retval = pthread_cond_wait( cond, lock ); + if ( retval != 0 ) { + logadd( LOG_ERROR, "pthread_cond_wait returned %d for lock %p (%s) at %s:%d\n", retval, (void*)lock, name, file, line ); + exit( 4 ); + } + if ( l->locked != 0 || l->thread != 0 ) { + logadd( LOG_ERROR, "Lock is not free after returning from pthread_cond_wait for %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + l->locked = 1; + l->thread = self; + timing_get( &l->locktime ); + pthread_mutex_lock( &initdestory ); + l->lockId = ++lockId; + pthread_mutex_unlock( &initdestory ); + return retval; +} + +int debug_mutex_destroy(const char *name, const char *file, int line, pthread_mutex_t *lock) +{ + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXLOCKS; ++i) { if ( locks[i].lock == lock ) { if ( locks[i].locked ) { @@ -215,8 +260,8 @@ int debug_spin_destroy(const char *name, const char *file, int line, pthread_spi } locks[i].lock = NULL; snprintf( locks[i].where, LOCKLEN, "D %s:%d", file, line ); - pthread_spin_unlock( &initdestory ); - return pthread_spin_destroy( lock ); + pthread_mutex_unlock( &initdestory ); + return pthread_mutex_destroy( lock ); } } logadd( LOG_ERROR, "Tried to destroy non-existent lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); @@ -226,7 +271,7 @@ int debug_spin_destroy(const char *name, const char *file, int line, pthread_spi void debug_dump_lock_stats() { declare_now; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); printf( "\n **** LOCKS ****\n\n" ); for (int i = 0; i < MAXLOCKS; ++i) { if ( locks[i].lock == NULL ) continue; @@ -252,7 +297,7 @@ void debug_dump_lock_stats() "* Where: %s\n" "* How long: %d secs\n", (int)threads[i].tid, threads[i].name, threads[i].where, (int)timing_diff( &threads[i].time, &now ) ); } - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); } static void *debug_thread_watchdog(void *something UNUSED) @@ -261,18 +306,18 @@ static void *debug_thread_watchdog(void *something UNUSED) while ( !_shutdown ) { if ( init_done ) { declare_now; - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); for (int i = 0; i < MAXTHREADS; ++i) { if ( threads[i].tid == 0 ) continue; const uint32_t diff = timing_diff( &threads[i].time, &now ); if ( diff > 6 && diff < 100000 ) { printf( "\n\n +++++++++ DEADLOCK ++++++++++++\n\n" ); - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); debug_dump_lock_stats(); exit( 99 ); } } - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); } if ( watchdogSignal == NULL || signal_wait( watchdogSignal, 5000 ) == SIGNAL_ERROR ) sleep( 5 ); } @@ -297,9 +342,9 @@ void debug_locks_stop_watchdog() #ifdef _DEBUG _shutdown = true; printf( "Killing debug watchdog...\n" ); - pthread_spin_lock( &initdestory ); + pthread_mutex_lock( &initdestory ); signal_call( watchdogSignal ); - pthread_spin_unlock( &initdestory ); + pthread_mutex_unlock( &initdestory ); thread_join( watchdog, NULL ); signal_close( watchdogSignal ); #endif diff --git a/src/server/locks.h b/src/server/locks.h index 16b59a7..859697c 100644 --- a/src/server/locks.h +++ b/src/server/locks.h @@ -8,28 +8,31 @@ #ifdef _DEBUG -#define spin_init( lock, type ) debug_spin_init( #lock, __FILE__, __LINE__, lock, type) -#define spin_lock( lock ) debug_spin_lock( #lock, __FILE__, __LINE__, lock) -#define spin_trylock( lock ) debug_spin_trylock( #lock, __FILE__, __LINE__, lock) -#define spin_unlock( lock ) debug_spin_unlock( #lock, __FILE__, __LINE__, lock) -#define spin_destroy( lock ) debug_spin_destroy( #lock, __FILE__, __LINE__, lock) - -int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared); -int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock); -int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock); -int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock); -int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock); +#define mutex_init( lock ) debug_mutex_init( #lock, __FILE__, __LINE__, lock) +#define mutex_lock( lock ) debug_mutex_lock( #lock, __FILE__, __LINE__, lock) +#define mutex_trylock( lock ) debug_mutex_trylock( #lock, __FILE__, __LINE__, lock) +#define mutex_unlock( lock ) debug_mutex_unlock( #lock, __FILE__, __LINE__, lock) +#define mutex_cond_wait( cond, lock ) debug_mutex_cond_wait( #lock, __FILE__, __LINE__, cond, lock) +#define mutex_destroy( lock ) debug_mutex_destroy( #lock, __FILE__, __LINE__, lock) + +int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock); +int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock); +int debug_mutex_trylock(const char *name, const char *file, int line, pthread_mutex_t *lock); +int debug_mutex_unlock(const char *name, const char *file, int line, pthread_mutex_t *lock); +int debug_mutex_cond_wait(const char *name, const char *file, int line, pthread_cond_t *restrict cond, pthread_mutex_t *restrict lock); +int debug_mutex_destroy(const char *name, const char *file, int line, pthread_mutex_t *lock); void debug_dump_lock_stats(); #else -#define spin_init( lock, type ) pthread_spin_init(lock, type) -#define spin_lock( lock ) pthread_spin_lock(lock) -#define spin_trylock( lock ) pthread_spin_trylock(lock) -#define spin_unlock( lock ) pthread_spin_unlock(lock) -#define spin_destroy( lock ) pthread_spin_destroy(lock) +#define mutex_init( lock ) pthread_mutex_init(lock, NULL) +#define mutex_lock( lock ) pthread_mutex_lock(lock) +#define mutex_trylock( lock ) pthread_mutex_trylock(lock) +#define mutex_unlock( lock ) pthread_mutex_unlock(lock) +#define mutex_cond_wait( lock ) pthread_cond_wait(cond, lock) +#define mutex_destroy( lock ) pthread_mutex_destroy(lock) #endif diff --git a/src/server/net.c b/src/server/net.c index 00e88e0..9abe221 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -46,7 +46,7 @@ static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS]; static int _num_clients = 0; -static pthread_spinlock_t _clients_lock; +static pthread_mutex_t _clients_lock; static char nullbytes[500]; @@ -145,7 +145,7 @@ static inline bool sendPadding( const int fd, uint32_t bytes ) void net_init() { - spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE ); + mutex_init( &_clients_lock ); } void* net_handleNewConnection(void *clientPtr) @@ -186,13 +186,13 @@ void* net_handleNewConnection(void *clientPtr) } } while (0); // Fully init client struct - spin_init( &client->lock, PTHREAD_PROCESS_PRIVATE ); - pthread_mutex_init( &client->sendMutex, NULL ); + mutex_init( &client->lock ); + mutex_init( &client->sendMutex ); - spin_lock( &client->lock ); + mutex_lock( &client->lock ); host_to_string( &client->host, client->hostName, HOSTNAMELEN ); client->hostName[HOSTNAMELEN-1] = '\0'; - spin_unlock( &client->lock ); + mutex_unlock( &client->lock ); client->bytesSent = 0; if ( !addToList( client ) ) { @@ -255,9 +255,9 @@ void* net_handleNewConnection(void *clientPtr) // No BGR mismatch, but don't lookup if image is unknown locally image = image_get( image_name, rid, true ); } - spin_lock( &client->lock ); + mutex_lock( &client->lock ); client->image = image; - spin_unlock( &client->lock ); + mutex_unlock( &client->lock ); if ( image == NULL ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); } else if ( !image->working ) { @@ -268,24 +268,24 @@ void* net_handleNewConnection(void *clientPtr) // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; if ( image->cache_map != NULL ) { - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { bOk = ( rand() % 4 ) == 1; } penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( penalty ) { // Wait 100ms if local caching is not working so this usleep( 100000 ); // server gets a penalty and is less likely to be selected } } if ( bOk ) { - spin_lock( &image->lock ); + mutex_lock( &image->lock ); image_file = image->readFd; if ( !client->isServer ) { // Only update immediately if this is a client. Servers are handled on disconnect. timing_get( &image->atime ); } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); serializer_reset_write( &payload ); serializer_put_uint16( &payload, client_version < 3 ? client_version : PROTOCOL_VERSION ); // XXX: Since messed up fuse client was messed up before :( serializer_put_string( &payload, image->name ); @@ -337,7 +337,7 @@ void* net_handleNewConnection(void *clientPtr) start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); bool isCached = true; - spin_lock( &image->lock ); + mutex_lock( &image->lock ); // Check again as we only aquired the lock just now if ( image->cache_map != NULL ) { const uint64_t firstByteInMap = start >> 15; @@ -382,7 +382,7 @@ void* net_handleNewConnection(void *clientPtr) } } } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( !isCached ) { if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", @@ -400,10 +400,10 @@ void* net_handleNewConnection(void *clientPtr) fixup_reply( reply ); const bool lock = image->uplink != NULL; - if ( lock ) pthread_mutex_lock( &client->sendMutex ); + if ( lock ) mutex_lock( &client->sendMutex ); // Send reply header if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { - if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + if ( lock ) mutex_unlock( &client->sendMutex ); logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK reply header to %s failed", client->hostName ); goto exit_client_cleanup; } @@ -450,7 +450,7 @@ void* net_handleNewConnection(void *clientPtr) sent = -1; } #endif - if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + if ( lock ) mutex_unlock( &client->sendMutex ); if ( sent == -1 ) { if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN && err != EAGAIN && err != EWOULDBLOCK ) { @@ -468,12 +468,12 @@ void* net_handleNewConnection(void *clientPtr) } if ( request.size > (uint32_t)realBytes ) { if ( !sendPadding( client->sock, request.size - (uint32_t)realBytes ) ) { - if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + if ( lock ) mutex_unlock( &client->sendMutex ); goto exit_client_cleanup; } } } - if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + if ( lock ) mutex_unlock( &client->sendMutex ); // Global per-client counter client->bytesSent += request.size; // Increase counter for statistics. break; @@ -483,18 +483,18 @@ void* net_handleNewConnection(void *clientPtr) num = altservers_getListForClient( &client->host, server_list, NUMBER_SERVERS ); reply.cmd = CMD_GET_SERVERS; reply.size = (uint32_t)( num * sizeof(dnbd3_server_entry_t) ); - pthread_mutex_lock( &client->sendMutex ); + mutex_lock( &client->sendMutex ); send_reply( client->sock, &reply, server_list ); - pthread_mutex_unlock( &client->sendMutex ); + mutex_unlock( &client->sendMutex ); goto set_name; break; case CMD_KEEPALIVE: reply.cmd = CMD_KEEPALIVE; reply.size = 0; - pthread_mutex_lock( &client->sendMutex ); + mutex_lock( &client->sendMutex ); send_reply( client->sock, &reply, NULL ); - pthread_mutex_unlock( &client->sendMutex ); + mutex_unlock( &client->sendMutex ); set_name: ; if ( !hasName ) { hasName = true; @@ -508,7 +508,7 @@ set_name: ; case CMD_GET_CRC32: reply.cmd = CMD_GET_CRC32; - pthread_mutex_lock( &client->sendMutex ); + mutex_lock( &client->sendMutex ); if ( image->crc32 == NULL ) { reply.size = 0; send_reply( client->sock, &reply, NULL ); @@ -518,7 +518,7 @@ set_name: ; send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE ); send( client->sock, image->crc32, size - sizeof(uint32_t), 0 ); } - pthread_mutex_unlock( &client->sendMutex ); + mutex_unlock( &client->sendMutex ); break; default: @@ -534,11 +534,11 @@ exit_client_cleanup: ; totalBytesSent += client->bytesSent; // Access time, but only if client didn't just probe if ( image != NULL ) { - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { timing_get( &image->atime ); } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); } freeClientStruct( client ); // This will also call image_release on client->image return NULL ; @@ -560,30 +560,30 @@ struct json_t* net_getListAsJson() char host[HOSTNAMELEN]; host[HOSTNAMELEN-1] = '\0'; - spin_lock( &_clients_lock ); + mutex_lock( &_clients_lock ); for ( int i = 0; i < _num_clients; ++i ) { dnbd3_client_t * const client = _clients[i]; if ( client == NULL || client->image == NULL ) continue; - spin_lock( &client->lock ); + mutex_lock( &client->lock ); // Unlock so we give other threads a chance to access the client list. // We might not get an atomic snapshot of the currently connected clients, // but that doesn't really make a difference anyways. - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); strncpy( host, client->hostName, HOSTNAMELEN - 1 ); imgId = client->image->id; isServer = (int)client->isServer; bytesSent = client->bytesSent; - spin_unlock( &client->lock ); + mutex_unlock( &client->lock ); clientStats = json_pack( "{sssisisI}", "address", host, "imageId", imgId, "isServer", isServer, "bytesSent", (json_int_t)bytesSent ); json_array_append_new( jsonClients, clientStats ); - spin_lock( &_clients_lock ); + mutex_lock( &_clients_lock ); } - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); return jsonClients; } @@ -597,7 +597,7 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) int cc = 0, sc = 0; uint64_t bs = 0; - spin_lock( &_clients_lock ); + mutex_lock( &_clients_lock ); for ( int i = 0; i < _num_clients; ++i ) { const dnbd3_client_t * const client = _clients[i]; if ( client == NULL || client->image == NULL ) @@ -609,7 +609,7 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) } bs += client->bytesSent; } - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); if ( clientCount != NULL ) { *clientCount = cc; } @@ -624,15 +624,15 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) void net_disconnectAll() { int i; - spin_lock( &_clients_lock ); + mutex_lock( &_clients_lock ); for (i = 0; i < _num_clients; ++i) { if ( _clients[i] == NULL ) continue; dnbd3_client_t * const client = _clients[i]; - spin_lock( &client->lock ); + mutex_lock( &client->lock ); if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR ); - spin_unlock( &client->lock ); + mutex_unlock( &client->lock ); } - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); } void net_waitForAllDisconnected() @@ -640,12 +640,12 @@ void net_waitForAllDisconnected() int retries = 10, count, i; do { count = 0; - spin_lock( &_clients_lock ); + mutex_lock( &_clients_lock ); for (i = 0; i < _num_clients; ++i) { if ( _clients[i] == NULL ) continue; count++; } - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); if ( count != 0 ) { logadd( LOG_INFO, "%d clients still active...\n", count ); sleep( 1 ); @@ -667,14 +667,14 @@ void net_waitForAllDisconnected() static void removeFromList(dnbd3_client_t *client) { int i; - spin_lock( &_clients_lock ); + mutex_lock( &_clients_lock ); for ( i = _num_clients - 1; i >= 0; --i ) { if ( _clients[i] == client ) { _clients[i] = NULL; } if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; } - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); } /** @@ -685,20 +685,20 @@ static void removeFromList(dnbd3_client_t *client) */ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) { - spin_lock( &client->lock ); - pthread_mutex_lock( &client->sendMutex ); + mutex_lock( &client->lock ); + mutex_lock( &client->sendMutex ); if ( client->sock != -1 ) close( client->sock ); client->sock = -1; - pthread_mutex_unlock( &client->sendMutex ); + mutex_unlock( &client->sendMutex ); if ( client->image != NULL ) { - spin_lock( &client->image->lock ); + mutex_lock( &client->image->lock ); if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); - spin_unlock( &client->image->lock ); + mutex_unlock( &client->image->lock ); client->image = image_release( client->image ); } - spin_unlock( &client->lock ); - spin_destroy( &client->lock ); - pthread_mutex_destroy( &client->sendMutex ); + mutex_unlock( &client->lock ); + mutex_destroy( &client->lock ); + mutex_destroy( &client->sendMutex ); free( client ); return NULL ; } @@ -712,20 +712,20 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) static bool addToList(dnbd3_client_t *client) { int i; - spin_lock( &_clients_lock ); + mutex_lock( &_clients_lock ); for (i = 0; i < _num_clients; ++i) { if ( _clients[i] != NULL ) continue; _clients[i] = client; - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); return true; } if ( _num_clients >= _maxClients ) { - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); logadd( LOG_ERROR, "Maximum number of clients reached!" ); return false; } _clients[_num_clients++] = client; - spin_unlock( &_clients_lock ); + mutex_unlock( &_clients_lock ); return true; } diff --git a/src/server/rpc.c b/src/server/rpc.c index 1ea09cb..5dbcafe 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -72,10 +72,10 @@ static inline bool iequals(struct string *cmpMixed, struct string *cmpLower) static int aclCount = 0; static dnbd3_access_rule_t aclRules[MAX_ACLS]; static json_int_t randomRunId; -static pthread_spinlock_t aclLock; +static pthread_mutex_t aclLock; #define MAX_CLIENTS 50 #define CUTOFF_START 40 -static pthread_spinlock_t statusLock; +static pthread_mutex_t statusLock; static struct { int count; bool overloaded; @@ -91,8 +91,8 @@ static void loadAcl(); void rpc_init() { - spin_init( &aclLock, PTHREAD_PROCESS_PRIVATE ); - spin_init( &statusLock, PTHREAD_PROCESS_PRIVATE ); + mutex_init( &aclLock ); + mutex_init( &statusLock ); randomRunId = (((json_int_t)getpid()) << 16) | (json_int_t)time(NULL); // if ( sizeof(randomRunId) > 4 ) { @@ -123,10 +123,10 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int return; } do { - spin_lock( &statusLock ); + mutex_lock( &statusLock ); const int curCount = ++status.count; UPDATE_LOADSTATE( curCount ); - spin_unlock( &statusLock ); + mutex_unlock( &statusLock ); if ( curCount > MAX_CLIENTS ) { sendReply( sock, "503 Service Temporarily Unavailable", "text/plain", "Too many HTTP clients", -1, HTTP_CLOSE ); goto func_return; @@ -198,9 +198,9 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int if ( minorVersion == 0 || hasHeaderValue( headers, numHeaders, &STR_CONNECTION, &STR_CLOSE ) ) { keepAlive = HTTP_CLOSE; } else { // And if there aren't too many active HTTP sessions - spin_lock( &statusLock ); + mutex_lock( &statusLock ); if ( status.overloaded ) keepAlive = HTTP_CLOSE; - spin_unlock( &statusLock ); + mutex_unlock( &statusLock ); } } if ( method.s != NULL && path.s != NULL ) { @@ -234,10 +234,10 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int } while (true); func_return:; do { - spin_lock( &statusLock ); + mutex_lock( &statusLock ); const int curCount = --status.count; UPDATE_LOADSTATE( curCount ); - spin_unlock( &statusLock ); + mutex_unlock( &statusLock ); } while (0); } @@ -422,7 +422,7 @@ static int getacl(dnbd3_host_t *host) static void addacl(int argc, char **argv, void *data UNUSED) { if ( argv[0][0] == '#' ) return; - spin_lock( &aclLock ); + mutex_lock( &aclLock ); if ( aclCount >= MAX_ACLS ) { logadd( LOG_WARNING, "Too many ACL rules, ignoring %s", argv[0] ); goto unlock_end; @@ -478,7 +478,7 @@ static void addacl(int argc, char **argv, void *data UNUSED) // in .bitMask, and compate it, otherwise, a simple memcmp will do. aclCount++; unlock_end:; - spin_unlock( &aclLock ); + mutex_unlock( &aclLock ); } static void loadAcl() @@ -486,18 +486,18 @@ static void loadAcl() static bool inProgress = false; char *fn; if ( asprintf( &fn, "%s/%s", _configDir, "rpc.acl" ) == -1 ) return; - spin_lock( &aclLock ); + mutex_lock( &aclLock ); if ( inProgress ) { - spin_unlock( &aclLock ); + mutex_unlock( &aclLock ); return; } aclCount = 0; inProgress = true; - spin_unlock( &aclLock ); + mutex_unlock( &aclLock ); file_loadLineBased( fn, 1, 20, &addacl, NULL ); - spin_lock( &aclLock ); + mutex_lock( &aclLock ); inProgress = false; - spin_unlock( &aclLock ); + mutex_unlock( &aclLock ); free( fn ); logadd( LOG_INFO, "%d HTTPRPC ACL rules loaded", (int)aclCount ); } diff --git a/src/server/threadpool.c b/src/server/threadpool.c index b55fe19..dac0980 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -17,12 +17,12 @@ static pthread_attr_t threadAttrs; static int maxIdleThreads = -1; static entry_t *pool = NULL; -static pthread_spinlock_t poolLock; +static pthread_mutex_t poolLock; bool threadpool_init(int maxIdle) { if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; - spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); + mutex_init( &poolLock ); maxIdleThreads = maxIdle; pthread_attr_init( &threadAttrs ); pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); @@ -33,7 +33,7 @@ void threadpool_close() { _shutdown = true; if ( maxIdleThreads < 0 ) return; - spin_lock( &poolLock ); + mutex_lock( &poolLock ); maxIdleThreads = -1; entry_t *ptr = pool; while ( ptr != NULL ) { @@ -41,16 +41,16 @@ void threadpool_close() ptr = ptr->next; signal_call( current->signal ); } - spin_unlock( &poolLock ); - spin_destroy( &poolLock ); + mutex_unlock( &poolLock ); + mutex_destroy( &poolLock ); } bool threadpool_run(void *(*startRoutine)(void *), void *arg) { - spin_lock( &poolLock ); + mutex_lock( &poolLock ); entry_t *entry = pool; if ( entry != NULL ) pool = entry->next; - spin_unlock( &poolLock ); + mutex_unlock( &poolLock ); if ( entry == NULL ) { entry = (entry_t*)malloc( sizeof(entry_t) ); if ( entry == NULL ) { @@ -101,19 +101,19 @@ static void *threadpool_worker(void *entryPtr) if ( _shutdown ) break; // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise int threadCount = 0; - spin_lock( &poolLock ); + mutex_lock( &poolLock ); entry_t *ptr = pool; while ( ptr != NULL ) { threadCount++; ptr = ptr->next; } if ( threadCount >= maxIdleThreads ) { - spin_unlock( &poolLock ); + mutex_unlock( &poolLock ); break; } entry->next = pool; pool = entry; - spin_unlock( &poolLock ); + mutex_unlock( &poolLock ); setThreadName( "[pool]" ); } else { logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); diff --git a/src/server/uplink.c b/src/server/uplink.c index ccbf209..682b986 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -56,9 +56,9 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version if ( !_isProxy || _shutdown ) return false; dnbd3_connection_t *link = NULL; assert( image != NULL ); - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->uplink != NULL && !image->uplink->shutdown ) { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( sock >= 0 ) close( sock ); return true; // There's already an uplink, so should we consider this success or failure? } @@ -67,20 +67,20 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version goto failure; } link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); - spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); - spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); - pthread_mutex_init( &link->sendMutex, NULL ); + mutex_init( &link->queueLock ); + mutex_init( &link->rttLock ); + mutex_init( &link->sendMutex ); link->image = image; link->bytesReceived = 0; link->idleTime = 0; link->queueLen = 0; - pthread_mutex_lock( &link->sendMutex ); + mutex_lock( &link->sendMutex ); link->fd = -1; - pthread_mutex_unlock( &link->sendMutex ); + mutex_unlock( &link->sendMutex ); link->cacheFd = -1; link->signal = NULL; link->replicationHandle = REP_NONE; - spin_lock( &link->rttLock ); + mutex_lock( &link->rttLock ); link->cycleDetected = false; if ( sock >= 0 ) { link->betterFd = sock; @@ -91,21 +91,21 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version link->betterFd = -1; link->rttTestResult = RTT_IDLE; } - spin_unlock( &link->rttLock ); + mutex_unlock( &link->rttLock ); link->recvBufferLen = 0; link->shutdown = false; if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) { logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return true; failure: ; if ( link != NULL ) { free( link ); link = image->uplink = NULL; } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return false; } @@ -119,28 +119,28 @@ void uplink_shutdown(dnbd3_image_t *image) bool join = false; pthread_t thread; assert( image != NULL ); - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image->uplink == NULL ) { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return; } dnbd3_connection_t * const uplink = image->uplink; - spin_lock( &uplink->queueLock ); + mutex_lock( &uplink->queueLock ); if ( !uplink->shutdown ) { uplink->shutdown = true; signal_call( uplink->signal ); thread = uplink->thread; join = true; } - spin_unlock( &uplink->queueLock ); + mutex_unlock( &uplink->queueLock ); bool wait = image->uplink != NULL; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( join ) thread_join( thread, NULL ); while ( wait ) { usleep( 5000 ); - spin_lock( &image->lock ); + mutex_lock( &image->lock ); wait = image->uplink != NULL && image->uplink->shutdown; - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); } } @@ -150,7 +150,7 @@ void uplink_shutdown(dnbd3_image_t *image) */ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) { - spin_lock( &uplink->queueLock ); + mutex_lock( &uplink->queueLock ); for (int i = uplink->queueLen - 1; i >= 0; --i) { if ( uplink->queue[i].client == client ) { uplink->queue[i].client = NULL; @@ -158,7 +158,7 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) } if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--; } - spin_unlock( &uplink->queueLock ); + mutex_unlock( &uplink->queueLock ); } /** @@ -172,26 +172,26 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); return false; } - spin_lock( &client->image->lock ); + mutex_lock( &client->image->lock ); if ( client->image->uplink == NULL ) { - spin_unlock( &client->image->lock ); + mutex_unlock( &client->image->lock ); logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); return false; } dnbd3_connection_t * const uplink = client->image->uplink; if ( uplink->shutdown ) { - spin_unlock( &client->image->lock ); + mutex_unlock( &client->image->lock ); logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); return false; } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { - spin_unlock( &client->image->lock ); + mutex_unlock( &client->image->lock ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - spin_lock( &uplink->rttLock ); + mutex_lock( &uplink->rttLock ); uplink->cycleDetected = true; - spin_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); return false; } @@ -203,8 +203,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin bool requestLoop = false; const uint64_t end = start + length; - spin_lock( &uplink->queueLock ); - spin_unlock( &client->image->lock ); + mutex_lock( &uplink->queueLock ); + mutex_unlock( &client->image->lock ); for (i = 0; i < uplink->queueLen; ++i) { if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) { freeSlot = i; @@ -224,17 +224,17 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } } if ( requestLoop ) { - spin_unlock( &uplink->queueLock ); + mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - spin_lock( &uplink->rttLock ); + mutex_lock( &uplink->rttLock ); uplink->cycleDetected = true; - spin_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); return false; } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { - spin_unlock( &uplink->queueLock ); + mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); return false; } @@ -268,35 +268,35 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin timing_get( &uplink->queue[freeSlot].entered ); //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end ); #endif - spin_unlock( &uplink->queueLock ); + mutex_unlock( &uplink->queueLock ); if ( foundExisting != -1 ) return true; // Attached to pending request, do nothing // See if we can fire away the request - if ( pthread_mutex_trylock( &uplink->sendMutex ) != 0 ) { + if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); } else { if ( uplink->fd == -1 ) { - pthread_mutex_unlock( &uplink->sendMutex ); + mutex_unlock( &uplink->sendMutex ); logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); } else { const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); if ( hops < 200 ) ++hops; const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); - pthread_mutex_unlock( &uplink->sendMutex ); + mutex_unlock( &uplink->sendMutex ); if ( !ret ) { logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); } else { - spin_lock( &uplink->queueLock ); + mutex_lock( &uplink->queueLock ); if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) { uplink->queue[freeSlot].status = ULR_PENDING; logadd( LOG_DEBUG2, "Succesful direct uplink request" ); } else { logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" ); } - spin_unlock( &uplink->queueLock ); + mutex_unlock( &uplink->queueLock ); return true; } // Fall through to waking up sender thread @@ -351,9 +351,9 @@ static void* uplink_mainloop(void *data) events[EV_SOCKET].fd = -1; while ( !_shutdown && !link->shutdown ) { // poll() - spin_lock( &link->rttLock ); + mutex_lock( &link->rttLock ); waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1; - spin_unlock( &link->rttLock ); + mutex_unlock( &link->rttLock ); if ( waitTime == 0 ) { // Nothing } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { @@ -374,22 +374,22 @@ static void* uplink_mainloop(void *data) continue; } // Check if server switch is in order - spin_lock( &link->rttLock ); + mutex_lock( &link->rttLock ); if ( link->rttTestResult != RTT_DOCHANGE ) { - spin_unlock( &link->rttLock ); + mutex_unlock( &link->rttLock ); } else { link->rttTestResult = RTT_IDLE; // The rttTest worker thread has finished our request. // And says it's better to switch to another server const int fd = link->fd; - pthread_mutex_lock( &link->sendMutex ); + mutex_lock( &link->sendMutex ); link->fd = link->betterFd; - pthread_mutex_unlock( &link->sendMutex ); + mutex_unlock( &link->sendMutex ); link->betterFd = -1; link->currentServer = link->betterServer; link->version = link->betterVersion; link->cycleDetected = false; - spin_unlock( &link->rttLock ); + mutex_unlock( &link->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); link->replicationHandle = REP_NONE; @@ -463,10 +463,10 @@ static void* uplink_mainloop(void *data) } // Don't keep link established if we're idle for too much if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { - pthread_mutex_lock( &link->sendMutex ); + mutex_lock( &link->sendMutex ); close( link->fd ); link->fd = events[EV_SOCKET].fd = -1; - pthread_mutex_unlock( &link->sendMutex ); + mutex_unlock( &link->sendMutex ); link->cycleDetected = false; if ( link->recvBufferLen != 0 ) { link->recvBufferLen = 0; @@ -478,9 +478,9 @@ static void* uplink_mainloop(void *data) } } // See if we should trigger an RTT measurement - spin_lock( &link->rttLock ); + mutex_lock( &link->rttLock ); const int rttTestResult = link->rttTestResult; - spin_unlock( &link->rttLock ); + mutex_unlock( &link->rttLock ); if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) { // It seems it's time for a check @@ -500,9 +500,9 @@ static void* uplink_mainloop(void *data) timing_set( &nextAltCheck, &now, altCheckInterval ); } } else if ( rttTestResult == RTT_NOT_REACHABLE ) { - spin_lock( &link->rttLock ); + mutex_lock( &link->rttLock ); link->rttTestResult = RTT_IDLE; - spin_unlock( &link->rttLock ); + mutex_unlock( &link->rttLock ); discoverFailCount++; timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); } @@ -511,7 +511,7 @@ static void* uplink_mainloop(void *data) bool resend = false; ticks deadline; timing_set( &deadline, &now, -10 ); - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) { snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" @@ -522,12 +522,12 @@ static void* uplink_mainloop(void *data) link->queue[i].status = ULR_NEW; resend = true; #endif - spin_unlock( &link->queueLock ); + mutex_unlock( &link->queueLock ); logadd( LOG_WARNING, "%s", buffer ); - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); } } - spin_unlock( &link->queueLock ); + mutex_unlock( &link->queueLock ); if ( resend ) uplink_sendRequests( link, true ); } @@ -536,16 +536,16 @@ static void* uplink_mainloop(void *data) cleanup: ; altservers_removeUplink( link ); uplink_saveCacheMap( link ); - spin_lock( &link->image->lock ); + mutex_lock( &link->image->lock ); if ( link->image->uplink == link ) { link->image->uplink = NULL; } - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); const int fd = link->fd; const dnbd3_signal_t* signal = link->signal; - pthread_mutex_lock( &link->sendMutex ); + mutex_lock( &link->sendMutex ); link->fd = -1; - pthread_mutex_unlock( &link->sendMutex ); + mutex_unlock( &link->sendMutex ); link->signal = NULL; if ( !link->shutdown ) { link->shutdown = true; @@ -554,8 +554,8 @@ static void* uplink_mainloop(void *data) // Do not access link->image after unlocking, since we set // image->uplink to NULL. Acquire with image_lock first, // like done below when checking whether to re-init uplink - spin_unlock( &link->image->lock ); - spin_unlock( &link->queueLock ); + mutex_unlock( &link->image->lock ); + mutex_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); if ( signal != NULL ) signal_close( signal ); // Wait for the RTT check to finish/fail if it's in progress @@ -564,9 +564,9 @@ static void* uplink_mainloop(void *data) if ( link->betterFd != -1 ) { close( link->betterFd ); } - spin_destroy( &link->queueLock ); - spin_destroy( &link->rttLock ); - pthread_mutex_destroy( &link->sendMutex ); + mutex_destroy( &link->queueLock ); + mutex_destroy( &link->rttLock ); + mutex_destroy( &link->sendMutex ); free( link->recvBuffer ); link->recvBuffer = NULL; if ( link->cacheFd != -1 ) { @@ -588,7 +588,7 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) { // Scan for new requests int j; - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); for (j = 0; j < link->queueLen; ++j) { if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue; link->queue[j].status = ULR_PENDING; @@ -599,11 +599,11 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", (void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize ); */ - spin_unlock( &link->queueLock ); + mutex_unlock( &link->queueLock ); if ( hops < 200 ) ++hops; - pthread_mutex_lock( &link->sendMutex ); + mutex_lock( &link->sendMutex ); const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); - pthread_mutex_unlock( &link->sendMutex ); + mutex_unlock( &link->sendMutex ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection @@ -612,9 +612,9 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) altservers_serverFailed( &link->currentServer ); return; } - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); } - spin_unlock( &link->queueLock ); + mutex_unlock( &link->queueLock ); } /** @@ -635,10 +635,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) return; dnbd3_image_t * const image = link->image; if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return; - spin_lock( &image->lock ); + mutex_lock( &image->lock ); if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) { // No cache map (=image complete), or replication pending, or not enough users, do nothing - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return; } const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); @@ -661,7 +661,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) break; } } - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { // Nothing left in current block, find next one replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte ); @@ -674,9 +674,9 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; link->replicationHandle = offset; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); - pthread_mutex_lock( &link->sendMutex ); + mutex_lock( &link->sendMutex ); bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ); - pthread_mutex_unlock( &link->sendMutex ); + mutex_unlock( &link->sendMutex ); if ( !sendOk ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); return; @@ -700,7 +700,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex) { int retval = -1; - spin_lock( &link->image->lock ); + mutex_lock( &link->image->lock ); const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize ); const uint8_t *cache_map = link->image->cache_map; if ( cache_map != NULL ) { @@ -736,7 +736,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const in retval = -1; } } - spin_unlock( &link->image->lock ); + mutex_unlock( &link->image->lock ); return retval; } @@ -834,7 +834,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } } // 2) Figure out which clients are interested in it - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; assert( req->status != ULR_PROCESSING ); @@ -866,23 +866,23 @@ static void uplink_handleReceive(dnbd3_connection_t *link) req->status = ULR_FREE; req->client = NULL; served = true; - pthread_mutex_lock( &client->sendMutex ); - spin_unlock( &link->queueLock ); + mutex_lock( &client->sendMutex ); + mutex_unlock( &link->queueLock ); if ( client->sock != -1 ) { ssize_t sent = writev( client->sock, iov, 2 ); if ( sent > (ssize_t)sizeof outReply ) { bytesSent = (size_t)sent - sizeof outReply; } } - pthread_mutex_unlock( &client->sendMutex ); + mutex_unlock( &client->sendMutex ); if ( bytesSent != 0 ) { client->bytesSent += bytesSent; } - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); } if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; } - spin_unlock( &link->queueLock ); + mutex_unlock( &link->queueLock ); #ifdef _DEBUG if ( !served && start != link->replicationHandle ) { logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); @@ -906,9 +906,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } } if ( link->replicationHandle == REP_NONE ) { - spin_lock( &link->queueLock ); + mutex_lock( &link->queueLock ); const bool rep = ( link->queueLen == 0 ); - spin_unlock( &link->queueLock ); + mutex_unlock( &link->queueLock ); if ( rep ) uplink_sendReplicationRequest( link ); } return; @@ -922,19 +922,19 @@ static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) if ( link->fd == -1 ) return; altservers_serverFailed( &link->currentServer ); - pthread_mutex_lock( &link->sendMutex ); + mutex_lock( &link->sendMutex ); close( link->fd ); link->fd = -1; - pthread_mutex_unlock( &link->sendMutex ); + mutex_unlock( &link->sendMutex ); link->replicationHandle = REP_NONE; if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { link->nextReplicationIndex = 0; } if ( !findNew ) return; - spin_lock( &link->rttLock ); + mutex_lock( &link->rttLock ); bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1; - spin_unlock( &link->rttLock ); + mutex_unlock( &link->rttLock ); if ( bail ) return; altservers_findUplink( link ); @@ -961,9 +961,9 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t); uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); - pthread_mutex_lock( &uplink->sendMutex ); + mutex_lock( &uplink->sendMutex ); bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ); - pthread_mutex_unlock( &uplink->sendMutex ); + mutex_unlock( &uplink->sendMutex ); if ( !sendOk || bytes == 0 ) { free( buffer ); return; @@ -1032,11 +1032,11 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link) if ( image->cache_map == NULL ) return true; logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); - spin_lock( &image->lock ); + mutex_lock( &image->lock ); // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to // figure out that this image's cache copy is complete if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) { - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); return true; } const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); @@ -1044,7 +1044,7 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link) memcpy( map, image->cache_map, size ); // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image, // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O - spin_unlock( &image->lock ); + mutex_unlock( &image->lock ); assert( image->path != NULL ); char mapfile[strlen( image->path ) + 4 + 1]; strcpy( mapfile, image->path ); -- cgit v1.2.3-55-g7522