summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2013-08-15 20:49:06 +0200
committerSimon Rettberg2013-08-15 20:49:06 +0200
commit4209bbef4b1e1b6a6af28407552c8518f3f72125 (patch)
tree90b159c9355c5ae47494bccc08cfa41ae96cfd40
parent[SERVER] Remove dead code (diff)
downloaddnbd3-4209bbef4b1e1b6a6af28407552c8518f3f72125.tar.gz
dnbd3-4209bbef4b1e1b6a6af28407552c8518f3f72125.tar.xz
dnbd3-4209bbef4b1e1b6a6af28407552c8518f3f72125.zip
[SERVER] (Hopefully) fix starving uplink requests
-rw-r--r--src/server/altservers.c39
-rw-r--r--src/server/uplink.c20
2 files changed, 39 insertions, 20 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index a98bedc..da41857 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -108,6 +108,7 @@ int altservers_add(dnbd3_host_t *host, const char *comment)
void altserver_find_uplink(dnbd3_connection_t *uplink)
{
int i;
+ assert( uplink->betterFd == -1 );
spin_lock( &pendingLock );
if ( uplink->rttTestResult == RTT_INPROGRESS ) {
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
@@ -382,16 +383,18 @@ static void *altserver_main(void *data)
iov[0].iov_len = sizeof(request);
iov[1].iov_base = &serialized;
iov[1].iov_len = len;
- if ( writev( sock, iov, 2 ) != len + sizeof(request) ) goto server_failed;
+ if ( writev( sock, iov, 2 ) != len + sizeof(request) ) {
+ goto server_failed;
+ }
// See if selecting the image succeeded ++++++++++++++++++++++++++++++
if ( recv( sock, &reply, sizeof(reply), MSG_WAITALL ) != sizeof(reply) ) {
- //ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header after CMD_SELECT_IMAGE (%s)",
- // uplink->image->lower_name );
goto server_failed;
}
// check reply header
fixup_reply( reply );
- if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD || reply.magic != dnbd3_packet_magic ) goto server_failed;
+ if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD || reply.magic != dnbd3_packet_magic ) {
+ goto server_failed;
+ }
// Not found
// receive reply payload
if ( recv( sock, &serialized, reply.size, MSG_WAITALL ) != reply.size ) {
@@ -402,19 +405,22 @@ static void *altserver_main(void *data)
const uint16_t protocol_version = serializer_get_uint16( &serialized );
if ( protocol_version < MIN_SUPPORTED_SERVER ) goto server_failed;
const char *name = serializer_get_string( &serialized );
- if ( strcmp( name, uplink->image->lower_name ) != 0 ) {
+ if ( name == NULL || strcmp( name, uplink->image->lower_name ) != 0 ) {
ERROR_GOTO_VA( server_failed, "[ERROR] Server offers image '%s', requested '%s'", name, uplink->image->lower_name );
}
const uint16_t rid = serializer_get_uint16( &serialized );
- if ( rid != uplink->image->rid ) ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)",
- (int)rid, (int)uplink->image->rid, uplink->image->lower_name );
+ if ( rid != uplink->image->rid ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)",
+ (int)rid, (int)uplink->image->rid, uplink->image->lower_name );
+ }
const uint64_t image_size = serializer_get_uint64( &serialized );
- if ( image_size != uplink->image->filesize ) ERROR_GOTO_VA( server_failed,
- "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)",
- image_size, uplink->image->filesize, uplink->image->lower_name );
+ if ( image_size != uplink->image->filesize ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)",
+ image_size, uplink->image->filesize, uplink->image->lower_name );
+ }
// Request random block ++++++++++++++++++++++++++++++
request.cmd = CMD_GET_BLOCK;
- request.offset = (uplink->image->filesize - 1) & ~(DNBD3_BLOCK_SIZE - 1);
+ request.offset = (((uint64_t)start.tv_nsec | (uint64_t)rand()) * DNBD3_BLOCK_SIZE ) % uplink->image->filesize;
request.size = DNBD3_BLOCK_SIZE;
fixup_request( request );
if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) ERROR_GOTO_VA( server_failed,
@@ -429,10 +435,13 @@ static void *altserver_main(void *data)
}
// check reply header
fixup_reply( reply );
- if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed,
- "[ERROR] Reply to random block request is %d bytes for %s", reply.size, uplink->image->lower_name );
- if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed,
- "[ERROR] Could not read random block from socket for %s", uplink->image->lower_name );
+ if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Reply to random block request is %d bytes for %s",
+ reply.size, uplink->image->lower_name );
+ }
+ if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Could not read random block from socket for %s", uplink->image->lower_name );
+ }
clock_gettime( CLOCK_MONOTONIC_RAW, &end );
// Measurement done - everything fine so far
const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 6343ec3..38b1415 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -112,6 +112,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
}
dnbd3_connection_t * const uplink = client->image->uplink;
int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise
+ int existingType = -1; // ULR_* type of existing request
int i;
int freeSlot = -1;
const uint64_t end = start + length;
@@ -121,12 +122,13 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
for (i = 0; i < uplink->queueLen; ++i) {
if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i;
if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
- if ( foundExisting == -1 && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
+ if ( (foundExisting == -1 || existingType == ULR_PENDING) && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
foundExisting = i;
+ existingType = uplink->queue[i].status;
break;
}
}
- if ( freeSlot == -1 || freeSlot < foundExisting ) { // Second check: If we "attach" to a thread the request has to be added after the existing one, otherwise a race condition might occur where the now request will starve
+ if ( freeSlot == -1 ) {
if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
spin_unlock( &uplink->queueLock );
memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
@@ -134,6 +136,14 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
}
freeSlot = uplink->queueLen++;
}
+ // Do not send request to uplink server if we have a matching pending request AND the request either has the
+ // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise
+ // explicitly send this request to the uplink server. The second condition mentioned here is to prevent
+ // a race condition where the reply for the outstanding request already arrived and the uplink thread
+ // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might
+ // already have passed the index of the free slot we determined, but not reached the existing request we just found above.
+ if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1;
+ // Fill structure
uplink->queue[freeSlot].from = start;
uplink->queue[freeSlot].to = end;
uplink->queue[freeSlot].handle = handle;
@@ -144,7 +154,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
#endif
spin_unlock( &uplink->queueLock );
- if ( foundExisting == -1 ) {
+ if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
static uint64_t counter = 1;
write( uplink->signal, &counter, sizeof(uint64_t) );
}
@@ -304,7 +314,7 @@ static void* uplink_mainloop(void *data)
goto cleanup;
}
} else {
- // Not complete- do measurement
+ // Not complete - do measurement
altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
// Also send a keepalive packet to the currently connected server
if ( link->fd != -1 ) {
@@ -447,7 +457,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
spin_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
dnbd3_queued_request_t * const req = &link->queue[i];
- assert( req->status != ULR_PROCESSING );
+ assert( req->status != ULR_PROCESSING || req->status != ULR_NEW );
if ( req->status != ULR_PENDING ) continue;
if ( req->from >= start && req->to <= end ) { // Match :-)
req->status = ULR_PROCESSING;