diff options
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r-- | src/server/uplink.c | 117 |
1 files changed, 61 insertions, 56 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c index e21e28c..6c85580 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -96,17 +96,18 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version uplink->bytesReceived = 0; uplink->idleTime = 0; uplink->queueLen = 0; - mutex_lock( &uplink->sendMutex ); - uplink->current.fd = -1; - mutex_unlock( &uplink->sendMutex ); uplink->cacheFd = -1; uplink->signal = NULL; uplink->replicationHandle = REP_NONE; mutex_lock( &uplink->rttLock ); + mutex_lock( &uplink->sendMutex ); + uplink->current.fd = -1; + mutex_unlock( &uplink->sendMutex ); uplink->cycleDetected = false; - if ( sock >= 0 ) { + if ( sock != -1 ) { uplink->better.fd = sock; - uplink->better.host = *host; + int index = altservers_hostToIndex( host ); + uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access uplink->rttTestResult = RTT_DOCHANGE; uplink->better.version = version; } else { @@ -116,7 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version mutex_unlock( &uplink->rttLock ); uplink->recvBufferLen = 0; uplink->shutdown = false; - if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)link ) ) { + if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) { logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; } @@ -148,8 +149,8 @@ void uplink_shutdown(dnbd3_image_t *image) } dnbd3_uplink_t * const uplink = image->uplink; mutex_lock( &uplink->queueLock ); - if ( !uplink->shutdown ) { - uplink->shutdown = true; + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { signal_call( uplink->signal ); thread = uplink->thread; join = true; @@ -211,13 +212,11 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) { - mutex_unlock( &client->image->lock ); - logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - mutex_lock( &uplink->rttLock ); + if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); + mutex_unlock( &client->image->lock ); + logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); return false; } @@ -256,12 +255,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } } if ( unlikely( requestLoop ) ) { - mutex_unlock( &uplink->queueLock ); - logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - mutex_lock( &uplink->rttLock ); uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); + mutex_unlock( &uplink->queueLock ); + logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); return false; } if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { @@ -311,6 +308,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( foundExisting != -1 ) return true; // Attached to pending request, do nothing + usleep( 10000 ); + // See if we can fire away the request if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); @@ -342,7 +341,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( state == -1 ) { logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" ); } else if ( state == ULR_NEW ) { - logadd( LOG_DEBUG2, "Succesful direct uplink request" ); + //logadd( LOG_DEBUG2, "Direct uplink request" ); } else { logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); } @@ -352,10 +351,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } } - if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); - } + if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } return true; } @@ -443,7 +440,7 @@ static void* uplink_mainloop(void *data) uplink->image->working = true; uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; - if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) { + if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) { logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 ); setThreadName( buffer ); } @@ -525,9 +522,7 @@ static void* uplink_mainloop(void *data) } } // See if we should trigger an RTT measurement - mutex_lock( &uplink->rttLock ); - const int rttTestResult = uplink->rttTestResult; - mutex_unlock( &uplink->rttLock ); + int rttTestResult = uplink->rttTestResult; if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) { // It seems it's time for a check @@ -538,7 +533,7 @@ static void* uplink_mainloop(void *data) goto cleanup; } else if ( !uplink_connectionShouldShutdown( uplink ) ) { // Not complete - do measurement - altservers_findUplink( uplink ); // This will set RTT_INPROGRESS (synchronous) + altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous) if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { uplink->nextReplicationIndex = 0; } @@ -547,11 +542,9 @@ static void* uplink_mainloop(void *data) timing_set( &nextAltCheck, &now, altCheckInterval ); } } else if ( rttTestResult == RTT_NOT_REACHABLE ) { - mutex_lock( &uplink->rttLock ); - uplink->rttTestResult = RTT_IDLE; - mutex_unlock( &uplink->rttLock ); + atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ); discoverFailCount++; - timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); + timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); } #ifdef _DEBUG if ( uplink->current.fd != -1 && !uplink->shutdown ) { @@ -581,36 +574,38 @@ static void* uplink_mainloop(void *data) #endif } cleanup: ; - if ( !uplink->shutdown ) { - uplink->shutdown = true; + // Detach depends on whether someone is joining this thread... + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { thread_detach( uplink->thread ); } - altservers_removeUplink( uplink ); uplink_saveCacheMap( uplink ); - mutex_lock( &uplink->image->lock ); - if ( uplink->image->uplink == uplink ) { - uplink->image->uplink = NULL; + dnbd3_image_t *image = uplink->image; + mutex_lock( &image->lock ); + // in the list anymore, but we want to prevent it from being freed in either case + if ( image->uplink == uplink ) { + image->uplink = NULL; } + mutex_unlock( &image->lock ); // Do NOT use image without locking it mutex_lock( &uplink->queueLock ); - const int fd = uplink->current.fd; - const dnbd3_signal_t* signal = uplink->signal; - mutex_lock( &uplink->sendMutex ); - uplink->current.fd = -1; - mutex_unlock( &uplink->sendMutex ); - uplink->signal = NULL; - // Do not access uplink->image after unlocking, since we set - // image->uplink to NULL. Acquire with image_lock first, - // like done below when checking whether to re-init uplink - mutex_unlock( &uplink->image->lock ); - mutex_unlock( &uplink->queueLock ); - if ( fd != -1 ) close( fd ); - if ( signal != NULL ) signal_close( signal ); - // Wait for the RTT check to finish/fail if it's in progress - while ( uplink->rttTestResult == RTT_INPROGRESS ) + // Wait for active RTT measurement to finish + while ( uplink->rttTestResult == RTT_INPROGRESS ) { usleep( 10000 ); + } + signal_close( uplink->signal ); + mutex_lock( &uplink->rttLock ); + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + close( uplink->current.fd ); + uplink->current.fd = -1; + } if ( uplink->better.fd != -1 ) { close( uplink->better.fd ); + uplink->better.fd = -1; } + mutex_unlock( &uplink->sendMutex ); + mutex_unlock( &uplink->rttLock ); + mutex_unlock( &uplink->queueLock ); mutex_destroy( &uplink->queueLock ); mutex_destroy( &uplink->rttLock ); mutex_destroy( &uplink->sendMutex ); @@ -619,9 +614,9 @@ static void* uplink_mainloop(void *data) if ( uplink->cacheFd != -1 ) { close( uplink->cacheFd ); } - dnbd3_image_t *image = image_lock( uplink->image ); free( uplink ); // !!! - if ( image != NULL ) { + if ( image_lock( image ) != NULL ) { + // Image is still in list... if ( !_shutdown && image->cache_map != NULL ) { // Ingegrity checker must have found something in the meantime uplink_init( image, -1, NULL, 0 ); @@ -656,7 +651,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) // the thread will re-send this request as soon as the connection // is reestablished. logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - altservers_serverFailed( &uplink->current.host ); + altservers_serverFailed( uplink->current.index ); return; } mutex_lock( &uplink->queueLock ); @@ -973,7 +968,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { if ( uplink->current.fd == -1 ) return; - altservers_serverFailed( &uplink->current.host ); + altservers_serverFailed( uplink->current.index ); mutex_lock( &uplink->sendMutex ); close( uplink->current.fd ); uplink->current.fd = -1; @@ -1138,3 +1133,13 @@ static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink) && ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) ); } +bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) +{ + int current; + mutex_lock( &uplink->rttLock ); + current = uplink->current.fd == -1 ? -1 : uplink->current.index; + mutex_unlock( &uplink->rttLock ); + if ( current == -1 ) + return false; + return altservers_toString( current, buffer, len ); +} |