summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-27 16:13:07 +0200
committerSimon Rettberg2019-08-27 16:13:07 +0200
commit69f5bf408b9587a6e2008fba2224c2d506f1a895 (patch)
tree8fc9eda7e3a0b105007b7a85a4cc35ecc1d4431d
parent[SERVER] Fix warnings, simplify locking (diff)
downloaddnbd3-69f5bf408b9587a6e2008fba2224c2d506f1a895.tar.gz
dnbd3-69f5bf408b9587a6e2008fba2224c2d506f1a895.tar.xz
dnbd3-69f5bf408b9587a6e2008fba2224c2d506f1a895.zip
[SERVER] Use reference counting for uplink
First step towards less locking for proxy mode
-rw-r--r--src/server/altservers.c13
-rw-r--r--src/server/globals.h4
-rw-r--r--src/server/image.c39
-rw-r--r--src/server/integrity.c17
-rw-r--r--src/server/net.c48
-rw-r--r--src/server/net.h2
-rw-r--r--src/server/reference.c33
-rw-r--r--src/server/reference.h54
-rw-r--r--src/server/reftypes.h25
-rw-r--r--src/server/uplink.c214
-rw-r--r--src/server/uplink.h2
11 files changed, 311 insertions, 140 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 493ed9e..7d7fdbe 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -7,6 +7,8 @@
#include "../shared/protocol.h"
#include "../shared/timing.h"
#include "../serverconfig.h"
+#include "reference.h"
+
#include <assert.h>
#include <inttypes.h>
#include <jansson.h>
@@ -104,7 +106,6 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink)
return;
if ( uplink->current.fd != -1 && numAltServers <= 1 )
return;
- int i;
// if betterFd != -1 it means the uplink is supposed to switch to another
// server. As this function here is called by the uplink thread, it can
// never be that the uplink is supposed to switch, but instead calls
@@ -112,11 +113,14 @@ void altservers_findUplinkAsync(dnbd3_uplink_t *uplink)
assert( uplink->better.fd == -1 );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
- mutex_lock( &uplink->rttLock );
if ( uplink->rttTestResult != RTT_INPROGRESS ) {
- threadpool_run( &altservers_runCheck, uplink );
+ dnbd3_uplink_t *current = ref_get_uplink( &uplink->image->uplinkref );
+ if ( current == uplink ) {
+ threadpool_run( &altservers_runCheck, uplink );
+ } else if ( current != NULL ) {
+ ref_put( &current->reference );
+ }
}
- mutex_unlock( &uplink->rttLock );
}
/**
@@ -375,6 +379,7 @@ static void *altservers_runCheck(void *data)
assert( uplink != NULL );
setThreadName( "altserver-check" );
altservers_findUplinkInternal( uplink );
+ ref_put( &uplink->reference ); // Acquired in findUplinkAsync
// Save cache maps of all images if applicable
// TODO: Has nothing to do with alt servers really, maybe move somewhere else?
declare_now;
diff --git a/src/server/globals.h b/src/server/globals.h
index 4d97c6b..5dd205a 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -8,6 +8,7 @@
#include <stdatomic.h>
#include <time.h>
#include <pthread.h>
+#include "reftypes.h"
typedef struct timespec ticks;
@@ -64,6 +65,7 @@ typedef struct {
#define RTT_NOT_REACHABLE 4 // No uplink was reachable
struct _dnbd3_uplink
{
+ ref reference;
dnbd3_server_connection_t current; // Currently active connection; fd == -1 means disconnected
dnbd3_server_connection_t better; // Better connection as found by altserver worker; fd == -1 means none
dnbd3_signal_t* signal; // used to wake up the process
@@ -107,7 +109,7 @@ struct _dnbd3_image
{
char *path; // absolute path of the image
char *name; // public name of the image (usually relative path minus revision ID)
- dnbd3_uplink_t *uplink; // pointer to a server connection
+ weakref uplinkref; // pointer to a server connection
uint8_t *cache_map; // cache map telling which parts are locally cached, NULL if complete
uint64_t virtualFilesize; // virtual size of image (real size rounded up to multiple of 4k)
uint64_t realFilesize; // actual file size on disk
diff --git a/src/server/image.c b/src/server/image.c
index 1a6e0f8..5b58347 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -8,6 +8,7 @@
#include "../shared/protocol.h"
#include "../shared/timing.h"
#include "../shared/crc32.h"
+#include "reference.h"
#include <assert.h>
#include <fcntl.h>
@@ -375,9 +376,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
// Check if image is incomplete, handle
if ( candidate->cache_map != NULL ) {
- if ( candidate->uplink == NULL ) {
- uplink_init( candidate, -1, NULL, -1 );
- }
+ uplink_init( candidate, -1, NULL, -1 );
}
return candidate; // We did all we can, hopefully it's working
@@ -484,17 +483,7 @@ void image_killUplinks()
mutex_lock( &imageListLock );
for (i = 0; i < _num_images; ++i) {
if ( _images[i] == NULL ) continue;
- mutex_lock( &_images[i]->lock );
- if ( _images[i]->uplink != NULL ) {
- mutex_lock( &_images[i]->uplink->queueLock );
- if ( !_images[i]->uplink->shutdown ) {
- thread_detach( _images[i]->uplink->thread );
- _images[i]->uplink->shutdown = true;
- }
- mutex_unlock( &_images[i]->uplink->queueLock );
- signal_call( _images[i]->uplink->signal );
- }
- mutex_unlock( &_images[i]->lock );
+ uplink_shutdown( _images[i] );
}
mutex_unlock( &imageListLock );
}
@@ -588,11 +577,15 @@ bool image_tryFreeAll()
static dnbd3_image_t* image_free(dnbd3_image_t *image)
{
assert( image != NULL );
+ assert( image->users == 0 );
if ( !_shutdown ) {
logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid );
}
- //
- uplink_shutdown( image );
+ // uplink_shutdown might return false to tell us
+ // that the shutdown is in progress. Bail out since
+ // this will get called again when the uplink is done.
+ if ( !uplink_shutdown( image ) )
+ return NULL;
mutex_lock( &image->lock );
free( image->cache_map );
free( image->crc32 );
@@ -860,7 +853,7 @@ static bool image_load(char *base, char *path, int withUplink)
image->cache_map = cache_map;
image->crc32 = crc32list;
image->masterCrc32 = masterCrc;
- image->uplink = NULL;
+ image->uplinkref = NULL;
image->realFilesize = realFilesize;
image->virtualFilesize = virtualFilesize;
image->rid = (uint16_t)revision;
@@ -1503,16 +1496,18 @@ json_t* image_getListAsJson()
mutex_lock( &image->lock );
idleTime = (int)timing_diff( &image->atime, &now );
completeness = image_getCompletenessEstimate( image );
- if ( image->uplink == NULL ) {
+ mutex_unlock( &image->lock );
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
bytesReceived = 0;
uplinkName[0] = '\0';
} else {
- bytesReceived = image->uplink->bytesReceived;
- if ( !uplink_getHostString( image->uplink, uplinkName, sizeof(uplinkName) ) ) {
+ bytesReceived = uplink->bytesReceived;
+ if ( !uplink_getHostString( uplink, uplinkName, sizeof(uplinkName) ) ) {
uplinkName[0] = '\0';
}
+ ref_put( &uplink->reference );
}
- mutex_unlock( &image->lock );
jsonImage = json_pack( "{sisssisisisisI}",
"id", image->id, // id, name, rid never change, so access them without locking
@@ -1734,7 +1729,7 @@ void image_closeUnusedFd()
if ( image == NULL )
continue;
mutex_lock( &image->lock );
- if ( image->users == 0 && image->uplink == NULL && timing_reached( &image->atime, &deadline ) ) {
+ if ( image->users == 0 && image->uplinkref == NULL && timing_reached( &image->atime, &deadline ) ) {
snprintf( imgstr, sizeof(imgstr), "%s:%d", image->name, (int)image->rid );
fd = image->readFd;
image->readFd = -1;
diff --git a/src/server/integrity.c b/src/server/integrity.c
index 3d1ac9b..f358c46 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -4,6 +4,7 @@
#include "locks.h"
#include "image.h"
#include "uplink.h"
+#include "reference.h"
#include <assert.h>
#include <sys/syscall.h>
@@ -238,11 +239,13 @@ static void* integrity_main(void * data UNUSED)
if ( i + 1 == queueLen ) queueLen--;
// Mark as working again if applicable
if ( !foundCorrupted ) {
- mutex_lock( &image->lock );
- if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper?
- image->working = image->uplink->current.fd != -1 && image->readFd != -1;
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink != NULL ) { // TODO: image_determineWorkingState() helper?
+ mutex_lock( &image->lock );
+ image->working = uplink->current.fd != -1 && image->readFd != -1;
+ mutex_unlock( &image->lock );
+ ref_put( &uplink->reference );
}
- mutex_unlock( &image->lock );
}
} else {
// Still more blocks to go...
@@ -255,12 +258,8 @@ static void* integrity_main(void * data UNUSED)
// Something was fishy, make sure uplink exists
mutex_lock( &image->lock );
image->working = false;
- bool restart = image->uplink == NULL || image->uplink->shutdown;
mutex_unlock( &image->lock );
- if ( restart ) {
- uplink_shutdown( image );
- uplink_init( image, -1, NULL, -1 );
- }
+ uplink_init( image, -1, NULL, -1 );
}
// Release :-)
image_release( image );
diff --git a/src/server/net.c b/src/server/net.c
index 4976eea..e0b516e 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -24,6 +24,7 @@
#include "locks.h"
#include "rpc.h"
#include "altservers.h"
+#include "reference.h"
#include "../shared/sockhelper.h"
#include "../shared/timing.h"
@@ -229,7 +230,7 @@ void* net_handleNewConnection(void *clientPtr)
rid = serializer_get_uint16( &payload );
const uint8_t flags = serializer_get_uint8( &payload );
client->isServer = ( flags & FLAGS8_SERVER );
- if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
+ if ( unlikely( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) ) {
if ( client_version < MIN_SUPPORTED_CLIENT ) {
logadd( LOG_DEBUG1, "Client %s too old", client->hostName );
} else {
@@ -257,22 +258,25 @@ void* net_handleNewConnection(void *clientPtr)
}
client->image = image;
atomic_thread_fence( memory_order_release );
- if ( image == NULL ) {
+ if ( unlikely( image == NULL ) ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( !image->working ) {
+ } else if ( unlikely( !image->working ) ) {
logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
client->hostName, image_name, (int)rid );
} else {
- bool penalty;
// Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
bOk = true;
if ( image->cache_map != NULL ) {
- mutex_lock( &image->lock );
- if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL || uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
bOk = ( rand() % 4 ) == 1;
}
- penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1;
- mutex_unlock( &image->lock );
+ bool penalty = bOk && ( uplink == NULL || uplink->cacheFd == -1 );
+ if ( uplink == NULL ) {
+ uplink_init( image, -1, NULL, 0 );
+ } else {
+ ref_put( &uplink->reference );
+ }
if ( penalty ) { // Wait 100ms if local caching is not working so this
usleep( 100000 ); // server gets a penalty and is less likely to be selected
}
@@ -300,7 +304,7 @@ void* net_handleNewConnection(void *clientPtr)
}
}
- if ( bOk ) {
+ if ( likely( bOk ) ) {
// add artificial delay if applicable
if ( client->isServer && _serverPenalty != 0 ) {
usleep( _serverPenalty );
@@ -315,7 +319,7 @@ void* net_handleNewConnection(void *clientPtr)
case CMD_GET_BLOCK:;
const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking
reply.handle = request.handle;
- if ( offset >= image->virtualFilesize ) {
+ if ( unlikely( offset >= image->virtualFilesize ) ) {
// Sanity check
logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName );
reply.size = 0;
@@ -323,7 +327,7 @@ void* net_handleNewConnection(void *clientPtr)
send_reply( client->sock, &reply, NULL );
break;
}
- if ( offset + request.size > image->virtualFilesize ) {
+ if ( unlikely( offset + request.size > image->virtualFilesize ) ) {
// Sanity check
logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName );
reply.size = 0;
@@ -398,7 +402,7 @@ void* net_handleNewConnection(void *clientPtr)
reply.size = request.size;
fixup_reply( reply );
- const bool lock = image->uplink != NULL;
+ const bool lock = image->uplinkref != NULL;
if ( lock ) mutex_lock( &client->sendMutex );
// Send reply header
if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) {
@@ -696,9 +700,11 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client)
{
mutex_lock( &client->lock );
if ( client->image != NULL ) {
- mutex_lock( &client->image->lock );
- if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client );
- mutex_unlock( &client->image->lock );
+ dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( uplink != NULL ) {
+ uplink_removeClient( uplink, client );
+ ref_put( &uplink->reference );
+ }
}
mutex_lock( &client->sendMutex );
if ( client->sock != -1 ) {
@@ -740,3 +746,15 @@ static bool addToList(dnbd3_client_t *client)
return true;
}
+void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle)
+{
+ dnbd3_reply_t reply;
+ reply.magic = dnbd3_packet_magic;
+ reply.cmd = cmd;
+ reply.handle = handle;
+ reply.size = 0;
+ mutex_lock( &client->sendMutex );
+ send_reply( client->sock, &reply, NULL );
+ mutex_unlock( &client->sendMutex );
+}
+
diff --git a/src/server/net.h b/src/server/net.h
index 6813b49..7719aef 100644
--- a/src/server/net.h
+++ b/src/server/net.h
@@ -37,4 +37,6 @@ void net_disconnectAll();
void net_waitForAllDisconnected();
+void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle);
+
#endif /* NET_H_ */
diff --git a/src/server/reference.c b/src/server/reference.c
new file mode 100644
index 0000000..468e00b
--- /dev/null
+++ b/src/server/reference.c
@@ -0,0 +1,33 @@
+#ifndef unlikely
+#define unlikely(x) (x)
+#endif
+#include "reference.h"
+#include <stdio.h>
+#include <stdlib.h>
+
+void ref_init( ref *reference, void ( *freefun )( ref * ), long count )
+{
+ reference->count = count;
+ reference->free = freefun;
+}
+
+_Noreturn void _ref_error( const char *message )
+{
+ fprintf( stderr, "Reference counter overflow\n" );
+ abort();
+}
+
+void ref_setref( weakref *weakref, ref *ref )
+{
+ union _aligned_ref_ *new_weakref = 0;
+ if ( ref ) {
+ ( new_weakref = aligned_ref( ref->_aligned_ref ) )->ref = ref;
+ ref->count += sizeof( union _aligned_ref_ ) + 1;
+ }
+ char *old_weakref = (char *)atomic_exchange( weakref, new_weakref );
+ if ( !old_weakref )
+ return;
+ struct _ref_ *old_ref = aligned_ref( old_weakref )->ref;
+ old_ref->count += old_weakref - (char *)aligned_ref( old_weakref ) - sizeof( union _aligned_ref_ );
+ ref_put( old_ref );
+}
diff --git a/src/server/reference.h b/src/server/reference.h
new file mode 100644
index 0000000..0bc081a
--- /dev/null
+++ b/src/server/reference.h
@@ -0,0 +1,54 @@
+#ifndef _REFERENCE_H_
+#define _REFERENCE_H_
+
+#include "reftypes.h"
+#include <stddef.h>
+#include <stdint.h>
+
+#define container_of(ptr, type, member) \
+ ((type *)((char *)(ptr) - (char *)&(((type *)NULL)->member)))
+
+void ref_init( ref *reference, void ( *freefun )( ref * ), long count );
+
+void ref_setref( weakref *weakref, ref *ref );
+
+_Noreturn void _ref_error( const char *message );
+
+static inline ref *ref_get( weakref *weakref )
+{
+ char *old_weakref = (char *)*weakref;
+ do {
+ if ( old_weakref == NULL )
+ return NULL;
+ if ( aligned_ref( old_weakref ) != aligned_ref( old_weakref + 1 ) ) {
+ old_weakref = (char *)*weakref;
+ continue;
+ }
+ } while ( !atomic_compare_exchange_weak( weakref, (void **)&old_weakref, old_weakref + 1 ) );
+ struct _ref_ *ref = aligned_ref( old_weakref )->ref;
+ if ( unlikely( ++ref->count == -1 ) ) {
+ _ref_error( "Reference counter overflow. Aborting.\n" );
+ }
+ char *cur_weakref = ( char * )*weakref;
+ do {
+ if ( aligned_ref( cur_weakref ) != aligned_ref( old_weakref ) ) {
+ ref->count--;
+ break;
+ }
+ } while ( !atomic_compare_exchange_weak( weakref, (void **)&cur_weakref, cur_weakref - 1 ) );
+ return ref;
+}
+
+static inline void ref_put( ref *ref )
+{
+ if ( --ref->count == 0 ) {
+ ref->free( ref );
+ }
+}
+
+#define ref_get_uplink(wr) ({ \
+ ref* ref = ref_get( wr ); \
+ ref == NULL ? NULL : container_of(ref, dnbd3_uplink_t, reference); \
+})
+
+#endif
diff --git a/src/server/reftypes.h b/src/server/reftypes.h
new file mode 100644
index 0000000..45c0c20
--- /dev/null
+++ b/src/server/reftypes.h
@@ -0,0 +1,25 @@
+#ifndef _REFTYPES_H_
+#define _REFTYPES_H_
+
+#include <stdatomic.h>
+
+_Static_assert( sizeof( void * ) == sizeof( _Atomic( void * ) ), "Atomic pointer bad" );
+
+typedef _Atomic( void * ) weakref;
+
+#define aligned_ref(ptr) \
+ ((union _aligned_ref_ *)((ptr) - (uintptr_t)(ptr) % sizeof(union _aligned_ref_)))
+
+union _aligned_ref_ {
+ struct _ref_ *ref;
+ void *_padding[( 32 - 1 ) / sizeof( void * ) + 1];
+};
+
+typedef struct _ref_ {
+ _Atomic long count;
+ void ( *free )( struct _ref_ * );
+ char _padding[sizeof( union _aligned_ref_ )];
+ char _aligned_ref[sizeof( union _aligned_ref_ )];
+} ref;
+
+#endif
diff --git a/src/server/uplink.c b/src/server/uplink.c
index abfebf0..7a39887 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -3,10 +3,12 @@
#include "locks.h"
#include "image.h"
#include "altservers.h"
+#include "net.h"
#include "../shared/sockhelper.h"
#include "../shared/protocol.h"
#include "../shared/timing.h"
#include "../shared/crc32.h"
+#include "reference.h"
#include <assert.h>
#include <inttypes.h>
@@ -45,6 +47,8 @@ static const char *const NAMES_ULR[4] = {
static atomic_uint_fast64_t totalBytesReceived = 0;
+static void cancelAllRequests(dnbd3_uplink_t *uplink);
+static void uplink_free(ref *ref);
static void* uplink_mainloop(void *data);
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
@@ -76,19 +80,24 @@ uint64_t uplink_getTotalBytesReceived()
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
if ( !_isProxy || _shutdown ) return false;
- dnbd3_uplink_t *uplink = NULL;
assert( image != NULL );
mutex_lock( &image->lock );
- if ( image->uplink != NULL && !image->uplink->shutdown ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink != NULL ) {
mutex_unlock( &image->lock );
- if ( sock >= 0 ) close( sock );
+ if ( sock != -1 ) {
+ close( sock );
+ }
+ ref_put( &uplink->reference );
return true; // There's already an uplink, so should we consider this success or failure?
}
if ( image->cache_map == NULL ) {
logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name );
goto failure;
}
- uplink = image->uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
+ uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
+ // Start with one reference for the uplink thread. We'll return it when the thread finishes
+ ref_init( &uplink->reference, uplink_free, 1 );
mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
@@ -121,12 +130,13 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
+ ref_setref( &image->uplinkref, &uplink->reference );
mutex_unlock( &image->lock );
return true;
failure: ;
if ( uplink != NULL ) {
free( uplink );
- uplink = image->uplink = NULL;
+ uplink = NULL;
}
mutex_unlock( &image->lock );
return false;
@@ -137,34 +147,83 @@ failure: ;
* Calling it multiple times, even concurrently, will
* not break anything.
*/
-void uplink_shutdown(dnbd3_image_t *image)
+bool uplink_shutdown(dnbd3_image_t *image)
{
- bool join = false;
- pthread_t thread;
assert( image != NULL );
mutex_lock( &image->lock );
- if ( image->uplink == NULL ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
mutex_unlock( &image->lock );
- return;
+ return true;
}
- dnbd3_uplink_t * const uplink = image->uplink;
mutex_lock( &uplink->queueLock );
bool exp = false;
if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
+ image->users++; // Prevent free while uplink shuts down
signal_call( uplink->signal );
- thread = uplink->thread;
- join = true;
+ } else {
+ logadd( LOG_ERROR, "This will never happen. '%s:%d'", image->name, (int)image->rid );
}
+ cancelAllRequests( uplink );
+ ref_setref( &image->uplinkref, NULL );
+ ref_put( &uplink->reference );
mutex_unlock( &uplink->queueLock );
- bool wait = image->uplink != NULL;
+ bool retval = ( exp && image->users == 0 );
mutex_unlock( &image->lock );
- if ( join ) thread_join( thread, NULL );
- while ( wait ) {
- usleep( 5000 );
- mutex_lock( &image->lock );
- wait = image->uplink != NULL && image->uplink->shutdown;
- mutex_unlock( &image->lock );
+ return exp;
+}
+
+/**
+ * Cancel all requests of this uplink.
+ * HOLD QUEUE LOCK WHILE CALLING
+ */
+static void cancelAllRequests(dnbd3_uplink_t *uplink)
+{
+ for ( int i = 0; i < uplink->queueLen; ++i ) {
+ if ( uplink->queue[i].status != ULR_FREE ) {
+ net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle );
+ uplink->queue[i].status = ULR_FREE;
+ }
+ }
+ uplink->queueLen = 0;
+}
+
+static void uplink_free(ref *ref)
+{
+ dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference);
+ logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", uplink->image->name, (int)uplink->image->rid );
+ assert( uplink->queueLen == 0 );
+ signal_close( uplink->signal );
+ if ( uplink->current.fd != -1 ) {
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
+ }
+ if ( uplink->better.fd != -1 ) {
+ close( uplink->better.fd );
+ uplink->better.fd = -1;
+ }
+ mutex_destroy( &uplink->queueLock );
+ mutex_destroy( &uplink->rttLock );
+ mutex_destroy( &uplink->sendMutex );
+ free( uplink->recvBuffer );
+ uplink->recvBuffer = NULL;
+ if ( uplink->cacheFd != -1 ) {
+ close( uplink->cacheFd );
}
+ // TODO Requeue any requests
+ dnbd3_image_t *image = image_lock( uplink->image );
+ if ( image != NULL ) {
+ // != NULL means image is still in list...
+ if ( !_shutdown && image->cache_map != NULL ) {
+ // Ingegrity checker must have found something in the meantime
+ uplink_init( image, -1, NULL, 0 );
+ }
+ image_release( image );
+ }
+ // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code
+ // of the uplink thread, depending on who set the uplink->shutdown flag.
+ image_release( image );
+ free( uplink ); // !!!
}
/**
@@ -193,31 +252,28 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
*/
bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
- if ( client == NULL || client->image == NULL ) return false;
+ if ( client == NULL || client->image == NULL )
+ return false;
if ( length > (uint32_t)_maxPayload ) {
logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
return false;
}
- mutex_lock( &client->image->lock );
- if ( client->image->uplink == NULL ) {
- mutex_unlock( &client->image->lock );
+ dnbd3_uplink_t * const uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( uplink == NULL ) {
logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
return false;
}
- dnbd3_uplink_t * const uplink = client->image->uplink;
if ( uplink->shutdown ) {
- mutex_unlock( &client->image->lock );
logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
- return false;
+ goto fail_ref;
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
- mutex_unlock( &client->image->lock );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- return false;
+ goto fail_ref;
}
int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise
@@ -229,7 +285,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
const uint64_t end = start + length;
mutex_lock( &uplink->queueLock );
- mutex_unlock( &client->image->lock );
+ if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ goto fail_lock;
+ }
for (i = 0; i < uplink->queueLen; ++i) {
// find free slot to place this request into
if ( uplink->queue[i].status == ULR_FREE ) {
@@ -257,18 +315,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( unlikely( requestLoop ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
- mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- return false;
+ goto fail_lock;
}
if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
freeSlot = -1; // Not attaching to existing request, make it use a higher slot
}
if ( freeSlot == -1 ) {
if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
- mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
- return false;
+ goto fail_lock;
}
freeSlot = uplink->queueLen++;
}
@@ -305,16 +361,16 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
#endif
mutex_unlock( &uplink->queueLock );
- if ( foundExisting != -1 )
+ if ( foundExisting != -1 ) {
+ ref_put( &uplink->reference );
return true; // Attached to pending request, do nothing
-
- usleep( 10000 );
+ }
// See if we can fire away the request
- if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
+ if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
- if ( uplink->current.fd == -1 ) {
+ if ( unlikely( uplink->current.fd == -1 ) ) {
mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
@@ -323,13 +379,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( hops < 200 ) ++hops;
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
mutex_unlock( &uplink->sendMutex );
- if ( !ret ) {
+ if ( unlikely( !ret ) ) {
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
} else {
// Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
int state;
mutex_lock( &uplink->queueLock );
- if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
+ if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
state = uplink->queue[freeSlot].status;
if ( uplink->queue[freeSlot].status == ULR_NEW ) {
uplink->queue[freeSlot].status = ULR_PENDING;
@@ -345,6 +401,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
} else {
logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
}
+ ref_put( &uplink->reference );
return true;
}
// Fall through to waking up sender thread
@@ -354,7 +411,13 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
+ ref_put( &uplink->reference );
return true;
+fail_lock:
+ mutex_unlock( &uplink->queueLock );
+fail_ref:
+ ref_put( &uplink->reference );
+ return false;
}
/**
@@ -381,6 +444,7 @@ static void* uplink_mainloop(void *data)
//
assert( uplink != NULL );
setThreadName( "idle-uplink" );
+ thread_detach( uplink->thread );
blockNoncriticalSignals();
// Make sure file is open for writing
if ( !uplink_reopenCacheFd( uplink, false ) ) {
@@ -553,7 +617,7 @@ static void* uplink_mainloop(void *data)
for (i = 0; i < uplink->queueLen; ++i) {
if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) {
snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
- "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, uplink->queue[i].client->image->name,
+ "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name,
uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status );
uplink->queue[i].entered = now;
#ifdef _DEBUG_RESEND_STARVING
@@ -572,55 +636,26 @@ static void* uplink_mainloop(void *data)
#endif
}
cleanup: ;
- // Detach depends on whether someone is joining this thread...
- bool exp = false;
- if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
- thread_detach( uplink->thread );
- }
uplink_saveCacheMap( uplink );
dnbd3_image_t *image = uplink->image;
mutex_lock( &image->lock );
- // in the list anymore, but we want to prevent it from being freed in either case
- if ( image->uplink == uplink ) {
- image->uplink = NULL;
- }
- mutex_unlock( &image->lock ); // Do NOT use image without locking it
- mutex_lock( &uplink->queueLock );
- // Wait for active RTT measurement to finish
- while ( uplink->rttTestResult == RTT_INPROGRESS ) {
- usleep( 10000 );
- }
- signal_close( uplink->signal );
- mutex_lock( &uplink->rttLock );
- mutex_lock( &uplink->sendMutex );
- if ( uplink->current.fd != -1 ) {
- close( uplink->current.fd );
- uplink->current.fd = -1;
- }
- if ( uplink->better.fd != -1 ) {
- close( uplink->better.fd );
- uplink->better.fd = -1;
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
+ image->users++; // We set the flag - hold onto image
}
- mutex_unlock( &uplink->sendMutex );
- mutex_unlock( &uplink->rttLock );
- mutex_unlock( &uplink->queueLock );
- mutex_destroy( &uplink->queueLock );
- mutex_destroy( &uplink->rttLock );
- mutex_destroy( &uplink->sendMutex );
- free( uplink->recvBuffer );
- uplink->recvBuffer = NULL;
- if ( uplink->cacheFd != -1 ) {
- close( uplink->cacheFd );
+ dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref );
+ if ( current == uplink ) { // Set NULL if it's still us...
+ mutex_lock( &uplink->queueLock );
+ cancelAllRequests( uplink );
+ mutex_unlock( &uplink->queueLock );
+ ref_setref( &image->uplinkref, NULL );
}
- free( uplink ); // !!!
- if ( image_lock( image ) != NULL ) {
- // Image is still in list...
- if ( !_shutdown && image->cache_map != NULL ) {
- // Ingegrity checker must have found something in the meantime
- uplink_init( image, -1, NULL, 0 );
- }
- image_release( image );
+ if ( current != NULL ) { // Decrease ref in any case
+ ref_put( &current->reference );
}
+ mutex_unlock( &image->lock );
+ // Finally as the thread is done, decrease our own ref that we initialized with
+ ref_put( &uplink->reference );
return NULL ;
}
@@ -637,7 +672,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
/*
logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
- (void*)link, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize );
+ (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize );
*/
mutex_unlock( &uplink->queueLock );
if ( hops < 200 ) ++hops;
@@ -782,7 +817,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
/**
* Receive data from uplink server and process/dispatch
- * Locks on: link.lock, images[].lock
+ * Locks on: uplink.lock, images[].lock
*/
static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
@@ -924,13 +959,16 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
}
mutex_unlock( &client->sendMutex );
mutex_lock( &uplink->queueLock );
+ if ( i > uplink->queueLen ) {
+ uplink->queueLen = i; // Might have been set to 0 by cancelAllRequests
+ }
}
if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--;
}
mutex_unlock( &uplink->queueLock );
#ifdef _DEBUG
if ( !served && start != uplink->replicationHandle ) {
- logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, uplink->image->name, start, end );
+ logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end );
}
#endif
if ( start == uplink->replicationHandle ) {
diff --git a/src/server/uplink.h b/src/server/uplink.h
index acc8e11..49ff0b4 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -14,7 +14,7 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client);
bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount);
-void uplink_shutdown(dnbd3_image_t *image);
+bool uplink_shutdown(dnbd3_image_t *image);
bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len);