From 69f5bf408b9587a6e2008fba2224c2d506f1a895 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 27 Aug 2019 16:13:07 +0200 Subject: [SERVER] Use reference counting for uplink First step towards less locking for proxy mode --- src/server/uplink.c | 214 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 126 insertions(+), 88 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index abfebf0..7a39887 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -3,10 +3,12 @@ #include "locks.h" #include "image.h" #include "altservers.h" +#include "net.h" #include "../shared/sockhelper.h" #include "../shared/protocol.h" #include "../shared/timing.h" #include "../shared/crc32.h" +#include "reference.h" #include #include @@ -45,6 +47,8 @@ static const char *const NAMES_ULR[4] = { static atomic_uint_fast64_t totalBytesReceived = 0; +static void cancelAllRequests(dnbd3_uplink_t *uplink); +static void uplink_free(ref *ref); static void* uplink_mainloop(void *data); static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly); static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex); @@ -76,19 +80,24 @@ uint64_t uplink_getTotalBytesReceived() bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { if ( !_isProxy || _shutdown ) return false; - dnbd3_uplink_t *uplink = NULL; assert( image != NULL ); mutex_lock( &image->lock ); - if ( image->uplink != NULL && !image->uplink->shutdown ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { mutex_unlock( &image->lock ); - if ( sock >= 0 ) close( sock ); + if ( sock != -1 ) { + close( sock ); + } + ref_put( &uplink->reference ); return true; // There's already an uplink, so should we consider this success or failure? } if ( image->cache_map == NULL ) { logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name ); goto failure; } - uplink = image->uplink = calloc( 1, sizeof(dnbd3_uplink_t) ); + uplink = calloc( 1, sizeof(dnbd3_uplink_t) ); + // Start with one reference for the uplink thread. We'll return it when the thread finishes + ref_init( &uplink->reference, uplink_free, 1 ); mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE ); mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT ); mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND ); @@ -121,12 +130,13 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; } + ref_setref( &image->uplinkref, &uplink->reference ); mutex_unlock( &image->lock ); return true; failure: ; if ( uplink != NULL ) { free( uplink ); - uplink = image->uplink = NULL; + uplink = NULL; } mutex_unlock( &image->lock ); return false; @@ -137,34 +147,83 @@ failure: ; * Calling it multiple times, even concurrently, will * not break anything. */ -void uplink_shutdown(dnbd3_image_t *image) +bool uplink_shutdown(dnbd3_image_t *image) { - bool join = false; - pthread_t thread; assert( image != NULL ); mutex_lock( &image->lock ); - if ( image->uplink == NULL ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { mutex_unlock( &image->lock ); - return; + return true; } - dnbd3_uplink_t * const uplink = image->uplink; mutex_lock( &uplink->queueLock ); bool exp = false; if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // Prevent free while uplink shuts down signal_call( uplink->signal ); - thread = uplink->thread; - join = true; + } else { + logadd( LOG_ERROR, "This will never happen. '%s:%d'", image->name, (int)image->rid ); } + cancelAllRequests( uplink ); + ref_setref( &image->uplinkref, NULL ); + ref_put( &uplink->reference ); mutex_unlock( &uplink->queueLock ); - bool wait = image->uplink != NULL; + bool retval = ( exp && image->users == 0 ); mutex_unlock( &image->lock ); - if ( join ) thread_join( thread, NULL ); - while ( wait ) { - usleep( 5000 ); - mutex_lock( &image->lock ); - wait = image->uplink != NULL && image->uplink->shutdown; - mutex_unlock( &image->lock ); + return exp; +} + +/** + * Cancel all requests of this uplink. + * HOLD QUEUE LOCK WHILE CALLING + */ +static void cancelAllRequests(dnbd3_uplink_t *uplink) +{ + for ( int i = 0; i < uplink->queueLen; ++i ) { + if ( uplink->queue[i].status != ULR_FREE ) { + net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle ); + uplink->queue[i].status = ULR_FREE; + } + } + uplink->queueLen = 0; +} + +static void uplink_free(ref *ref) +{ + dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference); + logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", uplink->image->name, (int)uplink->image->rid ); + assert( uplink->queueLen == 0 ); + signal_close( uplink->signal ); + if ( uplink->current.fd != -1 ) { + close( uplink->current.fd ); + uplink->current.fd = -1; + } + if ( uplink->better.fd != -1 ) { + close( uplink->better.fd ); + uplink->better.fd = -1; + } + mutex_destroy( &uplink->queueLock ); + mutex_destroy( &uplink->rttLock ); + mutex_destroy( &uplink->sendMutex ); + free( uplink->recvBuffer ); + uplink->recvBuffer = NULL; + if ( uplink->cacheFd != -1 ) { + close( uplink->cacheFd ); } + // TODO Requeue any requests + dnbd3_image_t *image = image_lock( uplink->image ); + if ( image != NULL ) { + // != NULL means image is still in list... + if ( !_shutdown && image->cache_map != NULL ) { + // Ingegrity checker must have found something in the meantime + uplink_init( image, -1, NULL, 0 ); + } + image_release( image ); + } + // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code + // of the uplink thread, depending on who set the uplink->shutdown flag. + image_release( image ); + free( uplink ); // !!! } /** @@ -193,31 +252,28 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) */ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - if ( client == NULL || client->image == NULL ) return false; + if ( client == NULL || client->image == NULL ) + return false; if ( length > (uint32_t)_maxPayload ) { logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); return false; } - mutex_lock( &client->image->lock ); - if ( client->image->uplink == NULL ) { - mutex_unlock( &client->image->lock ); + dnbd3_uplink_t * const uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink == NULL ) { logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); return false; } - dnbd3_uplink_t * const uplink = client->image->uplink; if ( uplink->shutdown ) { - mutex_unlock( &client->image->lock ); logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); - return false; + goto fail_ref; } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); - mutex_unlock( &client->image->lock ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - return false; + goto fail_ref; } int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise @@ -229,7 +285,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin const uint64_t end = start + length; mutex_lock( &uplink->queueLock ); - mutex_unlock( &client->image->lock ); + if ( uplink->shutdown ) { // Check again after locking to prevent lost requests + goto fail_lock; + } for (i = 0; i < uplink->queueLen; ++i) { // find free slot to place this request into if ( uplink->queue[i].status == ULR_FREE ) { @@ -257,18 +315,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( unlikely( requestLoop ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); - mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - return false; + goto fail_lock; } if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { freeSlot = -1; // Not attaching to existing request, make it use a higher slot } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { - mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); - return false; + goto fail_lock; } freeSlot = uplink->queueLen++; } @@ -305,16 +361,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin #endif mutex_unlock( &uplink->queueLock ); - if ( foundExisting != -1 ) + if ( foundExisting != -1 ) { + ref_put( &uplink->reference ); return true; // Attached to pending request, do nothing - - usleep( 10000 ); + } // See if we can fire away the request - if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { + if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); } else { - if ( uplink->current.fd == -1 ) { + if ( unlikely( uplink->current.fd == -1 ) ) { mutex_unlock( &uplink->sendMutex ); logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); } else { @@ -323,13 +379,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( hops < 200 ) ++hops; const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); mutex_unlock( &uplink->sendMutex ); - if ( !ret ) { + if ( unlikely( !ret ) ) { logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); } else { // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again int state; mutex_lock( &uplink->queueLock ); - if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { + if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { state = uplink->queue[freeSlot].status; if ( uplink->queue[freeSlot].status == ULR_NEW ) { uplink->queue[freeSlot].status = ULR_PENDING; @@ -345,6 +401,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } else { logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); } + ref_put( &uplink->reference ); return true; } // Fall through to waking up sender thread @@ -354,7 +411,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } + ref_put( &uplink->reference ); return true; +fail_lock: + mutex_unlock( &uplink->queueLock ); +fail_ref: + ref_put( &uplink->reference ); + return false; } /** @@ -381,6 +444,7 @@ static void* uplink_mainloop(void *data) // assert( uplink != NULL ); setThreadName( "idle-uplink" ); + thread_detach( uplink->thread ); blockNoncriticalSignals(); // Make sure file is open for writing if ( !uplink_reopenCacheFd( uplink, false ) ) { @@ -553,7 +617,7 @@ static void* uplink_mainloop(void *data) for (i = 0; i < uplink->queueLen; ++i) { if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) { snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" - "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, uplink->queue[i].client->image->name, + "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name, uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status ); uplink->queue[i].entered = now; #ifdef _DEBUG_RESEND_STARVING @@ -572,55 +636,26 @@ static void* uplink_mainloop(void *data) #endif } cleanup: ; - // Detach depends on whether someone is joining this thread... - bool exp = false; - if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { - thread_detach( uplink->thread ); - } uplink_saveCacheMap( uplink ); dnbd3_image_t *image = uplink->image; mutex_lock( &image->lock ); - // in the list anymore, but we want to prevent it from being freed in either case - if ( image->uplink == uplink ) { - image->uplink = NULL; - } - mutex_unlock( &image->lock ); // Do NOT use image without locking it - mutex_lock( &uplink->queueLock ); - // Wait for active RTT measurement to finish - while ( uplink->rttTestResult == RTT_INPROGRESS ) { - usleep( 10000 ); - } - signal_close( uplink->signal ); - mutex_lock( &uplink->rttLock ); - mutex_lock( &uplink->sendMutex ); - if ( uplink->current.fd != -1 ) { - close( uplink->current.fd ); - uplink->current.fd = -1; - } - if ( uplink->better.fd != -1 ) { - close( uplink->better.fd ); - uplink->better.fd = -1; + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // We set the flag - hold onto image } - mutex_unlock( &uplink->sendMutex ); - mutex_unlock( &uplink->rttLock ); - mutex_unlock( &uplink->queueLock ); - mutex_destroy( &uplink->queueLock ); - mutex_destroy( &uplink->rttLock ); - mutex_destroy( &uplink->sendMutex ); - free( uplink->recvBuffer ); - uplink->recvBuffer = NULL; - if ( uplink->cacheFd != -1 ) { - close( uplink->cacheFd ); + dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref ); + if ( current == uplink ) { // Set NULL if it's still us... + mutex_lock( &uplink->queueLock ); + cancelAllRequests( uplink ); + mutex_unlock( &uplink->queueLock ); + ref_setref( &image->uplinkref, NULL ); } - free( uplink ); // !!! - if ( image_lock( image ) != NULL ) { - // Image is still in list... - if ( !_shutdown && image->cache_map != NULL ) { - // Ingegrity checker must have found something in the meantime - uplink_init( image, -1, NULL, 0 ); - } - image_release( image ); + if ( current != NULL ) { // Decrease ref in any case + ref_put( ¤t->reference ); } + mutex_unlock( &image->lock ); + // Finally as the thread is done, decrease our own ref that we initialized with + ref_put( &uplink->reference ); return NULL ; } @@ -637,7 +672,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); /* logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", - (void*)link, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize ); + (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize ); */ mutex_unlock( &uplink->queueLock ); if ( hops < 200 ) ++hops; @@ -782,7 +817,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int /** * Receive data from uplink server and process/dispatch - * Locks on: link.lock, images[].lock + * Locks on: uplink.lock, images[].lock */ static void uplink_handleReceive(dnbd3_uplink_t *uplink) { @@ -924,13 +959,16 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) } mutex_unlock( &client->sendMutex ); mutex_lock( &uplink->queueLock ); + if ( i > uplink->queueLen ) { + uplink->queueLen = i; // Might have been set to 0 by cancelAllRequests + } } if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--; } mutex_unlock( &uplink->queueLock ); #ifdef _DEBUG if ( !served && start != uplink->replicationHandle ) { - logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, uplink->image->name, start, end ); + logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end ); } #endif if ( start == uplink->replicationHandle ) { -- cgit v1.2.3-55-g7522