summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c1529
1 files changed, 881 insertions, 648 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 682b986..8a83124 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -3,10 +3,13 @@
#include "locks.h"
#include "image.h"
#include "altservers.h"
-#include "../shared/sockhelper.h"
-#include "../shared/protocol.h"
-#include "../shared/timing.h"
-#include "../shared/crc32.h"
+#include "net.h"
+#include <dnbd3/shared/sockhelper.h>
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/timing.h>
+#include <dnbd3/shared/crc32.h>
+#include "threadpool.h"
+#include "reference.h"
#include <assert.h>
#include <inttypes.h>
@@ -15,25 +18,35 @@
#include <unistd.h>
#include <stdatomic.h>
+static const uint8_t HOP_FLAG_BGR = 0x80;
+static const uint8_t HOP_FLAG_PREFETCH = 0x40;
#define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 )
#define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE )
#define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) )
-#define REP_NONE ( (uint64_t)0xffffffffffffffff )
-
static atomic_uint_fast64_t totalBytesReceived = 0;
+typedef struct {
+ uint64_t start, end, handle;
+} req_t;
+
+static void cancelAllRequests(dnbd3_uplink_t *uplink);
+static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
-static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly);
-static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int lastBlockIndex);
-static void uplink_handleReceive(dnbd3_connection_t *link);
-static int uplink_sendKeepalive(const int fd);
-static void uplink_addCrc32(dnbd3_connection_t *uplink);
-static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
-static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
-static bool uplink_saveCacheMap(dnbd3_connection_t *link);
-static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link);
-static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew);
+static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly);
+static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
+static void handleReceive(dnbd3_uplink_t *uplink);
+static bool sendKeepalive(dnbd3_uplink_t *uplink);
+static void requestCrc32List(dnbd3_uplink_t *uplink);
+static bool sendReplicationRequest(dnbd3_uplink_t *uplink);
+static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
+static bool connectionShouldShutdown(dnbd3_uplink_t *uplink);
+static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
+static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
+static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
+
+#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) )
// ############ uplink connection handling
@@ -54,56 +67,73 @@ uint64_t uplink_getTotalBytesReceived()
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
if ( !_isProxy || _shutdown ) return false;
- dnbd3_connection_t *link = NULL;
assert( image != NULL );
+ if ( sock == -1 && !altservers_imageHasAltServers( image->name ) )
+ return false; // Nothing to do
mutex_lock( &image->lock );
- if ( image->uplink != NULL && !image->uplink->shutdown ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink != NULL ) {
mutex_unlock( &image->lock );
- if ( sock >= 0 ) close( sock );
- return true; // There's already an uplink, so should we consider this success or failure?
+ if ( sock != -1 ) {
+ close( sock );
+ }
+ ref_put( &uplink->reference );
+ return true; // There's already an uplink
}
- if ( image->cache_map == NULL ) {
+ if ( image->ref_cacheMap == NULL ) {
logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name );
goto failure;
}
- link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
- mutex_init( &link->queueLock );
- mutex_init( &link->rttLock );
- mutex_init( &link->sendMutex );
- link->image = image;
- link->bytesReceived = 0;
- link->idleTime = 0;
- link->queueLen = 0;
- mutex_lock( &link->sendMutex );
- link->fd = -1;
- mutex_unlock( &link->sendMutex );
- link->cacheFd = -1;
- link->signal = NULL;
- link->replicationHandle = REP_NONE;
- mutex_lock( &link->rttLock );
- link->cycleDetected = false;
- if ( sock >= 0 ) {
- link->betterFd = sock;
- link->betterServer = *host;
- link->rttTestResult = RTT_DOCHANGE;
- link->betterVersion = version;
+ uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
+ // Start with one reference for the uplink thread. We'll return it when the thread finishes
+ ref_init( &uplink->reference, freeUplinkStruct, 1 );
+ mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
+ mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
+ mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
+ uplink->image = image;
+ uplink->bytesReceived = 0;
+ uplink->bytesReceivedLastSave = 0;
+ uplink->idleTime = SERVER_UPLINK_IDLE_TIMEOUT - 90;
+ uplink->queue = NULL;
+ uplink->queueLen = 0;
+ uplink->cacheFd = -1;
+ uplink->signal = signal_new();
+ if ( uplink->signal == NULL ) {
+ logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." );
+ goto failure;
+ }
+ mutex_lock( &uplink->rttLock );
+ mutex_lock( &uplink->sendMutex );
+ uplink->current.fd = -1;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->cycleDetected = false;
+ image->problem.uplink = true;
+ image->problem.write = true;
+ image->problem.queue = false;
+ if ( sock != -1 ) {
+ uplink->better.fd = sock;
+ int index = altservers_hostToIndex( host );
+ uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access
+ uplink->rttTestResult = RTT_DOCHANGE;
+ uplink->better.version = version;
} else {
- link->betterFd = -1;
- link->rttTestResult = RTT_IDLE;
+ uplink->better.fd = -1;
+ uplink->rttTestResult = RTT_IDLE;
}
- mutex_unlock( &link->rttLock );
- link->recvBufferLen = 0;
- link->shutdown = false;
- if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) {
+ mutex_unlock( &uplink->rttLock );
+ uplink->recvBufferLen = 0;
+ uplink->shutdown = false;
+ if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
+ ref_setref( &image->uplinkref, &uplink->reference );
mutex_unlock( &image->lock );
return true;
failure: ;
- if ( link != NULL ) {
- free( link );
- link = image->uplink = NULL;
+ if ( uplink != NULL ) {
+ image->users++; // Expected by freeUplinkStruct()
+ ref_put( &uplink->reference ); // The ref for the uplink thread that never was
}
mutex_unlock( &image->lock );
return false;
@@ -114,201 +144,398 @@ failure: ;
* Calling it multiple times, even concurrently, will
* not break anything.
*/
-void uplink_shutdown(dnbd3_image_t *image)
+bool uplink_shutdown(dnbd3_image_t *image)
{
- bool join = false;
- pthread_t thread;
assert( image != NULL );
mutex_lock( &image->lock );
- if ( image->uplink == NULL ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
mutex_unlock( &image->lock );
- return;
+ return true;
}
- dnbd3_connection_t * const uplink = image->uplink;
mutex_lock( &uplink->queueLock );
- if ( !uplink->shutdown ) {
- uplink->shutdown = true;
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
+ image->users++; // Prevent free while uplink shuts down
signal_call( uplink->signal );
- thread = uplink->thread;
- join = true;
+ } else {
+ logadd( LOG_ERROR, "This will never happen. '%s:%d'", PIMG(image) );
}
+ cancelAllRequests( uplink );
+ ref_setref( &image->uplinkref, NULL );
mutex_unlock( &uplink->queueLock );
- bool wait = image->uplink != NULL;
+ bool retval = ( exp && image->users == 0 );
+ ref_put( &uplink->reference );
mutex_unlock( &image->lock );
- if ( join ) thread_join( thread, NULL );
- while ( wait ) {
- usleep( 5000 );
- mutex_lock( &image->lock );
- wait = image->uplink != NULL && image->uplink->shutdown;
- mutex_unlock( &image->lock );
+ return retval;
+}
+
+/**
+ * Cancel all requests of this uplink.
+ * HOLD QUEUE LOCK WHILE CALLING
+ */
+static void cancelAllRequests(dnbd3_uplink_t *uplink)
+{
+ dnbd3_queue_entry_t *it = uplink->queue;
+ while ( it != NULL ) {
+ dnbd3_queue_client_t *cit = it->clients;
+ while ( cit != NULL ) {
+ (*cit->callback)( cit->data, cit->handle, 0, 0, NULL );
+ dnbd3_queue_client_t *next = cit->next;
+ free( cit );
+ cit = next;
+ }
+ dnbd3_queue_entry_t *next = it->next;
+ free( it );
+ it = next;
}
+ uplink->queue = NULL;
+ uplink->queueLen = 0;
+ uplink->image->problem.queue = false;
+}
+
+static void freeUplinkStruct(ref *ref)
+{
+ dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference);
+ logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", PIMG(uplink->image) );
+ assert( uplink->queueLen == 0 );
+ if ( uplink->signal != NULL ) {
+ signal_close( uplink->signal );
+ }
+ if ( uplink->current.fd != -1 ) {
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
+ }
+ if ( uplink->better.fd != -1 ) {
+ close( uplink->better.fd );
+ uplink->better.fd = -1;
+ }
+ mutex_destroy( &uplink->queueLock );
+ mutex_destroy( &uplink->rttLock );
+ mutex_destroy( &uplink->sendMutex );
+ free( uplink->recvBuffer );
+ uplink->recvBuffer = NULL;
+ if ( uplink->cacheFd != -1 ) {
+ close( uplink->cacheFd );
+ }
+ // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code
+ // of the uplink thread, depending on who set the uplink->shutdown flag. (Or uplink_init if that failed)
+ image_release( uplink->image );
+ free( uplink ); // !!!
}
/**
* Remove given client from uplink request queue
* Locks on: uplink.queueLock
*/
-void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
+void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback)
{
mutex_lock( &uplink->queueLock );
- for (int i = uplink->queueLen - 1; i >= 0; --i) {
- if ( uplink->queue[i].client == client ) {
- uplink->queue[i].client = NULL;
- uplink->queue[i].status = ULR_FREE;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) {
+ if ( (**cit).data == data && (**cit).callback == callback ) {
+ (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL );
+ dnbd3_queue_client_t *entry = *cit;
+ *cit = (**cit).next;
+ free( entry );
+ } else {
+ cit = &(**cit).next;
+ }
}
- if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--;
}
mutex_unlock( &uplink->queueLock );
}
/**
- * Request a chunk of data through an uplink server
- * Locks on: image.lock, uplink.queueLock
+ * Called from a client (proxy) connection to request a missing part of the image.
+ * The caller has made sure that the range is actually missing.
*/
-bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
- if ( client == NULL || client->image == NULL ) return false;
- if ( length > (uint32_t)_maxPayload ) {
- logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
+ assert( client != NULL && callback != NULL );
+ if ( ( hops & 0x3f ) > 60 ) { // This is just silly
+ logadd( LOG_WARNING, "Refusing to relay a request that has > 60 hops" );
return false;
}
- mutex_lock( &client->image->lock );
- if ( client->image->uplink == NULL ) {
- mutex_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( client->image, -1, NULL, -1 );
+ uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
+ }
+ }
+ // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
+ // This might be a false positive if there are multiple instances running on the same host (IP)
+ bool ret;
+ if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
+ uplink->cycleDetected = true;
+ signal_call( uplink->signal );
+ logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
+ ret = false;
+ } else {
+ ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops );
+ }
+ ref_put( &uplink->reference );
+ return ret;
+}
+
+/**
+ * Called by integrated fuse module
+ */
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length)
+{
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( image, -1, NULL, -1 );
+ uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
+ }
+ }
+ bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 );
+ ref_put( &uplink->reference );
+ return ret;
+}
+
+static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted)
+{
+ uint32_t length = (uint32_t)( *end - start );
+ if ( length >= wanted )
+ return;
+ length = wanted;
+ if ( unlikely( _backgroundReplication == BGR_HASHBLOCK
+ && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) {
+ // Don't extend across hash-block border in this mode
+ *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 );
+ } else {
+ *end = start + length;
+ }
+ if ( unlikely( *end > image->virtualFilesize ) ) {
+ *end = image->virtualFilesize;
+ }
+ *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 );
+ //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end );
+}
+
+static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops)
+{
+ if ( uplink->current.fd == -1 )
+ return false;
+ return dnbd3_get_block( uplink->current.fd, req->start,
+ (uint32_t)( req->end - req->start ), req->handle,
+ COND_HOPCOUNT( uplink->current.version, hops ) );
+}
+
+/**
+ * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
+ * If callback is NULL, this is assumed to be a background replication request.
+ * Locks on: uplink.queueLock, uplink.sendMutex
+ */
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+{
+ assert( uplink != NULL );
+ assert( data == NULL || callback != NULL );
+ if ( ( hops & HOP_FLAG_BGR ) // This is a background replication request
+ && _backgroundReplication != BGR_FULL ) { // Deny if we're not doing BGR
+ // TODO: Allow BGR_HASHBLOCK too, but only if hash block isn't completely empty
+ logadd( LOG_DEBUG2, "Dopping client because of BGR policy" );
return false;
}
- dnbd3_connection_t * const uplink = client->image->uplink;
if ( uplink->shutdown ) {
- mutex_unlock( &client->image->lock );
logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
return false;
}
- // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
- // This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) {
- mutex_unlock( &client->image->lock );
- logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- mutex_lock( &uplink->rttLock );
- uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
- signal_call( uplink->signal );
+ if ( length > (uint32_t)_maxPayload ) {
+ logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload",
+ length );
return false;
}
- int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise
- int existingType = -1; // ULR_* type of existing request
- int i;
- int freeSlot = -1;
- bool requestLoop = false;
- const uint64_t end = start + length;
+ hops++;
+ if ( callback == NULL ) {
+ // Set upper-most bit for replication requests that we fire
+ // In client mode, at least set prefetch flag to prevent prefetch cascading
+ hops |= (uint8_t)( _pretendClient ? HOP_FLAG_PREFETCH : HOP_FLAG_BGR );
+ }
+ req_t req, preReq;
+ dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL;
+ bool isNew;
+ const uint64_t end = start + length;
+ req.start = start & ~(DNBD3_BLOCK_SIZE - 1);
+ req.end = end;
+ /* Don't do this -- this breaks matching of prefetch jobs, since they'd
+ * be misaligned, and the next client request wouldn't match anything.
+ * To improve this, we need to be able to attach a queue_client to multiple queue_entries
+ * and then serve it once all the queue_entries are done (atomic_int in queue_client).
+ * But currently we directly send the receive buffer's content to the queue_client after
+ * receiving the payload, as this will also work when the local cache is borked (we just
+ * tunnel though the traffic). One could argue that this mode of operation is nonsense,
+ * and we should just drop all affected clients. Then as a next step, don't serve the
+ * clients form the receive buffer, but just issue a normal sendfile() call after writing
+ * the received data to the local cache.
+ */
+ if ( callback != NULL && _minRequestSize != 0 ) {
+ // Not background replication request, extend request size
+ extendRequest( req.start, &req.end, uplink->image, _minRequestSize );
+ }
+ req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1);
+ // Critical section - work with the queue
mutex_lock( &uplink->queueLock );
- mutex_unlock( &client->image->lock );
- for (i = 0; i < uplink->queueLen; ++i) {
- if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) {
- freeSlot = i;
- continue;
+ if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ goto fail_lock;
+ }
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->from <= start && it->to >= end ) {
+ // Matching range, attach
+ request = it;
+ break;
}
- if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
- if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
- if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end ) {
- requestLoop = true;
- break;
- }
- if ( foundExisting == -1 || existingType == ULR_PENDING ) {
- foundExisting = i;
- existingType = uplink->queue[i].status;
- if ( freeSlot != -1 ) break;
- }
+ if ( it->next == NULL ) {
+ // Not matching, last in list, remember
+ last = it;
+ break;
}
}
- if ( requestLoop ) {
- mutex_unlock( &uplink->queueLock );
- logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- mutex_lock( &uplink->rttLock );
- uplink->cycleDetected = true;
- mutex_unlock( &uplink->rttLock );
- signal_call( uplink->signal );
- return false;
+ dnbd3_queue_client_t **c = NULL;
+ if ( request == NULL ) {
+ // No existing request to attach to
+ if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
+ logadd( LOG_WARNING,
+ "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
+ goto fail_lock;
+ }
+ uplink->queueLen++;
+ if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = true;
+ }
+ request = malloc( sizeof(*request) );
+ if ( last == NULL ) {
+ uplink->queue = request;
+ } else {
+ last->next = request;
+ }
+ request->next = NULL;
+ request->handle = ++uplink->queueId;
+ request->from = req.start;
+ request->to = req.end;
+#ifdef DEBUG
+ timing_get( &request->entered );
+#endif
+ request->hopCount = hops;
+ request->sent = true; // Optimistic; would be set to false on failure
+ if ( callback == NULL ) {
+ // BGR
+ request->clients = NULL;
+ } else {
+ c = &request->clients;
+ }
+ isNew = true;
+ } else if ( callback == NULL ) {
+ // Replication request that maches existing request. Do nothing
+ isNew = false;
+ } else {
+ // Existing request. Check if potential cycle
+ if ( hops > request->hopCount && request->from == start && request->to == end ) {
+ logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
+ goto fail_lock;
+ }
+ // Count number if clients, get tail of list
+ int count = 0;
+ c = &request->clients;
+ while ( *c != NULL ) {
+ c = &(**c).next;
+ if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
+ logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
+ goto fail_lock;
+ }
+ }
+ isNew = false;
}
- if ( freeSlot == -1 ) {
- if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
- mutex_unlock( &uplink->queueLock );
- logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
- return false;
+ // Prefetch immediately, without unlocking the list - the old approach of
+ // async prefetching in another thread was sometimes so slow that we'd process
+ // another request from the same client before the prefetch job would execute.
+ if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data )
+ && !( hops & (HOP_FLAG_BGR | HOP_FLAG_PREFETCH) ) // No cascading of prefetches
+ && end == request->to && length <= _maxPrefetch ) {
+ // Only if this is a client request, and the !! end boundary matches exactly !!
+ // (See above for reason why)
+ // - We neither check the local cache, nor other pending requests. Worth it?
+ // Complexity vs. probability
+ preReq.start = end;
+ preReq.end = end;
+ extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) );
+ if ( preReq.start < preReq.end ) {
+ //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end );
+ uplink->queueLen++;
+ pre = malloc( sizeof(*pre) );
+ pre->next = request->next;
+ request->next = pre;
+ pre->handle = preReq.handle = ++uplink->queueId;
+ pre->from = preReq.start;
+ pre->to = preReq.end;
+ pre->hopCount = hops | HOP_FLAG_PREFETCH;
+ pre->sent = true; // Optimistic; would be set to false on failure
+ pre->clients = NULL;
+#ifdef DEBUG
+ timing_get( &pre->entered );
+#endif
}
- freeSlot = uplink->queueLen++;
}
- // Do not send request to uplink server if we have a matching pending request AND the request either has the
- // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise
- // explicitly send this request to the uplink server. The second condition mentioned here is to prevent
- // a race condition where the reply for the outstanding request already arrived and the uplink thread
- // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might
- // already have passed the index of the free slot we determined, but not reached the existing request we just found above.
- if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request"
-#ifdef _DEBUG
- if ( foundExisting != -1 ) {
- logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot );
- logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n"
- "New %" PRIu64 "-%" PRIu64 " (%p)\n",
- uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client,
- start, end, (void*)client );
+ // // // //
+ // Copy data - need this after unlocking
+ req.handle = request->handle;
+ if ( callback != NULL ) {
+ assert( c != NULL );
+ *c = malloc( sizeof( *request->clients ) );
+ (**c).next = NULL;
+ (**c).handle = handle;
+ (**c).from = start;
+ (**c).to = end;
+ (**c).data = data;
+ (**c).callback = callback;
}
-#endif
- // Fill structure
- uplink->queue[freeSlot].from = start;
- uplink->queue[freeSlot].to = end;
- uplink->queue[freeSlot].handle = handle;
- uplink->queue[freeSlot].client = client;
- //int old = uplink->queue[freeSlot].status;
- uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING);
- uplink->queue[freeSlot].hopCount = hops;
-#ifdef _DEBUG
- timing_get( &uplink->queue[freeSlot].entered );
- //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end );
-#endif
mutex_unlock( &uplink->queueLock );
+ // End queue critical section
+ if ( pre == NULL && !isNew )
+ return true; // Nothing to do
- if ( foundExisting != -1 )
- return true; // Attached to pending request, do nothing
-
- // See if we can fire away the request
- if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
- logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
- } else {
- if ( uplink->fd == -1 ) {
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
- } else {
- const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
- if ( hops < 200 ) ++hops;
- const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !ret ) {
- logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
- } else {
- mutex_lock( &uplink->queueLock );
- if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) {
- uplink->queue[freeSlot].status = ULR_PENDING;
- logadd( LOG_DEBUG2, "Succesful direct uplink request" );
- } else {
- logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" );
- }
- mutex_unlock( &uplink->queueLock );
- return true;
- }
- // Fall through to waking up sender thread
- }
+ // Fire away the request(s)
+ mutex_lock( &uplink->sendMutex );
+ bool ret1 = true;
+ bool ret2 = true;
+ if ( isNew ) {
+ ret1 = requestBlock( uplink, &req, hops );
+ }
+ if ( pre != NULL ) {
+ ret2 = requestBlock( uplink, &preReq, hops | HOP_FLAG_PREFETCH );
+ }
+ if ( !ret1 || !ret2 ) { // Set with send locked
+ uplink->image->problem.uplink = true;
+ }
+ mutex_unlock( &uplink->sendMutex );
+ // markRequestUnsend locks the queue, would violate locking order with send mutex
+ if ( !ret1 ) {
+ markRequestUnsent( uplink, req.handle );
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
+ }
+ if ( !ret2 ) {
+ markRequestUnsent( uplink, preReq.handle );
}
- if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
- }
+ if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
return true;
+
+fail_lock:
+ mutex_unlock( &uplink->queueLock );
+ return false;
}
/**
@@ -321,52 +548,47 @@ static void* uplink_mainloop(void *data)
#define EV_SOCKET (1)
#define EV_COUNT (2)
struct pollfd events[EV_COUNT];
- dnbd3_connection_t * const link = (dnbd3_connection_t*)data;
- int numSocks, i, waitTime;
+ dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data;
+ int numSocks, waitTime;
int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
+ int rttTestResult;
uint32_t discoverFailCount = 0;
- uint32_t unsavedSeconds = 0;
ticks nextAltCheck, lastKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
timing_get( &nextAltCheck );
lastKeepalive = nextAltCheck;
//
- assert( link != NULL );
+ assert( uplink != NULL );
setThreadName( "idle-uplink" );
+ thread_detach( uplink->thread );
blockNoncriticalSignals();
// Make sure file is open for writing
- if ( !uplink_reopenCacheFd( link, false ) ) {
+ if ( !reopenCacheFd( uplink, false ) ) {
// It might have failed - still offer proxy mode, we just can't cache
- logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno );
+ logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno );
}
//
- link->signal = signal_new();
- if ( link->signal == NULL ) {
- logadd( LOG_WARNING, "error creating signal. Uplink unavailable." );
- goto cleanup;
- }
events[EV_SIGNAL].events = POLLIN;
- events[EV_SIGNAL].fd = signal_getWaitFd( link->signal );
+ events[EV_SIGNAL].fd = signal_getWaitFd( uplink->signal );
events[EV_SOCKET].fd = -1;
- while ( !_shutdown && !link->shutdown ) {
+ if ( uplink->rttTestResult != RTT_DOCHANGE ) {
+ altservers_findUplink( uplink ); // In case we didn't kickstart
+ }
+ while ( !_shutdown && !uplink->shutdown ) {
// poll()
- mutex_lock( &link->rttLock );
- waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1;
- mutex_unlock( &link->rttLock );
- if ( waitTime == 0 ) {
- // Nothing
- } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
- waitTime = 1000;
+ if ( uplink->rttTestResult == RTT_DOCHANGE ) {
+ // 0 means poll, since we're about to change the server
+ waitTime = 0;
} else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
if ( waitTime < 100 ) waitTime = 100;
- if ( waitTime > 5000 ) waitTime = 5000;
+ else if ( waitTime > 10000 ) waitTime = 10000;
}
- events[EV_SOCKET].fd = link->fd;
+ events[EV_SOCKET].fd = uplink->current.fd;
numSocks = poll( events, EV_COUNT, waitTime );
- if ( _shutdown || link->shutdown ) goto cleanup;
+ if ( _shutdown || uplink->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
if ( errno == EINTR ) continue;
logadd( LOG_DEBUG1, "poll() error %d", (int)errno );
@@ -374,40 +596,41 @@ static void* uplink_mainloop(void *data)
continue;
}
// Check if server switch is in order
- mutex_lock( &link->rttLock );
- if ( link->rttTestResult != RTT_DOCHANGE ) {
- mutex_unlock( &link->rttLock );
- } else {
- link->rttTestResult = RTT_IDLE;
+ if ( unlikely( uplink->rttTestResult == RTT_DOCHANGE ) ) {
+ mutex_lock( &uplink->rttLock );
+ assert( uplink->rttTestResult == RTT_DOCHANGE );
+ uplink->rttTestResult = RTT_IDLE;
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
- const int fd = link->fd;
- mutex_lock( &link->sendMutex );
- link->fd = link->betterFd;
- mutex_unlock( &link->sendMutex );
- link->betterFd = -1;
- link->currentServer = link->betterServer;
- link->version = link->betterVersion;
- link->cycleDetected = false;
- mutex_unlock( &link->rttLock );
+ const int fd = uplink->current.fd;
+ mutex_lock( &uplink->sendMutex );
+ uplink->current = uplink->better;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->better.fd = -1;
+ uplink->cycleDetected = false;
+ mutex_unlock( &uplink->rttLock );
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
- link->replicationHandle = REP_NONE;
- link->image->working = true;
- link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
+ uplink->image->problem.uplink = false;
+ uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
- if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
- logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", link->image->name, buffer + 1 );
+ if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
+ logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
// If we don't have a crc32 list yet, see if the new server has one
- if ( link->image->crc32 == NULL ) {
- uplink_addCrc32( link );
+ if ( uplink->image->crc32 == NULL ) {
+ requestCrc32List( uplink );
}
// Re-send all pending requests
- uplink_sendRequests( link, false );
- uplink_sendReplicationRequest( link );
+ sendQueuedRequests( uplink, false );
+ sendReplicationRequest( uplink );
events[EV_SOCKET].events = POLLIN | POLLRDHUP;
+ if ( uplink->image->problem.uplink ) {
+ // Some of the requests above must have failed again already :-(
+ logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
+ connectionFailed( uplink, true );
+ }
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
@@ -415,206 +638,187 @@ static void* uplink_mainloop(void *data)
// Check events
// Signal
if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ uplink->image->problem.uplink = true;
logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
goto cleanup;
} else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
// signal triggered -> pending requests
- if ( signal_clear( link->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name );
+ if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name );
}
- if ( link->fd != -1 ) {
+ if ( uplink->current.fd != -1 ) {
// Uplink seems fine, relay requests to it...
- uplink_sendRequests( link, true );
- } else { // No uplink; maybe it was shutdown since it was idle for too long
- link->idleTime = 0;
+ sendQueuedRequests( uplink, true );
+ } else if ( uplink->queueLen != 0 ) { // No uplink; maybe it was shutdown since it was idle for too long
+ uplink->idleTime = 0;
}
}
// Uplink socket
if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
- uplink_connectionFailed( link, true );
- logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
+ connectionFailed( uplink, true );
+ logadd( LOG_DEBUG1, "Uplink gone away, panic! (revents=%d)\n", (int)events[EV_SOCKET].revents );
setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
- uplink_handleReceive( link );
- if ( _shutdown || link->shutdown ) goto cleanup;
+ handleReceive( uplink );
+ if ( _shutdown || uplink->shutdown ) goto cleanup;
}
declare_now;
uint32_t timepassed = timing_diff( &lastKeepalive, &now );
- if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL
+ || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) {
lastKeepalive = now;
- link->idleTime += timepassed;
- unsavedSeconds += timepassed;
- if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) {
- // fsync/save every 4 minutes, or every 60 seconds if link is idle
- unsavedSeconds = 0;
- uplink_saveCacheMap( link );
- }
+ uplink->idleTime += timepassed;
// Keep-alive
- if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
- // Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- uplink_connectionFailed( link, true );
- logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
- setThreadName( "panic-uplink" );
+ if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) {
+ // Send keep-alive if nothing is happening, and try to trigger background rep.
+ if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) {
+ connectionFailed( uplink, true );
+ logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
}
}
- // Don't keep link established if we're idle for too much
- if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
- mutex_lock( &link->sendMutex );
- close( link->fd );
- link->fd = events[EV_SOCKET].fd = -1;
- mutex_unlock( &link->sendMutex );
- link->cycleDetected = false;
- if ( link->recvBufferLen != 0 ) {
- link->recvBufferLen = 0;
- free( link->recvBuffer );
- link->recvBuffer = NULL;
- }
- logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid );
- setThreadName( "idle-uplink" );
+ // Don't keep uplink established if we're idle for too much
+ if ( connectionShouldShutdown( uplink ) ) {
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) );
+ goto cleanup;
}
}
// See if we should trigger an RTT measurement
- mutex_lock( &link->rttLock );
- const int rttTestResult = link->rttTestResult;
- mutex_unlock( &link->rttLock );
+ rttTestResult = uplink->rttTestResult;
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && discoverFailCount == 0 ) || uplink->cycleDetected ) {
// It seems it's time for a check
- if ( image_isComplete( link->image ) ) {
+ if ( image_isComplete( uplink->image ) ) {
// Quit work if image is complete
- logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
+ logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name );
setThreadName( "finished-uplink" );
+ uplink->image->problem.uplink = false;
goto cleanup;
- } else if ( !uplink_connectionShouldShutdown( link ) ) {
+ } else {
// Not complete - do measurement
- altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
- if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
- link->nextReplicationIndex = 0;
+ altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous)
+ if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
+ uplink->nextReplicationIndex = 0;
}
}
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
- mutex_lock( &link->rttLock );
- link->rttTestResult = RTT_IDLE;
- mutex_unlock( &link->rttLock );
- discoverFailCount++;
- timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
+ if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) {
+ discoverFailCount++;
+ if ( uplink->current.fd == -1 ) {
+ uplink->cycleDetected = false;
+ }
+ }
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH) ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED );
}
-#ifdef _DEBUG
- if ( link->fd != -1 && !link->shutdown ) {
+#ifdef DEBUG
+ if ( uplink->current.fd != -1 && !uplink->shutdown ) {
bool resend = false;
ticks deadline;
timing_set( &deadline, &now, -10 );
- mutex_lock( &link->queueLock );
- for (i = 0; i < link->queueLen; ++i) {
- if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) {
- snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
- "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, link->queue[i].client->image->name,
- link->queue[i].from, link->queue[i].to, link->queue[i].status );
- link->queue[i].entered = now;
-#ifdef _DEBUG_RESEND_STARVING
- link->queue[i].status = ULR_NEW;
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( timing_reached( &it->entered, &deadline ) ) {
+ logadd( LOG_WARNING, "Starving request detected:"
+ " (from %" PRIu64 " to %" PRIu64 ", sent: %d) %s:%d",
+ it->from, it->to, (int)it->sent, PIMG(uplink->image) );
+ it->entered = now;
+#ifdef DEBUG_RESEND_STARVING
+ it->sent = false;
resend = true;
#endif
- mutex_unlock( &link->queueLock );
- logadd( LOG_WARNING, "%s", buffer );
- mutex_lock( &link->queueLock );
}
}
- mutex_unlock( &link->queueLock );
- if ( resend )
- uplink_sendRequests( link, true );
+ mutex_unlock( &uplink->queueLock );
+ if ( resend ) {
+ sendQueuedRequests( uplink, true );
+ }
}
#endif
}
- cleanup: ;
- altservers_removeUplink( link );
- uplink_saveCacheMap( link );
- mutex_lock( &link->image->lock );
- if ( link->image->uplink == link ) {
- link->image->uplink = NULL;
- }
- mutex_lock( &link->queueLock );
- const int fd = link->fd;
- const dnbd3_signal_t* signal = link->signal;
- mutex_lock( &link->sendMutex );
- link->fd = -1;
- mutex_unlock( &link->sendMutex );
- link->signal = NULL;
- if ( !link->shutdown ) {
- link->shutdown = true;
- thread_detach( link->thread );
+cleanup: ;
+ dnbd3_image_t *image = uplink->image;
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache != NULL ) {
+ cache->dirty = true; // Force writeout of cache map
+ ref_put( &cache->reference );
}
- // Do not access link->image after unlocking, since we set
- // image->uplink to NULL. Acquire with image_lock first,
- // like done below when checking whether to re-init uplink
- mutex_unlock( &link->image->lock );
- mutex_unlock( &link->queueLock );
- if ( fd != -1 ) close( fd );
- if ( signal != NULL ) signal_close( signal );
- // Wait for the RTT check to finish/fail if it's in progress
- while ( link->rttTestResult == RTT_INPROGRESS )
- usleep( 10000 );
- if ( link->betterFd != -1 ) {
- close( link->betterFd );
+ mutex_lock( &image->lock );
+ bool exp = false;
+ if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
+ image->users++; // We set the flag - hold onto image
}
- mutex_destroy( &link->queueLock );
- mutex_destroy( &link->rttLock );
- mutex_destroy( &link->sendMutex );
- free( link->recvBuffer );
- link->recvBuffer = NULL;
- if ( link->cacheFd != -1 ) {
- close( link->cacheFd );
+ dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref );
+ if ( current == uplink ) { // Set NULL if it's still us...
+ mutex_lock( &uplink->queueLock );
+ cancelAllRequests( uplink );
+ mutex_unlock( &uplink->queueLock );
+ ref_setref( &image->uplinkref, NULL );
}
- dnbd3_image_t *image = image_lock( link->image );
- free( link ); // !!!
- if ( image != NULL ) {
- if ( !_shutdown && image->cache_map != NULL ) {
- // Ingegrity checker must have found something in the meantime
- uplink_init( image, -1, NULL, 0 );
- }
- image_release( image );
+ if ( current != NULL ) { // Decrease ref in any case
+ ref_put( &current->reference );
}
+ mutex_unlock( &image->lock );
+ // Finally as the thread is done, decrease our own ref that we initialized with
+ ref_put( &uplink->reference );
return NULL ;
}
-static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
+/**
+ * Only called from uplink thread.
+ */
+static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
- // Scan for new requests
- int j;
- mutex_lock( &link->queueLock );
- for (j = 0; j < link->queueLen; ++j) {
- if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
- link->queue[j].status = ULR_PENDING;
- uint8_t hops = link->queue[j].hopCount;
- const uint64_t reqStart = link->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- const uint32_t reqSize = (uint32_t)(((link->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
- /*
- logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
- (void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize );
- */
- mutex_unlock( &link->queueLock );
- if ( hops < 200 ) ++hops;
- mutex_lock( &link->sendMutex );
- const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
- mutex_unlock( &link->sendMutex );
- if ( !ret ) {
- // Non-critical - if the connection dropped or the server was changed
- // the thread will re-send this request as soon as the connection
- // is reestablished.
- logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( &link->currentServer );
- return;
+ assert_uplink_thread();
+ // Scan for new requests, or optionally, (re)send all
+ // Build a buffer, so if there aren't too many requests, we can send them after
+ // unlocking the queue again. Otherwise we need flushes during iteration, which
+ // is no ideal, but in that case the uplink is probably overwhelmed anyways.
+ // Try 125 as that's exactly 300bytes, usually 2*MTU.
+#define MAX_RESEND_BATCH 125
+ dnbd3_request_t reqs[MAX_RESEND_BATCH];
+ int count = 0;
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( newOnly && it->sent )
+ continue;
+ it->sent = true;
+ dnbd3_request_t *hdr = &reqs[count++];
+ hdr->magic = dnbd3_packet_magic;
+ hdr->cmd = CMD_GET_BLOCK;
+ hdr->size = (uint32_t)( it->to - it->from );
+ hdr->offset = it->from; // Offset first, then hops! (union)
+ hdr->hops = COND_HOPCOUNT( uplink->current.version, it->hopCount );
+ hdr->handle = it->handle;
+ fixup_request( *hdr );
+ if ( count == MAX_RESEND_BATCH ) {
+ bool ok = false;
+ logadd( LOG_DEBUG2, "BLOCKING resend of %d", count );
+ count = 0;
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ ok = ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH, 3 )
+ == DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH );
+ }
+ mutex_unlock( &uplink->sendMutex );
+ if ( !ok ) {
+ uplink->image->problem.uplink = true;
+ break;
+ }
}
- mutex_lock( &link->queueLock );
}
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->queueLock );
+ if ( count != 0 ) {
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ uplink->image->problem.uplink =
+ ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * count, 3 )
+ != DNBD3_REQUEST_SIZE * count );
+ }
+ mutex_unlock( &uplink->sendMutex );
+ }
+#undef MAX_RESEND_BATCH
}
/**
@@ -626,90 +830,118 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
* server. This means we might request data we already have, but it makes
* the code simpler. Worst case would be only one bit is zero, which means
* 4kb are missing, but we will request 32kb.
+ *
+ * Only called form uplink thread, so current.fd is assumed to be valid.
+ *
+ * @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
*/
-static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
+static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( link == NULL || link->fd == -1 ) return;
- if ( _backgroundReplication == BGR_DISABLED || link->cacheFd == -1 ) return; // Don't do background replication
- if ( link->nextReplicationIndex == -1 || link->replicationHandle != REP_NONE )
- return;
- dnbd3_image_t * const image = link->image;
- if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
- mutex_lock( &image->lock );
- if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) {
- // No cache map (=image complete), or replication pending, or not enough users, do nothing
- mutex_unlock( &image->lock );
- return;
+ assert_uplink_thread();
+ if ( uplink->current.fd == -1 )
+ return false; // Should never be called in this state, consider send error
+ if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
+ return true; // Don't do background replication
+ if ( uplink->nextReplicationIndex == -1 )
+ return true; // No more blocks to replicate
+ dnbd3_image_t * const image = uplink->image;
+ if ( image->users < _bgrMinClients )
+ return true; // Not enough active users
+ const int numNewRequests = numWantedReplicationRequests( uplink );
+ if ( numNewRequests <= 0 )
+ return true; // Already sufficient amount of requests on the wire
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache == NULL ) {
+ // No cache map (=image complete)
+ return true;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
const int lastBlockIndex = mapBytes - 1;
- int endByte;
- if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
- endByte = link->nextReplicationIndex + mapBytes;
- } else { // Hashblock based: Only look for match in current hash block
- endByte = ( link->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
- if ( endByte > mapBytes ) {
- endByte = mapBytes;
+ for ( int bc = 0; bc < numNewRequests; ++bc ) {
+ int endByte;
+ if ( UPLINK_MAX_QUEUE - uplink->queueLen < 10 )
+ break; // Don't overload queue
+ if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
+ endByte = uplink->nextReplicationIndex + mapBytes;
+ } else { // Hashblock based: Only look for match in current hash block
+ endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
+ if ( endByte > mapBytes ) {
+ endByte = mapBytes;
+ }
}
- }
- int replicationIndex = -1;
- for ( int j = link->nextReplicationIndex; j < endByte; ++j ) {
- const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
- if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !link->replicatedLastBlock ) ) {
- // Found incomplete one
- replicationIndex = i;
+ atomic_thread_fence( memory_order_acquire );
+ int replicationIndex = -1;
+ for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) {
+ const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
+ if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff
+ && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) {
+ // Found incomplete one
+ replicationIndex = i;
+ break;
+ }
+ }
+ if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
+ // Nothing left in current block, find next one
+ replicationIndex = findNextIncompleteHashBlock( uplink, endByte );
+ }
+ if ( replicationIndex == -1 ) {
+ // Replication might be complete, uplink_mainloop should take care....
+ uplink->nextReplicationIndex = -1;
break;
}
+ const uint64_t handle = ++uplink->queueId;
+ const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
+ uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
+ // Extend the default 32k request size if _minRequestSize is > 32k
+ for ( size_t extra = 1; extra < ( _minRequestSize / FILE_BYTES_PER_MAP_BYTE )
+ && offset + size < image->virtualFilesize
+ && _backgroundReplication == BGR_FULL; ++extra ) {
+ if ( atomic_load_explicit( &cache->map[replicationIndex+1], memory_order_relaxed ) == 0xff )
+ break; // Hit complete 32k block, stop here
+ replicationIndex++;
+ size += (uint32_t)MIN( image->virtualFilesize - offset - size, FILE_BYTES_PER_MAP_BYTE );
+ }
+ if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) {
+ logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)",
+ PIMG(uplink->image) );
+ ref_put( &cache->reference );
+ return false;
+ }
+ if ( replicationIndex == lastBlockIndex ) {
+ uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
+ }
+ uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
+ if ( _backgroundReplication == BGR_HASHBLOCK
+ && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
+ // Just crossed a hash block boundary, look for new candidate starting at this very index
+ uplink->nextReplicationIndex = findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
+ if ( uplink->nextReplicationIndex == -1 )
+ break;
+ }
}
- mutex_unlock( &image->lock );
- if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
- // Nothing left in current block, find next one
- replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte );
- }
- if ( replicationIndex == -1 ) {
- // Replication might be complete, uplink_mainloop should take care....
- link->nextReplicationIndex = -1;
- return;
- }
- const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
- link->replicationHandle = offset;
- const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
- mutex_lock( &link->sendMutex );
- bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) );
- mutex_unlock( &link->sendMutex );
- if ( !sendOk ) {
- logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
- return;
- }
- if ( replicationIndex == lastBlockIndex ) {
- link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
- }
- link->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
- if ( _backgroundReplication == BGR_HASHBLOCK
- && link->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
- // Just crossed a hash block boundary, look for new candidate starting at this very index
- link->nextReplicationIndex = uplink_findNextIncompleteHashBlock( link, link->nextReplicationIndex );
- }
+ ref_put( &cache->reference );
+ return true;
}
/**
- * find next index into cache_map that corresponds to the beginning
+ * find next index into cache map that corresponds to the beginning
* of a hash block which is neither completely empty nor completely
* replicated yet. Returns -1 if no match.
*/
-static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex)
+static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex)
{
int retval = -1;
- mutex_lock( &link->image->lock );
- const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize );
- const uint8_t *cache_map = link->image->cache_map;
- if ( cache_map != NULL ) {
- int j;
+ dnbd3_cache_map_t *cache = ref_get_cachemap( uplink->image );
+ if ( cache != NULL ) {
+ const int mapBytes = IMGSIZE_TO_MAPBYTES( uplink->image->virtualFilesize );
const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK );
+ atomic_thread_fence( memory_order_acquire );
+ int j;
for (j = 0; j < mapBytes; ++j) {
const int i = ( start + j ) % mapBytes;
- const bool isFull = cache_map[i] == 0xff || ( i + 1 == mapBytes && link->replicatedLastBlock );
- const bool isEmpty = cache_map[i] == 0;
+ const uint8_t b = atomic_load_explicit( &cache->map[i], memory_order_relaxed );
+ const bool isFull = b == 0xff || ( i + 1 == mapBytes && uplink->replicatedLastBlock );
+ const bool isEmpty = b == 0;
if ( !isEmpty && !isFull ) {
// Neither full nor empty, replicate
if ( retval == -1 ) {
@@ -736,74 +968,97 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const in
retval = -1;
}
}
- mutex_unlock( &link->image->lock );
+ ref_put( &cache->reference );
return retval;
}
/**
* Receive data from uplink server and process/dispatch
- * Locks on: link.lock, images[].lock
+ * Locks on: uplink.lock, images[].lock
+ * Only called from uplink thread, so current.fd is assumed to be valid.
*/
-static void uplink_handleReceive(dnbd3_connection_t *link)
+static void handleReceive(dnbd3_uplink_t *uplink)
{
- dnbd3_reply_t inReply, outReply;
- int ret, i;
+ dnbd3_reply_t inReply;
+ int ret;
+ assert_uplink_thread();
+ assert( uplink->queueLen >= 0 );
for (;;) {
- ret = dnbd3_read_reply( link->fd, &inReply, false );
- if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue;
+ ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
+ if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
if ( unlikely( ret == REPLY_CLOSED ) ) {
- logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path );
+ logadd( LOG_INFO, "Uplink: Remote host hung up (%s:%d)", PIMG(uplink->image) );
goto error_cleanup;
}
if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
- logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
+ logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s:%d)", PIMG(uplink->image) );
goto error_cleanup;
}
if ( unlikely( ret != REPLY_OK ) ) {
- logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path );
+ logadd( LOG_INFO, "Uplink: Connection error %d (%s:%d)", ret, PIMG(uplink->image) );
goto error_cleanup;
}
if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
- logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path );
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s:%d", inReply.size, PIMG(uplink->image) );
goto error_cleanup;
}
- if ( unlikely( link->recvBufferLen < inReply.size ) ) {
- link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
- link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
- if ( link->recvBuffer == NULL ) {
+ if ( unlikely( uplink->recvBufferLen < inReply.size ) ) {
+ uplink->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
+ uplink->recvBuffer = realloc( uplink->recvBuffer, uplink->recvBufferLen );
+ if ( uplink->recvBuffer == NULL ) {
logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
exit( 1 );
}
}
- if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
- logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
+ if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
+ logadd( LOG_INFO, "Lost connection to uplink server of %s:%d (payload)", PIMG(uplink->image) );
goto error_cleanup;
}
// Payload read completely
// Bail out if we're not interested
- if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue;
+ if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) )
+ continue;
// Is a legit block reply
- struct iovec iov[2];
- const uint64_t start = inReply.handle;
- const uint64_t end = inReply.handle + inReply.size;
totalBytesReceived += inReply.size;
- link->bytesReceived += inReply.size;
+ uplink->bytesReceived += inReply.size;
+ // Get entry from queue
+ dnbd3_queue_entry_t *entry;
+ mutex_lock( &uplink->queueLock );
+ for ( entry = uplink->queue; entry != NULL; entry = entry->next ) {
+ if ( entry->handle == inReply.handle )
+ break;
+ }
+ if ( entry == NULL ) {
+ mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)",
+ inReply.handle, PIMG(uplink->image) );
+ continue;
+ }
+ const uint64_t start = entry->from;
+ const uint64_t end = entry->to;
+ mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ // We don't remove the entry from the list here yet, to slightly increase the chance of other
+ // clients attaching to this request while we write the data to disk
+ if ( end - start != inReply.size ) {
+ logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)",
+ inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) );
+ }
// 1) Write to cache file
- if ( unlikely( link->cacheFd == -1 ) ) {
- uplink_reopenCacheFd( link, false );
+ if ( unlikely( uplink->cacheFd == -1 ) ) {
+ reopenCacheFd( uplink, false );
}
- if ( likely( link->cacheFd != -1 ) ) {
+ if ( likely( uplink->cacheFd != -1 ) ) {
int err = 0;
bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
uint32_t done = 0;
ret = 0;
while ( done < inReply.size ) {
- ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
+ ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done );
if ( unlikely( ret == -1 ) ) {
err = errno;
- if ( err == EINTR ) continue;
+ if ( err == EINTR && !_shutdown ) continue;
if ( err == ENOSPC || err == EDQUOT ) {
// try to free 256MiB
if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break;
@@ -811,150 +1066,135 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
continue; // Success, retry write
}
if ( err == EBADF || err == EINVAL || err == EIO ) {
- if ( !tryAgain || !uplink_reopenCacheFd( link, true ) )
+ uplink->image->problem.write = true;
+ if ( !tryAgain || !reopenCacheFd( uplink, true ) )
break;
tryAgain = false;
continue; // Write handle to image successfully re-opened, try again
}
- logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", link->image->name, (int)link->image->rid, err );
+ logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d",
+ PIMG(uplink->image), err );
break;
}
if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) {
- logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, link->image->name, (int)link->image->rid );
+ logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d",
+ ret, PIMG(uplink->image) );
break;
}
done += (uint32_t)ret;
}
if ( likely( done > 0 ) ) {
- image_updateCachemap( link->image, start, start + done, true );
+ image_updateCachemap( uplink->image, start, start + done, true );
}
if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
- link->image->name, (int)link->image->rid, err );
+ PIMG(uplink->image), err );
}
}
- // 2) Figure out which clients are interested in it
- mutex_lock( &link->queueLock );
- for (i = 0; i < link->queueLen; ++i) {
- dnbd3_queued_request_t * const req = &link->queue[i];
- assert( req->status != ULR_PROCESSING );
- if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue;
- assert( req->client != NULL );
- if ( req->from >= start && req->to <= end ) { // Match :-)
- req->status = ULR_PROCESSING;
+ bool found = false;
+ dnbd3_queue_entry_t **it;
+ mutex_lock( &uplink->queueLock );
+ for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) {
+ if ( *it == entry && entry->handle == inReply.handle ) { // ABA check
+ assert( found == false );
+ *it = (**it).next;
+ found = true;
+ uplink->queueLen--;
+ break;
}
}
- // 3) Send to interested clients - iterate backwards so request collaboration works, and
- // so we can decrease queueLen on the fly while iterating. Should you ever change this to start
- // from 0, you also need to change the "attach to existing request"-logic in uplink_request()
- outReply.magic = dnbd3_packet_magic;
- bool served = false;
- for ( i = link->queueLen - 1; i >= 0; --i ) {
- dnbd3_queued_request_t * const req = &link->queue[i];
- if ( req->status == ULR_PROCESSING ) {
- size_t bytesSent = 0;
- assert( req->from >= start && req->to <= end );
- dnbd3_client_t * const client = req->client;
- outReply.cmd = CMD_GET_BLOCK;
- outReply.handle = req->handle;
- outReply.size = (uint32_t)( req->to - req->from );
- iov[0].iov_base = &outReply;
- iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = link->recvBuffer + (req->from - start);
- iov[1].iov_len = outReply.size;
- fixup_reply( outReply );
- req->status = ULR_FREE;
- req->client = NULL;
- served = true;
- mutex_lock( &client->sendMutex );
- mutex_unlock( &link->queueLock );
- if ( client->sock != -1 ) {
- ssize_t sent = writev( client->sock, iov, 2 );
- if ( sent > (ssize_t)sizeof outReply ) {
- bytesSent = (size_t)sent - sizeof outReply;
- }
- }
- mutex_unlock( &client->sendMutex );
- if ( bytesSent != 0 ) {
- client->bytesSent += bytesSent;
- }
- mutex_lock( &link->queueLock );
- }
- if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;
+ if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = false;
}
- mutex_unlock( &link->queueLock );
-#ifdef _DEBUG
- if ( !served && start != link->replicationHandle ) {
- logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
+ mutex_unlock( &uplink->queueLock );
+ if ( !found ) {
+ logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)",
+ PIMG(uplink->image) );
+ continue;
}
-#endif
- if ( start == link->replicationHandle ) {
- // Was our background replication
- link->replicationHandle = REP_NONE;
- // Try to remove from fs cache if no client was interested in this data
- if ( !served && link->cacheFd != -1 ) {
- posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
- }
+ dnbd3_queue_client_t *next;
+ for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
+ assert( c->from >= start && c->to <= end );
+ (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ),
+ (const char*)( uplink->recvBuffer + (c->from - start) ) );
+ next = c->next;
+ free( c );
}
- if ( served ) {
+ if ( entry->clients != NULL ) {
// Was some client -- reset idle counter
- link->idleTime = 0;
+ uplink->idleTime = 0;
// Re-enable replication if disabled
- if ( link->nextReplicationIndex == -1 ) {
- link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
+ if ( uplink->nextReplicationIndex == -1 ) {
+ uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
+ }
+ } else {
+ if ( uplink->cacheFd != -1 ) {
+ // Try to remove from fs cache if no client was interested in this data
+ posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
}
}
+ free( entry );
+ } // main receive loop
+ // Trigger background replication if applicable
+ if ( !sendReplicationRequest( uplink ) ) {
+ goto error_cleanup;
}
- if ( link->replicationHandle == REP_NONE ) {
- mutex_lock( &link->queueLock );
- const bool rep = ( link->queueLen == 0 );
- mutex_unlock( &link->queueLock );
- if ( rep ) uplink_sendReplicationRequest( link );
- }
+ // Normal end
return;
// Error handling from failed receive or message parsing
- error_cleanup: ;
- uplink_connectionFailed( link, true );
+error_cleanup: ;
+ connectionFailed( uplink, true );
}
-static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew)
+/**
+ * Only call from uplink thread
+ */
+static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
- if ( link->fd == -1 )
+ assert_uplink_thread();
+ if ( uplink->current.fd == -1 )
return;
- altservers_serverFailed( &link->currentServer );
- mutex_lock( &link->sendMutex );
- close( link->fd );
- link->fd = -1;
- mutex_unlock( &link->sendMutex );
- link->replicationHandle = REP_NONE;
- if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
- link->nextReplicationIndex = 0;
+ setThreadName( "panic-uplink" );
+ altservers_serverFailed( uplink->current.index );
+ mutex_lock( &uplink->sendMutex );
+ uplink->image->problem.uplink = true;
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
+ mutex_unlock( &uplink->sendMutex );
+ if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
+ uplink->nextReplicationIndex = 0;
}
if ( !findNew )
return;
- mutex_lock( &link->rttLock );
- bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1;
- mutex_unlock( &link->rttLock );
+ mutex_lock( &uplink->rttLock );
+ bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1;
+ mutex_unlock( &uplink->rttLock );
if ( bail )
return;
- altservers_findUplink( link );
+ altservers_findUplinkAsync( uplink );
}
/**
- * Send keep alive request to server
+ * Send keep alive request to server.
+ * Called from uplink thread, current.fd must be valid.
*/
-static int uplink_sendKeepalive(const int fd)
+static bool sendKeepalive(dnbd3_uplink_t *uplink)
{
- static dnbd3_request_t request = { 0 };
- if ( request.magic == 0 ) {
- request.magic = dnbd3_packet_magic;
- request.cmd = CMD_KEEPALIVE;
- fixup_request( request );
- }
- return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
+ assert_uplink_thread();
+ mutex_lock( &uplink->sendMutex );
+ bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_unlock( &uplink->sendMutex );
+ return sendOk;
}
-static void uplink_addCrc32(dnbd3_connection_t *uplink)
+/**
+ * Request crclist from uplink.
+ * Called from uplink thread, current.fd must be valid.
+ * FIXME This is broken as it could happen that another message arrives after sending
+ * the request. Refactor, split and move receive into general receive handler.
+ */
+static void requestCrc32List(dnbd3_uplink_t *uplink)
{
dnbd3_image_t *image = uplink->image;
if ( image == NULL || image->virtualFilesize == 0 ) return;
@@ -962,7 +1202,10 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
uint32_t masterCrc;
uint32_t *buffer = malloc( bytes );
mutex_lock( &uplink->sendMutex );
- bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes );
+ bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
+ if ( !sendOk ) {
+ uplink->image->problem.uplink = true;
+ }
mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );
@@ -972,7 +1215,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
lists_crc = crc32( lists_crc, (uint8_t*)buffer, bytes );
lists_crc = net_order_32( lists_crc );
if ( lists_crc != masterCrc ) {
- logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s)!", uplink->image->name );
+ logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s:%d)!", PIMG(uplink->image) );
free( buffer );
return;
}
@@ -982,10 +1225,14 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
char path[len];
snprintf( path, len, "%s.crc", uplink->image->path );
const int fd = open( path, O_WRONLY | O_CREAT, 0644 );
- if ( fd >= 0 ) {
- write( fd, &masterCrc, sizeof(uint32_t) );
- write( fd, buffer, bytes );
+ if ( fd != -1 ) {
+ ssize_t ret = write( fd, &masterCrc, sizeof(masterCrc) );
+ ret += write( fd, buffer, bytes );
close( fd );
+ if ( (size_t)ret != sizeof(masterCrc) + bytes ) {
+ unlink( path );
+ logadd( LOG_WARNING, "Could not write crc32 file for %s:%d", PIMG(uplink->image) );
+ }
}
}
@@ -997,91 +1244,77 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
* it will be closed first. Otherwise, nothing will happen and true will be returned
* immediately.
*/
-static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force)
+static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
{
- if ( link->cacheFd != -1 ) {
+ if ( uplink->cacheFd != -1 ) {
if ( !force ) return true;
- close( link->cacheFd );
+ close( uplink->cacheFd );
}
- link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT, 0644 );
- return link->cacheFd != -1;
+ uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 );
+ uplink->image->problem.write = uplink->cacheFd == -1;
+ return uplink->cacheFd != -1;
}
/**
- * Saves the cache map of the given image.
- * Return true on success.
- * Locks on: imageListLock, image.lock
+ * Returns true if the uplink has been idle for some time (apart from
+ * background replication, if it is set to hashblock, or if it has
+ * a minimum number of active clients configured that is not currently
+ * reached)
*/
-static bool uplink_saveCacheMap(dnbd3_connection_t *link)
+static bool connectionShouldShutdown(dnbd3_uplink_t *uplink)
{
- dnbd3_image_t *image = link->image;
- assert( image != NULL );
-
- if ( link->cacheFd != -1 ) {
- if ( fsync( link->cacheFd ) == -1 ) {
- // A failing fsync means we have no guarantee that any data
- // since the last fsync (or open if none) has been saved. Apart
- // from keeping the cache_map from the last successful fsync
- // around and restoring it there isn't much we can do to recover
- // a consistent state. Bail out.
- logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno );
- logadd( LOG_ERROR, "Bailing out immediately" );
- exit( 1 );
- }
- }
-
- if ( image->cache_map == NULL ) return true;
- logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid );
- mutex_lock( &image->lock );
- // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to
- // figure out that this image's cache copy is complete
- if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) {
- mutex_unlock( &image->lock );
- return true;
- }
- const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
- uint8_t *map = malloc( size );
- memcpy( map, image->cache_map, size );
- // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image,
- // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O
- mutex_unlock( &image->lock );
- assert( image->path != NULL );
- char mapfile[strlen( image->path ) + 4 + 1];
- strcpy( mapfile, image->path );
- strcat( mapfile, ".map" );
+ return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT
+ && ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );
+}
- int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
- if ( fd == -1 ) {
- const int err = errno;
- free( map );
- logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
+bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
+{
+ int current;
+ mutex_lock( &uplink->rttLock );
+ current = uplink->current.fd == -1 ? -1 : uplink->current.index;
+ mutex_unlock( &uplink->rttLock );
+ if ( current == -1 )
return false;
- }
+ return altservers_toString( current, buffer, len );
+}
- size_t done = 0;
- while ( done < size ) {
- const ssize_t ret = write( fd, map, size - done );
- if ( ret == -1 ) {
- if ( errno == EINTR ) continue;
- logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
- break;
- }
- if ( ret <= 0 ) {
- logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
+/**
+ * Get number of replication requests that should be sent right now to
+ * meet the configured bgrWindowSize. Returns 0 if any client requests
+ * are pending.
+ * This applies a sort of "slow start" in case the uplink was recently
+ * dealing with actual client requests, in that the uplink's idle time
+ * (in seconds) is an upper bound for the number returned, so we don't
+ * saturate the uplink with loads of requests right away, in case that
+ * client triggers more requests to the uplink server.
+ */
+static int numWantedReplicationRequests(dnbd3_uplink_t *uplink)
+{
+ int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 );
+ if ( uplink->queueLen == 0 )
+ return ret;
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->clients == NULL ) {
+ ret--;
+ } else {
+ ret = 0; // Do not allow BGR if client requests are being handled
break;
}
- done += (size_t)ret;
- }
- if ( fsync( fd ) == -1 ) {
- logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
}
- close( fd );
- free( map );
- return true;
+ mutex_unlock( &uplink->queueLock );
+ return ret;
}
-static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link)
+static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle)
{
- return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT && _backgroundReplication != BGR_FULL );
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->handle == handle ) {
+ it->sent = false;
+ break;
+ }
+ }
+ mutex_unlock( &uplink->queueLock );
}