diff options
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/altservers.c | 5 | ||||
-rw-r--r-- | src/server/globals.h | 9 | ||||
-rw-r--r-- | src/server/image.c | 6 | ||||
-rw-r--r-- | src/server/net.c | 25 | ||||
-rw-r--r-- | src/server/uplink.c | 47 | ||||
-rw-r--r-- | src/server/uplink.h | 4 |
6 files changed, 62 insertions, 34 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c index bf9d8f2..1e4af7e 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -407,6 +407,7 @@ static void *altservers_main(void *data UNUSED) // Test them all int bestSock = -1; int bestIndex = -1; + int bestProtocolVersion = -1; unsigned int bestRtt = 0xfffffff; unsigned int currentRtt = 0xfffffff; for (itAlt = 0; itAlt < numAlts; ++itAlt) { @@ -439,7 +440,7 @@ static void *altservers_main(void *data UNUSED) imageSize, image->virtualFilesize, image->name ); } // Request first block (NOT random!) ++++++++++++++++++++++++++++++ - if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0 ) ) { + if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { ERROR_GOTO( server_failed, "[RTT] Could not request first block for %s", image->name ); } // See if requesting the block succeeded ++++++++++++++++++++++ @@ -471,6 +472,7 @@ static void *altservers_main(void *data UNUSED) bestSock = sock; bestRtt = avg; bestIndex = itAlt; + bestProtocolVersion = protocolVersion; } else { // Was too slow, ignore close( sock ); @@ -492,6 +494,7 @@ static void *altservers_main(void *data UNUSED) spin_lock( &uplink->rttLock ); uplink->betterFd = bestSock; uplink->betterServer = servers[bestIndex]; + uplink->betterVersion = bestProtocolVersion; uplink->rttTestResult = RTT_DOCHANGE; spin_unlock( &uplink->rttLock ); signal_call( uplink->signal ); diff --git a/src/server/globals.h b/src/server/globals.h index 379fb8d..b1740f4 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -21,7 +21,7 @@ typedef struct _dnbd3_client dnbd3_client_t; // Must only be set in uplink_request() #define ULR_NEW 1 // Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. -// Must only be set in uplink_mainloop() +// Must only be set in uplink_mainloop() or uplink_request() #define ULR_PENDING 2 // Slot is being processed, do not consider for hop on. // Must only be set in uplink_handle_receive() @@ -34,6 +34,7 @@ typedef struct dnbd3_client_t * client; // Client to send reply to int status; // status of this entry: ULR_* time_t entered; // When this request entered the queue (for debugging) + uint8_t hopCount; // How many hops this request has already taken across proxies } dnbd3_queued_request_t; #define RTT_IDLE 0 // Not in progress @@ -44,16 +45,16 @@ typedef struct struct _dnbd3_connection { int fd; // socket fd to remote server + int version; // remote server protocol version dnbd3_signal_t* signal; // used to wake up the process pthread_t thread; // thread holding the connection pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. - dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; - int queueLen; // length of queue dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer dnbd3_host_t currentServer; // Current server we're connected to pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer int rttTestResult; // RTT_* dnbd3_host_t betterServer; // The better server + int betterVersion; // protocol version of better server int betterFd; // Active connection to better server, ready to use uint8_t *recvBuffer; // Buffer for receiving payload uint32_t recvBufferLen; // Len of ^^ @@ -62,6 +63,8 @@ struct _dnbd3_connection int nextReplicationIndex; // Which index in the cache map we should start looking for incomplete blocks at uint64_t replicationHandle; // Handle of pending replication request uint64_t bytesReceived; // Number of bytes received by the connection. + int queueLen; // length of queue + dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; }; typedef struct diff --git a/src/server/image.c b/src/server/image.c index 36e0f0c..78de2bb 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -430,7 +430,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) } } if ( candidate->uplink == NULL && candidate->cacheFd != -1 ) { - uplink_init( candidate, -1, NULL ); + uplink_init( candidate, -1, NULL, -1 ); } } @@ -916,7 +916,7 @@ static bool image_load(char *base, char *path, int withUplink) goto load_error; } if ( withUplink ) { - uplink_init( image, -1, NULL ); + uplink_init( image, -1, NULL, -1 ); } } @@ -1227,7 +1227,7 @@ server_fail: ; image = image_get( name, remoteRid, false ); if ( image != NULL && uplinkSock != -1 && uplinkServer != NULL ) { // If so, init the uplink and pass it the socket - if ( !uplink_init( image, uplinkSock, uplinkServer ) ) { + if ( !uplink_init( image, uplinkSock, uplinkServer, remoteProtocolVersion ) ) { close( uplinkSock ); } else { // Clumsy busy wait, but this should only take as long as it takes to start a thread, so is it really worth using a signalling mechanism? diff --git a/src/server/net.c b/src/server/net.c index 86be7ef..acb3dbe 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -294,8 +294,9 @@ void* net_handleNewConnection(void *clientPtr) if ( _shutdown ) break; switch ( request.cmd ) { - case CMD_GET_BLOCK: - if ( request.offset >= image->virtualFilesize ) { + case CMD_GET_BLOCK:; + const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking + if ( offset >= image->virtualFilesize ) { // Sanity check logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName ); reply.size = 0; @@ -303,7 +304,7 @@ void* net_handleNewConnection(void *clientPtr) send_reply( client->sock, &reply, NULL ); break; } - if ( request.offset + request.size > image->virtualFilesize ) { + if ( offset + request.size > image->virtualFilesize ) { // Sanity check logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName ); reply.size = 0; @@ -314,8 +315,8 @@ void* net_handleNewConnection(void *clientPtr) if ( request.size != 0 && image->cache_map != NULL ) { // This is a proxyed image, check if we need to relay the request... - start = request.offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - end = (request.offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); bool isCached = true; spin_lock( &image->lock ); // Check again as we only aquired the lock just now @@ -364,13 +365,13 @@ void* net_handleNewConnection(void *clientPtr) } spin_unlock( &image->lock ); if ( !isCached ) { - if ( !uplink_request( client, request.handle, request.offset, request.size ) ) { + if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", client->hostName, image->name, image->rid ); image->working = false; goto exit_client_cleanup; } - break; // DONE + break; // DONE, exit request.cmd switch } } @@ -391,16 +392,16 @@ void* net_handleNewConnection(void *clientPtr) if ( request.size != 0 ) { // Send payload if request length > 0 size_t done = 0; - off_t offset = (off_t)request.offset; + off_t foffset = (off_t)offset; size_t realBytes; - if ( request.offset + request.size <= image->realFilesize ) { + if ( offset + request.size <= image->realFilesize ) { realBytes = request.size; } else { - realBytes = image->realFilesize - request.offset; + realBytes = image->realFilesize - offset; } while ( done < realBytes ) { #ifdef __linux__ - const ssize_t ret = sendfile( client->sock, image_file, &offset, realBytes - done ); + const ssize_t ret = sendfile( client->sock, image_file, &foffset, realBytes - done ); if ( ret <= 0 ) { const int err = errno; if ( lock ) pthread_mutex_unlock( &client->sendMutex ); @@ -420,7 +421,7 @@ void* net_handleNewConnection(void *clientPtr) done += ret; #elif defined(__FreeBSD__) off_t sent; - int ret = sendfile( image_file, client->sock, offset, realBytes - done, NULL, &sent, 0 ); + int ret = sendfile( image_file, client->sock, foffset, realBytes - done, NULL, &sent, 0 ); const int err = errno; if ( ret < 0 ) { if ( err == EAGAIN ) { diff --git a/src/server/uplink.c b/src/server/uplink.c index a8d2f0b..65cefb5 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -43,7 +43,7 @@ uint64_t uplink_getTotalBytesReceived() * image. Uplinks run in their own thread. * Locks on: _images[].lock */ -bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) +bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { if ( !_isProxy ) return false; dnbd3_connection_t *link = NULL; @@ -72,6 +72,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->betterFd = sock; link->betterServer = *host; link->rttTestResult = RTT_DOCHANGE; + link->betterVersion = version; } else { link->betterFd = -1; link->rttTestResult = RTT_IDLE; @@ -145,7 +146,7 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) * Request a chunk of data through an uplink server * Locks on: image.lock, uplink.queueLock */ -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length) +bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { if ( client == NULL || client->image == NULL ) return false; spin_lock( &client->image->lock ); @@ -161,9 +162,10 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin return false; } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain - if ( isSameAddress( &uplink->currentServer, &client->host ) ) { + // This might be a false positive if there are multiple instances running on the same host (IP) + if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { spin_unlock( &client->image->lock ); - logadd( LOG_DEBUG1, "Proxy cycle detected" ); + logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); return false; } @@ -171,19 +173,34 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin int existingType = -1; // ULR_* type of existing request int i; int freeSlot = -1; + bool requestLoop = false; const uint64_t end = start + length; spin_lock( &uplink->queueLock ); spin_unlock( &client->image->lock ); for (i = 0; i < uplink->queueLen; ++i) { - if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; + if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) { + freeSlot = i; + continue; + } if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( (foundExisting == -1 || existingType == ULR_PENDING) && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { - foundExisting = i; - existingType = uplink->queue[i].status; - break; + if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { + if ( hops > uplink->queue[i].hopCount ) { + requestLoop = true; + break; + } + if ( foundExisting == -1 || existingType == ULR_PENDING ) { + foundExisting = i; + existingType = uplink->queue[i].status; + if ( freeSlot != -1 ) break; + } } } + if ( requestLoop ) { + spin_unlock( &uplink->queueLock ); + logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); + return false; + } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { spin_unlock( &uplink->queueLock ); @@ -198,7 +215,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin // a race condition where the reply for the outstanding request already arrived and the uplink thread // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might // already have passed the index of the free slot we determined, but not reached the existing request we just found above. - if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; + if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request" #ifdef _DEBUG if ( foundExisting != -1 ) { logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot ); @@ -215,6 +232,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin uplink->queue[freeSlot].client = client; //int old = uplink->queue[freeSlot].status; uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING); + uplink->queue[freeSlot].hopCount = hops; #ifdef _DEBUG uplink->queue[freeSlot].entered = time( NULL ); //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end ); @@ -272,6 +290,7 @@ static void* uplink_mainloop(void *data) link->fd = link->betterFd; link->betterFd = -1; link->currentServer = link->betterServer; + link->version = link->betterVersion; spin_unlock( &link->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); @@ -439,8 +458,10 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) link->queue[j].status = ULR_PENDING; const uint64_t offset = link->queue[j].from; const uint32_t size = link->queue[j].to - link->queue[j].from; + uint8_t hops = link->queue[j].hopCount; spin_unlock( &link->queueLock ); - const int ret = dnbd3_get_block( link->fd, offset, size, offset ); + if ( hops < 200 ) hops += 1; + const int ret = dnbd3_get_block( link->fd, offset, size, offset, COND_HOPCOUNT( link->version, hops ) ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection @@ -488,7 +509,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) // Unlocked - do not break or continue here... const uint64_t offset = link->replicationHandle = (uint64_t)i * (uint64_t)requestBlockSize; const uint32_t size = MIN( image->realFilesize - offset, requestBlockSize ); - if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle ) ) { + if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); return; } @@ -644,7 +665,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link) */ static int uplink_sendKeepalive(const int fd) { - static dnbd3_request_t request = { 0, 0, 0, 0, 0 }; + static dnbd3_request_t request = { 0 }; if ( request.magic == 0 ) { request.magic = dnbd3_packet_magic; request.cmd = CMD_KEEPALIVE; diff --git a/src/server/uplink.h b/src/server/uplink.h index c8cf4eb..2b41dfc 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -8,11 +8,11 @@ void uplink_globalsInit(); uint64_t uplink_getTotalBytesReceived(); -bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host); +bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version); void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client); -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length); +bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount); void uplink_shutdown(dnbd3_image_t *image); |