summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/altservers.c101
-rw-r--r--src/server/globals.c10
-rw-r--r--src/server/globals.h8
-rw-r--r--src/server/image.c196
-rw-r--r--src/server/integrity.c48
-rw-r--r--src/server/locks.c129
-rw-r--r--src/server/locks.h35
-rw-r--r--src/server/net.c110
-rw-r--r--src/server/rpc.c34
-rw-r--r--src/server/threadpool.c20
-rw-r--r--src/server/uplink.c188
11 files changed, 467 insertions, 412 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index b91ceab..bbbc584 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -15,13 +15,13 @@
#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__)
static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
-static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL)
-static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL)
+static pthread_mutex_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL)
+static pthread_mutex_t pendingLockConsume; // Lock for removing something (nonNULL -> NULL)
static dnbd3_signal_t* runSignal = NULL;
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
static int numAltServers = 0;
-static pthread_spinlock_t altServersLock;
+static pthread_mutex_t altServersLock;
static pthread_t altThread;
@@ -32,8 +32,9 @@ void altservers_init()
{
srand( (unsigned int)time( NULL ) );
// Init spinlock
- spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE );
- spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &pendingLockWrite );
+ mutex_init( &pendingLockConsume );
+ mutex_init( &altServersLock );
// Init signal
runSignal = signal_new();
if ( runSignal == NULL ) {
@@ -48,11 +49,11 @@ void altservers_init()
// Init waiting links queue -- this is currently a global static array so
// it will already be zero, but in case we refactor later do it explicitly
// while also holding the write lock so thread sanitizer is happy
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
pending[i] = NULL;
}
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
}
void altservers_shutdown()
@@ -99,10 +100,10 @@ int altservers_load()
bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly)
{
int i, freeSlot = -1;
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( isSameAddressPort( &altServers[i].host, host ) ) {
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return false;
} else if ( freeSlot == -1 && altServers[i].host.type == 0 ) {
freeSlot = i;
@@ -111,7 +112,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate
if ( freeSlot == -1 ) {
if ( numAltServers >= SERVER_MAX_ALTS ) {
logadd( LOG_WARNING, "Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS );
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return false;
}
freeSlot = numAltServers++;
@@ -120,7 +121,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate
altServers[freeSlot].isPrivate = isPrivate;
altServers[freeSlot].isClientOnly = isClientOnly;
if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment );
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return true;
}
@@ -135,14 +136,14 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
// never be that the uplink is supposed to switch, but instead calls
// this function.
assert( uplink->betterFd == -1 );
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
if ( uplink->rttTestResult == RTT_INPROGRESS ) {
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != uplink ) continue;
// Yep, measuring right now
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
return;
}
}
@@ -151,12 +152,12 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
if ( pending[i] != NULL ) continue;
pending[i] = uplink;
uplink->rttTestResult = RTT_INPROGRESS;
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
signal_call( runSignal ); // Wake altservers thread up
return;
}
// End of loop - no free slot
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." );
}
@@ -165,16 +166,16 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
*/
void altservers_removeUplink(dnbd3_connection_t *uplink)
{
- pthread_mutex_lock( &pendingLockConsume );
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockConsume );
+ mutex_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] == uplink ) {
uplink->rttTestResult = RTT_NOT_REACHABLE;
pending[i] = NULL;
}
}
- spin_unlock( &pendingLockWrite );
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockConsume );
}
/**
@@ -190,7 +191,7 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output
int count = 0;
int scores[size];
int score;
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
if ( size > numAltServers ) size = numAltServers;
for (i = 0; i < numAltServers; ++i) {
if ( altServers[i].host.type == 0 ) continue; // Slot is empty
@@ -226,7 +227,7 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output
}
}
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return count;
}
@@ -242,7 +243,7 @@ int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency)
int count = 0, i;
ticks now;
timing_get( &now );
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
// Flip first server in list with a random one every time this is called
if ( numAltServers > 1 ) {
const dnbd3_alt_server_t tmp = altServers[0];
@@ -273,7 +274,7 @@ int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency)
output[count++] = srv->host;
if ( count >= size ) break;
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return count;
}
@@ -281,12 +282,12 @@ json_t* altservers_toJson()
{
json_t *list = json_array();
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
char host[100];
const int count = numAltServers;
dnbd3_alt_server_t src[count];
memcpy( src, altServers, sizeof(src) );
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
for (int i = 0; i < count; ++i) {
json_t *rtts = json_array();
for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
@@ -313,7 +314,7 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const
{
unsigned int avg = rtt;
int i;
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( !isSameAddressPort( host, &altServers[i].host ) ) continue;
altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
@@ -334,7 +335,7 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const
}
break;
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return avg;
}
@@ -369,7 +370,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
int foundIndex = -1, lastOk = -1;
ticks now;
timing_get( &now );
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( foundIndex == -1 ) {
// Looking for the failed server in list
@@ -395,7 +396,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
altServers[lastOk] = tmp;
}
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
}
/**
* Mainloop of this module. It will wait for requests by uplinks to find a
@@ -432,27 +433,27 @@ static void *altservers_main(void *data UNUSED)
}
// Work your way through the queue
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
if ( pending[itLink] == NULL ) {
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
continue; // Check once before locking, as a mutex is expensive
}
- spin_unlock( &pendingLockWrite );
- pthread_mutex_lock( &pendingLockConsume );
- spin_lock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
+ mutex_lock( &pendingLockConsume );
+ mutex_lock( &pendingLockWrite );
dnbd3_connection_t * const uplink = pending[itLink];
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
if ( uplink == NULL ) { // Check again after locking
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockConsume );
continue;
}
dnbd3_image_t * const image = image_lock( uplink->image );
if ( image == NULL ) { // Check again after locking
uplink->rttTestResult = RTT_NOT_REACHABLE;
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
pending[itLink] = NULL;
- spin_unlock( &pendingLockWrite );
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockConsume );
logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" );
continue;
}
@@ -520,7 +521,7 @@ static void *altservers_main(void *data UNUSED)
}
clock_gettime( BEST_CLOCK_SOURCE, &end );
// Measurement done - everything fine so far
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer );
// Penaltize rtt if this was a cycle; this will treat this server with lower priority
// in the near future too, so we prevent alternating between two servers that are both
@@ -531,7 +532,7 @@ static void *altservers_main(void *data UNUSED)
unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt );
// If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
if ( uplink->fd != -1 && isCurrent ) {
// Was measuring current server
currentRtt = avg;
@@ -565,25 +566,25 @@ static void *altservers_main(void *data UNUSED)
LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
}
sock_setTimeout( bestSock, _uplinkTimeout );
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->betterFd = bestSock;
uplink->betterServer = servers[bestIndex];
uplink->betterVersion = bestProtocolVersion;
uplink->rttTestResult = RTT_DOCHANGE;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
} else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) {
// No server was reachable
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_NOT_REACHABLE;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
} else {
// nope
if ( bestSock != -1 ) close( bestSock );
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;
uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
if ( !image->working ) {
image->working = true;
LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink );
@@ -591,10 +592,10 @@ static void *altservers_main(void *data UNUSED)
}
image_release( image );
// end of loop over all pending uplinks
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
pending[itLink] = NULL;
- spin_unlock( &pendingLockWrite );
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockConsume );
}
// Save cache maps of all images if applicable
declare_now;
diff --git a/src/server/globals.c b/src/server/globals.c
index 010274d..d0de704 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -1,5 +1,6 @@
#include "globals.h"
#include "ini.h"
+#include "locks.h"
#include "../shared/log.h"
#include <string.h>
#include <stdlib.h>
@@ -39,7 +40,7 @@ atomic_bool _pretendClient = false;
* ignore certain values which cannot be changed safely at runtime.
*/
static atomic_bool initialLoad = true;
-static pthread_mutex_t loadLock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t loadLock;
#define IS_TRUE(value) (atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0)
#define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0)
@@ -110,7 +111,10 @@ void globals_loadConfig()
char *name = NULL;
asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME );
if ( name == NULL ) return;
- if ( pthread_mutex_trylock( &loadLock ) != 0 ) {
+ if ( initialLoad ) {
+ mutex_init( &loadLock );
+ }
+ if ( mutex_trylock( &loadLock ) != 0 ) {
logadd( LOG_INFO, "Ignoring config reload request due to already running reload" );
return;
}
@@ -128,7 +132,7 @@ void globals_loadConfig()
globals_dumpConfig( buffer, sizeof(buffer) );
logadd( LOG_DEBUG1, "Effective configuration:\n%s", buffer );
initialLoad = false;
- pthread_mutex_unlock( &loadLock );
+ mutex_unlock( &loadLock );
}
static void sanitizeFixedConfig()
diff --git a/src/server/globals.h b/src/server/globals.h
index 031f565..b248800 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -54,10 +54,10 @@ struct _dnbd3_connection
dnbd3_signal_t* signal; // used to wake up the process
pthread_t thread; // thread holding the connection
pthread_mutex_t sendMutex; // For locking socket while sending
- pthread_spinlock_t queueLock; // lock for synchronization on request queue etc.
+ pthread_mutex_t queueLock; // lock for synchronization on request queue etc.
dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer
dnbd3_host_t currentServer; // Current server we're connected to
- pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer
+ pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer
int rttTestResult; // RTT_*
int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD!
int betterVersion; // protocol version of better server
@@ -121,7 +121,7 @@ struct _dnbd3_image
int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server
bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected
uint16_t rid; // revision of image
- pthread_spinlock_t lock;
+ pthread_mutex_t lock;
};
struct _dnbd3_client
@@ -134,7 +134,7 @@ struct _dnbd3_client
dnbd3_host_t host;
char hostName[HOSTNAMELEN]; // inet_ntop version of host
pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too)
- pthread_spinlock_t lock;
+ pthread_mutex_t lock;
};
// #######################################################
diff --git a/src/server/image.c b/src/server/image.c
index 061f9a3..bfba6cb 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -25,9 +25,9 @@
static dnbd3_image_t *_images[SERVER_MAX_IMAGES];
static int _num_images = 0;
-static pthread_spinlock_t imageListLock;
-static pthread_mutex_t remoteCloneLock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_mutex_t reloadLock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t imageListLock;
+static pthread_mutex_t remoteCloneLock;
+static pthread_mutex_t reloadLock;
#define NAMELEN 500
#define CACHELEN 20
typedef struct
@@ -59,7 +59,9 @@ static bool image_checkRandomBlocks(const int count, int fdImage, const int64_t
void image_serverStartup()
{
srand( (unsigned int)time( NULL ) );
- spin_init( &imageListLock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &imageListLock );
+ mutex_init( &remoteCloneLock );
+ mutex_init( &reloadLock );
}
/**
@@ -87,12 +89,12 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
return;
bool setNewBlocks = false;
uint64_t pos = start;
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->cache_map == NULL ) {
// Image seems already complete
if ( set ) {
// This makes no sense
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
logadd( LOG_DEBUG1, "image_updateCachemap(true) with no cache_map: %s", image->path );
return;
}
@@ -125,14 +127,14 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
if ( image->cache_map == NULL ) break;
const int block = (int)( pos / HASH_BLOCK_SIZE );
if ( image_isHashBlockComplete( image->cache_map, block, image->realFilesize ) ) {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
integrity_check( image, block );
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
}
pos += HASH_BLOCK_SIZE;
}
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
}
/**
@@ -144,13 +146,13 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
bool image_isComplete(dnbd3_image_t *image)
{
assert( image != NULL );
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->virtualFilesize == 0 ) {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return false;
}
if ( image->cache_map == NULL ) {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return true;
}
bool complete = true;
@@ -175,14 +177,14 @@ bool image_isComplete(dnbd3_image_t *image)
complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte);
}
if ( !complete ) {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return false;
}
char mapfile[PATHLEN] = "";
free( image->cache_map );
image->cache_map = NULL;
snprintf( mapfile, PATHLEN, "%s.map", image->path );
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
unlink( mapfile );
return true;
}
@@ -215,18 +217,18 @@ bool image_ensureOpen(dnbd3_image_t *image)
}
}
if ( newFd == -1 ) {
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
image->working = false;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return false;
}
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->readFd == -1 ) {
image->readFd = newFd;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
} else {
// There was a race while opening the file (happens cause not locked cause blocking), we lost the race so close new fd and proceed
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
close( newFd );
}
return image->readFd != -1;
@@ -247,7 +249,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
const size_t slen = strlen( name );
if ( slen == 0 || name[slen - 1] == '/' || name[0] == '/' ) return NULL ;
// Go through array
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
for (i = 0; i < _num_images; ++i) {
dnbd3_image_t * const image = _images[i];
if ( image == NULL || strcmp( image->name, name ) != 0 ) continue;
@@ -261,14 +263,14 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
// Not found
if ( candidate == NULL ) {
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return NULL ;
}
- spin_lock( &candidate->lock );
- spin_unlock( &imageListLock );
+ mutex_lock( &candidate->lock );
+ mutex_unlock( &imageListLock );
candidate->users++;
- spin_unlock( &candidate->lock );
+ mutex_unlock( &candidate->lock );
// Found, see if it works
// TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list
@@ -276,9 +278,9 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
if ( candidate->working || checkIfWorking ) {
// Is marked working, but might not have an fd open
if ( !image_ensureOpen( candidate ) ) {
- spin_lock( &candidate->lock );
+ mutex_lock( &candidate->lock );
timing_get( &candidate->lastWorkCheck );
- spin_unlock( &candidate->lock );
+ mutex_unlock( &candidate->lock );
if ( _removeMissingImages ) {
candidate = image_remove( candidate ); // No release here, the image is still returned and should be released by caller
}
@@ -291,14 +293,14 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
// ...not working...
// Don't re-check too often
- spin_lock( &candidate->lock );
+ mutex_lock( &candidate->lock );
bool check;
declare_now;
check = timing_diff( &candidate->lastWorkCheck, &now ) > NONWORKING_RECHECK_INTERVAL_SECONDS;
if ( check ) {
candidate->lastWorkCheck = now;
}
- spin_unlock( &candidate->lock );
+ mutex_unlock( &candidate->lock );
if ( !check ) {
return candidate;
}
@@ -347,19 +349,19 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
img->rid = candidate->rid;
img->users = 1;
img->working = false;
- spin_init( &img->lock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &img->lock );
if ( candidate->crc32 != NULL ) {
const size_t mb = IMGSIZE_TO_HASHBLOCKS( candidate->virtualFilesize ) * sizeof(uint32_t);
img->crc32 = malloc( mb );
memcpy( img->crc32, candidate->crc32, mb );
}
- spin_lock( &candidate->lock );
+ mutex_lock( &candidate->lock );
if ( candidate->cache_map != NULL ) {
const size_t mb = IMGSIZE_TO_MAPBYTES( candidate->virtualFilesize );
img->cache_map = malloc( mb );
memcpy( img->cache_map, candidate->cache_map, mb );
}
- spin_unlock( &candidate->lock );
+ mutex_unlock( &candidate->lock );
if ( image_addToList( img ) ) {
image_release( candidate );
candidate = img;
@@ -393,17 +395,17 @@ dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places tha
{
if ( image == NULL ) return NULL ;
int i;
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
for (i = 0; i < _num_images; ++i) {
if ( _images[i] == image ) {
- spin_lock( &image->lock );
- spin_unlock( &imageListLock );
+ mutex_lock( &image->lock );
+ mutex_unlock( &imageListLock );
image->users++;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return image;
}
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return NULL ;
}
@@ -416,14 +418,14 @@ dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places tha
dnbd3_image_t* image_release(dnbd3_image_t *image)
{
if ( image == NULL ) return NULL;
- spin_lock( &imageListLock );
- spin_lock( &image->lock );
+ mutex_lock( &imageListLock );
+ mutex_lock( &image->lock );
assert( image->users > 0 );
image->users--;
bool inUse = image->users != 0;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( inUse ) { // Still in use, do nothing
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return NULL;
}
// Getting here means we decreased the usage counter to zero
@@ -431,11 +433,11 @@ dnbd3_image_t* image_release(dnbd3_image_t *image)
// responsible for freeing it
for (int i = 0; i < _num_images; ++i) {
if ( _images[i] == image ) { // Found, do nothing
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return NULL;
}
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
// So it wasn't in the images list anymore either, get rid of it
if ( !inUse ) image = image_free( image );
return NULL;
@@ -467,8 +469,8 @@ static bool isForbiddenExtension(const char* name)
static dnbd3_image_t* image_remove(dnbd3_image_t *image)
{
bool mustFree = false;
- spin_lock( &imageListLock );
- spin_lock( &image->lock );
+ mutex_lock( &imageListLock );
+ mutex_lock( &image->lock );
for ( int i = _num_images - 1; i >= 0; --i ) {
if ( _images[i] == image ) {
_images[i] = NULL;
@@ -476,8 +478,8 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image)
}
if ( _images[i] == NULL && i + 1 == _num_images ) _num_images--;
}
- spin_unlock( &image->lock );
- spin_unlock( &imageListLock );
+ mutex_unlock( &image->lock );
+ mutex_unlock( &imageListLock );
if ( mustFree ) image = image_free( image );
return image;
}
@@ -488,22 +490,22 @@ static dnbd3_image_t* image_remove(dnbd3_image_t *image)
void image_killUplinks()
{
int i;
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
for (i = 0; i < _num_images; ++i) {
if ( _images[i] == NULL ) continue;
- spin_lock( &_images[i]->lock );
+ mutex_lock( &_images[i]->lock );
if ( _images[i]->uplink != NULL ) {
- spin_lock( &_images[i]->uplink->queueLock );
+ mutex_lock( &_images[i]->uplink->queueLock );
if ( !_images[i]->uplink->shutdown ) {
thread_detach( _images[i]->uplink->thread );
_images[i]->uplink->shutdown = true;
}
- spin_unlock( &_images[i]->uplink->queueLock );
+ mutex_unlock( &_images[i]->uplink->queueLock );
signal_call( _images[i]->uplink->signal );
}
- spin_unlock( &_images[i]->lock );
+ mutex_unlock( &_images[i]->lock );
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
}
/**
@@ -518,14 +520,14 @@ bool image_loadAll(char *path)
dnbd3_image_t *imgHandle;
if ( path == NULL ) path = _basePath;
- if ( pthread_mutex_trylock( &reloadLock ) != 0 ) {
+ if ( mutex_trylock( &reloadLock ) != 0 ) {
logadd( LOG_MINOR, "Could not (re)load image list, already in progress." );
return false;
}
if ( _removeMissingImages ) {
// Check if all loaded images still exist on disk
logadd( LOG_INFO, "Checking for vanished images" );
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
for ( int i = _num_images - 1; i >= 0; --i ) {
if ( _shutdown ) break;
if ( _images[i] == NULL ) {
@@ -534,38 +536,38 @@ bool image_loadAll(char *path)
}
imgId = _images[i]->id;
snprintf( imgPath, PATHLEN, "%s", _images[i]->path );
- spin_unlock( &imageListLock ); // isReadable hits the fs; unlock
+ mutex_unlock( &imageListLock ); // isReadable hits the fs; unlock
// Check if fill can still be opened for reading
ret = file_isReadable( imgPath );
// Lock again, see if image is still there, free if required
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
if ( ret || i >= _num_images || _images[i] == NULL || _images[i]->id != imgId ) continue;
// Image needs to be removed
imgHandle = _images[i];
_images[i] = NULL;
if ( i + 1 == _num_images ) _num_images--;
- spin_lock( &imgHandle->lock );
+ mutex_lock( &imgHandle->lock );
const bool freeImg = ( imgHandle->users == 0 );
- spin_unlock( &imgHandle->lock );
+ mutex_unlock( &imgHandle->lock );
// We unlocked, but the image has been removed from the list already, so
// there's no way the users-counter can increase at this point.
if ( freeImg ) {
// Image is not in use anymore, free the dangling entry immediately
- spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock
+ mutex_unlock( &imageListLock ); // image_free might do several fs operations; unlock
image_free( imgHandle );
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
}
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
if ( _shutdown ) {
- pthread_mutex_unlock( &reloadLock );
+ mutex_unlock( &reloadLock );
return true;
}
}
// Now scan for new images
logadd( LOG_INFO, "Scanning for new or modified images" );
ret = image_load_all_internal( path, path );
- pthread_mutex_unlock( &reloadLock );
+ mutex_unlock( &reloadLock );
logadd( LOG_INFO, "Finished scanning %s", path );
return ret;
}
@@ -577,18 +579,18 @@ bool image_loadAll(char *path)
*/
bool image_tryFreeAll()
{
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
for (int i = _num_images - 1; i >= 0; --i) {
if ( _images[i] != NULL && _images[i]->users == 0 ) { // XXX Data race...
dnbd3_image_t *image = _images[i];
_images[i] = NULL;
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
image = image_free( image );
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
}
if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--;
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return _num_images == 0;
}
@@ -604,7 +606,7 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image)
}
//
uplink_shutdown( image );
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
free( image->cache_map );
free( image->crc32 );
free( image->path );
@@ -613,9 +615,9 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image)
image->crc32 = NULL;
image->path = NULL;
image->name = NULL;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( image->readFd != -1 ) close( image->readFd );
- spin_destroy( &image->lock );
+ mutex_destroy( &image->lock );
//
memset( image, 0, sizeof(*image) );
free( image );
@@ -700,7 +702,7 @@ static bool image_addToList(dnbd3_image_t *image)
{
int i;
static int imgIdCounter = 0; // Used to assign unique numeric IDs to images
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
// Now we're locked, assign unique ID to image (unique for this running server instance!)
image->id = ++imgIdCounter;
for ( i = 0; i < _num_images; ++i ) {
@@ -710,12 +712,12 @@ static bool image_addToList(dnbd3_image_t *image)
}
if ( i >= _num_images ) {
if ( _num_images >= _maxImages ) {
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return false;
}
_images[_num_images++] = image;
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return true;
}
@@ -880,7 +882,7 @@ static bool image_load(char *base, char *path, int withUplink)
image->working = (image->cache_map == NULL );
timing_get( &image->nextCompletenessEstimate );
image->completenessEstimate = -1;
- spin_init( &image->lock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &image->lock );
int32_t offset;
if ( stat( path, &st ) == 0 ) {
// Negatively offset atime by file modification time
@@ -1152,12 +1154,12 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
char *cmpname = name;
int useIndex = -1, fallbackIndex = 0;
if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN;
- pthread_mutex_lock( &remoteCloneLock );
+ mutex_lock( &remoteCloneLock );
for (int i = 0; i < CACHELEN; ++i) {
if ( remoteCloneCache[i].rid == revision && strcmp( cmpname, remoteCloneCache[i].name ) == 0 ) {
useIndex = i;
if ( timing_reached( &remoteCloneCache[i].deadline, &now ) ) break;
- pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked...
+ mutex_unlock( &remoteCloneLock ); // Was recently checked...
return image;
}
if ( timing_1le2( &remoteCloneCache[i].deadline, &remoteCloneCache[fallbackIndex].deadline ) ) {
@@ -1169,7 +1171,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
if ( revision != 0 ) {
if ( image == NULL ) image = image_get( name, revision, true );
if ( image != NULL ) {
- pthread_mutex_unlock( &remoteCloneLock );
+ mutex_unlock( &remoteCloneLock );
return image;
}
}
@@ -1182,7 +1184,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
timing_set( &remoteCloneCache[useIndex].deadline, &now, SERVER_REMOTE_IMAGE_CHECK_CACHETIME );
snprintf( remoteCloneCache[useIndex].name, NAMELEN, "%s", cmpname );
remoteCloneCache[useIndex].rid = revision;
- pthread_mutex_unlock( &remoteCloneLock );
+ mutex_unlock( &remoteCloneLock );
// Get some alt servers and try to get the image from there
#define REP_NUM_SRV (8)
@@ -1229,7 +1231,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
logadd( LOG_MINOR, "Won't proxy '%s:%d': Larger than maxReplicationSize", name, (int)revision );
goto server_fail;
}
- pthread_mutex_lock( &reloadLock );
+ mutex_lock( &reloadLock );
// Ensure disk space entirely if not using sparse files, otherwise just make sure we have some room at least
if ( _sparseFiles ) {
ok = image_ensureDiskSpace( 2ull * 1024 * 1024 * 1024, false ); // 2GiB, maybe configurable one day
@@ -1237,7 +1239,7 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
ok = image_ensureDiskSpace( remoteImageSize + ( 10 * 1024 * 1024 ), false ); // some extra space for cache map etc.
}
ok = ok && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img
- pthread_mutex_unlock( &reloadLock );
+ mutex_unlock( &reloadLock );
if ( !ok ) goto server_fail;
// Cloning worked :-)
@@ -1343,18 +1345,18 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
}
// Now lock on the loading mutex, then check again if the image exists (we're multi-threaded)
- pthread_mutex_lock( &reloadLock );
+ mutex_lock( &reloadLock );
dnbd3_image_t* image = image_get( name, detectedRid, true );
if ( image != NULL ) {
// The image magically appeared in the meantime
logadd( LOG_DEBUG2, "Magically appeared" );
- pthread_mutex_unlock( &reloadLock );
+ mutex_unlock( &reloadLock );
return image;
}
// Still not loaded, let's try to do so
logadd( LOG_DEBUG2, "Calling load" );
image_load( _basePath, imageFile, false );
- pthread_mutex_unlock( &reloadLock );
+ mutex_unlock( &reloadLock );
// If loading succeeded, this will return the image
logadd( LOG_DEBUG2, "Calling get" );
return image_get( name, requestedRid, true );
@@ -1507,12 +1509,12 @@ json_t* image_getListAsJson()
int users, completeness, idleTime;
declare_now;
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
for ( i = 0; i < _num_images; ++i ) {
if ( _images[i] == NULL ) continue;
dnbd3_image_t *image = _images[i];
- spin_lock( &image->lock );
- spin_unlock( &imageListLock );
+ mutex_lock( &image->lock );
+ mutex_unlock( &imageListLock );
users = image->users;
idleTime = (int)timing_diff( &image->atime, &now );
completeness = image_getCompletenessEstimate( image );
@@ -1526,7 +1528,7 @@ json_t* image_getListAsJson()
}
}
image->users++; // Prevent freeing after we unlock
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
jsonImage = json_pack( "{sisssisisisisI}",
"id", image->id, // id, name, rid never change, so access them without locking
@@ -1545,9 +1547,9 @@ json_t* image_getListAsJson()
json_array_append_new( imagesJson, jsonImage );
image = image_release( image ); // Since we did image->users++;
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
return imagesJson;
}
@@ -1655,9 +1657,9 @@ static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_
bool image_ensureDiskSpaceLocked(uint64_t size, bool force)
{
bool ret;
- pthread_mutex_lock( &reloadLock );
+ mutex_lock( &reloadLock );
ret = image_ensureDiskSpace( size, force );
- pthread_mutex_unlock( &reloadLock );
+ mutex_unlock( &reloadLock );
return ret;
}
@@ -1739,13 +1741,13 @@ void image_closeUnusedFd()
ticks deadline;
timing_gets( &deadline, -UNUSED_FD_TIMEOUT );
char imgstr[300];
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
for (i = 0; i < _num_images; ++i) {
dnbd3_image_t * const image = _images[i];
if ( image == NULL )
continue;
- spin_lock( &image->lock );
- spin_unlock( &imageListLock );
+ mutex_lock( &image->lock );
+ mutex_unlock( &imageListLock );
if ( image->users == 0 && image->uplink == NULL && timing_reached( &image->atime, &deadline ) ) {
snprintf( imgstr, sizeof(imgstr), "%s:%d", image->name, (int)image->rid );
fd = image->readFd;
@@ -1753,14 +1755,14 @@ void image_closeUnusedFd()
} else {
fd = -1;
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( fd != -1 ) {
close( fd );
logadd( LOG_DEBUG1, "Inactive fd closed for %s", imgstr );
}
- spin_lock( &imageListLock );
+ mutex_lock( &imageListLock );
}
- spin_unlock( &imageListLock );
+ mutex_unlock( &imageListLock );
}
/*
diff --git a/src/server/integrity.c b/src/server/integrity.c
index 88b7487..8f17855 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -39,11 +39,11 @@ static void* integrity_main(void *data);
void integrity_init()
{
assert( queueLen == -1 );
- pthread_mutex_init( &integrityQueueLock, NULL );
+ mutex_init( &integrityQueueLock );
pthread_cond_init( &queueSignal, NULL );
- pthread_mutex_lock( &integrityQueueLock );
+ mutex_lock( &integrityQueueLock );
queueLen = 0;
- pthread_mutex_unlock( &integrityQueueLock );
+ mutex_unlock( &integrityQueueLock );
bRunning = true;
if ( 0 != thread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) {
bRunning = false;
@@ -56,13 +56,13 @@ void integrity_shutdown()
{
assert( queueLen != -1 );
logadd( LOG_DEBUG1, "Shutting down integrity checker...\n" );
- pthread_mutex_lock( &integrityQueueLock );
+ mutex_lock( &integrityQueueLock );
pthread_cond_signal( &queueSignal );
- pthread_mutex_unlock( &integrityQueueLock );
+ mutex_unlock( &integrityQueueLock );
thread_join( thread, NULL );
while ( bRunning )
usleep( 10000 );
- pthread_mutex_destroy( &integrityQueueLock );
+ mutex_destroy( &integrityQueueLock );
pthread_cond_destroy( &queueSignal );
logadd( LOG_DEBUG1, "Integrity checker exited normally.\n" );
}
@@ -80,7 +80,7 @@ void integrity_check(dnbd3_image_t *image, int block)
return;
}
int i, freeSlot = -1;
- pthread_mutex_lock( &integrityQueueLock );
+ mutex_lock( &integrityQueueLock );
for (i = 0; i < queueLen; ++i) {
if ( freeSlot == -1 && checkQueue[i].image == NULL ) {
freeSlot = i;
@@ -92,13 +92,13 @@ void integrity_check(dnbd3_image_t *image, int block)
checkQueue[i].count += 1;
}
logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (%d +%d)", i, queueLen, checkQueue[i].block, checkQueue[i].count );
- pthread_mutex_unlock( &integrityQueueLock );
+ mutex_unlock( &integrityQueueLock );
return;
}
}
if ( freeSlot == -1 ) {
if ( queueLen >= CHECK_QUEUE_SIZE ) {
- pthread_mutex_unlock( &integrityQueueLock );
+ mutex_unlock( &integrityQueueLock );
logadd( LOG_INFO, "Check queue full, discarding check request...\n" );
return;
}
@@ -113,7 +113,7 @@ void integrity_check(dnbd3_image_t *image, int block)
checkQueue[freeSlot].count = 1;
}
pthread_cond_signal( &queueSignal );
- pthread_mutex_unlock( &integrityQueueLock );
+ mutex_unlock( &integrityQueueLock );
}
static void* integrity_main(void * data UNUSED)
@@ -130,10 +130,10 @@ static void* integrity_main(void * data UNUSED)
pid_t tid = (pid_t)syscall( SYS_gettid );
setpriority( PRIO_PROCESS, tid, 10 );
#endif
- pthread_mutex_lock( &integrityQueueLock );
+ mutex_lock( &integrityQueueLock );
while ( !_shutdown ) {
if ( queueLen == 0 ) {
- pthread_cond_wait( &queueSignal, &integrityQueueLock );
+ mutex_cond_wait( &queueSignal, &integrityQueueLock );
}
for (i = queueLen - 1; i >= 0; --i) {
if ( _shutdown ) break;
@@ -146,10 +146,10 @@ static void* integrity_main(void * data UNUSED)
// We have the image. Call image_release() some time
const int qCount = checkQueue[i].count;
bool foundCorrupted = false;
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->crc32 != NULL && image->realFilesize != 0 ) {
int blocks[2] = { checkQueue[i].block, -1 };
- pthread_mutex_unlock( &integrityQueueLock );
+ mutex_unlock( &integrityQueueLock );
// Make copy of crc32 list as it might go away
const uint64_t fileSize = image->realFilesize;
const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize);
@@ -160,7 +160,7 @@ static void* integrity_main(void * data UNUSED)
buffer = malloc( bufferSize );
}
memcpy( buffer, image->crc32, required );
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
// Open for direct I/O if possible; this prevents polluting the fs cache
int fd = open( image->path, O_RDONLY | O_DIRECT );
bool direct = fd != -1;
@@ -178,9 +178,9 @@ static void* integrity_main(void * data UNUSED)
bool complete = true;
if ( qCount == CHECK_ALL ) {
// When checking full image, skip incomplete blocks, otherwise assume block is complete
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
complete = image_isHashBlockComplete( image->cache_map, blocks[0], fileSize );
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
}
#if defined(linux) || defined(__linux)
if ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) {
@@ -220,7 +220,7 @@ static void* integrity_main(void * data UNUSED)
close( fd );
}
}
- pthread_mutex_lock( &integrityQueueLock );
+ mutex_lock( &integrityQueueLock );
assert( checkQueue[i].image == image );
if ( qCount != CHECK_ALL ) {
// Not a full check; update the counter
@@ -238,25 +238,25 @@ static void* integrity_main(void * data UNUSED)
if ( i + 1 == queueLen ) queueLen--;
// Mark as working again if applicable
if ( !foundCorrupted ) {
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper?
image->working = image->uplink->fd != -1 && image->readFd != -1;
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
}
} else {
// Still more blocks to go...
checkQueue[i].block = blocks[0];
}
} else {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
}
if ( foundCorrupted ) {
// Something was fishy, make sure uplink exists
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
image->working = false;
bool restart = image->uplink == NULL || image->uplink->shutdown;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( restart ) {
uplink_shutdown( image );
uplink_init( image, -1, NULL, -1 );
@@ -266,7 +266,7 @@ static void* integrity_main(void * data UNUSED)
image_release( image );
}
}
- pthread_mutex_unlock( &integrityQueueLock );
+ mutex_unlock( &integrityQueueLock );
if ( buffer != NULL ) free( buffer );
bRunning = false;
return NULL;
diff --git a/src/server/locks.c b/src/server/locks.c
index 71a1845..a5b7c76 100644
--- a/src/server/locks.c
+++ b/src/server/locks.c
@@ -38,23 +38,23 @@ int debugThreadCount = 0;
static debug_lock_t locks[MAXLOCKS];
static debug_thread_t threads[MAXTHREADS];
static int init_done = 0;
-static pthread_spinlock_t initdestory;
+static pthread_mutex_t initdestory;
static int lockId = 0;
static pthread_t watchdog = 0;
static dnbd3_signal_t* watchdogSignal = NULL;
static void *debug_thread_watchdog(void *something);
-int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared)
+int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock)
{
if ( !init_done ) {
memset( locks, 0, MAXLOCKS * sizeof(debug_lock_t) );
memset( threads, 0, MAXTHREADS * sizeof(debug_thread_t) );
- pthread_spin_init( &initdestory, PTHREAD_PROCESS_PRIVATE );
+ pthread_mutex_init( &initdestory, NULL );
init_done = 1;
}
int first = -1;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXLOCKS; ++i) {
if ( locks[i].lock == lock ) {
logadd( LOG_ERROR, "Lock %p (%s) already initialized (%s:%d)\n", (void*)lock, name, file, line );
@@ -64,7 +64,7 @@ int debug_spin_init(const char *name, const char *file, int line, pthread_spinlo
}
if ( first == -1 ) {
logadd( LOG_ERROR, "No more free debug locks (%s:%d)\n", file, line );
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
debug_dump_lock_stats();
exit( 4 );
}
@@ -72,28 +72,28 @@ int debug_spin_init(const char *name, const char *file, int line, pthread_spinlo
locks[first].locked = 0;
snprintf( locks[first].name, LOCKLEN, "%s", name );
snprintf( locks[first].where, LOCKLEN, "I %s:%d", file, line );
- pthread_spin_unlock( &initdestory );
- return pthread_spin_init( lock, shared );
+ pthread_mutex_unlock( &initdestory );
+ return pthread_mutex_init( lock, NULL );
}
-int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock)
+int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock)
{
debug_lock_t *l = NULL;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXLOCKS; ++i) {
if ( locks[i].lock == lock ) {
l = &locks[i];
break;
}
}
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
if ( l == NULL ) {
logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line );
debug_dump_lock_stats();
exit( 4 );
}
debug_thread_t *t = NULL;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXTHREADS; ++i) {
if ( threads[i].tid != 0 ) continue;
threads[i].tid = pthread_self();
@@ -103,15 +103,15 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo
t = &threads[i];
break;
}
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
if ( t == NULL ) {
logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for lock %p (%s) at %s:%d\n", (void*)lock, name, file, line );
exit( 4 );
}
- const int retval = pthread_spin_lock( lock );
- pthread_spin_lock( &initdestory );
+ const int retval = pthread_mutex_lock( lock );
+ pthread_mutex_lock( &initdestory );
t->tid = 0;
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
if ( l->locked ) {
logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line );
exit( 4 );
@@ -120,30 +120,30 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo
timing_get( &l->locktime );
l->thread = pthread_self();
snprintf( l->where, LOCKLEN, "L %s:%d", file, line );
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
l->lockId = ++lockId;
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
return retval;
}
-int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock)
+int debug_mutex_trylock(const char *name, const char *file, int line, pthread_mutex_t *lock)
{
debug_lock_t *l = NULL;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXLOCKS; ++i) {
if ( locks[i].lock == lock ) {
l = &locks[i];
break;
}
}
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
if ( l == NULL ) {
logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line );
debug_dump_lock_stats();
exit( 4 );
}
debug_thread_t *t = NULL;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXTHREADS; ++i) {
if ( threads[i].tid != 0 ) continue;
threads[i].tid = pthread_self();
@@ -153,15 +153,15 @@ int debug_spin_trylock(const char *name, const char *file, int line, pthread_spi
t = &threads[i];
break;
}
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
if ( t == NULL ) {
logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for %p (%s) at %s:%d\n", (void*)lock, name, file, line );
exit( 4 );
}
- const int retval = pthread_spin_trylock( lock );
- pthread_spin_lock( &initdestory );
+ const int retval = pthread_mutex_trylock( lock );
+ pthread_mutex_lock( &initdestory );
t->tid = 0;
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
if ( retval == 0 ) {
if ( l->locked ) {
logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line );
@@ -171,24 +171,24 @@ int debug_spin_trylock(const char *name, const char *file, int line, pthread_spi
timing_get( &l->locktime );
l->thread = pthread_self();
snprintf( l->where, LOCKLEN, "L %s:%d", file, line );
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
l->lockId = ++lockId;
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
}
return retval;
}
-int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock)
+int debug_mutex_unlock(const char *name, const char *file, int line, pthread_mutex_t *lock)
{
debug_lock_t *l = NULL;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXLOCKS; ++i) {
if ( locks[i].lock == lock ) {
l = &locks[i];
break;
}
}
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
if ( l == NULL ) {
logadd( LOG_ERROR, "Tried to unlock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line );
exit( 4 );
@@ -200,13 +200,58 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin
l->locked = 0;
l->thread = 0;
snprintf( l->where, LOCKLEN, "U %s:%d", file, line );
- int retval = pthread_spin_unlock( lock );
+ int retval = pthread_mutex_unlock( lock );
return retval;
}
-int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock)
+int debug_mutex_cond_wait(const char *name, const char *file, int line, pthread_cond_t *restrict cond, pthread_mutex_t *restrict lock)
{
- pthread_spin_lock( &initdestory );
+ debug_lock_t *l = NULL;
+ pthread_mutex_lock( &initdestory );
+ for (int i = 0; i < MAXLOCKS; ++i) {
+ if ( locks[i].lock == lock ) {
+ l = &locks[i];
+ break;
+ }
+ }
+ pthread_mutex_unlock( &initdestory );
+ if ( l == NULL ) {
+ logadd( LOG_ERROR, "Tried to cond_wait on uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line );
+ exit( 4 );
+ }
+ if ( !l->locked ) {
+ logadd( LOG_ERROR, "Cond_wait sanity check: lock %p (%s) not locked at %s:%d\n", (void*)lock, name, file, line );
+ exit( 4 );
+ }
+ pthread_t self = pthread_self();
+ if ( l->thread != self ) {
+ logadd( LOG_ERROR, "Cond_wait called from non-owning thread for %p (%s) at %s:%d\n", (void*)lock, name, file, line );
+ exit( 4 );
+ }
+ l->locked = 0;
+ l->thread = 0;
+ snprintf( l->where, LOCKLEN, "CW %s:%d", file, line );
+ int retval = pthread_cond_wait( cond, lock );
+ if ( retval != 0 ) {
+ logadd( LOG_ERROR, "pthread_cond_wait returned %d for lock %p (%s) at %s:%d\n", retval, (void*)lock, name, file, line );
+ exit( 4 );
+ }
+ if ( l->locked != 0 || l->thread != 0 ) {
+ logadd( LOG_ERROR, "Lock is not free after returning from pthread_cond_wait for %p (%s) at %s:%d\n", (void*)lock, name, file, line );
+ exit( 4 );
+ }
+ l->locked = 1;
+ l->thread = self;
+ timing_get( &l->locktime );
+ pthread_mutex_lock( &initdestory );
+ l->lockId = ++lockId;
+ pthread_mutex_unlock( &initdestory );
+ return retval;
+}
+
+int debug_mutex_destroy(const char *name, const char *file, int line, pthread_mutex_t *lock)
+{
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXLOCKS; ++i) {
if ( locks[i].lock == lock ) {
if ( locks[i].locked ) {
@@ -215,8 +260,8 @@ int debug_spin_destroy(const char *name, const char *file, int line, pthread_spi
}
locks[i].lock = NULL;
snprintf( locks[i].where, LOCKLEN, "D %s:%d", file, line );
- pthread_spin_unlock( &initdestory );
- return pthread_spin_destroy( lock );
+ pthread_mutex_unlock( &initdestory );
+ return pthread_mutex_destroy( lock );
}
}
logadd( LOG_ERROR, "Tried to destroy non-existent lock %p (%s) at %s:%d\n", (void*)lock, name, file, line );
@@ -226,7 +271,7 @@ int debug_spin_destroy(const char *name, const char *file, int line, pthread_spi
void debug_dump_lock_stats()
{
declare_now;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
printf( "\n **** LOCKS ****\n\n" );
for (int i = 0; i < MAXLOCKS; ++i) {
if ( locks[i].lock == NULL ) continue;
@@ -252,7 +297,7 @@ void debug_dump_lock_stats()
"* Where: %s\n"
"* How long: %d secs\n", (int)threads[i].tid, threads[i].name, threads[i].where, (int)timing_diff( &threads[i].time, &now ) );
}
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
}
static void *debug_thread_watchdog(void *something UNUSED)
@@ -261,18 +306,18 @@ static void *debug_thread_watchdog(void *something UNUSED)
while ( !_shutdown ) {
if ( init_done ) {
declare_now;
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
for (int i = 0; i < MAXTHREADS; ++i) {
if ( threads[i].tid == 0 ) continue;
const uint32_t diff = timing_diff( &threads[i].time, &now );
if ( diff > 6 && diff < 100000 ) {
printf( "\n\n +++++++++ DEADLOCK ++++++++++++\n\n" );
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
debug_dump_lock_stats();
exit( 99 );
}
}
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
}
if ( watchdogSignal == NULL || signal_wait( watchdogSignal, 5000 ) == SIGNAL_ERROR ) sleep( 5 );
}
@@ -297,9 +342,9 @@ void debug_locks_stop_watchdog()
#ifdef _DEBUG
_shutdown = true;
printf( "Killing debug watchdog...\n" );
- pthread_spin_lock( &initdestory );
+ pthread_mutex_lock( &initdestory );
signal_call( watchdogSignal );
- pthread_spin_unlock( &initdestory );
+ pthread_mutex_unlock( &initdestory );
thread_join( watchdog, NULL );
signal_close( watchdogSignal );
#endif
diff --git a/src/server/locks.h b/src/server/locks.h
index 16b59a7..859697c 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -8,28 +8,31 @@
#ifdef _DEBUG
-#define spin_init( lock, type ) debug_spin_init( #lock, __FILE__, __LINE__, lock, type)
-#define spin_lock( lock ) debug_spin_lock( #lock, __FILE__, __LINE__, lock)
-#define spin_trylock( lock ) debug_spin_trylock( #lock, __FILE__, __LINE__, lock)
-#define spin_unlock( lock ) debug_spin_unlock( #lock, __FILE__, __LINE__, lock)
-#define spin_destroy( lock ) debug_spin_destroy( #lock, __FILE__, __LINE__, lock)
-
-int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared);
-int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
-int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
-int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
-int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock);
+#define mutex_init( lock ) debug_mutex_init( #lock, __FILE__, __LINE__, lock)
+#define mutex_lock( lock ) debug_mutex_lock( #lock, __FILE__, __LINE__, lock)
+#define mutex_trylock( lock ) debug_mutex_trylock( #lock, __FILE__, __LINE__, lock)
+#define mutex_unlock( lock ) debug_mutex_unlock( #lock, __FILE__, __LINE__, lock)
+#define mutex_cond_wait( cond, lock ) debug_mutex_cond_wait( #lock, __FILE__, __LINE__, cond, lock)
+#define mutex_destroy( lock ) debug_mutex_destroy( #lock, __FILE__, __LINE__, lock)
+
+int debug_mutex_init(const char *name, const char *file, int line, pthread_mutex_t *lock);
+int debug_mutex_lock(const char *name, const char *file, int line, pthread_mutex_t *lock);
+int debug_mutex_trylock(const char *name, const char *file, int line, pthread_mutex_t *lock);
+int debug_mutex_unlock(const char *name, const char *file, int line, pthread_mutex_t *lock);
+int debug_mutex_cond_wait(const char *name, const char *file, int line, pthread_cond_t *restrict cond, pthread_mutex_t *restrict lock);
+int debug_mutex_destroy(const char *name, const char *file, int line, pthread_mutex_t *lock);
void debug_dump_lock_stats();
#else
-#define spin_init( lock, type ) pthread_spin_init(lock, type)
-#define spin_lock( lock ) pthread_spin_lock(lock)
-#define spin_trylock( lock ) pthread_spin_trylock(lock)
-#define spin_unlock( lock ) pthread_spin_unlock(lock)
-#define spin_destroy( lock ) pthread_spin_destroy(lock)
+#define mutex_init( lock ) pthread_mutex_init(lock, NULL)
+#define mutex_lock( lock ) pthread_mutex_lock(lock)
+#define mutex_trylock( lock ) pthread_mutex_trylock(lock)
+#define mutex_unlock( lock ) pthread_mutex_unlock(lock)
+#define mutex_cond_wait( lock ) pthread_cond_wait(cond, lock)
+#define mutex_destroy( lock ) pthread_mutex_destroy(lock)
#endif
diff --git a/src/server/net.c b/src/server/net.c
index 00e88e0..9abe221 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -46,7 +46,7 @@
static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS];
static int _num_clients = 0;
-static pthread_spinlock_t _clients_lock;
+static pthread_mutex_t _clients_lock;
static char nullbytes[500];
@@ -145,7 +145,7 @@ static inline bool sendPadding( const int fd, uint32_t bytes )
void net_init()
{
- spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &_clients_lock );
}
void* net_handleNewConnection(void *clientPtr)
@@ -186,13 +186,13 @@ void* net_handleNewConnection(void *clientPtr)
}
} while (0);
// Fully init client struct
- spin_init( &client->lock, PTHREAD_PROCESS_PRIVATE );
- pthread_mutex_init( &client->sendMutex, NULL );
+ mutex_init( &client->lock );
+ mutex_init( &client->sendMutex );
- spin_lock( &client->lock );
+ mutex_lock( &client->lock );
host_to_string( &client->host, client->hostName, HOSTNAMELEN );
client->hostName[HOSTNAMELEN-1] = '\0';
- spin_unlock( &client->lock );
+ mutex_unlock( &client->lock );
client->bytesSent = 0;
if ( !addToList( client ) ) {
@@ -255,9 +255,9 @@ void* net_handleNewConnection(void *clientPtr)
// No BGR mismatch, but don't lookup if image is unknown locally
image = image_get( image_name, rid, true );
}
- spin_lock( &client->lock );
+ mutex_lock( &client->lock );
client->image = image;
- spin_unlock( &client->lock );
+ mutex_unlock( &client->lock );
if ( image == NULL ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
} else if ( !image->working ) {
@@ -268,24 +268,24 @@ void* net_handleNewConnection(void *clientPtr)
// Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
bOk = true;
if ( image->cache_map != NULL ) {
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
bOk = ( rand() % 4 ) == 1;
}
penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( penalty ) { // Wait 100ms if local caching is not working so this
usleep( 100000 ); // server gets a penalty and is less likely to be selected
}
}
if ( bOk ) {
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
image_file = image->readFd;
if ( !client->isServer ) {
// Only update immediately if this is a client. Servers are handled on disconnect.
timing_get( &image->atime );
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
serializer_reset_write( &payload );
serializer_put_uint16( &payload, client_version < 3 ? client_version : PROTOCOL_VERSION ); // XXX: Since messed up fuse client was messed up before :(
serializer_put_string( &payload, image->name );
@@ -337,7 +337,7 @@ void* net_handleNewConnection(void *clientPtr)
start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
bool isCached = true;
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
// Check again as we only aquired the lock just now
if ( image->cache_map != NULL ) {
const uint64_t firstByteInMap = start >> 15;
@@ -382,7 +382,7 @@ void* net_handleNewConnection(void *clientPtr)
}
}
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) {
logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d",
@@ -400,10 +400,10 @@ void* net_handleNewConnection(void *clientPtr)
fixup_reply( reply );
const bool lock = image->uplink != NULL;
- if ( lock ) pthread_mutex_lock( &client->sendMutex );
+ if ( lock ) mutex_lock( &client->sendMutex );
// Send reply header
if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) {
- if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ if ( lock ) mutex_unlock( &client->sendMutex );
logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK reply header to %s failed", client->hostName );
goto exit_client_cleanup;
}
@@ -450,7 +450,7 @@ void* net_handleNewConnection(void *clientPtr)
sent = -1;
}
#endif
- if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ if ( lock ) mutex_unlock( &client->sendMutex );
if ( sent == -1 ) {
if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN
&& err != EAGAIN && err != EWOULDBLOCK ) {
@@ -468,12 +468,12 @@ void* net_handleNewConnection(void *clientPtr)
}
if ( request.size > (uint32_t)realBytes ) {
if ( !sendPadding( client->sock, request.size - (uint32_t)realBytes ) ) {
- if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ if ( lock ) mutex_unlock( &client->sendMutex );
goto exit_client_cleanup;
}
}
}
- if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ if ( lock ) mutex_unlock( &client->sendMutex );
// Global per-client counter
client->bytesSent += request.size; // Increase counter for statistics.
break;
@@ -483,18 +483,18 @@ void* net_handleNewConnection(void *clientPtr)
num = altservers_getListForClient( &client->host, server_list, NUMBER_SERVERS );
reply.cmd = CMD_GET_SERVERS;
reply.size = (uint32_t)( num * sizeof(dnbd3_server_entry_t) );
- pthread_mutex_lock( &client->sendMutex );
+ mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, server_list );
- pthread_mutex_unlock( &client->sendMutex );
+ mutex_unlock( &client->sendMutex );
goto set_name;
break;
case CMD_KEEPALIVE:
reply.cmd = CMD_KEEPALIVE;
reply.size = 0;
- pthread_mutex_lock( &client->sendMutex );
+ mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
- pthread_mutex_unlock( &client->sendMutex );
+ mutex_unlock( &client->sendMutex );
set_name: ;
if ( !hasName ) {
hasName = true;
@@ -508,7 +508,7 @@ set_name: ;
case CMD_GET_CRC32:
reply.cmd = CMD_GET_CRC32;
- pthread_mutex_lock( &client->sendMutex );
+ mutex_lock( &client->sendMutex );
if ( image->crc32 == NULL ) {
reply.size = 0;
send_reply( client->sock, &reply, NULL );
@@ -518,7 +518,7 @@ set_name: ;
send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE );
send( client->sock, image->crc32, size - sizeof(uint32_t), 0 );
}
- pthread_mutex_unlock( &client->sendMutex );
+ mutex_unlock( &client->sendMutex );
break;
default:
@@ -534,11 +534,11 @@ exit_client_cleanup: ;
totalBytesSent += client->bytesSent;
// Access time, but only if client didn't just probe
if ( image != NULL ) {
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) {
timing_get( &image->atime );
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
}
freeClientStruct( client ); // This will also call image_release on client->image
return NULL ;
@@ -560,30 +560,30 @@ struct json_t* net_getListAsJson()
char host[HOSTNAMELEN];
host[HOSTNAMELEN-1] = '\0';
- spin_lock( &_clients_lock );
+ mutex_lock( &_clients_lock );
for ( int i = 0; i < _num_clients; ++i ) {
dnbd3_client_t * const client = _clients[i];
if ( client == NULL || client->image == NULL )
continue;
- spin_lock( &client->lock );
+ mutex_lock( &client->lock );
// Unlock so we give other threads a chance to access the client list.
// We might not get an atomic snapshot of the currently connected clients,
// but that doesn't really make a difference anyways.
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
strncpy( host, client->hostName, HOSTNAMELEN - 1 );
imgId = client->image->id;
isServer = (int)client->isServer;
bytesSent = client->bytesSent;
- spin_unlock( &client->lock );
+ mutex_unlock( &client->lock );
clientStats = json_pack( "{sssisisI}",
"address", host,
"imageId", imgId,
"isServer", isServer,
"bytesSent", (json_int_t)bytesSent );
json_array_append_new( jsonClients, clientStats );
- spin_lock( &_clients_lock );
+ mutex_lock( &_clients_lock );
}
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
return jsonClients;
}
@@ -597,7 +597,7 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent)
int cc = 0, sc = 0;
uint64_t bs = 0;
- spin_lock( &_clients_lock );
+ mutex_lock( &_clients_lock );
for ( int i = 0; i < _num_clients; ++i ) {
const dnbd3_client_t * const client = _clients[i];
if ( client == NULL || client->image == NULL )
@@ -609,7 +609,7 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent)
}
bs += client->bytesSent;
}
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
if ( clientCount != NULL ) {
*clientCount = cc;
}
@@ -624,15 +624,15 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent)
void net_disconnectAll()
{
int i;
- spin_lock( &_clients_lock );
+ mutex_lock( &_clients_lock );
for (i = 0; i < _num_clients; ++i) {
if ( _clients[i] == NULL ) continue;
dnbd3_client_t * const client = _clients[i];
- spin_lock( &client->lock );
+ mutex_lock( &client->lock );
if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR );
- spin_unlock( &client->lock );
+ mutex_unlock( &client->lock );
}
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
}
void net_waitForAllDisconnected()
@@ -640,12 +640,12 @@ void net_waitForAllDisconnected()
int retries = 10, count, i;
do {
count = 0;
- spin_lock( &_clients_lock );
+ mutex_lock( &_clients_lock );
for (i = 0; i < _num_clients; ++i) {
if ( _clients[i] == NULL ) continue;
count++;
}
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
if ( count != 0 ) {
logadd( LOG_INFO, "%d clients still active...\n", count );
sleep( 1 );
@@ -667,14 +667,14 @@ void net_waitForAllDisconnected()
static void removeFromList(dnbd3_client_t *client)
{
int i;
- spin_lock( &_clients_lock );
+ mutex_lock( &_clients_lock );
for ( i = _num_clients - 1; i >= 0; --i ) {
if ( _clients[i] == client ) {
_clients[i] = NULL;
}
if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients;
}
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
}
/**
@@ -685,20 +685,20 @@ static void removeFromList(dnbd3_client_t *client)
*/
static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client)
{
- spin_lock( &client->lock );
- pthread_mutex_lock( &client->sendMutex );
+ mutex_lock( &client->lock );
+ mutex_lock( &client->sendMutex );
if ( client->sock != -1 ) close( client->sock );
client->sock = -1;
- pthread_mutex_unlock( &client->sendMutex );
+ mutex_unlock( &client->sendMutex );
if ( client->image != NULL ) {
- spin_lock( &client->image->lock );
+ mutex_lock( &client->image->lock );
if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client );
- spin_unlock( &client->image->lock );
+ mutex_unlock( &client->image->lock );
client->image = image_release( client->image );
}
- spin_unlock( &client->lock );
- spin_destroy( &client->lock );
- pthread_mutex_destroy( &client->sendMutex );
+ mutex_unlock( &client->lock );
+ mutex_destroy( &client->lock );
+ mutex_destroy( &client->sendMutex );
free( client );
return NULL ;
}
@@ -712,20 +712,20 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client)
static bool addToList(dnbd3_client_t *client)
{
int i;
- spin_lock( &_clients_lock );
+ mutex_lock( &_clients_lock );
for (i = 0; i < _num_clients; ++i) {
if ( _clients[i] != NULL ) continue;
_clients[i] = client;
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
return true;
}
if ( _num_clients >= _maxClients ) {
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
logadd( LOG_ERROR, "Maximum number of clients reached!" );
return false;
}
_clients[_num_clients++] = client;
- spin_unlock( &_clients_lock );
+ mutex_unlock( &_clients_lock );
return true;
}
diff --git a/src/server/rpc.c b/src/server/rpc.c
index 1ea09cb..5dbcafe 100644
--- a/src/server/rpc.c
+++ b/src/server/rpc.c
@@ -72,10 +72,10 @@ static inline bool iequals(struct string *cmpMixed, struct string *cmpLower)
static int aclCount = 0;
static dnbd3_access_rule_t aclRules[MAX_ACLS];
static json_int_t randomRunId;
-static pthread_spinlock_t aclLock;
+static pthread_mutex_t aclLock;
#define MAX_CLIENTS 50
#define CUTOFF_START 40
-static pthread_spinlock_t statusLock;
+static pthread_mutex_t statusLock;
static struct {
int count;
bool overloaded;
@@ -91,8 +91,8 @@ static void loadAcl();
void rpc_init()
{
- spin_init( &aclLock, PTHREAD_PROCESS_PRIVATE );
- spin_init( &statusLock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &aclLock );
+ mutex_init( &statusLock );
randomRunId = (((json_int_t)getpid()) << 16) | (json_int_t)time(NULL);
// </guard>
if ( sizeof(randomRunId) > 4 ) {
@@ -123,10 +123,10 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
return;
}
do {
- spin_lock( &statusLock );
+ mutex_lock( &statusLock );
const int curCount = ++status.count;
UPDATE_LOADSTATE( curCount );
- spin_unlock( &statusLock );
+ mutex_unlock( &statusLock );
if ( curCount > MAX_CLIENTS ) {
sendReply( sock, "503 Service Temporarily Unavailable", "text/plain", "Too many HTTP clients", -1, HTTP_CLOSE );
goto func_return;
@@ -198,9 +198,9 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
if ( minorVersion == 0 || hasHeaderValue( headers, numHeaders, &STR_CONNECTION, &STR_CLOSE ) ) {
keepAlive = HTTP_CLOSE;
} else { // And if there aren't too many active HTTP sessions
- spin_lock( &statusLock );
+ mutex_lock( &statusLock );
if ( status.overloaded ) keepAlive = HTTP_CLOSE;
- spin_unlock( &statusLock );
+ mutex_unlock( &statusLock );
}
}
if ( method.s != NULL && path.s != NULL ) {
@@ -234,10 +234,10 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int
} while (true);
func_return:;
do {
- spin_lock( &statusLock );
+ mutex_lock( &statusLock );
const int curCount = --status.count;
UPDATE_LOADSTATE( curCount );
- spin_unlock( &statusLock );
+ mutex_unlock( &statusLock );
} while (0);
}
@@ -422,7 +422,7 @@ static int getacl(dnbd3_host_t *host)
static void addacl(int argc, char **argv, void *data UNUSED)
{
if ( argv[0][0] == '#' ) return;
- spin_lock( &aclLock );
+ mutex_lock( &aclLock );
if ( aclCount >= MAX_ACLS ) {
logadd( LOG_WARNING, "Too many ACL rules, ignoring %s", argv[0] );
goto unlock_end;
@@ -478,7 +478,7 @@ static void addacl(int argc, char **argv, void *data UNUSED)
// in .bitMask, and compate it, otherwise, a simple memcmp will do.
aclCount++;
unlock_end:;
- spin_unlock( &aclLock );
+ mutex_unlock( &aclLock );
}
static void loadAcl()
@@ -486,18 +486,18 @@ static void loadAcl()
static bool inProgress = false;
char *fn;
if ( asprintf( &fn, "%s/%s", _configDir, "rpc.acl" ) == -1 ) return;
- spin_lock( &aclLock );
+ mutex_lock( &aclLock );
if ( inProgress ) {
- spin_unlock( &aclLock );
+ mutex_unlock( &aclLock );
return;
}
aclCount = 0;
inProgress = true;
- spin_unlock( &aclLock );
+ mutex_unlock( &aclLock );
file_loadLineBased( fn, 1, 20, &addacl, NULL );
- spin_lock( &aclLock );
+ mutex_lock( &aclLock );
inProgress = false;
- spin_unlock( &aclLock );
+ mutex_unlock( &aclLock );
free( fn );
logadd( LOG_INFO, "%d HTTPRPC ACL rules loaded", (int)aclCount );
}
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index b55fe19..dac0980 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -17,12 +17,12 @@ static pthread_attr_t threadAttrs;
static int maxIdleThreads = -1;
static entry_t *pool = NULL;
-static pthread_spinlock_t poolLock;
+static pthread_mutex_t poolLock;
bool threadpool_init(int maxIdle)
{
if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false;
- spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &poolLock );
maxIdleThreads = maxIdle;
pthread_attr_init( &threadAttrs );
pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
@@ -33,7 +33,7 @@ void threadpool_close()
{
_shutdown = true;
if ( maxIdleThreads < 0 ) return;
- spin_lock( &poolLock );
+ mutex_lock( &poolLock );
maxIdleThreads = -1;
entry_t *ptr = pool;
while ( ptr != NULL ) {
@@ -41,16 +41,16 @@ void threadpool_close()
ptr = ptr->next;
signal_call( current->signal );
}
- spin_unlock( &poolLock );
- spin_destroy( &poolLock );
+ mutex_unlock( &poolLock );
+ mutex_destroy( &poolLock );
}
bool threadpool_run(void *(*startRoutine)(void *), void *arg)
{
- spin_lock( &poolLock );
+ mutex_lock( &poolLock );
entry_t *entry = pool;
if ( entry != NULL ) pool = entry->next;
- spin_unlock( &poolLock );
+ mutex_unlock( &poolLock );
if ( entry == NULL ) {
entry = (entry_t*)malloc( sizeof(entry_t) );
if ( entry == NULL ) {
@@ -101,19 +101,19 @@ static void *threadpool_worker(void *entryPtr)
if ( _shutdown ) break;
// Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise
int threadCount = 0;
- spin_lock( &poolLock );
+ mutex_lock( &poolLock );
entry_t *ptr = pool;
while ( ptr != NULL ) {
threadCount++;
ptr = ptr->next;
}
if ( threadCount >= maxIdleThreads ) {
- spin_unlock( &poolLock );
+ mutex_unlock( &poolLock );
break;
}
entry->next = pool;
pool = entry;
- spin_unlock( &poolLock );
+ mutex_unlock( &poolLock );
setThreadName( "[pool]" );
} else {
logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index ccbf209..682b986 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -56,9 +56,9 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
if ( !_isProxy || _shutdown ) return false;
dnbd3_connection_t *link = NULL;
assert( image != NULL );
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->uplink != NULL && !image->uplink->shutdown ) {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( sock >= 0 ) close( sock );
return true; // There's already an uplink, so should we consider this success or failure?
}
@@ -67,20 +67,20 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
goto failure;
}
link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
- spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
- spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
- pthread_mutex_init( &link->sendMutex, NULL );
+ mutex_init( &link->queueLock );
+ mutex_init( &link->rttLock );
+ mutex_init( &link->sendMutex );
link->image = image;
link->bytesReceived = 0;
link->idleTime = 0;
link->queueLen = 0;
- pthread_mutex_lock( &link->sendMutex );
+ mutex_lock( &link->sendMutex );
link->fd = -1;
- pthread_mutex_unlock( &link->sendMutex );
+ mutex_unlock( &link->sendMutex );
link->cacheFd = -1;
link->signal = NULL;
link->replicationHandle = REP_NONE;
- spin_lock( &link->rttLock );
+ mutex_lock( &link->rttLock );
link->cycleDetected = false;
if ( sock >= 0 ) {
link->betterFd = sock;
@@ -91,21 +91,21 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
link->betterFd = -1;
link->rttTestResult = RTT_IDLE;
}
- spin_unlock( &link->rttLock );
+ mutex_unlock( &link->rttLock );
link->recvBufferLen = 0;
link->shutdown = false;
if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return true;
failure: ;
if ( link != NULL ) {
free( link );
link = image->uplink = NULL;
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return false;
}
@@ -119,28 +119,28 @@ void uplink_shutdown(dnbd3_image_t *image)
bool join = false;
pthread_t thread;
assert( image != NULL );
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image->uplink == NULL ) {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return;
}
dnbd3_connection_t * const uplink = image->uplink;
- spin_lock( &uplink->queueLock );
+ mutex_lock( &uplink->queueLock );
if ( !uplink->shutdown ) {
uplink->shutdown = true;
signal_call( uplink->signal );
thread = uplink->thread;
join = true;
}
- spin_unlock( &uplink->queueLock );
+ mutex_unlock( &uplink->queueLock );
bool wait = image->uplink != NULL;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( join ) thread_join( thread, NULL );
while ( wait ) {
usleep( 5000 );
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
wait = image->uplink != NULL && image->uplink->shutdown;
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
}
}
@@ -150,7 +150,7 @@ void uplink_shutdown(dnbd3_image_t *image)
*/
void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
{
- spin_lock( &uplink->queueLock );
+ mutex_lock( &uplink->queueLock );
for (int i = uplink->queueLen - 1; i >= 0; --i) {
if ( uplink->queue[i].client == client ) {
uplink->queue[i].client = NULL;
@@ -158,7 +158,7 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
}
if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--;
}
- spin_unlock( &uplink->queueLock );
+ mutex_unlock( &uplink->queueLock );
}
/**
@@ -172,26 +172,26 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
return false;
}
- spin_lock( &client->image->lock );
+ mutex_lock( &client->image->lock );
if ( client->image->uplink == NULL ) {
- spin_unlock( &client->image->lock );
+ mutex_unlock( &client->image->lock );
logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
return false;
}
dnbd3_connection_t * const uplink = client->image->uplink;
if ( uplink->shutdown ) {
- spin_unlock( &client->image->lock );
+ mutex_unlock( &client->image->lock );
logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
return false;
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) {
- spin_unlock( &client->image->lock );
+ mutex_unlock( &client->image->lock );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->cycleDetected = true;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
return false;
}
@@ -203,8 +203,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
bool requestLoop = false;
const uint64_t end = start + length;
- spin_lock( &uplink->queueLock );
- spin_unlock( &client->image->lock );
+ mutex_lock( &uplink->queueLock );
+ mutex_unlock( &client->image->lock );
for (i = 0; i < uplink->queueLen; ++i) {
if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) {
freeSlot = i;
@@ -224,17 +224,17 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
}
if ( requestLoop ) {
- spin_unlock( &uplink->queueLock );
+ mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->cycleDetected = true;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
return false;
}
if ( freeSlot == -1 ) {
if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
- spin_unlock( &uplink->queueLock );
+ mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
return false;
}
@@ -268,35 +268,35 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
timing_get( &uplink->queue[freeSlot].entered );
//logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end );
#endif
- spin_unlock( &uplink->queueLock );
+ mutex_unlock( &uplink->queueLock );
if ( foundExisting != -1 )
return true; // Attached to pending request, do nothing
// See if we can fire away the request
- if ( pthread_mutex_trylock( &uplink->sendMutex ) != 0 ) {
+ if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
if ( uplink->fd == -1 ) {
- pthread_mutex_unlock( &uplink->sendMutex );
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
if ( hops < 200 ) ++hops;
const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
- pthread_mutex_unlock( &uplink->sendMutex );
+ mutex_unlock( &uplink->sendMutex );
if ( !ret ) {
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
} else {
- spin_lock( &uplink->queueLock );
+ mutex_lock( &uplink->queueLock );
if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) {
uplink->queue[freeSlot].status = ULR_PENDING;
logadd( LOG_DEBUG2, "Succesful direct uplink request" );
} else {
logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" );
}
- spin_unlock( &uplink->queueLock );
+ mutex_unlock( &uplink->queueLock );
return true;
}
// Fall through to waking up sender thread
@@ -351,9 +351,9 @@ static void* uplink_mainloop(void *data)
events[EV_SOCKET].fd = -1;
while ( !_shutdown && !link->shutdown ) {
// poll()
- spin_lock( &link->rttLock );
+ mutex_lock( &link->rttLock );
waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1;
- spin_unlock( &link->rttLock );
+ mutex_unlock( &link->rttLock );
if ( waitTime == 0 ) {
// Nothing
} else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
@@ -374,22 +374,22 @@ static void* uplink_mainloop(void *data)
continue;
}
// Check if server switch is in order
- spin_lock( &link->rttLock );
+ mutex_lock( &link->rttLock );
if ( link->rttTestResult != RTT_DOCHANGE ) {
- spin_unlock( &link->rttLock );
+ mutex_unlock( &link->rttLock );
} else {
link->rttTestResult = RTT_IDLE;
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
const int fd = link->fd;
- pthread_mutex_lock( &link->sendMutex );
+ mutex_lock( &link->sendMutex );
link->fd = link->betterFd;
- pthread_mutex_unlock( &link->sendMutex );
+ mutex_unlock( &link->sendMutex );
link->betterFd = -1;
link->currentServer = link->betterServer;
link->version = link->betterVersion;
link->cycleDetected = false;
- spin_unlock( &link->rttLock );
+ mutex_unlock( &link->rttLock );
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
link->replicationHandle = REP_NONE;
@@ -463,10 +463,10 @@ static void* uplink_mainloop(void *data)
}
// Don't keep link established if we're idle for too much
if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
- pthread_mutex_lock( &link->sendMutex );
+ mutex_lock( &link->sendMutex );
close( link->fd );
link->fd = events[EV_SOCKET].fd = -1;
- pthread_mutex_unlock( &link->sendMutex );
+ mutex_unlock( &link->sendMutex );
link->cycleDetected = false;
if ( link->recvBufferLen != 0 ) {
link->recvBufferLen = 0;
@@ -478,9 +478,9 @@ static void* uplink_mainloop(void *data)
}
}
// See if we should trigger an RTT measurement
- spin_lock( &link->rttLock );
+ mutex_lock( &link->rttLock );
const int rttTestResult = link->rttTestResult;
- spin_unlock( &link->rttLock );
+ mutex_unlock( &link->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
// It seems it's time for a check
@@ -500,9 +500,9 @@ static void* uplink_mainloop(void *data)
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
- spin_lock( &link->rttLock );
+ mutex_lock( &link->rttLock );
link->rttTestResult = RTT_IDLE;
- spin_unlock( &link->rttLock );
+ mutex_unlock( &link->rttLock );
discoverFailCount++;
timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
@@ -511,7 +511,7 @@ static void* uplink_mainloop(void *data)
bool resend = false;
ticks deadline;
timing_set( &deadline, &now, -10 );
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) {
snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
@@ -522,12 +522,12 @@ static void* uplink_mainloop(void *data)
link->queue[i].status = ULR_NEW;
resend = true;
#endif
- spin_unlock( &link->queueLock );
+ mutex_unlock( &link->queueLock );
logadd( LOG_WARNING, "%s", buffer );
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
}
}
- spin_unlock( &link->queueLock );
+ mutex_unlock( &link->queueLock );
if ( resend )
uplink_sendRequests( link, true );
}
@@ -536,16 +536,16 @@ static void* uplink_mainloop(void *data)
cleanup: ;
altservers_removeUplink( link );
uplink_saveCacheMap( link );
- spin_lock( &link->image->lock );
+ mutex_lock( &link->image->lock );
if ( link->image->uplink == link ) {
link->image->uplink = NULL;
}
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
const int fd = link->fd;
const dnbd3_signal_t* signal = link->signal;
- pthread_mutex_lock( &link->sendMutex );
+ mutex_lock( &link->sendMutex );
link->fd = -1;
- pthread_mutex_unlock( &link->sendMutex );
+ mutex_unlock( &link->sendMutex );
link->signal = NULL;
if ( !link->shutdown ) {
link->shutdown = true;
@@ -554,8 +554,8 @@ static void* uplink_mainloop(void *data)
// Do not access link->image after unlocking, since we set
// image->uplink to NULL. Acquire with image_lock first,
// like done below when checking whether to re-init uplink
- spin_unlock( &link->image->lock );
- spin_unlock( &link->queueLock );
+ mutex_unlock( &link->image->lock );
+ mutex_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
if ( signal != NULL ) signal_close( signal );
// Wait for the RTT check to finish/fail if it's in progress
@@ -564,9 +564,9 @@ static void* uplink_mainloop(void *data)
if ( link->betterFd != -1 ) {
close( link->betterFd );
}
- spin_destroy( &link->queueLock );
- spin_destroy( &link->rttLock );
- pthread_mutex_destroy( &link->sendMutex );
+ mutex_destroy( &link->queueLock );
+ mutex_destroy( &link->rttLock );
+ mutex_destroy( &link->sendMutex );
free( link->recvBuffer );
link->recvBuffer = NULL;
if ( link->cacheFd != -1 ) {
@@ -588,7 +588,7 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
{
// Scan for new requests
int j;
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
for (j = 0; j < link->queueLen; ++j) {
if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
link->queue[j].status = ULR_PENDING;
@@ -599,11 +599,11 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
(void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize );
*/
- spin_unlock( &link->queueLock );
+ mutex_unlock( &link->queueLock );
if ( hops < 200 ) ++hops;
- pthread_mutex_lock( &link->sendMutex );
+ mutex_lock( &link->sendMutex );
const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
- pthread_mutex_unlock( &link->sendMutex );
+ mutex_unlock( &link->sendMutex );
if ( !ret ) {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
@@ -612,9 +612,9 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
altservers_serverFailed( &link->currentServer );
return;
}
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
}
- spin_unlock( &link->queueLock );
+ mutex_unlock( &link->queueLock );
}
/**
@@ -635,10 +635,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
return;
dnbd3_image_t * const image = link->image;
if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) {
// No cache map (=image complete), or replication pending, or not enough users, do nothing
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
@@ -661,7 +661,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
break;
}
}
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
// Nothing left in current block, find next one
replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte );
@@ -674,9 +674,9 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
link->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
- pthread_mutex_lock( &link->sendMutex );
+ mutex_lock( &link->sendMutex );
bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) );
- pthread_mutex_unlock( &link->sendMutex );
+ mutex_unlock( &link->sendMutex );
if ( !sendOk ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
return;
@@ -700,7 +700,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex)
{
int retval = -1;
- spin_lock( &link->image->lock );
+ mutex_lock( &link->image->lock );
const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize );
const uint8_t *cache_map = link->image->cache_map;
if ( cache_map != NULL ) {
@@ -736,7 +736,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const in
retval = -1;
}
}
- spin_unlock( &link->image->lock );
+ mutex_unlock( &link->image->lock );
return retval;
}
@@ -834,7 +834,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
}
// 2) Figure out which clients are interested in it
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
dnbd3_queued_request_t * const req = &link->queue[i];
assert( req->status != ULR_PROCESSING );
@@ -866,23 +866,23 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
req->status = ULR_FREE;
req->client = NULL;
served = true;
- pthread_mutex_lock( &client->sendMutex );
- spin_unlock( &link->queueLock );
+ mutex_lock( &client->sendMutex );
+ mutex_unlock( &link->queueLock );
if ( client->sock != -1 ) {
ssize_t sent = writev( client->sock, iov, 2 );
if ( sent > (ssize_t)sizeof outReply ) {
bytesSent = (size_t)sent - sizeof outReply;
}
}
- pthread_mutex_unlock( &client->sendMutex );
+ mutex_unlock( &client->sendMutex );
if ( bytesSent != 0 ) {
client->bytesSent += bytesSent;
}
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
}
if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;
}
- spin_unlock( &link->queueLock );
+ mutex_unlock( &link->queueLock );
#ifdef _DEBUG
if ( !served && start != link->replicationHandle ) {
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
@@ -906,9 +906,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
}
if ( link->replicationHandle == REP_NONE ) {
- spin_lock( &link->queueLock );
+ mutex_lock( &link->queueLock );
const bool rep = ( link->queueLen == 0 );
- spin_unlock( &link->queueLock );
+ mutex_unlock( &link->queueLock );
if ( rep ) uplink_sendReplicationRequest( link );
}
return;
@@ -922,19 +922,19 @@ static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew)
if ( link->fd == -1 )
return;
altservers_serverFailed( &link->currentServer );
- pthread_mutex_lock( &link->sendMutex );
+ mutex_lock( &link->sendMutex );
close( link->fd );
link->fd = -1;
- pthread_mutex_unlock( &link->sendMutex );
+ mutex_unlock( &link->sendMutex );
link->replicationHandle = REP_NONE;
if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = 0;
}
if ( !findNew )
return;
- spin_lock( &link->rttLock );
+ mutex_lock( &link->rttLock );
bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1;
- spin_unlock( &link->rttLock );
+ mutex_unlock( &link->rttLock );
if ( bail )
return;
altservers_findUplink( link );
@@ -961,9 +961,9 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t);
uint32_t masterCrc;
uint32_t *buffer = malloc( bytes );
- pthread_mutex_lock( &uplink->sendMutex );
+ mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes );
- pthread_mutex_unlock( &uplink->sendMutex );
+ mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );
return;
@@ -1032,11 +1032,11 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link)
if ( image->cache_map == NULL ) return true;
logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid );
- spin_lock( &image->lock );
+ mutex_lock( &image->lock );
// Lock and get a copy of the cache map, as it could be freed by another thread that is just about to
// figure out that this image's cache copy is complete
if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) {
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
return true;
}
const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
@@ -1044,7 +1044,7 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link)
memcpy( map, image->cache_map, size );
// Unlock. Use path and cacheFd without locking. path should never change after initialization of the image,
// cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O
- spin_unlock( &image->lock );
+ mutex_unlock( &image->lock );
assert( image->path != NULL );
char mapfile[strlen( image->path ) + 4 + 1];
strcpy( mapfile, image->path );