From 5bc3badd013b88201da64dc970600d19451daaec Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 3 Mar 2020 14:55:01 +0100 Subject: [SERVER] Also add a flag for uplink queue overload --- src/server/globals.h | 3 ++- src/server/net.c | 10 +++------- src/server/uplink.c | 11 +++++++++++ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/server/globals.h b/src/server/globals.h index 31fbce5..0bd6e47 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -93,7 +93,7 @@ struct _dnbd3_uplink // If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block" uint64_t replicationHandle; // Handle of pending replication request atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup. - atomic_int queueLen; // length of queue + int queueLen; // length of queue uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives) dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; dnbd3_alt_local_t altData[SERVER_MAX_ALTS]; @@ -141,6 +141,7 @@ struct _dnbd3_image atomic_bool write; // Error writing to file atomic_bool read; // Error reading from file atomic_bool changed; // File disappeared or changed, thorough check required if it seems to be back + atomic_bool queue; // Too many requests waiting on uplink } problem; uint16_t rid; // revision of image pthread_mutex_t lock; diff --git a/src/server/net.c b/src/server/net.c index 29147be..a478e0c 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -269,12 +269,11 @@ void* net_handleNewConnection(void *clientPtr) // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; if ( image->ref_cacheMap != NULL ) { - dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); - if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) { + if ( image->problem.queue || image->problem.write ) { bOk = ( rand() % 4 ) == 1; } - if ( bOk && uplink != NULL ) { - if ( uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this + if ( bOk ) { + if ( image->problem.write ) { // Wait 100ms if local caching is not working so this usleep( 100000 ); // server gets a penalty and is less likely to be selected } if ( image->problem.uplink ) { @@ -282,9 +281,6 @@ void* net_handleNewConnection(void *clientPtr) usleep( ( 100 - image->completenessEstimate ) * 100 ); } } - if ( uplink != NULL ) { - ref_put( &uplink->reference ); - } } if ( bOk ) { mutex_lock( &image->lock ); diff --git a/src/server/uplink.c b/src/server/uplink.c index aba53ba..97cb2a9 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -118,6 +118,8 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version mutex_unlock( &uplink->sendMutex ); uplink->cycleDetected = false; image->problem.uplink = true; + image->problem.write = true; + image->problem.queue = false; if ( sock != -1 ) { uplink->better.fd = sock; int index = altservers_hostToIndex( host ); @@ -191,6 +193,7 @@ static void cancelAllRequests(dnbd3_uplink_t *uplink) } } uplink->queueLen = 0; + uplink->image->problem.queue = false; } static void uplink_free(ref *ref) @@ -328,6 +331,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin goto fail_lock; } freeSlot = uplink->queueLen++; + if ( freeSlot > SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = true; + } } // Do not send request to uplink server if we have a matching pending request AND the request either has the // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise @@ -904,6 +910,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) continue; // Success, retry write } if ( err == EBADF || err == EINVAL || err == EIO ) { + uplink->image->problem.write = true; if ( !tryAgain || !uplink_reopenCacheFd( uplink, true ) ) break; tryAgain = false; @@ -983,6 +990,9 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) } if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--; } + if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = false; + } mutex_unlock( &uplink->queueLock ); #ifdef _DEBUG if ( !served && start != uplink->replicationHandle ) { @@ -1121,6 +1131,7 @@ static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force) close( uplink->cacheFd ); } uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 ); + uplink->image->problem.write = uplink->cacheFd == -1; return uplink->cacheFd != -1; } -- cgit v1.2.3-55-g7522