summaryrefslogtreecommitdiffstats
path: root/src/server
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-06 14:06:27 +0200
committerSimon Rettberg2019-08-06 14:06:27 +0200
commit5dc776ac73be190daa2b2b8c3eb6042fdab4acda (patch)
tree9f22afe4bf2e35a9e973c0afa7c16214ac55c557 /src/server
parent[BENCH] Increase timeouts, fix block payload reading (diff)
downloaddnbd3-5dc776ac73be190daa2b2b8c3eb6042fdab4acda.tar.gz
dnbd3-5dc776ac73be190daa2b2b8c3eb6042fdab4acda.tar.xz
dnbd3-5dc776ac73be190daa2b2b8c3eb6042fdab4acda.zip
[SERVER] uplink: Improve attaching to existing requests
Allow attaching in ULR_PROCESSING state, leave lower slots empty to increase chances attaching to ULR_PROCESSING.
Diffstat (limited to 'src/server')
-rw-r--r--src/server/globals.h12
-rw-r--r--src/server/uplink.c97
2 files changed, 74 insertions, 35 deletions
diff --git a/src/server/globals.h b/src/server/globals.h
index 7e5ff04..cd5ad7e 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -17,18 +17,6 @@ typedef struct _dnbd3_connection dnbd3_connection_t;
typedef struct _dnbd3_image dnbd3_image_t;
typedef struct _dnbd3_client dnbd3_client_t;
-// Slot is free, can be used.
-// Must only be set in uplink_handle_receive() or uplink_remove_client()
-#define ULR_FREE 0
-// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse.
-// Must only be set in uplink_request()
-#define ULR_NEW 1
-// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse.
-// Must only be set in uplink_mainloop() or uplink_request()
-#define ULR_PENDING 2
-// Slot is being processed, do not consider for hop on.
-// Must only be set in uplink_handle_receive()
-#define ULR_PROCESSING 3
typedef struct
{
uint64_t handle; // Client defined handle to pass back in reply
diff --git a/src/server/uplink.c b/src/server/uplink.c
index f58b019..9f99fe4 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -21,6 +21,28 @@
#define REP_NONE ( (uint64_t)0xffffffffffffffff )
+// Status of request in queue
+
+// Slot is free, can be used.
+// Must only be set in uplink_handle_receive() or uplink_remove_client()
+#define ULR_FREE 0
+// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse.
+// Must only be set in uplink_request()
+#define ULR_NEW 1
+// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse.
+// Must only be set in uplink_mainloop() or uplink_request()
+#define ULR_PENDING 2
+// Slot is being processed, do not consider for hop on.
+// Must only be set in uplink_handle_receive()
+#define ULR_PROCESSING 3
+
+static const char *const NAMES_ULR[4] = {
+ [ULR_FREE] = "ULR_FREE",
+ [ULR_NEW] = "ULR_NEW",
+ [ULR_PENDING] = "ULR_PENDING",
+ [ULR_PROCESSING] = "ULR_PROCESSING",
+};
+
static atomic_uint_fast64_t totalBytesReceived = 0;
static void* uplink_mainloop(void *data);
@@ -203,30 +225,37 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
int existingType = -1; // ULR_* type of existing request
int i;
int freeSlot = -1;
+ int firstUsedSlot = -1;
bool requestLoop = false;
const uint64_t end = start + length;
mutex_lock( &uplink->queueLock );
mutex_unlock( &client->image->lock );
for (i = 0; i < uplink->queueLen; ++i) {
- if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) {
- freeSlot = i;
+ // find free slot to place this request into
+ if ( uplink->queue[i].status == ULR_FREE ) {
+ if ( freeSlot == -1 || existingType != ULR_PROCESSING ) {
+ freeSlot = i;
+ }
continue;
}
- if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
- if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
- if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end ) {
- requestLoop = true;
- break;
- }
- if ( foundExisting == -1 || existingType == ULR_PENDING ) {
- foundExisting = i;
- existingType = uplink->queue[i].status;
- if ( freeSlot != -1 ) break;
- }
+ if ( firstUsedSlot == -1 ) {
+ firstUsedSlot = i;
+ }
+ // find existing request to attach to
+ if ( uplink->queue[i].from > start || uplink->queue[i].to < end )
+ continue; // Range not suitable
+ // Detect potential proxy cycle. New request hopcount is greater, range is same, old request has already been sent -> suspicious
+ if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end && uplink->queue[i].status == ULR_PENDING ) {
+ requestLoop = true;
+ break;
+ }
+ if ( foundExisting == -1 || existingType == ULR_PROCESSING ) {
+ foundExisting = i;
+ existingType = uplink->queue[i].status;
}
}
- if ( requestLoop ) {
+ if ( unlikely( requestLoop ) ) {
mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
mutex_lock( &uplink->rttLock );
@@ -235,6 +264,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
signal_call( uplink->signal );
return false;
}
+ if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) {
+ freeSlot = -1; // Not attaching to existing request, make it use a higher slot
+ }
if ( freeSlot == -1 ) {
if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
mutex_unlock( &uplink->queueLock );
@@ -244,15 +276,17 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
freeSlot = uplink->queueLen++;
}
// Do not send request to uplink server if we have a matching pending request AND the request either has the
- // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise
+ // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise
// explicitly send this request to the uplink server. The second condition mentioned here is to prevent
// a race condition where the reply for the outstanding request already arrived and the uplink thread
// is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might
// already have passed the index of the free slot we determined, but not reached the existing request we just found above.
- if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request"
+ if ( foundExisting != -1 && existingType == ULR_PROCESSING && freeSlot > foundExisting ) {
+ foundExisting = -1; // -1 means "send request"
+ }
#ifdef _DEBUG
if ( foundExisting != -1 ) {
- logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot );
+ logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, NAMES_ULR[existingType], foundExisting, freeSlot );
logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n"
"New %" PRIu64 "-%" PRIu64 " (%p)\n",
uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client,
@@ -265,7 +299,8 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
uplink->queue[freeSlot].handle = handle;
uplink->queue[freeSlot].client = client;
//int old = uplink->queue[freeSlot].status;
- uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING);
+ uplink->queue[freeSlot].status = ( foundExisting == -1 ? ULR_NEW :
+ ( existingType == ULR_NEW ? ULR_PENDING : existingType ) );
uplink->queue[freeSlot].hopCount = hops;
#ifdef _DEBUG
timing_get( &uplink->queue[freeSlot].entered );
@@ -292,14 +327,25 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( !ret ) {
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
} else {
+ // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
+ int state;
mutex_lock( &uplink->queueLock );
- if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) {
- uplink->queue[freeSlot].status = ULR_PENDING;
- logadd( LOG_DEBUG2, "Succesful direct uplink request" );
+ if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
+ state = uplink->queue[freeSlot].status;
+ if ( uplink->queue[freeSlot].status == ULR_NEW ) {
+ uplink->queue[freeSlot].status = ULR_PENDING;
+ }
} else {
- logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" );
+ state = -1;
}
mutex_unlock( &uplink->queueLock );
+ if ( state == -1 ) {
+ logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" );
+ } else if ( state == ULR_NEW ) {
+ logadd( LOG_DEBUG2, "Succesful direct uplink request" );
+ } else {
+ logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] );
+ }
return true;
}
// Fall through to waking up sender thread
@@ -837,6 +883,11 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
}
// 2) Figure out which clients are interested in it
+ // Mark as ULR_PROCESSING, since we unlock repeatedly in the second loop
+ // below; this prevents uplink_request() from attaching to this request
+ // by populating a slot with index greater than the highest matching
+ // request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW
+ // where it's fine if the index is greater)
mutex_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
dnbd3_queued_request_t * const req = &link->queue[i];
@@ -877,10 +928,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
bytesSent = (size_t)sent - sizeof outReply;
}
}
- mutex_unlock( &client->sendMutex );
if ( bytesSent != 0 ) {
client->bytesSent += bytesSent;
}
+ mutex_unlock( &client->sendMutex );
mutex_lock( &link->queueLock );
}
if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;