diff options
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r-- | src/server/uplink.c | 350 |
1 files changed, 198 insertions, 152 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c index e4d9d0e..dedaa17 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -24,6 +24,10 @@ static atomic_uint_fast64_t totalBytesReceived = 0; +typedef struct { + uint64_t start, end, handle; +} req_t; + static void cancelAllRequests(dnbd3_uplink_t *uplink); static void freeUplinkStruct(ref *ref); static void* uplink_mainloop(void *data); @@ -39,13 +43,6 @@ static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew); static int numWantedReplicationRequests(dnbd3_uplink_t *uplink); static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle); static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); -static void *prefetchForClient(void *data); - -typedef struct { - dnbd3_uplink_t *uplink; - uint64_t start; - uint32_t length; -} prefetch_job_t; #define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) ) @@ -247,6 +244,10 @@ void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback call mutex_unlock( &uplink->queueLock ); } +/** + * Called from a client (proxy) connection to request a missing part of the image. + * The caller has made sure that the range is actually missing. + */ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { assert( client != NULL && callback != NULL ); @@ -266,8 +267,7 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) bool ret; - if ( client != NULL && hops > 1 - && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { + if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); @@ -279,7 +279,11 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint return ret; } -bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length) +/** + * Called by integrated fuse module + */ +bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, + uint64_t handle, uint64_t start, uint32_t length) { dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); if ( unlikely( uplink == NULL ) ) { @@ -295,12 +299,42 @@ bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, return ret; } +static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted) +{ + uint32_t length = (uint32_t)( *end - start ); + if ( length >= wanted ) + return; + length = wanted; + if ( unlikely( _backgroundReplication == BGR_HASHBLOCK + && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) { + // Don't extend across hash-block border in this mode + *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 ); + } else { + *end = start + length; + } + if ( unlikely( *end > image->virtualFilesize ) ) { + *end = image->virtualFilesize; + } + *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 ); + //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end ); +} + +static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops) +{ + if ( uplink->current.fd == -1 ) + return false; + return dnbd3_get_block( uplink->current.fd, req->start, + (uint32_t)( req->end - req->start ), req->handle, + COND_HOPCOUNT( uplink->current.version, hops ) ); +} + /** * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. - * If client is NULL, this is assumed to be a background replication request. + * If callback is NULL, this is assumed to be a background replication request. * Locks on: uplink.queueLock, uplink.sendMutex */ -static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, + uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { assert( uplink != NULL ); assert( data == NULL || callback != NULL ); @@ -309,174 +343,186 @@ static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_ca return false; } if ( length > (uint32_t)_maxPayload ) { - logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length ); + logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", + length ); return false; } - struct { - uint64_t handle, start, end; - } req; + req_t req, preReq; hops++; - do { - const uint64_t end = start + length; - dnbd3_queue_entry_t *request = NULL, *last = NULL; - bool isNew; - mutex_lock( &uplink->queueLock ); - if ( uplink->shutdown ) { // Check again after locking to prevent lost requests + dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL; + bool isNew; + const uint64_t end = start + length; + req.start = start & ~(DNBD3_BLOCK_SIZE - 1); + req.end = end; + /* Don't do this for now -- this breaks matching of prefetch jobs, since they'd + * be misaligned, and the next client request wouldn't match anything. + * To improve this, we need to be able to attach a queue_client to multiple queue_entries + * and then serve it once all the queue_entries are done (atomic_int in queue_client). + * But currently we directly send the receive buffer's content to the queue_client after + * receiving the payload, as this will also work when the local cache is borked (we just + * tunnel though the traffic). One could argue that this mode of operation is nonsense, + * and we should just drop all affected clients. Then as a next step, don't serve the + * clients form the receive buffer, but just issue a normal sendfile() call after writing + * the received data to the local cache. + if ( callback != NULL ) { + // Not background replication request, extend request size + extendRequest( req.start, &req.end, uplink->image, _minRequestSize ); + } + */ + req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1); + // Critical section - work with the queue + mutex_lock( &uplink->queueLock ); + if ( uplink->shutdown ) { // Check again after locking to prevent lost requests + goto fail_lock; + } + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->from <= start && it->to >= end ) { + // Matching range, attach + request = it; + break; + } + if ( it->next == NULL ) { + // Not matching, last in list, remember + last = it; + break; + } + } + dnbd3_queue_client_t **c = NULL; + if ( request == NULL ) { + // No existing request to attach to + if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) { + logadd( LOG_WARNING, + "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." ); goto fail_lock; } - for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { - if ( it->from <= start && it->to >= end ) { - // Matching range, attach - request = it; - break; - } - if ( it->next == NULL ) { - // Not matching, last in list, remember - last = it; - break; - } + uplink->queueLen++; + if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = true; } - dnbd3_queue_client_t **c = NULL; - if ( request == NULL ) { - // No existing request to attach to - if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) { - logadd( LOG_WARNING, "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." ); - goto fail_lock; - } - uplink->queueLen++; - if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { - uplink->image->problem.queue = true; - } - request = malloc( sizeof(*request) ); - if ( last == NULL ) { - uplink->queue = request; - } else { - last->next = request; - } - request->next = NULL; - request->handle = ++uplink->queueId; - request->from = start & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - request->to = (end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + request = malloc( sizeof(*request) ); + if ( last == NULL ) { + uplink->queue = request; + } else { + last->next = request; + } + request->next = NULL; + request->handle = ++uplink->queueId; + request->from = req.start; + request->to = req.end; #ifdef DEBUG - timing_get( &request->entered ); + timing_get( &request->entered ); #endif - request->hopCount = hops; - request->sent = true; // Optimistic; would be set to false on failure - if ( callback == NULL ) { - // BGR - request->clients = NULL; - } else { - c = &request->clients; - } - isNew = true; - } else if ( callback == NULL ) { - // Replication request that maches existing request. Do nothing - isNew = false; + request->hopCount = hops; + request->sent = true; // Optimistic; would be set to false on failure + if ( callback == NULL ) { + // BGR + request->clients = NULL; } else { - // Existing request. Check if potential cycle - if ( hops > request->hopCount && request->from == start && request->to == end ) { - logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) ); - goto fail_lock; - } - // Count number if clients, get tail of list - int count = 0; c = &request->clients; - while ( *c != NULL ) { - c = &(**c).next; - if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) { - logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count ); - goto fail_lock; - } - } - isNew = false; } - req.handle = request->handle; - req.start = request->from; - req.end = request->to; - if ( callback != NULL ) { - assert( c != NULL ); - *c = malloc( sizeof( *request->clients ) ); - (**c).next = NULL; - (**c).handle = handle; - (**c).from = start; - (**c).to = end; - (**c).data = data; - (**c).callback = callback; + isNew = true; + } else if ( callback == NULL ) { + // Replication request that maches existing request. Do nothing + isNew = false; + } else { + // Existing request. Check if potential cycle + if ( hops > request->hopCount && request->from == start && request->to == end ) { + logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) ); + goto fail_lock; } - mutex_unlock( &uplink->queueLock ); - - if ( !isNew ) { - goto success_ref; // Attached to pending request, do nothing + // Count number if clients, get tail of list + int count = 0; + c = &request->clients; + while ( *c != NULL ) { + c = &(**c).next; + if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) { + logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count ); + goto fail_lock; + } } - } while (0); + isNew = false; + } + // Prefetch immediately, without unlocking the list - the old approach of + // async prefetching in another thread was sometimes so slow that we'd process + // another request from the same client before the prefetch job would execute. + if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data ) + && end == request->to && length <= _maxPrefetch ) { + // Only if this is a client request, and the !! end boundary matches exactly !! + // (See above for reason why) + // - We neither check the local cache, nor other pending requests. Worth it? + // Complexity vs. probability + preReq.start = end; + preReq.end = end; + extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) ); + if ( preReq.start < preReq.end ) { + //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end ); + uplink->queueLen++; + pre = malloc( sizeof(*pre) ); + pre->next = request->next; + request->next = pre; + pre->handle = preReq.handle = ++uplink->queueId; + pre->from = preReq.start; + pre->to = preReq.end; + pre->hopCount = hops; + pre->sent = true; // Optimistic; would be set to false on failure + pre->clients = NULL; +#ifdef DEBUG + timing_get( &pre->entered ); +#endif + } + } + // // // // + // Copy data - need this after unlocking + req.handle = request->handle; + if ( callback != NULL ) { + assert( c != NULL ); + *c = malloc( sizeof( *request->clients ) ); + (**c).next = NULL; + (**c).handle = handle; + (**c).from = start; + (**c).to = end; + (**c).data = data; + (**c).callback = callback; + } + mutex_unlock( &uplink->queueLock ); + // End queue critical section + if ( pre == NULL && !isNew ) + return true; // Nothing to do - // Fire away the request + // Fire away the request(s) mutex_lock( &uplink->sendMutex ); - if ( unlikely( uplink->current.fd == -1 ) ) { + bool ret1 = true; + bool ret2 = true; + if ( isNew ) { + ret1 = requestBlock( uplink, &req, hops ); + } + if ( pre != NULL ) { + ret2 = requestBlock( uplink, &preReq, hops ); + } + if ( !ret1 || !ret2 ) { // Set with send locked uplink->image->problem.uplink = true; + } + mutex_unlock( &uplink->sendMutex ); + // markRequestUnsend locks the queue, would violate locking order with send mutex + if ( !ret1 ) { markRequestUnsent( uplink, req.handle ); - mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); - } else { - const bool ret = dnbd3_get_block( uplink->current.fd, req.start, - (uint32_t)( req.end - req.start ), req.handle, - COND_HOPCOUNT( uplink->current.version, hops ) ); - if ( unlikely( !ret ) ) { - markRequestUnsent( uplink, req.handle ); - uplink->image->problem.uplink = true; - mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle ); - } else { - // OK - mutex_unlock( &uplink->sendMutex ); - goto success_ref; - } - // Fall through to waking up sender thread + logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle ); } - - if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); + if ( !ret2 ) { + markRequestUnsent( uplink, preReq.handle ); } -success_ref: - if ( callback != NULL ) { - // Was from client -- potential prefetch - // Same size as this request, but consider end of image... - uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end, - req.end - req.start ); - // Also don't prefetch if we cross a hash block border and BGR mode == hashblock - if ( len > 0 && ( _backgroundReplication != BGR_HASHBLOCK - || req.start % HASH_BLOCK_SIZE == (req.end-1) % HASH_BLOCK_SIZE ) ) { - prefetch_job_t *job = malloc( sizeof( *job ) ); - job->start = req.end; - job->length = len; - job->uplink = uplink; - ref_inc( &uplink->reference ); // Hold one for the thread, thread will return it - threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" ); - } + if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } return true; + fail_lock: mutex_unlock( &uplink->queueLock ); return false; } -static void *prefetchForClient(void *data) -{ - prefetch_job_t *job = (prefetch_job_t*)data; - dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image ); - if ( cache != NULL ) { - if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) { - uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 ); - } - ref_put( &cache->reference ); - } - ref_put( &job->uplink->reference ); // Acquired in uplink_request - free( job ); - return NULL; -} - /** * Uplink thread. * Locks are irrelevant as this is never called from another function |