summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-27 16:13:07 +0200
committerSimon Rettberg2019-08-27 16:13:07 +0200
commit69f5bf408b9587a6e2008fba2224c2d506f1a895 (patch)
tree8fc9eda7e3a0b105007b7a85a4cc35ecc1d4431d /src/server/uplink.c
parent[SERVER] Fix warnings, simplify locking (diff)
downloaddnbd3-69f5bf408b9587a6e2008fba2224c2d506f1a895.tar.gz
dnbd3-69f5bf408b9587a6e2008fba2224c2d506f1a895.tar.xz
dnbd3-69f5bf408b9587a6e2008fba2224c2d506f1a895.zip
[SERVER] Use reference counting for uplink
First step towards less locking for proxy mode
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c214
1 files changed, 126 insertions, 88 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index abfebf0..7a39887 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -3,10 +3,12 @@
#include "locks.h"
#include "image.h"
#include "altservers.h"
+#include "net.h"
#include "../shared/sockhelper.h"
#include "../shared/protocol.h"
#include "../shared/timing.h"
#include "../shared/crc32.h"
+#include "reference.h"
#include <assert.h>
#include <inttypes.h>
@@ -45,6 +47,8 @@ static const char *const NAMES_ULR[4] = {
static atomic_uint_fast64_t totalBytesReceived = 0;
+static void cancelAllRequests(dnbd3_uplink_t *uplink);
+static void uplink_free(ref *ref);
static void* uplink_mainloop(void *data);
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
@@ -76,19 +80,24 @@ uint64_t uplink_getTotalBytesReceived()
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
if ( !_isProxy || _shutdown ) return false;
- dnbd3_uplink_t *uplink = NULL;
assert( image != NULL );
mutex_lock( &image->lock );
- if ( image->uplink != NULL && !image->uplink->shutdown ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink != NULL ) {
mutex_unlock( &image->lock );
- if ( sock >= 0 ) close( sock );
+ if ( sock != -1 ) {
+ close( sock );
+ }
+ ref_put( &uplink->reference );
return true; // There's already an uplink, so should we consider this success or failure?
}
if ( image->cache_map == NULL ) {
logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name );
goto failure;
}
- uplink = image->uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
+ uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
+ // Start with one reference for the uplink thread. We'll return it when the thread finishes
+ ref_init( &uplink->reference, uplink_free, 1 );
mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
@@ -121,12 +130,13 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
+ ref_setref( &image->uplinkref, &uplink->reference );
mutex_unlock( &image->lock );
return true;
failure: ;
if ( uplink != NULL ) {
free( uplink );
- uplink = image->uplink = NULL;
+ uplink = NULL;
}
mutex_unlock( &image->lock );
return false;
@@ -137,34 +147,83 @@ failure: ;
* Calling it multiple times, even concurrently, will
* not break anything.
*/
-void uplink_shutdown(dnbd3_image_t *image)
+bool uplink_shutdown(dnbd3_image_t *image)
{
- bool join = false;
- pthread_t thread;
assert( image != NULL );
mutex_lock( &image->lock );
- if ( image->uplink == NULL ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
mutex_unlock( &image->lock );
- return;
+ return true;
}
- dnbd3_uplink_t * const uplink = image->uplink;
mutex_lock( &uplink->queueLock );
bool exp = false;
if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
+ image->users++; // Prevent free while uplink shuts down
signal_call( uplink->signal );
- thread = uplink->thread;
- join = true;
+ } else {
+ logadd( LOG_ERROR, "This will never happen. '%s:%d'", image->name, (int)image->rid );
}
+ cancelAllRequests( uplink );
+ ref_setref( &image->uplinkref, NULL );
+ ref_put( &uplink->reference );
mutex_unlock( &uplink->queueLock );
- bool wait = image->uplink != NULL;
+ bool retval = ( exp && image->users == 0 );
mutex_unlock( &image->lock );
- if ( join ) thread_join( thread, NULL );
- while ( wait ) {
- usleep( 5000 );
- mutex_lock( &image->lock );
- wait = image->uplink != NULL && image->uplink->shutdown;
- mutex_unlock( &image->lock );
+ return exp;
+}
+
+/**
+ * Cancel all requests of this uplink.
+ * HOLD QUEUE LOCK WHILE CALLING
+ */
+static void cancelAllRequests(dnbd3_uplink_t *uplink)
+{
+ for ( int i = 0; i < uplink->queueLen; ++i ) {
+ if ( uplink->queue[i].status != ULR_FREE ) {
+ net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle );
+ uplink->queue[i].status = ULR_FREE;
+ }
+ }
+ uplink->queueLen = 0;
+}
+
+static void uplink_free(ref *ref)
+{
+ dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference);
+ logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", uplink->image->name, (int)uplink->image->rid );
+ assert( uplink->queueLen == 0 );
+ signal_close( uplink->signal );
+ if ( uplink->current.fd != -1 ) {
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
+ }
+ if ( uplink->better.fd != -1 ) {
+ close( uplink->better.fd );
+ uplink->better.fd = -1;
+ }
+ mutex_destroy( &uplink->queueLock );
+ mutex_destroy( &uplink->rttLock );
+ mutex_destroy( &uplink->sendMutex );
+ free( uplink->recvBuffer );
+ uplink->recvBuffer = NULL;
+ if ( uplink->cacheFd != -1 ) {
+ close( uplink->cacheFd );
}
+ // TODO Requeue any requests
+ dnbd3_image_t *image = image_lock( uplink->image );
+ if ( image != NULL ) {
+ // != NULL means image is still in list...
+ if ( !_shutdown && image->cache_map != NULL ) {
+ // Ingegrity checker must have found something in the meantime
+ uplink_init( image, -1, NULL, 0 );
+ }
+ image_release( image );
+ }
+ // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code
+ // of the uplink thread, depending on who set the uplink->shutdown flag.
+ image_release( image );
+ free( uplink ); // !!!
}
/**
@@ -193,31 +252,28 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
*/
bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
- if ( client == NULL || client->image == NULL ) return false;
+ if ( client == NULL || client->image == NULL )
+ return false;
if ( length > (uint32_t)_maxPayload ) {
logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
return false;
}
- mutex_lock( &client->image->lock );
- if ( client->image->uplink == NULL ) {
- mutex_unlock( &client->image->lock );
+ dnbd3_uplink_t * const uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( uplink == NULL ) {
logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
return false;
}
- dnbd3_uplink_t * const uplink = client->image->uplink;
if ( uplink->shutdown ) {
- mutex_unlock( &client->image->lock );
logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
- return false;
+ goto fail_ref;
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
- mutex_unlock( &client->image->lock );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- return false;
+ goto fail_ref;
}
int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise
@@ -229,7 +285,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
const uint64_t end = start + length;
mutex_lock( &uplink->queueLock );
- mutex_unlock( &client->image->lock );
+ if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ goto fail_lock;
+ }
for (i = 0; i < uplink->queueLen; ++i) {
// find free slot to place this request into
if ( uplink->queue[i].status == ULR_FREE ) {
@@ -257,18 +315,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( unlikely( requestLoop ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
- mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- return false;
+ goto fail_lock;
}
if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
freeSlot = -1; // Not attaching to existing request, make it use a higher slot
}
if ( freeSlot == -1 ) {
if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
- mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
- return false;
+ goto fail_lock;
}
freeSlot = uplink->queueLen++;
}
@@ -305,16 +361,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
#endif
mutex_unlock( &uplink->queueLock );
- if ( foundExisting != -1 )
+ if ( foundExisting != -1 ) {
+ ref_put( &uplink->reference );
return true; // Attached to pending request, do nothing
-
- usleep( 10000 );
+ }
// See if we can fire away the request
- if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
+ if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
- if ( uplink->current.fd == -1 ) {
+ if ( unlikely( uplink->current.fd == -1 ) ) {
mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
@@ -323,13 +379,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( hops < 200 ) ++hops;
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
mutex_unlock( &uplink->sendMutex );
- if ( !ret ) {
+ if ( unlikely( !ret ) ) {
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
} else {
// Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
int state;
mutex_lock( &uplink->queueLock );
- if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
+ if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
state = uplink->queue[freeSlot].status;
if ( uplink->queue[freeSlot].status == ULR_NEW ) {
uplink->queue[freeSlot].status = ULR_PENDING;
@@ -345,6 +401,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
} else {
logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
}
+ ref_put( &uplink->reference );
return true;
}
// Fall through to waking up sender thread
@@ -354,7 +411,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
+ ref_put( &uplink->reference );
return true;
+fail_lock:
+ mutex_unlock( &uplink->queueLock );
+fail_ref:
+ ref_put( &uplink->reference );
+ return false;
}
/**
@@ -381,6 +444,7 @@ static void* uplink_mainloop(void *data)
//
assert( uplink != NULL );
setThreadName( "idle-uplink" );
+ thread_detach( uplink->thread );
blockNoncriticalSignals();
// Make sure file is open for writing
if ( !uplink_reopenCacheFd( uplink, false ) ) {
@@ -553,7 +617,7 @@ static void* uplink_mainloop(void *data)
for (i = 0; i < uplink->queueLen; ++i) {
if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) {
snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
- "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, uplink->queue[i].client->image->name,
+ "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name,
uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status );
uplink->queue[i].entered = now;
#ifdef _DEBUG_RESEND_STARVING
@@ -572,55 +636,26 @@ static void* uplink_mainloop(void *data)
#endif
}
cleanup: ;
- // Detach depends on whether someone is joining this thread...
- bool exp = false;
- if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
- thread_detach( uplink->thread );
- }
uplink_saveCacheMap( uplink );
dnbd3_image_t *image = uplink->image;
mutex_lock( &image->lock );
- // in the list anymore, but we want to prevent it from being freed in either case
- if ( image->uplink == uplink ) {
- image->uplink = NULL;
- }
- mutex_unlock( &image->lock ); // Do NOT use image without locking it
- mutex_lock( &uplink->queueLock );
- // Wait for active RTT measurement to finish
- while ( uplink->rttTestResult == RTT_INPROGRESS ) {
- usleep( 10000 );
- }
- signal_close( uplink->signal );
- mutex_lock( &uplink->rttLock );
- mutex_lock( &uplink->sendMutex );
- if ( uplink->current.fd != -1 ) {
- close( uplink->current.fd );
- uplink->current.fd = -1;
- }
- if ( uplink->better.fd != -1 ) {
- close( uplink->better.fd );
- uplink->better.fd = -1;
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
+ image->users++; // We set the flag - hold onto image
}
- mutex_unlock( &uplink->sendMutex );
- mutex_unlock( &uplink->rttLock );
- mutex_unlock( &uplink->queueLock );
- mutex_destroy( &uplink->queueLock );
- mutex_destroy( &uplink->rttLock );
- mutex_destroy( &uplink->sendMutex );
- free( uplink->recvBuffer );
- uplink->recvBuffer = NULL;
- if ( uplink->cacheFd != -1 ) {
- close( uplink->cacheFd );
+ dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref );
+ if ( current == uplink ) { // Set NULL if it's still us...
+ mutex_lock( &uplink->queueLock );
+ cancelAllRequests( uplink );
+ mutex_unlock( &uplink->queueLock );
+ ref_setref( &image->uplinkref, NULL );
}
- free( uplink ); // !!!
- if ( image_lock( image ) != NULL ) {
- // Image is still in list...
- if ( !_shutdown && image->cache_map != NULL ) {
- // Ingegrity checker must have found something in the meantime
- uplink_init( image, -1, NULL, 0 );
- }
- image_release( image );
+ if ( current != NULL ) { // Decrease ref in any case
+ ref_put( &current->reference );
}
+ mutex_unlock( &image->lock );
+ // Finally as the thread is done, decrease our own ref that we initialized with
+ ref_put( &uplink->reference );
return NULL ;
}
@@ -637,7 +672,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
/*
logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
- (void*)link, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize );
+ (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize );
*/
mutex_unlock( &uplink->queueLock );
if ( hops < 200 ) ++hops;
@@ -782,7 +817,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
/**
* Receive data from uplink server and process/dispatch
- * Locks on: link.lock, images[].lock
+ * Locks on: uplink.lock, images[].lock
*/
static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
@@ -924,13 +959,16 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
}
mutex_unlock( &client->sendMutex );
mutex_lock( &uplink->queueLock );
+ if ( i > uplink->queueLen ) {
+ uplink->queueLen = i; // Might have been set to 0 by cancelAllRequests
+ }
}
if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--;
}
mutex_unlock( &uplink->queueLock );
#ifdef _DEBUG
if ( !served && start != uplink->replicationHandle ) {
- logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, uplink->image->name, start, end );
+ logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end );
}
#endif
if ( start == uplink->replicationHandle ) {