From 69f5bf408b9587a6e2008fba2224c2d506f1a895 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 27 Aug 2019 16:13:07 +0200 Subject: [SERVER] Use reference counting for uplink First step towards less locking for proxy mode --- src/server/altservers.c | 13 ++- src/server/globals.h | 4 +- src/server/image.c | 39 ++++----- src/server/integrity.c | 17 ++-- src/server/net.c | 48 +++++++---- src/server/net.h | 2 + src/server/reference.c | 33 ++++++++ src/server/reference.h | 54 ++++++++++++ src/server/reftypes.h | 25 ++++++ src/server/uplink.c | 214 ++++++++++++++++++++++++++++-------------------- src/server/uplink.h | 2 +- 11 files changed, 311 insertions(+), 140 deletions(-) create mode 100644 src/server/reference.c create mode 100644 src/server/reference.h create mode 100644 src/server/reftypes.h diff --git a/src/server/altservers.c b/src/server/altservers.c index 493ed9e..7d7fdbe 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -7,6 +7,8 @@ #include "../shared/protocol.h" #include "../shared/timing.h" #include "../serverconfig.h" +#include "reference.h" + #include #include #include @@ -104,7 +106,6 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) return; if ( uplink->current.fd != -1 && numAltServers <= 1 ) return; - int i; // if betterFd != -1 it means the uplink is supposed to switch to another // server. As this function here is called by the uplink thread, it can // never be that the uplink is supposed to switch, but instead calls @@ -112,11 +113,14 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink) assert( uplink->better.fd == -1 ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress - mutex_lock( &uplink->rttLock ); if ( uplink->rttTestResult != RTT_INPROGRESS ) { - threadpool_run( &altservers_runCheck, uplink ); + dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref ); + if ( current == uplink ) { + threadpool_run( &altservers_runCheck, uplink ); + } else if ( current != NULL ) { + ref_put( ¤t->reference ); + } } - mutex_unlock( &uplink->rttLock ); } /** @@ -375,6 +379,7 @@ static void *altservers_runCheck(void *data) assert( uplink != NULL ); setThreadName( "altserver-check" ); altservers_findUplinkInternal( uplink ); + ref_put( &uplink->reference ); // Acquired in findUplinkAsync // Save cache maps of all images if applicable // TODO: Has nothing to do with alt servers really, maybe move somewhere else? declare_now; diff --git a/src/server/globals.h b/src/server/globals.h index 4d97c6b..5dd205a 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -8,6 +8,7 @@ #include #include #include +#include "reftypes.h" typedef struct timespec ticks; @@ -64,6 +65,7 @@ typedef struct { #define RTT_NOT_REACHABLE 4 // No uplink was reachable struct _dnbd3_uplink { + ref reference; dnbd3_server_connection_t current; // Currently active connection; fd == -1 means disconnected dnbd3_server_connection_t better; // Better connection as found by altserver worker; fd == -1 means none dnbd3_signal_t* signal; // used to wake up the process @@ -107,7 +109,7 @@ struct _dnbd3_image { char *path; // absolute path of the image char *name; // public name of the image (usually relative path minus revision ID) - dnbd3_uplink_t *uplink; // pointer to a server connection + weakref uplinkref; // pointer to a server connection uint8_t *cache_map; // cache map telling which parts are locally cached, NULL if complete uint64_t virtualFilesize; // virtual size of image (real size rounded up to multiple of 4k) uint64_t realFilesize; // actual file size on disk diff --git a/src/server/image.c b/src/server/image.c index 1a6e0f8..5b58347 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -8,6 +8,7 @@ #include "../shared/protocol.h" #include "../shared/timing.h" #include "../shared/crc32.h" +#include "reference.h" #include #include @@ -375,9 +376,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) // Check if image is incomplete, handle if ( candidate->cache_map != NULL ) { - if ( candidate->uplink == NULL ) { - uplink_init( candidate, -1, NULL, -1 ); - } + uplink_init( candidate, -1, NULL, -1 ); } return candidate; // We did all we can, hopefully it's working @@ -484,17 +483,7 @@ void image_killUplinks() mutex_lock( &imageListLock ); for (i = 0; i < _num_images; ++i) { if ( _images[i] == NULL ) continue; - mutex_lock( &_images[i]->lock ); - if ( _images[i]->uplink != NULL ) { - mutex_lock( &_images[i]->uplink->queueLock ); - if ( !_images[i]->uplink->shutdown ) { - thread_detach( _images[i]->uplink->thread ); - _images[i]->uplink->shutdown = true; - } - mutex_unlock( &_images[i]->uplink->queueLock ); - signal_call( _images[i]->uplink->signal ); - } - mutex_unlock( &_images[i]->lock ); + uplink_shutdown( _images[i] ); } mutex_unlock( &imageListLock ); } @@ -588,11 +577,15 @@ bool image_tryFreeAll() static dnbd3_image_t* image_free(dnbd3_image_t *image) { assert( image != NULL ); + assert( image->users == 0 ); if ( !_shutdown ) { logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid ); } - // - uplink_shutdown( image ); + // uplink_shutdown might return false to tell us + // that the shutdown is in progress. Bail out since + // this will get called again when the uplink is done. + if ( !uplink_shutdown( image ) ) + return NULL; mutex_lock( &image->lock ); free( image->cache_map ); free( image->crc32 ); @@ -860,7 +853,7 @@ static bool image_load(char *base, char *path, int withUplink) image->cache_map = cache_map; image->crc32 = crc32list; image->masterCrc32 = masterCrc; - image->uplink = NULL; + image->uplinkref = NULL; image->realFilesize = realFilesize; image->virtualFilesize = virtualFilesize; image->rid = (uint16_t)revision; @@ -1503,16 +1496,18 @@ json_t* image_getListAsJson() mutex_lock( &image->lock ); idleTime = (int)timing_diff( &image->atime, &now ); completeness = image_getCompletenessEstimate( image ); - if ( image->uplink == NULL ) { + mutex_unlock( &image->lock ); + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { bytesReceived = 0; uplinkName[0] = '\0'; } else { - bytesReceived = image->uplink->bytesReceived; - if ( !uplink_getHostString( image->uplink, uplinkName, sizeof(uplinkName) ) ) { + bytesReceived = uplink->bytesReceived; + if ( !uplink_getHostString( uplink, uplinkName, sizeof(uplinkName) ) ) { uplinkName[0] = '\0'; } + ref_put( &uplink->reference ); } - mutex_unlock( &image->lock ); jsonImage = json_pack( "{sisssisisisisI}", "id", image->id, // id, name, rid never change, so access them without locking @@ -1734,7 +1729,7 @@ void image_closeUnusedFd() if ( image == NULL ) continue; mutex_lock( &image->lock ); - if ( image->users == 0 && image->uplink == NULL && timing_reached( &image->atime, &deadline ) ) { + if ( image->users == 0 && image->uplinkref == NULL && timing_reached( &image->atime, &deadline ) ) { snprintf( imgstr, sizeof(imgstr), "%s:%d", image->name, (int)image->rid ); fd = image->readFd; image->readFd = -1; diff --git a/src/server/integrity.c b/src/server/integrity.c index 3d1ac9b..f358c46 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -4,6 +4,7 @@ #include "locks.h" #include "image.h" #include "uplink.h" +#include "reference.h" #include #include @@ -238,11 +239,13 @@ static void* integrity_main(void * data UNUSED) if ( i + 1 == queueLen ) queueLen--; // Mark as working again if applicable if ( !foundCorrupted ) { - mutex_lock( &image->lock ); - if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper? - image->working = image->uplink->current.fd != -1 && image->readFd != -1; + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { // TODO: image_determineWorkingState() helper? + mutex_lock( &image->lock ); + image->working = uplink->current.fd != -1 && image->readFd != -1; + mutex_unlock( &image->lock ); + ref_put( &uplink->reference ); } - mutex_unlock( &image->lock ); } } else { // Still more blocks to go... @@ -255,12 +258,8 @@ static void* integrity_main(void * data UNUSED) // Something was fishy, make sure uplink exists mutex_lock( &image->lock ); image->working = false; - bool restart = image->uplink == NULL || image->uplink->shutdown; mutex_unlock( &image->lock ); - if ( restart ) { - uplink_shutdown( image ); - uplink_init( image, -1, NULL, -1 ); - } + uplink_init( image, -1, NULL, -1 ); } // Release :-) image_release( image ); diff --git a/src/server/net.c b/src/server/net.c index 4976eea..e0b516e 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -24,6 +24,7 @@ #include "locks.h" #include "rpc.h" #include "altservers.h" +#include "reference.h" #include "../shared/sockhelper.h" #include "../shared/timing.h" @@ -229,7 +230,7 @@ void* net_handleNewConnection(void *clientPtr) rid = serializer_get_uint16( &payload ); const uint8_t flags = serializer_get_uint8( &payload ); client->isServer = ( flags & FLAGS8_SERVER ); - if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( unlikely( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) ) { if ( client_version < MIN_SUPPORTED_CLIENT ) { logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); } else { @@ -257,22 +258,25 @@ void* net_handleNewConnection(void *clientPtr) } client->image = image; atomic_thread_fence( memory_order_release ); - if ( image == NULL ) { + if ( unlikely( image == NULL ) ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( !image->working ) { + } else if ( unlikely( !image->working ) ) { logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", client->hostName, image_name, (int)rid ); } else { - bool penalty; // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; if ( image->cache_map != NULL ) { - mutex_lock( &image->lock ); - if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL || uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { bOk = ( rand() % 4 ) == 1; } - penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1; - mutex_unlock( &image->lock ); + bool penalty = bOk && ( uplink == NULL || uplink->cacheFd == -1 ); + if ( uplink == NULL ) { + uplink_init( image, -1, NULL, 0 ); + } else { + ref_put( &uplink->reference ); + } if ( penalty ) { // Wait 100ms if local caching is not working so this usleep( 100000 ); // server gets a penalty and is less likely to be selected } @@ -300,7 +304,7 @@ void* net_handleNewConnection(void *clientPtr) } } - if ( bOk ) { + if ( likely( bOk ) ) { // add artificial delay if applicable if ( client->isServer && _serverPenalty != 0 ) { usleep( _serverPenalty ); @@ -315,7 +319,7 @@ void* net_handleNewConnection(void *clientPtr) case CMD_GET_BLOCK:; const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking reply.handle = request.handle; - if ( offset >= image->virtualFilesize ) { + if ( unlikely( offset >= image->virtualFilesize ) ) { // Sanity check logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName ); reply.size = 0; @@ -323,7 +327,7 @@ void* net_handleNewConnection(void *clientPtr) send_reply( client->sock, &reply, NULL ); break; } - if ( offset + request.size > image->virtualFilesize ) { + if ( unlikely( offset + request.size > image->virtualFilesize ) ) { // Sanity check logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName ); reply.size = 0; @@ -398,7 +402,7 @@ void* net_handleNewConnection(void *clientPtr) reply.size = request.size; fixup_reply( reply ); - const bool lock = image->uplink != NULL; + const bool lock = image->uplinkref != NULL; if ( lock ) mutex_lock( &client->sendMutex ); // Send reply header if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { @@ -696,9 +700,11 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) { mutex_lock( &client->lock ); if ( client->image != NULL ) { - mutex_lock( &client->image->lock ); - if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); - mutex_unlock( &client->image->lock ); + dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink != NULL ) { + uplink_removeClient( uplink, client ); + ref_put( &uplink->reference ); + } } mutex_lock( &client->sendMutex ); if ( client->sock != -1 ) { @@ -740,3 +746,15 @@ static bool addToList(dnbd3_client_t *client) return true; } +void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle) +{ + dnbd3_reply_t reply; + reply.magic = dnbd3_packet_magic; + reply.cmd = cmd; + reply.handle = handle; + reply.size = 0; + mutex_lock( &client->sendMutex ); + send_reply( client->sock, &reply, NULL ); + mutex_unlock( &client->sendMutex ); +} + diff --git a/src/server/net.h b/src/server/net.h index 6813b49..7719aef 100644 --- a/src/server/net.h +++ b/src/server/net.h @@ -37,4 +37,6 @@ void net_disconnectAll(); void net_waitForAllDisconnected(); +void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle); + #endif /* NET_H_ */ diff --git a/src/server/reference.c b/src/server/reference.c new file mode 100644 index 0000000..468e00b --- /dev/null +++ b/src/server/reference.c @@ -0,0 +1,33 @@ +#ifndef unlikely +#define unlikely(x) (x) +#endif +#include "reference.h" +#include +#include + +void ref_init( ref *reference, void ( *freefun )( ref * ), long count ) +{ + reference->count = count; + reference->free = freefun; +} + +_Noreturn void _ref_error( const char *message ) +{ + fprintf( stderr, "Reference counter overflow\n" ); + abort(); +} + +void ref_setref( weakref *weakref, ref *ref ) +{ + union _aligned_ref_ *new_weakref = 0; + if ( ref ) { + ( new_weakref = aligned_ref( ref->_aligned_ref ) )->ref = ref; + ref->count += sizeof( union _aligned_ref_ ) + 1; + } + char *old_weakref = (char *)atomic_exchange( weakref, new_weakref ); + if ( !old_weakref ) + return; + struct _ref_ *old_ref = aligned_ref( old_weakref )->ref; + old_ref->count += old_weakref - (char *)aligned_ref( old_weakref ) - sizeof( union _aligned_ref_ ); + ref_put( old_ref ); +} diff --git a/src/server/reference.h b/src/server/reference.h new file mode 100644 index 0000000..0bc081a --- /dev/null +++ b/src/server/reference.h @@ -0,0 +1,54 @@ +#ifndef _REFERENCE_H_ +#define _REFERENCE_H_ + +#include "reftypes.h" +#include +#include + +#define container_of(ptr, type, member) \ + ((type *)((char *)(ptr) - (char *)&(((type *)NULL)->member))) + +void ref_init( ref *reference, void ( *freefun )( ref * ), long count ); + +void ref_setref( weakref *weakref, ref *ref ); + +_Noreturn void _ref_error( const char *message ); + +static inline ref *ref_get( weakref *weakref ) +{ + char *old_weakref = (char *)*weakref; + do { + if ( old_weakref == NULL ) + return NULL; + if ( aligned_ref( old_weakref ) != aligned_ref( old_weakref + 1 ) ) { + old_weakref = (char *)*weakref; + continue; + } + } while ( !atomic_compare_exchange_weak( weakref, (void **)&old_weakref, old_weakref + 1 ) ); + struct _ref_ *ref = aligned_ref( old_weakref )->ref; + if ( unlikely( ++ref->count == -1 ) ) { + _ref_error( "Reference counter overflow. Aborting.\n" ); + } + char *cur_weakref = ( char * )*weakref; + do { + if ( aligned_ref( cur_weakref ) != aligned_ref( old_weakref ) ) { + ref->count--; + break; + } + } while ( !atomic_compare_exchange_weak( weakref, (void **)&cur_weakref, cur_weakref - 1 ) ); + return ref; +} + +static inline void ref_put( ref *ref ) +{ + if ( --ref->count == 0 ) { + ref->free( ref ); + } +} + +#define ref_get_uplink(wr) ({ \ + ref* ref = ref_get( wr ); \ + ref == NULL ? NULL : container_of(ref, dnbd3_uplink_t, reference); \ +}) + +#endif diff --git a/src/server/reftypes.h b/src/server/reftypes.h new file mode 100644 index 0000000..45c0c20 --- /dev/null +++ b/src/server/reftypes.h @@ -0,0 +1,25 @@ +#ifndef _REFTYPES_H_ +#define _REFTYPES_H_ + +#include + +_Static_assert( sizeof( void * ) == sizeof( _Atomic( void * ) ), "Atomic pointer bad" ); + +typedef _Atomic( void * ) weakref; + +#define aligned_ref(ptr) \ + ((union _aligned_ref_ *)((ptr) - (uintptr_t)(ptr) % sizeof(union _aligned_ref_))) + +union _aligned_ref_ { + struct _ref_ *ref; + void *_padding[( 32 - 1 ) / sizeof( void * ) + 1]; +}; + +typedef struct _ref_ { + _Atomic long count; + void ( *free )( struct _ref_ * ); + char _padding[sizeof( union _aligned_ref_ )]; + char _aligned_ref[sizeof( union _aligned_ref_ )]; +} ref; + +#endif diff --git a/src/server/uplink.c b/src/server/uplink.c index abfebf0..7a39887 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -3,10 +3,12 @@ #include "locks.h" #include "image.h" #include "altservers.h" +#include "net.h" #include "../shared/sockhelper.h" #include "../shared/protocol.h" #include "../shared/timing.h" #include "../shared/crc32.h" +#include "reference.h" #include #include @@ -45,6 +47,8 @@ static const char *const NAMES_ULR[4] = { static atomic_uint_fast64_t totalBytesReceived = 0; +static void cancelAllRequests(dnbd3_uplink_t *uplink); +static void uplink_free(ref *ref); static void* uplink_mainloop(void *data); static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly); static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex); @@ -76,19 +80,24 @@ uint64_t uplink_getTotalBytesReceived() bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { if ( !_isProxy || _shutdown ) return false; - dnbd3_uplink_t *uplink = NULL; assert( image != NULL ); mutex_lock( &image->lock ); - if ( image->uplink != NULL && !image->uplink->shutdown ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { mutex_unlock( &image->lock ); - if ( sock >= 0 ) close( sock ); + if ( sock != -1 ) { + close( sock ); + } + ref_put( &uplink->reference ); return true; // There's already an uplink, so should we consider this success or failure? } if ( image->cache_map == NULL ) { logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name ); goto failure; } - uplink = image->uplink = calloc( 1, sizeof(dnbd3_uplink_t) ); + uplink = calloc( 1, sizeof(dnbd3_uplink_t) ); + // Start with one reference for the uplink thread. We'll return it when the thread finishes + ref_init( &uplink->reference, uplink_free, 1 ); mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE ); mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT ); mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND ); @@ -121,12 +130,13 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; } + ref_setref( &image->uplinkref, &uplink->reference ); mutex_unlock( &image->lock ); return true; failure: ; if ( uplink != NULL ) { free( uplink ); - uplink = image->uplink = NULL; + uplink = NULL; } mutex_unlock( &image->lock ); return false; @@ -137,34 +147,83 @@ failure: ; * Calling it multiple times, even concurrently, will * not break anything. */ -void uplink_shutdown(dnbd3_image_t *image) +bool uplink_shutdown(dnbd3_image_t *image) { - bool join = false; - pthread_t thread; assert( image != NULL ); mutex_lock( &image->lock ); - if ( image->uplink == NULL ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { mutex_unlock( &image->lock ); - return; + return true; } - dnbd3_uplink_t * const uplink = image->uplink; mutex_lock( &uplink->queueLock ); bool exp = false; if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // Prevent free while uplink shuts down signal_call( uplink->signal ); - thread = uplink->thread; - join = true; + } else { + logadd( LOG_ERROR, "This will never happen. '%s:%d'", image->name, (int)image->rid ); } + cancelAllRequests( uplink ); + ref_setref( &image->uplinkref, NULL ); + ref_put( &uplink->reference ); mutex_unlock( &uplink->queueLock ); - bool wait = image->uplink != NULL; + bool retval = ( exp && image->users == 0 ); mutex_unlock( &image->lock ); - if ( join ) thread_join( thread, NULL ); - while ( wait ) { - usleep( 5000 ); - mutex_lock( &image->lock ); - wait = image->uplink != NULL && image->uplink->shutdown; - mutex_unlock( &image->lock ); + return exp; +} + +/** + * Cancel all requests of this uplink. + * HOLD QUEUE LOCK WHILE CALLING + */ +static void cancelAllRequests(dnbd3_uplink_t *uplink) +{ + for ( int i = 0; i < uplink->queueLen; ++i ) { + if ( uplink->queue[i].status != ULR_FREE ) { + net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle ); + uplink->queue[i].status = ULR_FREE; + } + } + uplink->queueLen = 0; +} + +static void uplink_free(ref *ref) +{ + dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference); + logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", uplink->image->name, (int)uplink->image->rid ); + assert( uplink->queueLen == 0 ); + signal_close( uplink->signal ); + if ( uplink->current.fd != -1 ) { + close( uplink->current.fd ); + uplink->current.fd = -1; + } + if ( uplink->better.fd != -1 ) { + close( uplink->better.fd ); + uplink->better.fd = -1; + } + mutex_destroy( &uplink->queueLock ); + mutex_destroy( &uplink->rttLock ); + mutex_destroy( &uplink->sendMutex ); + free( uplink->recvBuffer ); + uplink->recvBuffer = NULL; + if ( uplink->cacheFd != -1 ) { + close( uplink->cacheFd ); } + // TODO Requeue any requests + dnbd3_image_t *image = image_lock( uplink->image ); + if ( image != NULL ) { + // != NULL means image is still in list... + if ( !_shutdown && image->cache_map != NULL ) { + // Ingegrity checker must have found something in the meantime + uplink_init( image, -1, NULL, 0 ); + } + image_release( image ); + } + // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code + // of the uplink thread, depending on who set the uplink->shutdown flag. + image_release( image ); + free( uplink ); // !!! } /** @@ -193,31 +252,28 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) */ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - if ( client == NULL || client->image == NULL ) return false; + if ( client == NULL || client->image == NULL ) + return false; if ( length > (uint32_t)_maxPayload ) { logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); return false; } - mutex_lock( &client->image->lock ); - if ( client->image->uplink == NULL ) { - mutex_unlock( &client->image->lock ); + dnbd3_uplink_t * const uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink == NULL ) { logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); return false; } - dnbd3_uplink_t * const uplink = client->image->uplink; if ( uplink->shutdown ) { - mutex_unlock( &client->image->lock ); logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); - return false; + goto fail_ref; } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); - mutex_unlock( &client->image->lock ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - return false; + goto fail_ref; } int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise @@ -229,7 +285,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin const uint64_t end = start + length; mutex_lock( &uplink->queueLock ); - mutex_unlock( &client->image->lock ); + if ( uplink->shutdown ) { // Check again after locking to prevent lost requests + goto fail_lock; + } for (i = 0; i < uplink->queueLen; ++i) { // find free slot to place this request into if ( uplink->queue[i].status == ULR_FREE ) { @@ -257,18 +315,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( unlikely( requestLoop ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); - mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - return false; + goto fail_lock; } if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { freeSlot = -1; // Not attaching to existing request, make it use a higher slot } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { - mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); - return false; + goto fail_lock; } freeSlot = uplink->queueLen++; } @@ -305,16 +361,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin #endif mutex_unlock( &uplink->queueLock ); - if ( foundExisting != -1 ) + if ( foundExisting != -1 ) { + ref_put( &uplink->reference ); return true; // Attached to pending request, do nothing - - usleep( 10000 ); + } // See if we can fire away the request - if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { + if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); } else { - if ( uplink->current.fd == -1 ) { + if ( unlikely( uplink->current.fd == -1 ) ) { mutex_unlock( &uplink->sendMutex ); logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); } else { @@ -323,13 +379,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( hops < 200 ) ++hops; const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); mutex_unlock( &uplink->sendMutex ); - if ( !ret ) { + if ( unlikely( !ret ) ) { logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); } else { // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again int state; mutex_lock( &uplink->queueLock ); - if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { + if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { state = uplink->queue[freeSlot].status; if ( uplink->queue[freeSlot].status == ULR_NEW ) { uplink->queue[freeSlot].status = ULR_PENDING; @@ -345,6 +401,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } else { logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); } + ref_put( &uplink->reference ); return true; } // Fall through to waking up sender thread @@ -354,7 +411,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } + ref_put( &uplink->reference ); return true; +fail_lock: + mutex_unlock( &uplink->queueLock ); +fail_ref: + ref_put( &uplink->reference ); + return false; } /** @@ -381,6 +444,7 @@ static void* uplink_mainloop(void *data) // assert( uplink != NULL ); setThreadName( "idle-uplink" ); + thread_detach( uplink->thread ); blockNoncriticalSignals(); // Make sure file is open for writing if ( !uplink_reopenCacheFd( uplink, false ) ) { @@ -553,7 +617,7 @@ static void* uplink_mainloop(void *data) for (i = 0; i < uplink->queueLen; ++i) { if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) { snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" - "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, uplink->queue[i].client->image->name, + "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name, uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status ); uplink->queue[i].entered = now; #ifdef _DEBUG_RESEND_STARVING @@ -572,55 +636,26 @@ static void* uplink_mainloop(void *data) #endif } cleanup: ; - // Detach depends on whether someone is joining this thread... - bool exp = false; - if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { - thread_detach( uplink->thread ); - } uplink_saveCacheMap( uplink ); dnbd3_image_t *image = uplink->image; mutex_lock( &image->lock ); - // in the list anymore, but we want to prevent it from being freed in either case - if ( image->uplink == uplink ) { - image->uplink = NULL; - } - mutex_unlock( &image->lock ); // Do NOT use image without locking it - mutex_lock( &uplink->queueLock ); - // Wait for active RTT measurement to finish - while ( uplink->rttTestResult == RTT_INPROGRESS ) { - usleep( 10000 ); - } - signal_close( uplink->signal ); - mutex_lock( &uplink->rttLock ); - mutex_lock( &uplink->sendMutex ); - if ( uplink->current.fd != -1 ) { - close( uplink->current.fd ); - uplink->current.fd = -1; - } - if ( uplink->better.fd != -1 ) { - close( uplink->better.fd ); - uplink->better.fd = -1; + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // We set the flag - hold onto image } - mutex_unlock( &uplink->sendMutex ); - mutex_unlock( &uplink->rttLock ); - mutex_unlock( &uplink->queueLock ); - mutex_destroy( &uplink->queueLock ); - mutex_destroy( &uplink->rttLock ); - mutex_destroy( &uplink->sendMutex ); - free( uplink->recvBuffer ); - uplink->recvBuffer = NULL; - if ( uplink->cacheFd != -1 ) { - close( uplink->cacheFd ); + dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref ); + if ( current == uplink ) { // Set NULL if it's still us... + mutex_lock( &uplink->queueLock ); + cancelAllRequests( uplink ); + mutex_unlock( &uplink->queueLock ); + ref_setref( &image->uplinkref, NULL ); } - free( uplink ); // !!! - if ( image_lock( image ) != NULL ) { - // Image is still in list... - if ( !_shutdown && image->cache_map != NULL ) { - // Ingegrity checker must have found something in the meantime - uplink_init( image, -1, NULL, 0 ); - } - image_release( image ); + if ( current != NULL ) { // Decrease ref in any case + ref_put( ¤t->reference ); } + mutex_unlock( &image->lock ); + // Finally as the thread is done, decrease our own ref that we initialized with + ref_put( &uplink->reference ); return NULL ; } @@ -637,7 +672,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); /* logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", - (void*)link, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize ); + (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize ); */ mutex_unlock( &uplink->queueLock ); if ( hops < 200 ) ++hops; @@ -782,7 +817,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int /** * Receive data from uplink server and process/dispatch - * Locks on: link.lock, images[].lock + * Locks on: uplink.lock, images[].lock */ static void uplink_handleReceive(dnbd3_uplink_t *uplink) { @@ -924,13 +959,16 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) } mutex_unlock( &client->sendMutex ); mutex_lock( &uplink->queueLock ); + if ( i > uplink->queueLen ) { + uplink->queueLen = i; // Might have been set to 0 by cancelAllRequests + } } if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--; } mutex_unlock( &uplink->queueLock ); #ifdef _DEBUG if ( !served && start != uplink->replicationHandle ) { - logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, uplink->image->name, start, end ); + logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end ); } #endif if ( start == uplink->replicationHandle ) { diff --git a/src/server/uplink.h b/src/server/uplink.h index acc8e11..49ff0b4 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -14,7 +14,7 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client); bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount); -void uplink_shutdown(dnbd3_image_t *image); +bool uplink_shutdown(dnbd3_image_t *image); bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len); -- cgit v1.2.3-55-g7522