From 4209bbef4b1e1b6a6af28407552c8518f3f72125 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 15 Aug 2013 20:49:06 +0200 Subject: [SERVER] (Hopefully) fix starving uplink requests --- src/server/altservers.c | 39 ++++++++++++++++++++++++--------------- src/server/uplink.c | 20 +++++++++++++++----- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/server/altservers.c b/src/server/altservers.c index a98bedc..da41857 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -108,6 +108,7 @@ int altservers_add(dnbd3_host_t *host, const char *comment) void altserver_find_uplink(dnbd3_connection_t *uplink) { int i; + assert( uplink->betterFd == -1 ); spin_lock( &pendingLock ); if ( uplink->rttTestResult == RTT_INPROGRESS ) { for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { @@ -382,16 +383,18 @@ static void *altserver_main(void *data) iov[0].iov_len = sizeof(request); iov[1].iov_base = &serialized; iov[1].iov_len = len; - if ( writev( sock, iov, 2 ) != len + sizeof(request) ) goto server_failed; + if ( writev( sock, iov, 2 ) != len + sizeof(request) ) { + goto server_failed; + } // See if selecting the image succeeded ++++++++++++++++++++++++++++++ if ( recv( sock, &reply, sizeof(reply), MSG_WAITALL ) != sizeof(reply) ) { - //ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header after CMD_SELECT_IMAGE (%s)", - // uplink->image->lower_name ); goto server_failed; } // check reply header fixup_reply( reply ); - if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD || reply.magic != dnbd3_packet_magic ) goto server_failed; + if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD || reply.magic != dnbd3_packet_magic ) { + goto server_failed; + } // Not found // receive reply payload if ( recv( sock, &serialized, reply.size, MSG_WAITALL ) != reply.size ) { @@ -402,19 +405,22 @@ static void *altserver_main(void *data) const uint16_t protocol_version = serializer_get_uint16( &serialized ); if ( protocol_version < MIN_SUPPORTED_SERVER ) goto server_failed; const char *name = serializer_get_string( &serialized ); - if ( strcmp( name, uplink->image->lower_name ) != 0 ) { + if ( name == NULL || strcmp( name, uplink->image->lower_name ) != 0 ) { ERROR_GOTO_VA( server_failed, "[ERROR] Server offers image '%s', requested '%s'", name, uplink->image->lower_name ); } const uint16_t rid = serializer_get_uint16( &serialized ); - if ( rid != uplink->image->rid ) ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)", - (int)rid, (int)uplink->image->rid, uplink->image->lower_name ); + if ( rid != uplink->image->rid ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)", + (int)rid, (int)uplink->image->rid, uplink->image->lower_name ); + } const uint64_t image_size = serializer_get_uint64( &serialized ); - if ( image_size != uplink->image->filesize ) ERROR_GOTO_VA( server_failed, - "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)", - image_size, uplink->image->filesize, uplink->image->lower_name ); + if ( image_size != uplink->image->filesize ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)", + image_size, uplink->image->filesize, uplink->image->lower_name ); + } // Request random block ++++++++++++++++++++++++++++++ request.cmd = CMD_GET_BLOCK; - request.offset = (uplink->image->filesize - 1) & ~(DNBD3_BLOCK_SIZE - 1); + request.offset = (((uint64_t)start.tv_nsec | (uint64_t)rand()) * DNBD3_BLOCK_SIZE ) % uplink->image->filesize; request.size = DNBD3_BLOCK_SIZE; fixup_request( request ); if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) ERROR_GOTO_VA( server_failed, @@ -429,10 +435,13 @@ static void *altserver_main(void *data) } // check reply header fixup_reply( reply ); - if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed, - "[ERROR] Reply to random block request is %d bytes for %s", reply.size, uplink->image->lower_name ); - if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) ERROR_GOTO_VA( server_failed, - "[ERROR] Could not read random block from socket for %s", uplink->image->lower_name ); + if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Reply to random block request is %d bytes for %s", + reply.size, uplink->image->lower_name ); + } + if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Could not read random block from socket for %s", uplink->image->lower_name ); + } clock_gettime( CLOCK_MONOTONIC_RAW, &end ); // Measurement done - everything fine so far const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs diff --git a/src/server/uplink.c b/src/server/uplink.c index 6343ec3..38b1415 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -112,6 +112,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint } dnbd3_connection_t * const uplink = client->image->uplink; int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise + int existingType = -1; // ULR_* type of existing request int i; int freeSlot = -1; const uint64_t end = start + length; @@ -121,12 +122,13 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint for (i = 0; i < uplink->queueLen; ++i) { if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( foundExisting == -1 && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { + if ( (foundExisting == -1 || existingType == ULR_PENDING) && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { foundExisting = i; + existingType = uplink->queue[i].status; break; } } - if ( freeSlot == -1 || freeSlot < foundExisting ) { // Second check: If we "attach" to a thread the request has to be added after the existing one, otherwise a race condition might occur where the now request will starve + if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { spin_unlock( &uplink->queueLock ); memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); @@ -134,6 +136,14 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint } freeSlot = uplink->queueLen++; } + // Do not send request to uplink server if we have a matching pending request AND the request either has the + // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise + // explicitly send this request to the uplink server. The second condition mentioned here is to prevent + // a race condition where the reply for the outstanding request already arrived and the uplink thread + // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might + // already have passed the index of the free slot we determined, but not reached the existing request we just found above. + if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; + // Fill structure uplink->queue[freeSlot].from = start; uplink->queue[freeSlot].to = end; uplink->queue[freeSlot].handle = handle; @@ -144,7 +154,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint #endif spin_unlock( &uplink->queueLock ); - if ( foundExisting == -1 ) { + if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed static uint64_t counter = 1; write( uplink->signal, &counter, sizeof(uint64_t) ); } @@ -304,7 +314,7 @@ static void* uplink_mainloop(void *data) goto cleanup; } } else { - // Not complete- do measurement + // Not complete - do measurement altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) // Also send a keepalive packet to the currently connected server if ( link->fd != -1 ) { @@ -447,7 +457,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; - assert( req->status != ULR_PROCESSING ); + assert( req->status != ULR_PROCESSING || req->status != ULR_NEW ); if ( req->status != ULR_PENDING ) continue; if ( req->from >= start && req->to <= end ) { // Match :-) req->status = ULR_PROCESSING; -- cgit v1.2.3-55-g7522