summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2020-03-13 16:03:29 +0100
committerSimon Rettberg2020-03-13 16:03:29 +0100
commit290d3478f245bb7d2112bb781286a9fbae42b983 (patch)
tree3cc825ae2249126d1f97f4e06592358ab9cfd81a /src/server/uplink.c
parent[SERVER] Fix data type (diff)
downloaddnbd3-290d3478f245bb7d2112bb781286a9fbae42b983.tar.gz
dnbd3-290d3478f245bb7d2112bb781286a9fbae42b983.tar.xz
dnbd3-290d3478f245bb7d2112bb781286a9fbae42b983.zip
[SERVER] Rewrite uplink queue handling
- Now uses linked lists instead of huge array - Does prefetch data on client requests - Can have multiple replication requests in-flight
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c771
1 files changed, 423 insertions, 348 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 7c7cd1c..188bf06 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -8,6 +8,7 @@
#include "../shared/protocol.h"
#include "../shared/timing.h"
#include "../shared/crc32.h"
+#include "threadpool.h"
#include "reference.h"
#include <assert.h>
@@ -21,30 +22,6 @@
#define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE )
#define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) )
-#define REP_NONE ( (uint64_t)0xffffffffffffffff )
-
-// Status of request in queue
-
-// Slot is free, can be used.
-// Must only be set in uplink_handle_receive() or uplink_remove_client()
-#define ULR_FREE 0
-// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse.
-// Must only be set in uplink_request()
-#define ULR_NEW 1
-// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse.
-// Must only be set in uplink_mainloop() or uplink_request()
-#define ULR_PENDING 2
-// Slot is being processed, do not consider for hop on.
-// Must only be set in uplink_handle_receive()
-#define ULR_PROCESSING 3
-
-static const char *const NAMES_ULR[4] = {
- [ULR_FREE] = "ULR_FREE",
- [ULR_NEW] = "ULR_NEW",
- [ULR_PENDING] = "ULR_PENDING",
- [ULR_PROCESSING] = "ULR_PROCESSING",
-};
-
static atomic_uint_fast64_t totalBytesReceived = 0;
static void cancelAllRequests(dnbd3_uplink_t *uplink);
@@ -59,6 +36,15 @@ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink);
static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
+static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
+static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
+static void *prefetchForClient(void *data);
+
+typedef struct {
+ dnbd3_uplink_t *uplink;
+ uint64_t start;
+ uint32_t length;
+} prefetch_request_t;
// ############ uplink connection handling
@@ -106,6 +92,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->bytesReceived = 0;
uplink->bytesReceivedLastSave = 0;
uplink->idleTime = SERVER_UPLINK_IDLE_TIMEOUT - 90;
+ uplink->queue = NULL;
uplink->queueLen = 0;
uplink->cacheFd = -1;
uplink->signal = signal_new();
@@ -113,7 +100,6 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." );
goto failure;
}
- uplink->replicationHandle = REP_NONE;
mutex_lock( &uplink->rttLock );
mutex_lock( &uplink->sendMutex );
uplink->current.fd = -1;
@@ -175,9 +161,9 @@ bool uplink_shutdown(dnbd3_image_t *image)
}
cancelAllRequests( uplink );
ref_setref( &image->uplinkref, NULL );
- ref_put( &uplink->reference );
mutex_unlock( &uplink->queueLock );
bool retval = ( exp && image->users == 0 );
+ ref_put( &uplink->reference );
mutex_unlock( &image->lock );
return retval;
}
@@ -188,12 +174,21 @@ bool uplink_shutdown(dnbd3_image_t *image)
*/
static void cancelAllRequests(dnbd3_uplink_t *uplink)
{
- for ( int i = 0; i < uplink->queueLen; ++i ) {
- if ( uplink->queue[i].status != ULR_FREE ) {
- net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle );
- uplink->queue[i].status = ULR_FREE;
+ dnbd3_queue_entry_t *it = uplink->queue;
+ while ( it != NULL ) {
+ dnbd3_queue_client_t *cit = it->clients;
+ while ( cit != NULL ) {
+ net_sendReply( cit->client, CMD_ERROR, cit->handle );
+ cit->client->relayedCount--;
+ dnbd3_queue_client_t *next = cit->next;
+ free( cit );
+ cit = next;
}
+ dnbd3_queue_entry_t *next = it->next;
+ free( it );
+ it = next;
}
+ uplink->queue = NULL;
uplink->queueLen = 0;
uplink->image->problem.queue = false;
}
@@ -234,39 +229,54 @@ static void uplink_free(ref *ref)
*/
void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
{
+ if ( client->relayedCount == 0 )
+ return;
mutex_lock( &uplink->queueLock );
- for (int i = uplink->queueLen - 1; i >= 0; --i) {
- if ( uplink->queue[i].client == client ) {
- // Make sure client doesn't get destroyed while we're sending it data
- mutex_lock( &client->sendMutex );
- mutex_unlock( &client->sendMutex );
- uplink->queue[i].client = NULL;
- uplink->queue[i].status = ULR_FREE;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; cit = &(**cit).next ) {
+ if ( (**cit).client == client ) {
+ --client->relayedCount;
+ dnbd3_queue_client_t *entry = *cit;
+ *cit = (**cit).next;
+ free( entry );
+ }
}
- if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--;
}
mutex_unlock( &uplink->queueLock );
+ if ( unlikely( client->relayedCount != 0 ) ) {
+ logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount );
+ int i;
+ for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) {
+ usleep( 10000 );
+ }
+ if ( client->relayedCount != 0 ) {
+ logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount );
+ }
+ }
}
/**
- * Request a chunk of data through an uplink server
- * Locks on: image.lock, uplink.queueLock
+ * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
+ * If client is NULL, this is assumed to be a background replication request.
+ * Locks on: uplink.queueLock, uplink.sendMutex
*/
-bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
- if ( client == NULL || client->image == NULL )
- return false;
+ bool getUplink = ( uplink == NULL );
+ assert( client != NULL || uplink != NULL );
if ( length > (uint32_t)_maxPayload ) {
logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
return false;
}
- dnbd3_uplink_t * uplink = ref_get_uplink( &client->image->uplinkref );
- if ( unlikely( uplink == NULL ) ) {
- uplink_init( client->image, -1, NULL, -1 );
+ if ( getUplink ) {
uplink = ref_get_uplink( &client->image->uplinkref );
- if ( uplink == NULL ) {
- logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
- return false;
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( client->image, -1, NULL, -1 );
+ uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
+ }
}
}
if ( uplink->shutdown ) {
@@ -275,163 +285,179 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
+ if ( client != NULL && hops != 0
+ && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
goto fail_ref;
}
- int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise
- int existingType = -1; // ULR_* type of existing request
- int i;
- int freeSlot = -1;
- int firstUsedSlot = -1;
- bool requestLoop = false;
- const uint64_t end = start + length;
-
- mutex_lock( &uplink->queueLock );
- if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
- goto fail_lock;
- }
- for (i = 0; i < uplink->queueLen; ++i) {
- // find free slot to place this request into
- if ( uplink->queue[i].status == ULR_FREE ) {
- if ( freeSlot == -1 || existingType != ULR_PROCESSING ) {
- freeSlot = i;
- }
- continue;
- }
- if ( firstUsedSlot == -1 ) {
- firstUsedSlot = i;
- }
- // find existing request to attach to
- if ( uplink->queue[i].from > start || uplink->queue[i].to < end )
- continue; // Range not suitable
- // Detect potential proxy cycle. New request hopcount is greater, range is same, old request has already been sent -> suspicious
- if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end && uplink->queue[i].status == ULR_PENDING ) {
- requestLoop = true;
- break;
- }
- if ( foundExisting == -1 || existingType == ULR_PROCESSING ) {
- foundExisting = i;
- existingType = uplink->queue[i].status;
- }
- }
- if ( unlikely( requestLoop ) ) {
- uplink->cycleDetected = true;
- signal_call( uplink->signal );
- logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
- goto fail_lock;
- }
- if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
- freeSlot = -1; // Not attaching to existing request, make it use a higher slot
- }
- if ( freeSlot == -1 ) {
- if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
- logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
+ struct {
+ uint64_t handle, start, end;
+ } req;
+ do {
+ const uint64_t end = start + length;
+ dnbd3_queue_entry_t *request = NULL, *last = NULL;
+ bool isNew;
+ mutex_lock( &uplink->queueLock );
+ if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
goto fail_lock;
}
- freeSlot = uplink->queueLen++;
- if ( freeSlot > SERVER_UPLINK_QUEUELEN_THRES ) {
- uplink->image->problem.queue = true;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->from <= start && it->to >= end ) {
+ // Matching range, attach
+ request = it;
+ break;
+ }
+ if ( it->next == NULL ) {
+ // Not matching, last in list, remember
+ last = it;
+ break;
+ }
}
- }
- // Do not send request to uplink server if we have a matching pending request AND the request either has the
- // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise
- // explicitly send this request to the uplink server. The second condition mentioned here is to prevent
- // a race condition where the reply for the outstanding request already arrived and the uplink thread
- // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might
- // already have passed the index of the free slot we determined, but not reached the existing request we just found above.
- if ( foundExisting != -1 && existingType == ULR_PROCESSING && freeSlot > foundExisting ) {
- foundExisting = -1; // -1 means "send request"
- }
-#ifdef _DEBUG
- if ( foundExisting != -1 ) {
- logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, NAMES_ULR[existingType], foundExisting, freeSlot );
- logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n"
- "New %" PRIu64 "-%" PRIu64 " (%p)\n",
- uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client,
- start, end, (void*)client );
- }
-#endif
- // Fill structure
- uplink->queue[freeSlot].from = start;
- uplink->queue[freeSlot].to = end;
- uplink->queue[freeSlot].handle = handle;
- uplink->queue[freeSlot].client = client;
- //int old = uplink->queue[freeSlot].status;
- uplink->queue[freeSlot].status = ( foundExisting == -1 ? ULR_NEW :
- ( existingType == ULR_NEW ? ULR_PENDING : existingType ) );
- uplink->queue[freeSlot].hopCount = hops;
+ dnbd3_queue_client_t **c;
+ if ( request == NULL ) {
+ // No existing request to attach to
+ if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
+ logadd( LOG_WARNING, "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
+ goto fail_lock;
+ }
+ uplink->queueLen++;
+ if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = true;
+ }
+ request = malloc( sizeof(*request) );
+ if ( last == NULL ) {
+ uplink->queue = request;
+ } else {
+ last->next = request;
+ }
+ request->next = NULL;
+ request->handle = ++uplink->queueId;
+ request->from = start & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ request->to = (end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
#ifdef _DEBUG
- timing_get( &uplink->queue[freeSlot].entered );
- //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end );
+ timing_get( &request->entered );
#endif
- mutex_unlock( &uplink->queueLock );
+ request->hopCount = hops;
+ request->sent = true; // Optimistic; would be set to false on failure
+ if ( client == NULL ) {
+ // BGR
+ request->clients = NULL;
+ } else {
+ c = &request->clients;
+ }
+ isNew = true;
+ } else if ( client == NULL ) {
+ // Replication request that maches existing request. Do nothing
+ isNew = false;
+ } else {
+ // Existing request. Check if potential cycle
+ if ( hops > request->hopCount + 1 && request->from == start && request->to == end ) {
+ logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
+ goto fail_lock;
+ }
+ // Count number if clients, get tail of list
+ int count = 0;
+ c = &request->clients;
+ while ( *c != NULL ) {
+ c = &(**c).next;
+ if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
+ logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
+ goto fail_lock;
+ }
+ }
+ isNew = false;
+ }
+ req.handle = request->handle;
+ req.start = request->from;
+ req.end = request->to;
+ if ( client != NULL ) {
+ *c = malloc( sizeof( *request->clients ) );
+ (**c).next = NULL;
+ (**c).handle = handle;
+ (**c).from = start;
+ (**c).to = end;
+ (**c).client = client;
+ client->relayedCount++;
+ }
+ mutex_unlock( &uplink->queueLock );
- if ( foundExisting != -1 ) {
- ref_put( &uplink->reference );
- return true; // Attached to pending request, do nothing
- }
+ if ( !isNew ) {
+ goto success_ref; // Attached to pending request, do nothing
+ }
+ } while (0);
- // See if we can fire away the request
- if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) {
- logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
+ // Fire away the request
+ mutex_lock( &uplink->sendMutex );
+ if ( unlikely( uplink->current.fd == -1 ) ) {
+ uplink->image->problem.uplink = true;
+ markRequestUnsent( uplink, req.handle );
+ mutex_unlock( &uplink->sendMutex );
+ logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
- if ( unlikely( uplink->current.fd == -1 ) ) {
+ if ( hops < 200 ) ++hops;
+ const bool ret = dnbd3_get_block( uplink->current.fd, req.start, req.end - req.start,
+ req.handle, COND_HOPCOUNT( uplink->current.version, hops ) );
+ if ( unlikely( !ret ) ) {
+ markRequestUnsent( uplink, req.handle );
uplink->image->problem.uplink = true;
mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
} else {
- const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
- if ( hops < 200 ) ++hops;
- const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- if ( unlikely( !ret ) ) {
- uplink->image->problem.uplink = true;
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
- } else {
- // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
- int state;
- mutex_unlock( &uplink->sendMutex );
- mutex_lock( &uplink->queueLock );
- if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
- state = uplink->queue[freeSlot].status;
- if ( uplink->queue[freeSlot].status == ULR_NEW ) {
- uplink->queue[freeSlot].status = ULR_PENDING;
- }
- } else {
- state = -1;
- }
- mutex_unlock( &uplink->queueLock );
- if ( state == -1 ) {
- logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" );
- } else if ( state == ULR_NEW ) {
- //logadd( LOG_DEBUG2, "Direct uplink request" );
- } else {
- logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
- }
- ref_put( &uplink->reference );
- return true;
- }
- // Fall through to waking up sender thread
+ // OK
+ mutex_unlock( &uplink->sendMutex );
+ goto success_ref;
}
+ // Fall through to waking up sender thread
}
if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
- ref_put( &uplink->reference );
+
+success_ref:
+ if ( client != NULL ) {
+ // Was from client -- potential prefetch
+ uint32_t len = MIN( uplink->image->virtualFilesize - req.end, req.end - req.start );
+ if ( len > 0 ) {
+ prefetch_request_t *job = malloc( sizeof( *job ) );
+ job->start = req.end;
+ job->length = len;
+ job->uplink = uplink;
+ ref_inc( &uplink->reference ); // Hold one for the thread, thread will return it
+ threadpool_run( &prefetchForClient, (void*)job );
+ }
+ }
+ if ( getUplink ) {
+ ref_put( &uplink->reference );
+ }
return true;
fail_lock:
mutex_unlock( &uplink->queueLock );
fail_ref:
- ref_put( &uplink->reference );
+ if ( getUplink ) {
+ ref_put( &uplink->reference );
+ }
return false;
}
+static void *prefetchForClient(void *data)
+{
+ prefetch_request_t *job = (prefetch_request_t*)data;
+ dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image );
+ if ( cache != NULL ) {
+ if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) {
+ uplink_request( job->uplink, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
+ }
+ ref_put( &cache->reference );
+ }
+ ref_put( &job->uplink->reference );
+ free( job );
+ return NULL;
+}
+
/**
* Uplink thread.
* Locks are irrelevant as this is never called from another function
@@ -443,7 +469,7 @@ static void* uplink_mainloop(void *data)
#define EV_COUNT (2)
struct pollfd events[EV_COUNT];
dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data;
- int numSocks, i, waitTime;
+ int numSocks, waitTime;
int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
int rttTestResult;
uint32_t discoverFailCount = 0;
@@ -478,7 +504,7 @@ static void* uplink_mainloop(void *data)
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
if ( waitTime < 100 ) waitTime = 100;
- if ( waitTime > 10000 ) waitTime = 10000;
+ else if ( waitTime > 10000 ) waitTime = 10000;
}
events[EV_SOCKET].fd = uplink->current.fd;
numSocks = poll( events, EV_COUNT, waitTime );
@@ -505,7 +531,6 @@ static void* uplink_mainloop(void *data)
mutex_unlock( &uplink->rttLock );
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
- uplink->replicationHandle = REP_NONE;
uplink->image->problem.uplink = false;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
@@ -559,11 +584,11 @@ static void* uplink_mainloop(void *data)
}
declare_now;
uint32_t timepassed = timing_diff( &lastKeepalive, &now );
- if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) {
lastKeepalive = now;
uplink->idleTime += timepassed;
// Keep-alive
- if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) {
+ if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) {
// Send keep-alive if nothing is happening, and try to trigger background rep.
if ( !uplink_sendKeepalive( uplink ) || !uplink_sendReplicationRequest( uplink ) ) {
uplink_connectionFailed( uplink, true );
@@ -612,19 +637,16 @@ static void* uplink_mainloop(void *data)
ticks deadline;
timing_set( &deadline, &now, -10 );
mutex_lock( &uplink->queueLock );
- for (i = 0; i < uplink->queueLen; ++i) {
- if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) {
- snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
- "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name,
- uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status );
- uplink->queue[i].entered = now;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( timing_reached( &it->entered, &deadline ) ) {
+ logadd( LOG_WARNING, "Starving request detected:"
+ " (from %" PRIu64 " to %" PRIu64 ", sent: %d) %s:%d",
+ it->from, it->to, (int)it->sent, PIMG(uplink->image) );
+ it->entered = now;
#ifdef _DEBUG_RESEND_STARVING
- uplink->queue[i].status = ULR_NEW;
+ it->sent = false;
resend = true;
#endif
- mutex_unlock( &uplink->queueLock );
- logadd( LOG_WARNING, "%s", buffer );
- mutex_lock( &uplink->queueLock );
}
}
mutex_unlock( &uplink->queueLock );
@@ -667,37 +689,54 @@ cleanup: ;
*/
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
- // Scan for new requests
- int j;
+ // Scan for new requests, or optionally, (re)send all
+ // Build a buffer, so if there aren't too many requests, we can send them after
+ // unlocking the queue again. Otherwise we need flushes during iteration, which
+ // is no ideal, but in that case the uplink is probably overwhelmed anyways.
+ // Try 125 as that's exactly 300bytes, usually 2*MTU.
+#define MAX_RESEND_BATCH 125
+ dnbd3_request_t reqs[MAX_RESEND_BATCH];
+ int count = 0;
mutex_lock( &uplink->queueLock );
- for (j = 0; j < uplink->queueLen; ++j) {
- if ( uplink->queue[j].status != ULR_NEW && (newOnly || uplink->queue[j].status != ULR_PENDING) ) continue;
- uplink->queue[j].status = ULR_PENDING;
- uint8_t hops = uplink->queue[j].hopCount;
- const uint64_t reqStart = uplink->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
- /*
- logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
- (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize );
- */
- mutex_unlock( &uplink->queueLock );
- if ( hops < 200 ) ++hops;
- mutex_lock( &uplink->sendMutex );
- const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- if ( likely( ret ) ) {
- mutex_unlock( &uplink->sendMutex );
- } else {
- // Non-critical - if the connection dropped or the server was changed
- // the thread will re-send this request as soon as the connection
- // is reestablished.
- uplink->image->problem.uplink = true;
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( newOnly && it->sent )
+ continue;
+ it->sent = true;
+ dnbd3_request_t *hdr = &reqs[count++];
+ hdr->magic = dnbd3_packet_magic;
+ hdr->cmd = CMD_GET_BLOCK;
+ hdr->size = it->to - it->from;
+ hdr->offset_small = it->from;
+ hdr->hops = it->hopCount + 1;
+ hdr->handle = it->handle;
+ fixup_request( *hdr );
+ if ( count == MAX_RESEND_BATCH ) {
+ bool ok = false;
+ logadd( LOG_DEBUG2, "BLOCKING resend of %d", count );
+ count = 0;
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ ok = ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH, 3 )
+ == DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH );
+ }
mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- return;
+ if ( !ok ) {
+ uplink->image->problem.uplink = true;
+ break;
+ }
}
- mutex_lock( &uplink->queueLock );
}
mutex_unlock( &uplink->queueLock );
+ if ( count != 0 ) {
+ mutex_lock( &uplink->sendMutex );
+ if ( uplink->current.fd != -1 ) {
+ uplink->image->problem.uplink =
+ ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * count, 3 )
+ != DNBD3_REQUEST_SIZE * count );
+ }
+ mutex_unlock( &uplink->sendMutex );
+ }
+#undef MAX_RESEND_BATCH
}
/**
@@ -720,71 +759,73 @@ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
return false; // Should never be called in this state, consider send error
if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
return true; // Don't do background replication
- if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
- return true; // Already a replication request on the wire, or no more blocks to replicate
+ if ( uplink->nextReplicationIndex == -1 )
+ return true; // No more blocks to replicate
dnbd3_image_t * const image = uplink->image;
if ( image->users < _bgrMinClients )
return true; // Not enough active users
+ const int numNewRequests = numWantedReplicationRequests( uplink );
+ if ( numNewRequests <= 0 )
+ return true; // Already sufficient amount of requests on the wire
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
- if ( cache == NULL || image->users ) {
+ if ( cache == NULL ) {
// No cache map (=image complete)
- ref_put( &cache->reference );
return true;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
const int lastBlockIndex = mapBytes - 1;
- int endByte;
- if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
- endByte = uplink->nextReplicationIndex + mapBytes;
- } else { // Hashblock based: Only look for match in current hash block
- endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
- if ( endByte > mapBytes ) {
- endByte = mapBytes;
+ for ( int bc = 0; bc < numNewRequests; ++bc ) {
+ int endByte;
+ if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
+ endByte = uplink->nextReplicationIndex + mapBytes;
+ } else { // Hashblock based: Only look for match in current hash block
+ endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
+ if ( endByte > mapBytes ) {
+ endByte = mapBytes;
+ }
}
- }
- atomic_thread_fence( memory_order_acquire );
- int replicationIndex = -1;
- for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) {
- const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
- if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff
- && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) {
- // Found incomplete one
- replicationIndex = i;
+ atomic_thread_fence( memory_order_acquire );
+ int replicationIndex = -1;
+ for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) {
+ const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
+ if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff
+ && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) {
+ // Found incomplete one
+ replicationIndex = i;
+ break;
+ }
+ }
+ if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
+ // Nothing left in current block, find next one
+ replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte );
+ }
+ if ( replicationIndex == -1 ) {
+ // Replication might be complete, uplink_mainloop should take care....
+ uplink->nextReplicationIndex = -1;
break;
}
+ const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
+ const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
+ const uint64_t handle = ++uplink->queueId;
+ if ( !uplink_request( uplink, NULL, handle, offset, size, 0 ) ) {
+ logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)",
+ PIMG(uplink->image) );
+ ref_put( &cache->reference );
+ return false;
+ }
+ if ( replicationIndex == lastBlockIndex ) {
+ uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
+ }
+ uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
+ if ( _backgroundReplication == BGR_HASHBLOCK
+ && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
+ // Just crossed a hash block boundary, look for new candidate starting at this very index
+ uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
+ if ( uplink->nextReplicationIndex == -1 )
+ break;
+ }
}
ref_put( &cache->reference );
- if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
- // Nothing left in current block, find next one
- replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte );
- }
- if ( replicationIndex == -1 ) {
- // Replication might be complete, uplink_mainloop should take care....
- uplink->nextReplicationIndex = -1;
- return true;
- }
- const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
- uplink->replicationHandle = offset;
- const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
- mutex_lock( &uplink->sendMutex );
- bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) );
- if ( likely( sendOk ) ) {
- mutex_unlock( &uplink->sendMutex );
- } else {
- uplink->image->problem.uplink = true;
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
- return false;
- }
- if ( replicationIndex == lastBlockIndex ) {
- uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
- }
- uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
- if ( _backgroundReplication == BGR_HASHBLOCK
- && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
- // Just crossed a hash block boundary, look for new candidate starting at this very index
- uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
- }
return true;
}
@@ -845,7 +886,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
dnbd3_reply_t inReply, outReply;
- int ret, i;
+ int ret;
for (;;) {
ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
@@ -881,13 +922,34 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
}
// Payload read completely
// Bail out if we're not interested
- if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue;
+ if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) )
+ continue;
// Is a legit block reply
- struct iovec iov[2];
- const uint64_t start = inReply.handle;
- const uint64_t end = inReply.handle + inReply.size;
totalBytesReceived += inReply.size;
uplink->bytesReceived += inReply.size;
+ // Get entry from queue
+ dnbd3_queue_entry_t *entry;
+ mutex_lock( &uplink->queueLock );
+ for ( entry = uplink->queue; entry != NULL; entry = entry->next ) {
+ if ( entry->handle == inReply.handle )
+ break;
+ }
+ if ( entry == NULL ) {
+ mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)",
+ inReply.handle, PIMG(uplink->image) );
+ continue;
+ }
+ const uint64_t start = entry->from;
+ const uint64_t end = entry->to;
+ mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ // We don't remove the entry from the list here yet, to slightly increase the chance of other
+ // clients attaching to this request while we write the data to disk
+ if ( end - start != inReply.size ) {
+ logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)",
+ inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) );
+ }
+ struct iovec iov[2];
// 1) Write to cache file
if ( unlikely( uplink->cacheFd == -1 ) ) {
uplink_reopenCacheFd( uplink, false );
@@ -934,98 +996,76 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
PIMG(uplink->image), err );
}
}
- // 2) Figure out which clients are interested in it
- // Mark as ULR_PROCESSING, since we unlock repeatedly in the second loop
- // below; this prevents uplink_request() from attaching to this request
- // by populating a slot with index greater than the highest matching
- // request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW
- // where it's fine if the index is greater)
+ bool found = false;
+ dnbd3_queue_entry_t **it;
mutex_lock( &uplink->queueLock );
- for (i = 0; i < uplink->queueLen; ++i) {
- dnbd3_queued_request_t * const req = &uplink->queue[i];
- assert( req->status != ULR_PROCESSING );
- if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue;
- assert( req->client != NULL );
- if ( req->from >= start && req->to <= end ) { // Match :-)
- req->status = ULR_PROCESSING;
- }
- }
- // 3) Send to interested clients - iterate backwards so request collaboration works, and
- // so we can decrease queueLen on the fly while iterating. Should you ever change this to start
- // from 0, you also need to change the "attach to existing request"-logic in uplink_request()
- outReply.magic = dnbd3_packet_magic;
- bool served = false;
- for ( i = uplink->queueLen - 1; i >= 0; --i ) {
- dnbd3_queued_request_t * const req = &uplink->queue[i];
- if ( req->status == ULR_PROCESSING ) {
- size_t bytesSent = 0;
- assert( req->from >= start && req->to <= end );
- dnbd3_client_t * const client = req->client;
- outReply.cmd = CMD_GET_BLOCK;
- outReply.handle = req->handle;
- outReply.size = (uint32_t)( req->to - req->from );
- iov[0].iov_base = &outReply;
- iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = uplink->recvBuffer + (req->from - start);
- iov[1].iov_len = outReply.size;
- fixup_reply( outReply );
- req->status = ULR_FREE;
- req->client = NULL;
- served = true;
- mutex_lock( &client->sendMutex );
- mutex_unlock( &uplink->queueLock );
- if ( client->sock != -1 ) {
- ssize_t sent = writev( client->sock, iov, 2 );
- if ( sent > (ssize_t)sizeof outReply ) {
- bytesSent = (size_t)sent - sizeof outReply;
- }
- }
- if ( bytesSent != 0 ) {
- client->bytesSent += bytesSent;
- }
- mutex_unlock( &client->sendMutex );
- mutex_lock( &uplink->queueLock );
- if ( i > uplink->queueLen ) {
- i = uplink->queueLen; // Might have been set to 0 by cancelAllRequests
- }
+ for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) {
+ if ( *it == entry && entry->handle == inReply.handle ) { // ABA check
+ assert( found == false );
+ *it = (**it).next;
+ found = true;
+ uplink->queueLen--;
+ break;
}
- if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--;
}
if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) {
uplink->image->problem.queue = false;
}
mutex_unlock( &uplink->queueLock );
-#ifdef _DEBUG
- if ( !served && start != uplink->replicationHandle ) {
- logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end );
+ if ( !found ) {
+ logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)",
+ PIMG(uplink->image) );
+ continue;
}
-#endif
- if ( start == uplink->replicationHandle ) {
- // Was our background replication
- uplink->replicationHandle = REP_NONE;
- // Try to remove from fs cache if no client was interested in this data
- if ( !served && uplink->cacheFd != -1 ) {
- posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
+ outReply.magic = dnbd3_packet_magic;
+ dnbd3_queue_client_t *next;
+ for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
+ size_t bytesSent = 0;
+ assert( c->from >= start && c->to <= end );
+ dnbd3_client_t * const client = c->client;
+ outReply.cmd = CMD_GET_BLOCK;
+ outReply.handle = c->handle;
+ outReply.size = (uint32_t)( c->to - c->from );
+ iov[0].iov_base = &outReply;
+ iov[0].iov_len = sizeof outReply;
+ iov[1].iov_base = uplink->recvBuffer + (c->from - start);
+ iov[1].iov_len = outReply.size;
+ fixup_reply( outReply );
+ mutex_lock( &client->sendMutex );
+ if ( client->sock != -1 ) {
+ ssize_t sent = writev( client->sock, iov, 2 );
+ if ( sent > (ssize_t)sizeof outReply ) {
+ bytesSent = (size_t)sent - sizeof outReply;
+ }
+ if ( bytesSent != 0 ) {
+ client->bytesSent += bytesSent;
+ }
}
+ mutex_unlock( &client->sendMutex );
+ client->relayedCount--;
+ next = c->next;
+ free( c );
}
- if ( served ) {
+ if ( entry->clients != NULL ) {
// Was some client -- reset idle counter
uplink->idleTime = 0;
// Re-enable replication if disabled
if ( uplink->nextReplicationIndex == -1 ) {
uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
}
+ } else {
+ if ( uplink->cacheFd != -1 ) {
+ // Try to remove from fs cache if no client was interested in this data
+ posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
+ }
}
+ free( entry );
+ } // main receive loop
+ // Trigger background replication if applicable
+ if ( !uplink_sendReplicationRequest( uplink ) ) {
+ goto error_cleanup;
}
- if ( uplink->replicationHandle == REP_NONE ) {
- mutex_lock( &uplink->queueLock );
- const bool rep = ( uplink->queueLen == 0 );
- mutex_unlock( &uplink->queueLock );
- if ( rep ) {
- if ( !uplink_sendReplicationRequest( uplink ) )
- goto error_cleanup;
- }
- }
+ // Normal end
return;
// Error handling from failed receive or message parsing
error_cleanup: ;
@@ -1046,7 +1086,6 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
close( uplink->current.fd );
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
- uplink->replicationHandle = REP_NONE;
if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
uplink->nextReplicationIndex = 0;
}
@@ -1156,3 +1195,39 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
return false;
return altservers_toString( current, buffer, len );
}
+
+/**
+ * Get number of replication requests that should be sent right now to
+ * meet the configured bgrWindowSize. Returns 0 if any client requests
+ * are pending
+ */
+static int numWantedReplicationRequests(dnbd3_uplink_t *uplink)
+{
+ int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 );
+ if ( uplink->queueLen == 0 )
+ return ret;
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->clients == NULL ) {
+ ret--;
+ } else {
+ ret = 0; // Do not allow BGR if client requests are being handled
+ break;
+ }
+ }
+ mutex_unlock( &uplink->queueLock );
+ return ret;
+}
+
+static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle)
+{
+ mutex_lock( &uplink->queueLock );
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->handle == handle ) {
+ it->sent = false;
+ break;
+ }
+ }
+ mutex_unlock( &uplink->queueLock );
+}
+