summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2021-04-13 16:41:26 +0200
committerSimon Rettberg2021-04-14 13:17:59 +0200
commit0be00336f97a7ef3590ee803d4ad19e305aa1583 (patch)
treeb00b91b8c6d687ad4116405818e2cecb3e6cf44b /src/server/uplink.c
parent[CLIENT] Use SO_GETPEERCRED instead of braindead setuid crap (diff)
downloaddnbd3-0be00336f97a7ef3590ee803d4ad19e305aa1583.tar.gz
dnbd3-0be00336f97a7ef3590ee803d4ad19e305aa1583.tar.xz
dnbd3-0be00336f97a7ef3590ee803d4ad19e305aa1583.zip
[SERVER] Make prefetching synchronous
There is a race condition where we process the next request from the same client faster than the OS will schedule the async prefetch job, rendering it a NOOP in the best case (request ranges match) or fetching redundant data from the upstream server (prefetch range is larger than actual request by client). Make prefetching synchronous to prevent this race condition.
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c350
1 files changed, 198 insertions, 152 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index e4d9d0e..dedaa17 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -24,6 +24,10 @@
static atomic_uint_fast64_t totalBytesReceived = 0;
+typedef struct {
+ uint64_t start, end, handle;
+} req_t;
+
static void cancelAllRequests(dnbd3_uplink_t *uplink);
static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
@@ -39,13 +43,6 @@ static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
-static void *prefetchForClient(void *data);
-
-typedef struct {
- dnbd3_uplink_t *uplink;
- uint64_t start;
- uint32_t length;
-} prefetch_job_t;
#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) )
@@ -247,6 +244,10 @@ void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback call
mutex_unlock( &uplink->queueLock );
}
+/**
+ * Called from a client (proxy) connection to request a missing part of the image.
+ * The caller has made sure that the range is actually missing.
+ */
bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
assert( client != NULL && callback != NULL );
@@ -266,8 +267,7 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
bool ret;
- if ( client != NULL && hops > 1
- && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
+ if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
@@ -279,7 +279,11 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint
return ret;
}
-bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length)
+/**
+ * Called by integrated fuse module
+ */
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length)
{
dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
if ( unlikely( uplink == NULL ) ) {
@@ -295,12 +299,42 @@ bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
return ret;
}
+static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted)
+{
+ uint32_t length = (uint32_t)( *end - start );
+ if ( length >= wanted )
+ return;
+ length = wanted;
+ if ( unlikely( _backgroundReplication == BGR_HASHBLOCK
+ && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) {
+ // Don't extend across hash-block border in this mode
+ *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 );
+ } else {
+ *end = start + length;
+ }
+ if ( unlikely( *end > image->virtualFilesize ) ) {
+ *end = image->virtualFilesize;
+ }
+ *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 );
+ //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end );
+}
+
+static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops)
+{
+ if ( uplink->current.fd == -1 )
+ return false;
+ return dnbd3_get_block( uplink->current.fd, req->start,
+ (uint32_t)( req->end - req->start ), req->handle,
+ COND_HOPCOUNT( uplink->current.version, hops ) );
+}
+
/**
* Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
- * If client is NULL, this is assumed to be a background replication request.
+ * If callback is NULL, this is assumed to be a background replication request.
* Locks on: uplink.queueLock, uplink.sendMutex
*/
-static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
assert( uplink != NULL );
assert( data == NULL || callback != NULL );
@@ -309,174 +343,186 @@ static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_ca
return false;
}
if ( length > (uint32_t)_maxPayload ) {
- logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length );
+ logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload",
+ length );
return false;
}
- struct {
- uint64_t handle, start, end;
- } req;
+ req_t req, preReq;
hops++;
- do {
- const uint64_t end = start + length;
- dnbd3_queue_entry_t *request = NULL, *last = NULL;
- bool isNew;
- mutex_lock( &uplink->queueLock );
- if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL;
+ bool isNew;
+ const uint64_t end = start + length;
+ req.start = start & ~(DNBD3_BLOCK_SIZE - 1);
+ req.end = end;
+ /* Don't do this for now -- this breaks matching of prefetch jobs, since they'd
+ * be misaligned, and the next client request wouldn't match anything.
+ * To improve this, we need to be able to attach a queue_client to multiple queue_entries
+ * and then serve it once all the queue_entries are done (atomic_int in queue_client).
+ * But currently we directly send the receive buffer's content to the queue_client after
+ * receiving the payload, as this will also work when the local cache is borked (we just
+ * tunnel though the traffic). One could argue that this mode of operation is nonsense,
+ * and we should just drop all affected clients. Then as a next step, don't serve the
+ * clients form the receive buffer, but just issue a normal sendfile() call after writing
+ * the received data to the local cache.
+ if ( callback != NULL ) {
+ // Not background replication request, extend request size
+ extendRequest( req.start, &req.end, uplink->image, _minRequestSize );
+ }
+ */
+ req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1);
+ // Critical section - work with the queue
+ mutex_lock( &uplink->queueLock );
+ if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ goto fail_lock;
+ }
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->from <= start && it->to >= end ) {
+ // Matching range, attach
+ request = it;
+ break;
+ }
+ if ( it->next == NULL ) {
+ // Not matching, last in list, remember
+ last = it;
+ break;
+ }
+ }
+ dnbd3_queue_client_t **c = NULL;
+ if ( request == NULL ) {
+ // No existing request to attach to
+ if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
+ logadd( LOG_WARNING,
+ "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
goto fail_lock;
}
- for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
- if ( it->from <= start && it->to >= end ) {
- // Matching range, attach
- request = it;
- break;
- }
- if ( it->next == NULL ) {
- // Not matching, last in list, remember
- last = it;
- break;
- }
+ uplink->queueLen++;
+ if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = true;
}
- dnbd3_queue_client_t **c = NULL;
- if ( request == NULL ) {
- // No existing request to attach to
- if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
- logadd( LOG_WARNING, "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
- goto fail_lock;
- }
- uplink->queueLen++;
- if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
- uplink->image->problem.queue = true;
- }
- request = malloc( sizeof(*request) );
- if ( last == NULL ) {
- uplink->queue = request;
- } else {
- last->next = request;
- }
- request->next = NULL;
- request->handle = ++uplink->queueId;
- request->from = start & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- request->to = (end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ request = malloc( sizeof(*request) );
+ if ( last == NULL ) {
+ uplink->queue = request;
+ } else {
+ last->next = request;
+ }
+ request->next = NULL;
+ request->handle = ++uplink->queueId;
+ request->from = req.start;
+ request->to = req.end;
#ifdef DEBUG
- timing_get( &request->entered );
+ timing_get( &request->entered );
#endif
- request->hopCount = hops;
- request->sent = true; // Optimistic; would be set to false on failure
- if ( callback == NULL ) {
- // BGR
- request->clients = NULL;
- } else {
- c = &request->clients;
- }
- isNew = true;
- } else if ( callback == NULL ) {
- // Replication request that maches existing request. Do nothing
- isNew = false;
+ request->hopCount = hops;
+ request->sent = true; // Optimistic; would be set to false on failure
+ if ( callback == NULL ) {
+ // BGR
+ request->clients = NULL;
} else {
- // Existing request. Check if potential cycle
- if ( hops > request->hopCount && request->from == start && request->to == end ) {
- logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
- goto fail_lock;
- }
- // Count number if clients, get tail of list
- int count = 0;
c = &request->clients;
- while ( *c != NULL ) {
- c = &(**c).next;
- if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
- logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
- goto fail_lock;
- }
- }
- isNew = false;
}
- req.handle = request->handle;
- req.start = request->from;
- req.end = request->to;
- if ( callback != NULL ) {
- assert( c != NULL );
- *c = malloc( sizeof( *request->clients ) );
- (**c).next = NULL;
- (**c).handle = handle;
- (**c).from = start;
- (**c).to = end;
- (**c).data = data;
- (**c).callback = callback;
+ isNew = true;
+ } else if ( callback == NULL ) {
+ // Replication request that maches existing request. Do nothing
+ isNew = false;
+ } else {
+ // Existing request. Check if potential cycle
+ if ( hops > request->hopCount && request->from == start && request->to == end ) {
+ logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
+ goto fail_lock;
}
- mutex_unlock( &uplink->queueLock );
-
- if ( !isNew ) {
- goto success_ref; // Attached to pending request, do nothing
+ // Count number if clients, get tail of list
+ int count = 0;
+ c = &request->clients;
+ while ( *c != NULL ) {
+ c = &(**c).next;
+ if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
+ logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
+ goto fail_lock;
+ }
}
- } while (0);
+ isNew = false;
+ }
+ // Prefetch immediately, without unlocking the list - the old approach of
+ // async prefetching in another thread was sometimes so slow that we'd process
+ // another request from the same client before the prefetch job would execute.
+ if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data )
+ && end == request->to && length <= _maxPrefetch ) {
+ // Only if this is a client request, and the !! end boundary matches exactly !!
+ // (See above for reason why)
+ // - We neither check the local cache, nor other pending requests. Worth it?
+ // Complexity vs. probability
+ preReq.start = end;
+ preReq.end = end;
+ extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) );
+ if ( preReq.start < preReq.end ) {
+ //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end );
+ uplink->queueLen++;
+ pre = malloc( sizeof(*pre) );
+ pre->next = request->next;
+ request->next = pre;
+ pre->handle = preReq.handle = ++uplink->queueId;
+ pre->from = preReq.start;
+ pre->to = preReq.end;
+ pre->hopCount = hops;
+ pre->sent = true; // Optimistic; would be set to false on failure
+ pre->clients = NULL;
+#ifdef DEBUG
+ timing_get( &pre->entered );
+#endif
+ }
+ }
+ // // // //
+ // Copy data - need this after unlocking
+ req.handle = request->handle;
+ if ( callback != NULL ) {
+ assert( c != NULL );
+ *c = malloc( sizeof( *request->clients ) );
+ (**c).next = NULL;
+ (**c).handle = handle;
+ (**c).from = start;
+ (**c).to = end;
+ (**c).data = data;
+ (**c).callback = callback;
+ }
+ mutex_unlock( &uplink->queueLock );
+ // End queue critical section
+ if ( pre == NULL && !isNew )
+ return true; // Nothing to do
- // Fire away the request
+ // Fire away the request(s)
mutex_lock( &uplink->sendMutex );
- if ( unlikely( uplink->current.fd == -1 ) ) {
+ bool ret1 = true;
+ bool ret2 = true;
+ if ( isNew ) {
+ ret1 = requestBlock( uplink, &req, hops );
+ }
+ if ( pre != NULL ) {
+ ret2 = requestBlock( uplink, &preReq, hops );
+ }
+ if ( !ret1 || !ret2 ) { // Set with send locked
uplink->image->problem.uplink = true;
+ }
+ mutex_unlock( &uplink->sendMutex );
+ // markRequestUnsend locks the queue, would violate locking order with send mutex
+ if ( !ret1 ) {
markRequestUnsent( uplink, req.handle );
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
- } else {
- const bool ret = dnbd3_get_block( uplink->current.fd, req.start,
- (uint32_t)( req.end - req.start ), req.handle,
- COND_HOPCOUNT( uplink->current.version, hops ) );
- if ( unlikely( !ret ) ) {
- markRequestUnsent( uplink, req.handle );
- uplink->image->problem.uplink = true;
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
- } else {
- // OK
- mutex_unlock( &uplink->sendMutex );
- goto success_ref;
- }
- // Fall through to waking up sender thread
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
}
-
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
+ if ( !ret2 ) {
+ markRequestUnsent( uplink, preReq.handle );
}
-success_ref:
- if ( callback != NULL ) {
- // Was from client -- potential prefetch
- // Same size as this request, but consider end of image...
- uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end,
- req.end - req.start );
- // Also don't prefetch if we cross a hash block border and BGR mode == hashblock
- if ( len > 0 && ( _backgroundReplication != BGR_HASHBLOCK
- || req.start % HASH_BLOCK_SIZE == (req.end-1) % HASH_BLOCK_SIZE ) ) {
- prefetch_job_t *job = malloc( sizeof( *job ) );
- job->start = req.end;
- job->length = len;
- job->uplink = uplink;
- ref_inc( &uplink->reference ); // Hold one for the thread, thread will return it
- threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" );
- }
+ if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
return true;
+
fail_lock:
mutex_unlock( &uplink->queueLock );
return false;
}
-static void *prefetchForClient(void *data)
-{
- prefetch_job_t *job = (prefetch_job_t*)data;
- dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image );
- if ( cache != NULL ) {
- if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) {
- uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
- }
- ref_put( &cache->reference );
- }
- ref_put( &job->uplink->reference ); // Acquired in uplink_request
- free( job );
- return NULL;
-}
-
/**
* Uplink thread.
* Locks are irrelevant as this is never called from another function