summaryrefslogtreecommitdiffstats
path: root/src/server/altservers.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/altservers.c')
-rw-r--r--src/server/altservers.c101
1 files changed, 51 insertions, 50 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index b91ceab..bbbc584 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -15,13 +15,13 @@
#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__)
static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
-static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL)
-static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL)
+static pthread_mutex_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL)
+static pthread_mutex_t pendingLockConsume; // Lock for removing something (nonNULL -> NULL)
static dnbd3_signal_t* runSignal = NULL;
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
static int numAltServers = 0;
-static pthread_spinlock_t altServersLock;
+static pthread_mutex_t altServersLock;
static pthread_t altThread;
@@ -32,8 +32,9 @@ void altservers_init()
{
srand( (unsigned int)time( NULL ) );
// Init spinlock
- spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE );
- spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE );
+ mutex_init( &pendingLockWrite );
+ mutex_init( &pendingLockConsume );
+ mutex_init( &altServersLock );
// Init signal
runSignal = signal_new();
if ( runSignal == NULL ) {
@@ -48,11 +49,11 @@ void altservers_init()
// Init waiting links queue -- this is currently a global static array so
// it will already be zero, but in case we refactor later do it explicitly
// while also holding the write lock so thread sanitizer is happy
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
pending[i] = NULL;
}
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
}
void altservers_shutdown()
@@ -99,10 +100,10 @@ int altservers_load()
bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly)
{
int i, freeSlot = -1;
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( isSameAddressPort( &altServers[i].host, host ) ) {
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return false;
} else if ( freeSlot == -1 && altServers[i].host.type == 0 ) {
freeSlot = i;
@@ -111,7 +112,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate
if ( freeSlot == -1 ) {
if ( numAltServers >= SERVER_MAX_ALTS ) {
logadd( LOG_WARNING, "Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS );
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return false;
}
freeSlot = numAltServers++;
@@ -120,7 +121,7 @@ bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate
altServers[freeSlot].isPrivate = isPrivate;
altServers[freeSlot].isClientOnly = isClientOnly;
if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment );
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return true;
}
@@ -135,14 +136,14 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
// never be that the uplink is supposed to switch, but instead calls
// this function.
assert( uplink->betterFd == -1 );
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
if ( uplink->rttTestResult == RTT_INPROGRESS ) {
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != uplink ) continue;
// Yep, measuring right now
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
return;
}
}
@@ -151,12 +152,12 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
if ( pending[i] != NULL ) continue;
pending[i] = uplink;
uplink->rttTestResult = RTT_INPROGRESS;
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
signal_call( runSignal ); // Wake altservers thread up
return;
}
// End of loop - no free slot
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." );
}
@@ -165,16 +166,16 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
*/
void altservers_removeUplink(dnbd3_connection_t *uplink)
{
- pthread_mutex_lock( &pendingLockConsume );
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockConsume );
+ mutex_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] == uplink ) {
uplink->rttTestResult = RTT_NOT_REACHABLE;
pending[i] = NULL;
}
}
- spin_unlock( &pendingLockWrite );
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockConsume );
}
/**
@@ -190,7 +191,7 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output
int count = 0;
int scores[size];
int score;
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
if ( size > numAltServers ) size = numAltServers;
for (i = 0; i < numAltServers; ++i) {
if ( altServers[i].host.type == 0 ) continue; // Slot is empty
@@ -226,7 +227,7 @@ int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output
}
}
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return count;
}
@@ -242,7 +243,7 @@ int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency)
int count = 0, i;
ticks now;
timing_get( &now );
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
// Flip first server in list with a random one every time this is called
if ( numAltServers > 1 ) {
const dnbd3_alt_server_t tmp = altServers[0];
@@ -273,7 +274,7 @@ int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency)
output[count++] = srv->host;
if ( count >= size ) break;
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return count;
}
@@ -281,12 +282,12 @@ json_t* altservers_toJson()
{
json_t *list = json_array();
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
char host[100];
const int count = numAltServers;
dnbd3_alt_server_t src[count];
memcpy( src, altServers, sizeof(src) );
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
for (int i = 0; i < count; ++i) {
json_t *rtts = json_array();
for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
@@ -313,7 +314,7 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const
{
unsigned int avg = rtt;
int i;
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( !isSameAddressPort( host, &altServers[i].host ) ) continue;
altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
@@ -334,7 +335,7 @@ static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const
}
break;
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
return avg;
}
@@ -369,7 +370,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
int foundIndex = -1, lastOk = -1;
ticks now;
timing_get( &now );
- spin_lock( &altServersLock );
+ mutex_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( foundIndex == -1 ) {
// Looking for the failed server in list
@@ -395,7 +396,7 @@ void altservers_serverFailed(const dnbd3_host_t * const host)
altServers[lastOk] = tmp;
}
}
- spin_unlock( &altServersLock );
+ mutex_unlock( &altServersLock );
}
/**
* Mainloop of this module. It will wait for requests by uplinks to find a
@@ -432,27 +433,27 @@ static void *altservers_main(void *data UNUSED)
}
// Work your way through the queue
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
if ( pending[itLink] == NULL ) {
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
continue; // Check once before locking, as a mutex is expensive
}
- spin_unlock( &pendingLockWrite );
- pthread_mutex_lock( &pendingLockConsume );
- spin_lock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
+ mutex_lock( &pendingLockConsume );
+ mutex_lock( &pendingLockWrite );
dnbd3_connection_t * const uplink = pending[itLink];
- spin_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockWrite );
if ( uplink == NULL ) { // Check again after locking
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockConsume );
continue;
}
dnbd3_image_t * const image = image_lock( uplink->image );
if ( image == NULL ) { // Check again after locking
uplink->rttTestResult = RTT_NOT_REACHABLE;
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
pending[itLink] = NULL;
- spin_unlock( &pendingLockWrite );
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockConsume );
logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" );
continue;
}
@@ -520,7 +521,7 @@ static void *altservers_main(void *data UNUSED)
}
clock_gettime( BEST_CLOCK_SOURCE, &end );
// Measurement done - everything fine so far
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer );
// Penaltize rtt if this was a cycle; this will treat this server with lower priority
// in the near future too, so we prevent alternating between two servers that are both
@@ -531,7 +532,7 @@ static void *altservers_main(void *data UNUSED)
unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt );
// If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
if ( uplink->fd != -1 && isCurrent ) {
// Was measuring current server
currentRtt = avg;
@@ -565,25 +566,25 @@ static void *altservers_main(void *data UNUSED)
LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
}
sock_setTimeout( bestSock, _uplinkTimeout );
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->betterFd = bestSock;
uplink->betterServer = servers[bestIndex];
uplink->betterVersion = bestProtocolVersion;
uplink->rttTestResult = RTT_DOCHANGE;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
} else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) {
// No server was reachable
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_NOT_REACHABLE;
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
} else {
// nope
if ( bestSock != -1 ) close( bestSock );
- spin_lock( &uplink->rttLock );
+ mutex_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;
uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
- spin_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->rttLock );
if ( !image->working ) {
image->working = true;
LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink );
@@ -591,10 +592,10 @@ static void *altservers_main(void *data UNUSED)
}
image_release( image );
// end of loop over all pending uplinks
- spin_lock( &pendingLockWrite );
+ mutex_lock( &pendingLockWrite );
pending[itLink] = NULL;
- spin_unlock( &pendingLockWrite );
- pthread_mutex_unlock( &pendingLockConsume );
+ mutex_unlock( &pendingLockWrite );
+ mutex_unlock( &pendingLockConsume );
}
// Save cache maps of all images if applicable
declare_now;