From 4209bbef4b1e1b6a6af28407552c8518f3f72125 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 15 Aug 2013 20:49:06 +0200 Subject: [SERVER] (Hopefully) fix starving uplink requests --- src/server/uplink.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 6343ec3..38b1415 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -112,6 +112,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint } dnbd3_connection_t * const uplink = client->image->uplink; int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise + int existingType = -1; // ULR_* type of existing request int i; int freeSlot = -1; const uint64_t end = start + length; @@ -121,12 +122,13 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint for (i = 0; i < uplink->queueLen; ++i) { if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( foundExisting == -1 && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { + if ( (foundExisting == -1 || existingType == ULR_PENDING) && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { foundExisting = i; + existingType = uplink->queue[i].status; break; } } - if ( freeSlot == -1 || freeSlot < foundExisting ) { // Second check: If we "attach" to a thread the request has to be added after the existing one, otherwise a race condition might occur where the now request will starve + if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { spin_unlock( &uplink->queueLock ); memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); @@ -134,6 +136,14 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint } freeSlot = uplink->queueLen++; } + // Do not send request to uplink server if we have a matching pending request AND the request either has the + // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise + // explicitly send this request to the uplink server. The second condition mentioned here is to prevent + // a race condition where the reply for the outstanding request already arrived and the uplink thread + // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might + // already have passed the index of the free slot we determined, but not reached the existing request we just found above. + if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; + // Fill structure uplink->queue[freeSlot].from = start; uplink->queue[freeSlot].to = end; uplink->queue[freeSlot].handle = handle; @@ -144,7 +154,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint #endif spin_unlock( &uplink->queueLock ); - if ( foundExisting == -1 ) { + if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed static uint64_t counter = 1; write( uplink->signal, &counter, sizeof(uint64_t) ); } @@ -304,7 +314,7 @@ static void* uplink_mainloop(void *data) goto cleanup; } } else { - // Not complete- do measurement + // Not complete - do measurement altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) // Also send a keepalive packet to the currently connected server if ( link->fd != -1 ) { @@ -447,7 +457,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; - assert( req->status != ULR_PROCESSING ); + assert( req->status != ULR_PROCESSING || req->status != ULR_NEW ); if ( req->status != ULR_PENDING ) continue; if ( req->from >= start && req->to <= end ) { // Match :-) req->status = ULR_PROCESSING; -- cgit v1.2.3-55-g7522