summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c350
1 files changed, 198 insertions, 152 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index e4d9d0e..dedaa17 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -24,6 +24,10 @@
static atomic_uint_fast64_t totalBytesReceived = 0;
+typedef struct {
+ uint64_t start, end, handle;
+} req_t;
+
static void cancelAllRequests(dnbd3_uplink_t *uplink);
static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
@@ -39,13 +43,6 @@ static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
-static void *prefetchForClient(void *data);
-
-typedef struct {
- dnbd3_uplink_t *uplink;
- uint64_t start;
- uint32_t length;
-} prefetch_job_t;
#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) )
@@ -247,6 +244,10 @@ void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback call
mutex_unlock( &uplink->queueLock );
}
+/**
+ * Called from a client (proxy) connection to request a missing part of the image.
+ * The caller has made sure that the range is actually missing.
+ */
bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
assert( client != NULL && callback != NULL );
@@ -266,8 +267,7 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
bool ret;
- if ( client != NULL && hops > 1
- && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
+ if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
@@ -279,7 +279,11 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint
return ret;
}
-bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length)
+/**
+ * Called by integrated fuse module
+ */
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length)
{
dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
if ( unlikely( uplink == NULL ) ) {
@@ -295,12 +299,42 @@ bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
return ret;
}
+static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted)
+{
+ uint32_t length = (uint32_t)( *end - start );
+ if ( length >= wanted )
+ return;
+ length = wanted;
+ if ( unlikely( _backgroundReplication == BGR_HASHBLOCK
+ && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) {
+ // Don't extend across hash-block border in this mode
+ *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 );
+ } else {
+ *end = start + length;
+ }
+ if ( unlikely( *end > image->virtualFilesize ) ) {
+ *end = image->virtualFilesize;
+ }
+ *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 );
+ //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end );
+}
+
+static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops)
+{
+ if ( uplink->current.fd == -1 )
+ return false;
+ return dnbd3_get_block( uplink->current.fd, req->start,
+ (uint32_t)( req->end - req->start ), req->handle,
+ COND_HOPCOUNT( uplink->current.version, hops ) );
+}
+
/**
* Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
- * If client is NULL, this is assumed to be a background replication request.
+ * If callback is NULL, this is assumed to be a background replication request.
* Locks on: uplink.queueLock, uplink.sendMutex
*/
-static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
assert( uplink != NULL );
assert( data == NULL || callback != NULL );
@@ -309,174 +343,186 @@ static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_ca
return false;
}
if ( length > (uint32_t)_maxPayload ) {
- logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length );
+ logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload",
+ length );
return false;
}
- struct {
- uint64_t handle, start, end;
- } req;
+ req_t req, preReq;
hops++;
- do {
- const uint64_t end = start + length;
- dnbd3_queue_entry_t *request = NULL, *last = NULL;
- bool isNew;
- mutex_lock( &uplink->queueLock );
- if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL;
+ bool isNew;
+ const uint64_t end = start + length;
+ req.start = start & ~(DNBD3_BLOCK_SIZE - 1);
+ req.end = end;
+ /* Don't do this for now -- this breaks matching of prefetch jobs, since they'd
+ * be misaligned, and the next client request wouldn't match anything.
+ * To improve this, we need to be able to attach a queue_client to multiple queue_entries
+ * and then serve it once all the queue_entries are done (atomic_int in queue_client).
+ * But currently we directly send the receive buffer's content to the queue_client after
+ * receiving the payload, as this will also work when the local cache is borked (we just
+ * tunnel though the traffic). One could argue that this mode of operation is nonsense,
+ * and we should just drop all affected clients. Then as a next step, don't serve the
+ * clients form the receive buffer, but just issue a normal sendfile() call after writing
+ * the received data to the local cache.
+ if ( callback != NULL ) {
+ // Not background replication request, extend request size
+ extendRequest( req.start, &req.end, uplink->image, _minRequestSize );
+ }
+ */
+ req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1);
+ // Critical section - work with the queue
+ mutex_lock( &uplink->queueLock );
+ if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ goto fail_lock;
+ }
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->from <= start && it->to >= end ) {
+ // Matching range, attach
+ request = it;
+ break;
+ }
+ if ( it->next == NULL ) {
+ // Not matching, last in list, remember
+ last = it;
+ break;
+ }
+ }
+ dnbd3_queue_client_t **c = NULL;
+ if ( request == NULL ) {
+ // No existing request to attach to
+ if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
+ logadd( LOG_WARNING,
+ "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
goto fail_lock;
}
- for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
- if ( it->from <= start && it->to >= end ) {
- // Matching range, attach
- request = it;
- break;
- }
- if ( it->next == NULL ) {
- // Not matching, last in list, remember
- last = it;
- break;
- }
+ uplink->queueLen++;
+ if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = true;
}
- dnbd3_queue_client_t **c = NULL;
- if ( request == NULL ) {
- // No existing request to attach to
- if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
- logadd( LOG_WARNING, "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
- goto fail_lock;
- }
- uplink->queueLen++;
- if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
- uplink->image->problem.queue = true;
- }
- request = malloc( sizeof(*request) );
- if ( last == NULL ) {
- uplink->queue = request;
- } else {
- last->next = request;
- }
- request->next = NULL;
- request->handle = ++uplink->queueId;
- request->from = start & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- request->to = (end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ request = malloc( sizeof(*request) );
+ if ( last == NULL ) {
+ uplink->queue = request;
+ } else {
+ last->next = request;
+ }
+ request->next = NULL;
+ request->handle = ++uplink->queueId;
+ request->from = req.start;
+ request->to = req.end;
#ifdef DEBUG
- timing_get( &request->entered );
+ timing_get( &request->entered );
#endif
- request->hopCount = hops;
- request->sent = true; // Optimistic; would be set to false on failure
- if ( callback == NULL ) {
- // BGR
- request->clients = NULL;
- } else {
- c = &request->clients;
- }
- isNew = true;
- } else if ( callback == NULL ) {
- // Replication request that maches existing request. Do nothing
- isNew = false;
+ request->hopCount = hops;
+ request->sent = true; // Optimistic; would be set to false on failure
+ if ( callback == NULL ) {
+ // BGR
+ request->clients = NULL;
} else {
- // Existing request. Check if potential cycle
- if ( hops > request->hopCount && request->from == start && request->to == end ) {
- logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
- goto fail_lock;
- }
- // Count number if clients, get tail of list
- int count = 0;
c = &request->clients;
- while ( *c != NULL ) {
- c = &(**c).next;
- if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
- logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
- goto fail_lock;
- }
- }
- isNew = false;
}
- req.handle = request->handle;
- req.start = request->from;
- req.end = request->to;
- if ( callback != NULL ) {
- assert( c != NULL );
- *c = malloc( sizeof( *request->clients ) );
- (**c).next = NULL;
- (**c).handle = handle;
- (**c).from = start;
- (**c).to = end;
- (**c).data = data;
- (**c).callback = callback;
+ isNew = true;
+ } else if ( callback == NULL ) {
+ // Replication request that maches existing request. Do nothing
+ isNew = false;
+ } else {
+ // Existing request. Check if potential cycle
+ if ( hops > request->hopCount && request->from == start && request->to == end ) {
+ logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
+ goto fail_lock;
}
- mutex_unlock( &uplink->queueLock );
-
- if ( !isNew ) {
- goto success_ref; // Attached to pending request, do nothing
+ // Count number if clients, get tail of list
+ int count = 0;
+ c = &request->clients;
+ while ( *c != NULL ) {
+ c = &(**c).next;
+ if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
+ logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
+ goto fail_lock;
+ }
}
- } while (0);
+ isNew = false;
+ }
+ // Prefetch immediately, without unlocking the list - the old approach of
+ // async prefetching in another thread was sometimes so slow that we'd process
+ // another request from the same client before the prefetch job would execute.
+ if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data )
+ && end == request->to && length <= _maxPrefetch ) {
+ // Only if this is a client request, and the !! end boundary matches exactly !!
+ // (See above for reason why)
+ // - We neither check the local cache, nor other pending requests. Worth it?
+ // Complexity vs. probability
+ preReq.start = end;
+ preReq.end = end;
+ extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) );
+ if ( preReq.start < preReq.end ) {
+ //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end );
+ uplink->queueLen++;
+ pre = malloc( sizeof(*pre) );
+ pre->next = request->next;
+ request->next = pre;
+ pre->handle = preReq.handle = ++uplink->queueId;
+ pre->from = preReq.start;
+ pre->to = preReq.end;
+ pre->hopCount = hops;
+ pre->sent = true; // Optimistic; would be set to false on failure
+ pre->clients = NULL;
+#ifdef DEBUG
+ timing_get( &pre->entered );
+#endif
+ }
+ }
+ // // // //
+ // Copy data - need this after unlocking
+ req.handle = request->handle;
+ if ( callback != NULL ) {
+ assert( c != NULL );
+ *c = malloc( sizeof( *request->clients ) );
+ (**c).next = NULL;
+ (**c).handle = handle;
+ (**c).from = start;
+ (**c).to = end;
+ (**c).data = data;
+ (**c).callback = callback;
+ }
+ mutex_unlock( &uplink->queueLock );
+ // End queue critical section
+ if ( pre == NULL && !isNew )
+ return true; // Nothing to do
- // Fire away the request
+ // Fire away the request(s)
mutex_lock( &uplink->sendMutex );
- if ( unlikely( uplink->current.fd == -1 ) ) {
+ bool ret1 = true;
+ bool ret2 = true;
+ if ( isNew ) {
+ ret1 = requestBlock( uplink, &req, hops );
+ }
+ if ( pre != NULL ) {
+ ret2 = requestBlock( uplink, &preReq, hops );
+ }
+ if ( !ret1 || !ret2 ) { // Set with send locked
uplink->image->problem.uplink = true;
+ }
+ mutex_unlock( &uplink->sendMutex );
+ // markRequestUnsend locks the queue, would violate locking order with send mutex
+ if ( !ret1 ) {
markRequestUnsent( uplink, req.handle );
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
- } else {
- const bool ret = dnbd3_get_block( uplink->current.fd, req.start,
- (uint32_t)( req.end - req.start ), req.handle,
- COND_HOPCOUNT( uplink->current.version, hops ) );
- if ( unlikely( !ret ) ) {
- markRequestUnsent( uplink, req.handle );
- uplink->image->problem.uplink = true;
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
- } else {
- // OK
- mutex_unlock( &uplink->sendMutex );
- goto success_ref;
- }
- // Fall through to waking up sender thread
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
}
-
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
+ if ( !ret2 ) {
+ markRequestUnsent( uplink, preReq.handle );
}
-success_ref:
- if ( callback != NULL ) {
- // Was from client -- potential prefetch
- // Same size as this request, but consider end of image...
- uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end,
- req.end - req.start );
- // Also don't prefetch if we cross a hash block border and BGR mode == hashblock
- if ( len > 0 && ( _backgroundReplication != BGR_HASHBLOCK
- || req.start % HASH_BLOCK_SIZE == (req.end-1) % HASH_BLOCK_SIZE ) ) {
- prefetch_job_t *job = malloc( sizeof( *job ) );
- job->start = req.end;
- job->length = len;
- job->uplink = uplink;
- ref_inc( &uplink->reference ); // Hold one for the thread, thread will return it
- threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" );
- }
+ if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
return true;
+
fail_lock:
mutex_unlock( &uplink->queueLock );
return false;
}
-static void *prefetchForClient(void *data)
-{
- prefetch_job_t *job = (prefetch_job_t*)data;
- dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image );
- if ( cache != NULL ) {
- if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) {
- uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
- }
- ref_put( &cache->reference );
- }
- ref_put( &job->uplink->reference ); // Acquired in uplink_request
- free( job );
- return NULL;
-}
-
/**
* Uplink thread.
* Locks are irrelevant as this is never called from another function