summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c117
1 files changed, 61 insertions, 56 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index e21e28c..6c85580 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -96,17 +96,18 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->bytesReceived = 0;
uplink->idleTime = 0;
uplink->queueLen = 0;
- mutex_lock( &uplink->sendMutex );
- uplink->current.fd = -1;
- mutex_unlock( &uplink->sendMutex );
uplink->cacheFd = -1;
uplink->signal = NULL;
uplink->replicationHandle = REP_NONE;
mutex_lock( &uplink->rttLock );
+ mutex_lock( &uplink->sendMutex );
+ uplink->current.fd = -1;
+ mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
- if ( sock >= 0 ) {
+ if ( sock != -1 ) {
uplink->better.fd = sock;
- uplink->better.host = *host;
+ int index = altservers_hostToIndex( host );
+ uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access
uplink->rttTestResult = RTT_DOCHANGE;
uplink->better.version = version;
} else {
@@ -116,7 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
mutex_unlock( &uplink->rttLock );
uplink->recvBufferLen = 0;
uplink->shutdown = false;
- if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)link ) ) {
+ if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
@@ -148,8 +149,8 @@ void uplink_shutdown(dnbd3_image_t *image)
}
dnbd3_uplink_t * const uplink = image->uplink;
mutex_lock( &uplink->queueLock );
- if ( !uplink->shutdown ) {
- uplink->shutdown = true;
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
signal_call( uplink->signal );
thread = uplink->thread;
join = true;
@@ -211,13 +212,11 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) {
- mutex_unlock( &client->image->lock );
- logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- mutex_lock( &uplink->rttLock );
+ if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
+ mutex_unlock( &client->image->lock );
+ logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
return false;
}
@@ -256,12 +255,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
}
if ( unlikely( requestLoop ) ) {
- mutex_unlock( &uplink->queueLock );
- logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- mutex_lock( &uplink->rttLock );
uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
+ mutex_unlock( &uplink->queueLock );
+ logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
return false;
}
if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
@@ -311,6 +308,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( foundExisting != -1 )
return true; // Attached to pending request, do nothing
+ usleep( 10000 );
+
// See if we can fire away the request
if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
@@ -342,7 +341,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( state == -1 ) {
logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" );
} else if ( state == ULR_NEW ) {
- logadd( LOG_DEBUG2, "Succesful direct uplink request" );
+ //logadd( LOG_DEBUG2, "Direct uplink request" );
} else {
logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
}
@@ -352,10 +351,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
}
- if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
- }
+ if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
return true;
}
@@ -443,7 +440,7 @@ static void* uplink_mainloop(void *data)
uplink->image->working = true;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
- if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) {
+ if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
@@ -525,9 +522,7 @@ static void* uplink_mainloop(void *data)
}
}
// See if we should trigger an RTT measurement
- mutex_lock( &uplink->rttLock );
- const int rttTestResult = uplink->rttTestResult;
- mutex_unlock( &uplink->rttLock );
+ int rttTestResult = uplink->rttTestResult;
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
// It seems it's time for a check
@@ -538,7 +533,7 @@ static void* uplink_mainloop(void *data)
goto cleanup;
} else if ( !uplink_connectionShouldShutdown( uplink ) ) {
// Not complete - do measurement
- altservers_findUplink( uplink ); // This will set RTT_INPROGRESS (synchronous)
+ altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous)
if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
uplink->nextReplicationIndex = 0;
}
@@ -547,11 +542,9 @@ static void* uplink_mainloop(void *data)
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
- mutex_lock( &uplink->rttLock );
- uplink->rttTestResult = RTT_IDLE;
- mutex_unlock( &uplink->rttLock );
+ atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE );
discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
if ( uplink->current.fd != -1 && !uplink->shutdown ) {
@@ -581,36 +574,38 @@ static void* uplink_mainloop(void *data)
#endif
}
cleanup: ;
- if ( !uplink->shutdown ) {
- uplink->shutdown = true;
+ // Detach depends on whether someone is joining this thread...
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
thread_detach( uplink->thread );
}
- altservers_removeUplink( uplink );
uplink_saveCacheMap( uplink );
- mutex_lock( &uplink->image->lock );
- if ( uplink->image->uplink == uplink ) {
- uplink->image->uplink = NULL;
+ dnbd3_image_t *image = uplink->image;
+ mutex_lock( &image->lock );
+ // in the list anymore, but we want to prevent it from being freed in either case
+ if ( image->uplink == uplink ) {
+ image->uplink = NULL;
}
+ mutex_unlock( &image->lock ); // Do NOT use image without locking it
mutex_lock( &uplink->queueLock );
- const int fd = uplink->current.fd;
- const dnbd3_signal_t* signal = uplink->signal;
- mutex_lock( &uplink->sendMutex );
- uplink->current.fd = -1;
- mutex_unlock( &uplink->sendMutex );
- uplink->signal = NULL;
- // Do not access uplink->image after unlocking, since we set
- // image->uplink to NULL. Acquire with image_lock first,
- // like done below when checking whether to re-init uplink
- mutex_unlock( &uplink->image->lock );
- mutex_unlock( &uplink->queueLock );
- if ( fd != -1 ) close( fd );
- if ( signal != NULL ) signal_close( signal );
- // Wait for the RTT check to finish/fail if it's in progress
- while ( uplink->rttTestResult == RTT_INPROGRESS )
+ // Wait for active RTT measurement to finish
+ while ( uplink->rttTestResult == RTT_INPROGRESS ) {
usleep( 10000 );
+ }
+ signal_close( uplink->signal );
+ mutex_lock( &uplink->rttLock );
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
+ }
if ( uplink->better.fd != -1 ) {
close( uplink->better.fd );
+ uplink->better.fd = -1;
}
+ mutex_unlock( &uplink->sendMutex );
+ mutex_unlock( &uplink->rttLock );
+ mutex_unlock( &uplink->queueLock );
mutex_destroy( &uplink->queueLock );
mutex_destroy( &uplink->rttLock );
mutex_destroy( &uplink->sendMutex );
@@ -619,9 +614,9 @@ static void* uplink_mainloop(void *data)
if ( uplink->cacheFd != -1 ) {
close( uplink->cacheFd );
}
- dnbd3_image_t *image = image_lock( uplink->image );
free( uplink ); // !!!
- if ( image != NULL ) {
+ if ( image_lock( image ) != NULL ) {
+ // Image is still in list...
if ( !_shutdown && image->cache_map != NULL ) {
// Ingegrity checker must have found something in the meantime
uplink_init( image, -1, NULL, 0 );
@@ -656,7 +651,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
// the thread will re-send this request as soon as the connection
// is reestablished.
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( &uplink->current.host );
+ altservers_serverFailed( uplink->current.index );
return;
}
mutex_lock( &uplink->queueLock );
@@ -973,7 +968,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
if ( uplink->current.fd == -1 )
return;
- altservers_serverFailed( &uplink->current.host );
+ altservers_serverFailed( uplink->current.index );
mutex_lock( &uplink->sendMutex );
close( uplink->current.fd );
uplink->current.fd = -1;
@@ -1138,3 +1133,13 @@ static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink)
&& ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );
}
+bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
+{
+ int current;
+ mutex_lock( &uplink->rttLock );
+ current = uplink->current.fd == -1 ? -1 : uplink->current.index;
+ mutex_unlock( &uplink->rttLock );
+ if ( current == -1 )
+ return false;
+ return altservers_toString( current, buffer, len );
+}