From 5dc776ac73be190daa2b2b8c3eb6042fdab4acda Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 6 Aug 2019 14:06:27 +0200 Subject: [SERVER] uplink: Improve attaching to existing requests Allow attaching in ULR_PROCESSING state, leave lower slots empty to increase chances attaching to ULR_PROCESSING. --- src/server/uplink.c | 97 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 23 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index f58b019..9f99fe4 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -21,6 +21,28 @@ #define REP_NONE ( (uint64_t)0xffffffffffffffff ) +// Status of request in queue + +// Slot is free, can be used. +// Must only be set in uplink_handle_receive() or uplink_remove_client() +#define ULR_FREE 0 +// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse. +// Must only be set in uplink_request() +#define ULR_NEW 1 +// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. +// Must only be set in uplink_mainloop() or uplink_request() +#define ULR_PENDING 2 +// Slot is being processed, do not consider for hop on. +// Must only be set in uplink_handle_receive() +#define ULR_PROCESSING 3 + +static const char *const NAMES_ULR[4] = { + [ULR_FREE] = "ULR_FREE", + [ULR_NEW] = "ULR_NEW", + [ULR_PENDING] = "ULR_PENDING", + [ULR_PROCESSING] = "ULR_PROCESSING", +}; + static atomic_uint_fast64_t totalBytesReceived = 0; static void* uplink_mainloop(void *data); @@ -203,30 +225,37 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin int existingType = -1; // ULR_* type of existing request int i; int freeSlot = -1; + int firstUsedSlot = -1; bool requestLoop = false; const uint64_t end = start + length; mutex_lock( &uplink->queueLock ); mutex_unlock( &client->image->lock ); for (i = 0; i < uplink->queueLen; ++i) { - if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) { - freeSlot = i; + // find free slot to place this request into + if ( uplink->queue[i].status == ULR_FREE ) { + if ( freeSlot == -1 || existingType != ULR_PROCESSING ) { + freeSlot = i; + } continue; } - if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { - if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end ) { - requestLoop = true; - break; - } - if ( foundExisting == -1 || existingType == ULR_PENDING ) { - foundExisting = i; - existingType = uplink->queue[i].status; - if ( freeSlot != -1 ) break; - } + if ( firstUsedSlot == -1 ) { + firstUsedSlot = i; + } + // find existing request to attach to + if ( uplink->queue[i].from > start || uplink->queue[i].to < end ) + continue; // Range not suitable + // Detect potential proxy cycle. New request hopcount is greater, range is same, old request has already been sent -> suspicious + if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end && uplink->queue[i].status == ULR_PENDING ) { + requestLoop = true; + break; + } + if ( foundExisting == -1 || existingType == ULR_PROCESSING ) { + foundExisting = i; + existingType = uplink->queue[i].status; } } - if ( requestLoop ) { + if ( unlikely( requestLoop ) ) { mutex_unlock( &uplink->queueLock ); logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); mutex_lock( &uplink->rttLock ); @@ -235,6 +264,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin signal_call( uplink->signal ); return false; } + if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { + freeSlot = -1; // Not attaching to existing request, make it use a higher slot + } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { mutex_unlock( &uplink->queueLock ); @@ -244,15 +276,17 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin freeSlot = uplink->queueLen++; } // Do not send request to uplink server if we have a matching pending request AND the request either has the - // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise + // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise // explicitly send this request to the uplink server. The second condition mentioned here is to prevent // a race condition where the reply for the outstanding request already arrived and the uplink thread // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might // already have passed the index of the free slot we determined, but not reached the existing request we just found above. - if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request" + if ( foundExisting != -1 && existingType == ULR_PROCESSING && freeSlot > foundExisting ) { + foundExisting = -1; // -1 means "send request" + } #ifdef _DEBUG if ( foundExisting != -1 ) { - logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot ); + logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, NAMES_ULR[existingType], foundExisting, freeSlot ); logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n" "New %" PRIu64 "-%" PRIu64 " (%p)\n", uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client, @@ -265,7 +299,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin uplink->queue[freeSlot].handle = handle; uplink->queue[freeSlot].client = client; //int old = uplink->queue[freeSlot].status; - uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING); + uplink->queue[freeSlot].status = ( foundExisting == -1 ? ULR_NEW : + ( existingType == ULR_NEW ? ULR_PENDING : existingType ) ); uplink->queue[freeSlot].hopCount = hops; #ifdef _DEBUG timing_get( &uplink->queue[freeSlot].entered ); @@ -292,14 +327,25 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( !ret ) { logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); } else { + // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again + int state; mutex_lock( &uplink->queueLock ); - if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) { - uplink->queue[freeSlot].status = ULR_PENDING; - logadd( LOG_DEBUG2, "Succesful direct uplink request" ); + if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { + state = uplink->queue[freeSlot].status; + if ( uplink->queue[freeSlot].status == ULR_NEW ) { + uplink->queue[freeSlot].status = ULR_PENDING; + } } else { - logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" ); + state = -1; } mutex_unlock( &uplink->queueLock ); + if ( state == -1 ) { + logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" ); + } else if ( state == ULR_NEW ) { + logadd( LOG_DEBUG2, "Succesful direct uplink request" ); + } else { + logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); + } return true; } // Fall through to waking up sender thread @@ -837,6 +883,11 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } } // 2) Figure out which clients are interested in it + // Mark as ULR_PROCESSING, since we unlock repeatedly in the second loop + // below; this prevents uplink_request() from attaching to this request + // by populating a slot with index greater than the highest matching + // request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW + // where it's fine if the index is greater) mutex_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; @@ -877,10 +928,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) bytesSent = (size_t)sent - sizeof outReply; } } - mutex_unlock( &client->sendMutex ); if ( bytesSent != 0 ) { client->bytesSent += bytesSent; } + mutex_unlock( &client->sendMutex ); mutex_lock( &link->queueLock ); } if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; -- cgit v1.2.3-55-g7522