diff options
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r-- | src/server/uplink.c | 47 |
1 files changed, 34 insertions, 13 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c index a8d2f0b..65cefb5 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -43,7 +43,7 @@ uint64_t uplink_getTotalBytesReceived() * image. Uplinks run in their own thread. * Locks on: _images[].lock */ -bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) +bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { if ( !_isProxy ) return false; dnbd3_connection_t *link = NULL; @@ -72,6 +72,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->betterFd = sock; link->betterServer = *host; link->rttTestResult = RTT_DOCHANGE; + link->betterVersion = version; } else { link->betterFd = -1; link->rttTestResult = RTT_IDLE; @@ -145,7 +146,7 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) * Request a chunk of data through an uplink server * Locks on: image.lock, uplink.queueLock */ -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length) +bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { if ( client == NULL || client->image == NULL ) return false; spin_lock( &client->image->lock ); @@ -161,9 +162,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin return false; } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain - if ( isSameAddress( &uplink->currentServer, &client->host ) ) { + // This might be a false positive if there are multiple instances running on the same host (IP) + if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { spin_unlock( &client->image->lock ); - logadd( LOG_DEBUG1, "Proxy cycle detected" ); + logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); return false; } @@ -171,19 +173,34 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin int existingType = -1; // ULR_* type of existing request int i; int freeSlot = -1; + bool requestLoop = false; const uint64_t end = start + length; spin_lock( &uplink->queueLock ); spin_unlock( &client->image->lock ); for (i = 0; i < uplink->queueLen; ++i) { - if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; + if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) { + freeSlot = i; + continue; + } if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( (foundExisting == -1 || existingType == ULR_PENDING) && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { - foundExisting = i; - existingType = uplink->queue[i].status; - break; + if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { + if ( hops > uplink->queue[i].hopCount ) { + requestLoop = true; + break; + } + if ( foundExisting == -1 || existingType == ULR_PENDING ) { + foundExisting = i; + existingType = uplink->queue[i].status; + if ( freeSlot != -1 ) break; + } } } + if ( requestLoop ) { + spin_unlock( &uplink->queueLock ); + logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); + return false; + } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { spin_unlock( &uplink->queueLock ); @@ -198,7 +215,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin // a race condition where the reply for the outstanding request already arrived and the uplink thread // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might // already have passed the index of the free slot we determined, but not reached the existing request we just found above. - if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; + if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request" #ifdef _DEBUG if ( foundExisting != -1 ) { logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot ); @@ -215,6 +232,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin uplink->queue[freeSlot].client = client; //int old = uplink->queue[freeSlot].status; uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING); + uplink->queue[freeSlot].hopCount = hops; #ifdef _DEBUG uplink->queue[freeSlot].entered = time( NULL ); //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end ); @@ -272,6 +290,7 @@ static void* uplink_mainloop(void *data) link->fd = link->betterFd; link->betterFd = -1; link->currentServer = link->betterServer; + link->version = link->betterVersion; spin_unlock( &link->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); @@ -439,8 +458,10 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) link->queue[j].status = ULR_PENDING; const uint64_t offset = link->queue[j].from; const uint32_t size = link->queue[j].to - link->queue[j].from; + uint8_t hops = link->queue[j].hopCount; spin_unlock( &link->queueLock ); - const int ret = dnbd3_get_block( link->fd, offset, size, offset ); + if ( hops < 200 ) hops += 1; + const int ret = dnbd3_get_block( link->fd, offset, size, offset, COND_HOPCOUNT( link->version, hops ) ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection @@ -488,7 +509,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) // Unlocked - do not break or continue here... const uint64_t offset = link->replicationHandle = (uint64_t)i * (uint64_t)requestBlockSize; const uint32_t size = MIN( image->realFilesize - offset, requestBlockSize ); - if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle ) ) { + if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); return; } @@ -644,7 +665,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link) */ static int uplink_sendKeepalive(const int fd) { - static dnbd3_request_t request = { 0, 0, 0, 0, 0 }; + static dnbd3_request_t request = { 0 }; if ( request.magic == 0 ) { request.magic = dnbd3_packet_magic; request.cmd = CMD_KEEPALIVE; |