summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c47
1 files changed, 34 insertions, 13 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index a8d2f0b..65cefb5 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -43,7 +43,7 @@ uint64_t uplink_getTotalBytesReceived()
* image. Uplinks run in their own thread.
* Locks on: _images[].lock
*/
-bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
+bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
if ( !_isProxy ) return false;
dnbd3_connection_t *link = NULL;
@@ -72,6 +72,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
link->betterFd = sock;
link->betterServer = *host;
link->rttTestResult = RTT_DOCHANGE;
+ link->betterVersion = version;
} else {
link->betterFd = -1;
link->rttTestResult = RTT_IDLE;
@@ -145,7 +146,7 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
* Request a chunk of data through an uplink server
* Locks on: image.lock, uplink.queueLock
*/
-bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
+bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
if ( client == NULL || client->image == NULL ) return false;
spin_lock( &client->image->lock );
@@ -161,9 +162,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
return false;
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
- if ( isSameAddress( &uplink->currentServer, &client->host ) ) {
+ // This might be a false positive if there are multiple instances running on the same host (IP)
+ if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) {
spin_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Proxy cycle detected" );
+ logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
return false;
}
@@ -171,19 +173,34 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
int existingType = -1; // ULR_* type of existing request
int i;
int freeSlot = -1;
+ bool requestLoop = false;
const uint64_t end = start + length;
spin_lock( &uplink->queueLock );
spin_unlock( &client->image->lock );
for (i = 0; i < uplink->queueLen; ++i) {
- if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i;
+ if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) {
+ freeSlot = i;
+ continue;
+ }
if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
- if ( (foundExisting == -1 || existingType == ULR_PENDING) && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
- foundExisting = i;
- existingType = uplink->queue[i].status;
- break;
+ if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
+ if ( hops > uplink->queue[i].hopCount ) {
+ requestLoop = true;
+ break;
+ }
+ if ( foundExisting == -1 || existingType == ULR_PENDING ) {
+ foundExisting = i;
+ existingType = uplink->queue[i].status;
+ if ( freeSlot != -1 ) break;
+ }
}
}
+ if ( requestLoop ) {
+ spin_unlock( &uplink->queueLock );
+ logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
+ return false;
+ }
if ( freeSlot == -1 ) {
if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
spin_unlock( &uplink->queueLock );
@@ -198,7 +215,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
// a race condition where the reply for the outstanding request already arrived and the uplink thread
// is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might
// already have passed the index of the free slot we determined, but not reached the existing request we just found above.
- if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1;
+ if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request"
#ifdef _DEBUG
if ( foundExisting != -1 ) {
logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot );
@@ -215,6 +232,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
uplink->queue[freeSlot].client = client;
//int old = uplink->queue[freeSlot].status;
uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING);
+ uplink->queue[freeSlot].hopCount = hops;
#ifdef _DEBUG
uplink->queue[freeSlot].entered = time( NULL );
//logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end );
@@ -272,6 +290,7 @@ static void* uplink_mainloop(void *data)
link->fd = link->betterFd;
link->betterFd = -1;
link->currentServer = link->betterServer;
+ link->version = link->betterVersion;
spin_unlock( &link->rttLock );
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
@@ -439,8 +458,10 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
link->queue[j].status = ULR_PENDING;
const uint64_t offset = link->queue[j].from;
const uint32_t size = link->queue[j].to - link->queue[j].from;
+ uint8_t hops = link->queue[j].hopCount;
spin_unlock( &link->queueLock );
- const int ret = dnbd3_get_block( link->fd, offset, size, offset );
+ if ( hops < 200 ) hops += 1;
+ const int ret = dnbd3_get_block( link->fd, offset, size, offset, COND_HOPCOUNT( link->version, hops ) );
if ( !ret ) {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
@@ -488,7 +509,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
// Unlocked - do not break or continue here...
const uint64_t offset = link->replicationHandle = (uint64_t)i * (uint64_t)requestBlockSize;
const uint32_t size = MIN( image->realFilesize - offset, requestBlockSize );
- if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle ) ) {
+ if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
return;
}
@@ -644,7 +665,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
*/
static int uplink_sendKeepalive(const int fd)
{
- static dnbd3_request_t request = { 0, 0, 0, 0, 0 };
+ static dnbd3_request_t request = { 0 };
if ( request.magic == 0 ) {
request.magic = dnbd3_packet_magic;
request.cmd = CMD_KEEPALIVE;