summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c1034
1 files changed, 1034 insertions, 0 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
new file mode 100644
index 0000000..31b220d
--- /dev/null
+++ b/src/server/uplink.c
@@ -0,0 +1,1034 @@
+#include "uplink.h"
+#include "helper.h"
+#include "locks.h"
+#include "image.h"
+#include "altservers.h"
+#include "../shared/sockhelper.h"
+#include "../shared/protocol.h"
+#include "../shared/timing.h"
+#include "../shared/crc32.h"
+
+#include <assert.h>
+#include <inttypes.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <unistd.h>
+#include <stdatomic.h>
+
+#define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 )
+#define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE )
+#define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) )
+
+#define REP_NONE ( (uint64_t)0xffffffffffffffff )
+
+static atomic_uint_fast64_t totalBytesReceived = 0;
+
+static void* uplink_mainloop(void *data);
+static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly);
+static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int lastBlockIndex);
+static void uplink_handleReceive(dnbd3_connection_t *link);
+static int uplink_sendKeepalive(const int fd);
+static void uplink_addCrc32(dnbd3_connection_t *uplink);
+static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
+static bool uplink_saveCacheMap(dnbd3_connection_t *link);
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link);
+static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew);
+
+// ############ uplink connection handling
+
+void uplink_globalsInit()
+{
+}
+
+uint64_t uplink_getTotalBytesReceived()
+{
+ return (uint64_t)totalBytesReceived;
+}
+
+/**
+ * Create and initialize an uplink instance for the given
+ * image. Uplinks run in their own thread.
+ * Locks on: _images[].lock
+ */
+bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
+{
+ if ( !_isProxy || _shutdown ) return false;
+ dnbd3_connection_t *link = NULL;
+ assert( image != NULL );
+ spin_lock( &image->lock );
+ if ( image->uplink != NULL && !image->uplink->shutdown ) {
+ spin_unlock( &image->lock );
+ if ( sock >= 0 ) close( sock );
+ return true; // There's already an uplink, so should we consider this success or failure?
+ }
+ if ( image->cache_map == NULL ) {
+ logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name );
+ goto failure;
+ }
+ link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
+ spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
+ link->image = image;
+ link->bytesReceived = 0;
+ link->idleTime = 0;
+ link->queueLen = 0;
+ link->fd = -1;
+ link->cacheFd = -1;
+ link->signal = NULL;
+ link->replicationHandle = REP_NONE;
+ spin_lock( &link->rttLock );
+ link->cycleDetected = false;
+ if ( sock >= 0 ) {
+ link->betterFd = sock;
+ link->betterServer = *host;
+ link->rttTestResult = RTT_DOCHANGE;
+ link->betterVersion = version;
+ } else {
+ link->betterFd = -1;
+ link->rttTestResult = RTT_IDLE;
+ }
+ spin_unlock( &link->rttLock );
+ link->recvBufferLen = 0;
+ link->shutdown = false;
+ if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) {
+ logadd( LOG_ERROR, "Could not start thread for new uplink." );
+ goto failure;
+ }
+ spin_unlock( &image->lock );
+ return true;
+failure: ;
+ if ( link != NULL ) {
+ free( link );
+ link = image->uplink = NULL;
+ }
+ spin_unlock( &image->lock );
+ return false;
+}
+
+/**
+ * Locks on image.lock, uplink.lock
+ * Calling it multiple times, even concurrently, will
+ * not break anything.
+ */
+void uplink_shutdown(dnbd3_image_t *image)
+{
+ bool join = false;
+ pthread_t thread;
+ assert( image != NULL );
+ spin_lock( &image->lock );
+ if ( image->uplink == NULL ) {
+ spin_unlock( &image->lock );
+ return;
+ }
+ dnbd3_connection_t * const uplink = image->uplink;
+ spin_lock( &uplink->queueLock );
+ if ( !uplink->shutdown ) {
+ uplink->shutdown = true;
+ signal_call( uplink->signal );
+ thread = uplink->thread;
+ join = true;
+ }
+ spin_unlock( &uplink->queueLock );
+ bool wait = image->uplink != NULL;
+ spin_unlock( &image->lock );
+ if ( join ) thread_join( thread, NULL );
+ while ( wait ) {
+ usleep( 5000 );
+ spin_lock( &image->lock );
+ wait = image->uplink != NULL && image->uplink->shutdown;
+ spin_unlock( &image->lock );
+ }
+}
+
+/**
+ * Remove given client from uplink request queue
+ * Locks on: uplink.queueLock
+ */
+void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
+{
+ spin_lock( &uplink->queueLock );
+ for (int i = uplink->queueLen - 1; i >= 0; --i) {
+ if ( uplink->queue[i].client == client ) {
+ uplink->queue[i].client = NULL;
+ uplink->queue[i].status = ULR_FREE;
+ }
+ if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--;
+ }
+ spin_unlock( &uplink->queueLock );
+}
+
+/**
+ * Request a chunk of data through an uplink server
+ * Locks on: image.lock, uplink.queueLock
+ */
+bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+{
+ if ( client == NULL || client->image == NULL ) return false;
+ if ( length > (uint32_t)_maxPayload ) {
+ logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
+ return false;
+ }
+ spin_lock( &client->image->lock );
+ if ( client->image->uplink == NULL ) {
+ spin_unlock( &client->image->lock );
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
+ }
+ dnbd3_connection_t * const uplink = client->image->uplink;
+ if ( uplink->shutdown ) {
+ spin_unlock( &client->image->lock );
+ logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
+ return false;
+ }
+ // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
+ // This might be a false positive if there are multiple instances running on the same host (IP)
+ if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) {
+ spin_unlock( &client->image->lock );
+ logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
+ spin_lock( &uplink->rttLock );
+ uplink->cycleDetected = true;
+ spin_unlock( &uplink->rttLock );
+ signal_call( uplink->signal );
+ return false;
+ }
+
+ int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise
+ int existingType = -1; // ULR_* type of existing request
+ int i;
+ int freeSlot = -1;
+ bool requestLoop = false;
+ const uint64_t end = start + length;
+
+ spin_lock( &uplink->queueLock );
+ spin_unlock( &client->image->lock );
+ for (i = 0; i < uplink->queueLen; ++i) {
+ if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) {
+ freeSlot = i;
+ continue;
+ }
+ if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
+ if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
+ if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end ) {
+ requestLoop = true;
+ break;
+ }
+ if ( foundExisting == -1 || existingType == ULR_PENDING ) {
+ foundExisting = i;
+ existingType = uplink->queue[i].status;
+ if ( freeSlot != -1 ) break;
+ }
+ }
+ }
+ if ( requestLoop ) {
+ spin_unlock( &uplink->queueLock );
+ logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops );
+ spin_lock( &uplink->rttLock );
+ uplink->cycleDetected = true;
+ spin_unlock( &uplink->rttLock );
+ signal_call( uplink->signal );
+ return false;
+ }
+ if ( freeSlot == -1 ) {
+ if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
+ spin_unlock( &uplink->queueLock );
+ logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
+ return false;
+ }
+ freeSlot = uplink->queueLen++;
+ }
+ // Do not send request to uplink server if we have a matching pending request AND the request either has the
+ // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise
+ // explicitly send this request to the uplink server. The second condition mentioned here is to prevent
+ // a race condition where the reply for the outstanding request already arrived and the uplink thread
+ // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might
+ // already have passed the index of the free slot we determined, but not reached the existing request we just found above.
+ if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request"
+#ifdef _DEBUG
+ if ( foundExisting != -1 ) {
+ logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot );
+ logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n"
+ "New %" PRIu64 "-%" PRIu64 " (%p)\n",
+ uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client,
+ start, end, (void*)client );
+ }
+#endif
+ // Fill structure
+ uplink->queue[freeSlot].from = start;
+ uplink->queue[freeSlot].to = end;
+ uplink->queue[freeSlot].handle = handle;
+ uplink->queue[freeSlot].client = client;
+ //int old = uplink->queue[freeSlot].status;
+ uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING);
+ uplink->queue[freeSlot].hopCount = hops;
+#ifdef _DEBUG
+ timing_get( &uplink->queue[freeSlot].entered );
+ //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end );
+#endif
+ spin_unlock( &uplink->queueLock );
+
+ if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
+ if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
+ }
+ }
+ return true;
+}
+
+/**
+ * Uplink thread.
+ * Locks are irrelevant as this is never called from another function
+ */
+static void* uplink_mainloop(void *data)
+{
+#define EV_SIGNAL (0)
+#define EV_SOCKET (1)
+#define EV_COUNT (2)
+ struct pollfd events[EV_COUNT];
+ dnbd3_connection_t * const link = (dnbd3_connection_t*)data;
+ int numSocks, i, waitTime;
+ int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
+ uint32_t discoverFailCount = 0;
+ uint32_t unsavedSeconds = 0;
+ ticks nextAltCheck, lastKeepalive;
+ char buffer[200];
+ memset( events, 0, sizeof(events) );
+ timing_get( &nextAltCheck );
+ lastKeepalive = nextAltCheck;
+ //
+ assert( link != NULL );
+ setThreadName( "idle-uplink" );
+ blockNoncriticalSignals();
+ // Make sure file is open for writing
+ if ( !uplink_reopenCacheFd( link, false ) ) {
+ // It might have failed - still offer proxy mode, we just can't cache
+ logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno );
+ }
+ //
+ link->signal = signal_new();
+ if ( link->signal == NULL ) {
+ logadd( LOG_WARNING, "error creating signal. Uplink unavailable." );
+ goto cleanup;
+ }
+ events[EV_SIGNAL].events = POLLIN;
+ events[EV_SIGNAL].fd = signal_getWaitFd( link->signal );
+ events[EV_SOCKET].fd = -1;
+ while ( !_shutdown && !link->shutdown ) {
+ // poll()
+ spin_lock( &link->rttLock );
+ waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1;
+ spin_unlock( &link->rttLock );
+ if ( waitTime == 0 ) {
+ // Nothing
+ } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
+ waitTime = 1000;
+ } else {
+ declare_now;
+ waitTime = (int)timing_diffMs( &now, &nextAltCheck );
+ if ( waitTime < 100 ) waitTime = 100;
+ if ( waitTime > 5000 ) waitTime = 5000;
+ }
+ events[EV_SOCKET].fd = link->fd;
+ numSocks = poll( events, EV_COUNT, waitTime );
+ if ( _shutdown || link->shutdown ) goto cleanup;
+ if ( numSocks == -1 ) { // Error?
+ if ( errno == EINTR ) continue;
+ logadd( LOG_DEBUG1, "poll() error %d", (int)errno );
+ usleep( 10000 );
+ continue;
+ }
+ // Check if server switch is in order
+ spin_lock( &link->rttLock );
+ if ( link->rttTestResult != RTT_DOCHANGE ) {
+ spin_unlock( &link->rttLock );
+ } else {
+ link->rttTestResult = RTT_IDLE;
+ // The rttTest worker thread has finished our request.
+ // And says it's better to switch to another server
+ const int fd = link->fd;
+ link->fd = link->betterFd;
+ link->betterFd = -1;
+ link->currentServer = link->betterServer;
+ link->version = link->betterVersion;
+ link->cycleDetected = false;
+ spin_unlock( &link->rttLock );
+ discoverFailCount = 0;
+ if ( fd != -1 ) close( fd );
+ link->replicationHandle = REP_NONE;
+ link->image->working = true;
+ link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
+ buffer[0] = '@';
+ if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
+ logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", link->image->name, buffer + 1 );
+ setThreadName( buffer );
+ }
+ // If we don't have a crc32 list yet, see if the new server has one
+ if ( link->image->crc32 == NULL ) {
+ uplink_addCrc32( link );
+ }
+ // Re-send all pending requests
+ uplink_sendRequests( link, false );
+ uplink_sendReplicationRequest( link );
+ events[EV_SOCKET].events = POLLIN | POLLRDHUP;
+ timing_gets( &nextAltCheck, altCheckInterval );
+ // The rtt worker already did the handshake for our image, so there's nothing
+ // more to do here
+ }
+ // Check events
+ // Signal
+ if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
+ goto cleanup;
+ } else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
+ // signal triggered -> pending requests
+ if ( signal_clear( link->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name );
+ }
+ if ( link->fd != -1 ) {
+ // Uplink seems fine, relay requests to it...
+ uplink_sendRequests( link, true );
+ } else { // No uplink; maybe it was shutdown since it was idle for too long
+ link->idleTime = 0;
+ }
+ }
+ // Uplink socket
+ if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ uplink_connectionFailed( link, true );
+ logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
+ setThreadName( "panic-uplink" );
+ } else if ( (events[EV_SOCKET].revents & POLLIN) ) {
+ uplink_handleReceive( link );
+ if ( _shutdown || link->shutdown ) goto cleanup;
+ }
+ declare_now;
+ uint32_t timepassed = timing_diff( &lastKeepalive, &now );
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ lastKeepalive = now;
+ link->idleTime += timepassed;
+ unsavedSeconds += timepassed;
+ if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) {
+ // fsync/save every 4 minutes, or every 60 seconds if link is idle
+ unsavedSeconds = 0;
+ uplink_saveCacheMap( link );
+ }
+ // Keep-alive
+ if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
+ // Send keep-alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ uplink_connectionFailed( link, true );
+ logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
+ setThreadName( "panic-uplink" );
+ }
+ }
+ // Don't keep link established if we're idle for too much
+ if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
+ close( link->fd );
+ link->fd = events[EV_SOCKET].fd = -1;
+ link->cycleDetected = false;
+ if ( link->recvBufferLen != 0 ) {
+ link->recvBufferLen = 0;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ }
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid );
+ setThreadName( "idle-uplink" );
+ }
+ }
+ // See if we should trigger an RTT measurement
+ spin_lock( &link->rttLock );
+ const int rttTestResult = link->rttTestResult;
+ spin_unlock( &link->rttLock );
+ if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
+ // It seems it's time for a check
+ if ( image_isComplete( link->image ) ) {
+ // Quit work if image is complete
+ logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
+ setThreadName( "finished-uplink" );
+ goto cleanup;
+ } else if ( !uplink_connectionShouldShutdown( link ) ) {
+ // Not complete - do measurement
+ altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
+ link->nextReplicationIndex = 0;
+ }
+ }
+ altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
+ timing_set( &nextAltCheck, &now, altCheckInterval );
+ }
+ } else if ( rttTestResult == RTT_NOT_REACHABLE ) {
+ spin_lock( &link->rttLock );
+ link->rttTestResult = RTT_IDLE;
+ spin_unlock( &link->rttLock );
+ discoverFailCount++;
+ timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
+ }
+#ifdef _DEBUG
+ if ( link->fd != -1 && !link->shutdown ) {
+ bool resend = false;
+ ticks deadline;
+ timing_set( &deadline, &now, -10 );
+ spin_lock( &link->queueLock );
+ for (i = 0; i < link->queueLen; ++i) {
+ if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) {
+ snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
+ "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, link->queue[i].client->image->name,
+ link->queue[i].from, link->queue[i].to, link->queue[i].status );
+ link->queue[i].entered = now;
+#ifdef _DEBUG_RESEND_STARVING
+ link->queue[i].status = ULR_NEW;
+ resend = true;
+#endif
+ spin_unlock( &link->queueLock );
+ logadd( LOG_WARNING, "%s", buffer );
+ spin_lock( &link->queueLock );
+ }
+ }
+ spin_unlock( &link->queueLock );
+ if ( resend )
+ uplink_sendRequests( link, true );
+ }
+#endif
+ }
+ cleanup: ;
+ altservers_removeUplink( link );
+ uplink_saveCacheMap( link );
+ spin_lock( &link->image->lock );
+ if ( link->image->uplink == link ) {
+ link->image->uplink = NULL;
+ }
+ spin_lock( &link->queueLock );
+ const int fd = link->fd;
+ const dnbd3_signal_t* signal = link->signal;
+ link->fd = -1;
+ link->signal = NULL;
+ if ( !link->shutdown ) {
+ link->shutdown = true;
+ thread_detach( link->thread );
+ }
+ // Do not access link->image after unlocking, since we set
+ // image->uplink to NULL. Acquire with image_lock first,
+ // like done below when checking whether to re-init uplink
+ spin_unlock( &link->image->lock );
+ spin_unlock( &link->queueLock );
+ if ( fd != -1 ) close( fd );
+ if ( signal != NULL ) signal_close( signal );
+ // Wait for the RTT check to finish/fail if it's in progress
+ while ( link->rttTestResult == RTT_INPROGRESS )
+ usleep( 10000 );
+ if ( link->betterFd != -1 ) {
+ close( link->betterFd );
+ }
+ spin_destroy( &link->queueLock );
+ spin_destroy( &link->rttLock );
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ if ( link->cacheFd != -1 ) {
+ close( link->cacheFd );
+ }
+ dnbd3_image_t *image = image_lock( link->image );
+ free( link ); // !!!
+ if ( image != NULL ) {
+ if ( !_shutdown && image->cache_map != NULL ) {
+ // Ingegrity checker must have found something in the meantime
+ uplink_init( image, -1, NULL, 0 );
+ }
+ image_release( image );
+ }
+ return NULL ;
+}
+
+static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
+{
+ // Scan for new requests
+ int j;
+ spin_lock( &link->queueLock );
+ for (j = 0; j < link->queueLen; ++j) {
+ if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
+ link->queue[j].status = ULR_PENDING;
+ uint8_t hops = link->queue[j].hopCount;
+ const uint64_t reqStart = link->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint32_t reqSize = (uint32_t)(((link->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
+ /*
+ logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
+ (void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize );
+ */
+ spin_unlock( &link->queueLock );
+ if ( hops < 200 ) ++hops;
+ const int ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
+ if ( !ret ) {
+ // Non-critical - if the connection dropped or the server was changed
+ // the thread will re-send this request as soon as the connection
+ // is reestablished.
+ logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
+ altservers_serverFailed( &link->currentServer );
+ return;
+ }
+ spin_lock( &link->queueLock );
+ }
+ spin_unlock( &link->queueLock );
+}
+
+/**
+ * Send a block request to an uplink server without really having
+ * any client that needs that data. This will be used for background replication.
+ *
+ * We'll go through the cache map of the image and look for bytes that don't have
+ * all bits set. We then request the corresponding 8 blocks of 4kb from the uplink
+ * server. This means we might request data we already have, but it makes
+ * the code simpler. Worst case would be only one bit is zero, which means
+ * 4kb are missing, but we will request 32kb.
+ */
+static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
+{
+ if ( link == NULL || link->fd == -1 ) return;
+ if ( _backgroundReplication == BGR_DISABLED || link->cacheFd == -1 ) return; // Don't do background replication
+ if ( link->nextReplicationIndex == -1 || link->replicationHandle != REP_NONE )
+ return;
+ dnbd3_image_t * const image = link->image;
+ if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
+ spin_lock( &image->lock );
+ if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) {
+ // No cache map (=image complete), or replication pending, or not enough users, do nothing
+ spin_unlock( &image->lock );
+ return;
+ }
+ const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
+ const int lastBlockIndex = mapBytes - 1;
+ int endByte;
+ if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
+ endByte = link->nextReplicationIndex + mapBytes;
+ } else { // Hashblock based: Only look for match in current hash block
+ endByte = ( link->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
+ if ( endByte > mapBytes ) {
+ endByte = mapBytes;
+ }
+ }
+ int replicationIndex = -1;
+ for ( int j = link->nextReplicationIndex; j < endByte; ++j ) {
+ const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
+ if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !link->replicatedLastBlock ) ) {
+ // Found incomplete one
+ replicationIndex = i;
+ break;
+ }
+ }
+ spin_unlock( &image->lock );
+ if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
+ // Nothing left in current block, find next one
+ replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte );
+ }
+ if ( replicationIndex == -1 ) {
+ // Replication might be complete, uplink_mainloop should take care....
+ link->nextReplicationIndex = -1;
+ return;
+ }
+ const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
+ link->replicationHandle = offset;
+ const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
+ if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) {
+ logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
+ return;
+ }
+ if ( replicationIndex == lastBlockIndex ) {
+ link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
+ }
+ link->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
+ if ( _backgroundReplication == BGR_HASHBLOCK
+ && link->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
+ // Just crossed a hash block boundary, look for new candidate starting at this very index
+ link->nextReplicationIndex = uplink_findNextIncompleteHashBlock( link, link->nextReplicationIndex );
+ }
+}
+
+/**
+ * find next index into cache_map that corresponds to the beginning
+ * of a hash block which is neither completely empty nor completely
+ * replicated yet. Returns -1 if no match.
+ */
+static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex)
+{
+ int retval = -1;
+ spin_lock( &link->image->lock );
+ const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize );
+ const uint8_t *cache_map = link->image->cache_map;
+ if ( cache_map != NULL ) {
+ int j;
+ const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK );
+ for (j = 0; j < mapBytes; ++j) {
+ const int i = ( start + j ) % mapBytes;
+ const bool isFull = cache_map[i] == 0xff || ( i + 1 == mapBytes && link->replicatedLastBlock );
+ const bool isEmpty = cache_map[i] == 0;
+ if ( !isEmpty && !isFull ) {
+ // Neither full nor empty, replicate
+ if ( retval == -1 ) {
+ retval = i;
+ }
+ break;
+ }
+ if ( ( i & MAP_INDEX_HASH_START_MASK ) == i ) {
+ // Reset state if we just crossed into the next hash chunk
+ retval = ( isEmpty ) ? ( i ) : ( -1 );
+ } else if ( isFull ) {
+ if ( retval != -1 ) {
+ // It's a full one, previous one was empty -> replicate
+ break;
+ }
+ } else if ( isEmpty ) {
+ if ( retval == -1 ) { // Previous one was full -> replicate
+ retval = i;
+ break;
+ }
+ }
+ }
+ if ( j == mapBytes ) { // Nothing found, loop ran until end
+ retval = -1;
+ }
+ }
+ spin_unlock( &link->image->lock );
+ return retval;
+}
+
+/**
+ * Receive data from uplink server and process/dispatch
+ * Locks on: link.lock, images[].lock
+ */
+static void uplink_handleReceive(dnbd3_connection_t *link)
+{
+ dnbd3_reply_t inReply, outReply;
+ int ret, i;
+ for (;;) {
+ ret = dnbd3_read_reply( link->fd, &inReply, false );
+ if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue;
+ if ( ret == REPLY_AGAIN ) break;
+ if ( unlikely( ret == REPLY_CLOSED ) ) {
+ logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path );
+ goto error_cleanup;
+ }
+ if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
+ logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
+ goto error_cleanup;
+ }
+ if ( unlikely( ret != REPLY_OK ) ) {
+ logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path );
+ goto error_cleanup;
+ }
+ if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path );
+ goto error_cleanup;
+ }
+
+ if ( unlikely( link->recvBufferLen < inReply.size ) ) {
+ link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
+ link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
+ if ( link->recvBuffer == NULL ) {
+ logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
+ exit( 1 );
+ }
+ }
+ if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
+ logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
+ goto error_cleanup;
+ }
+ // Payload read completely
+ // Bail out if we're not interested
+ if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue;
+ // Is a legit block reply
+ struct iovec iov[2];
+ const uint64_t start = inReply.handle;
+ const uint64_t end = inReply.handle + inReply.size;
+ totalBytesReceived += inReply.size;
+ link->bytesReceived += inReply.size;
+ // 1) Write to cache file
+ if ( unlikely( link->cacheFd == -1 ) ) {
+ uplink_reopenCacheFd( link, false );
+ }
+ if ( likely( link->cacheFd != -1 ) ) {
+ int err = 0;
+ bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
+ uint32_t done = 0;
+ ret = 0;
+ while ( done < inReply.size ) {
+ ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
+ if ( unlikely( ret == -1 ) ) {
+ err = errno;
+ if ( err == EINTR ) continue;
+ if ( err == ENOSPC || err == EDQUOT ) {
+ // try to free 256MiB
+ if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break;
+ tryAgain = false;
+ continue; // Success, retry write
+ }
+ if ( err == EBADF || err == EINVAL || err == EIO ) {
+ if ( !tryAgain || !uplink_reopenCacheFd( link, true ) )
+ break;
+ tryAgain = false;
+ continue; // Write handle to image successfully re-opened, try again
+ }
+ logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", link->image->name, (int)link->image->rid, err );
+ break;
+ }
+ if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) {
+ logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, link->image->name, (int)link->image->rid );
+ break;
+ }
+ done += (uint32_t)ret;
+ }
+ if ( likely( done > 0 ) ) {
+ image_updateCachemap( link->image, start, start + done, true );
+ }
+ if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
+ logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
+ link->image->name, (int)link->image->rid, err );
+ }
+ }
+ // 2) Figure out which clients are interested in it
+ spin_lock( &link->queueLock );
+ for (i = 0; i < link->queueLen; ++i) {
+ dnbd3_queued_request_t * const req = &link->queue[i];
+ assert( req->status != ULR_PROCESSING );
+ if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue;
+ assert( req->client != NULL );
+ if ( req->from >= start && req->to <= end ) { // Match :-)
+ req->status = ULR_PROCESSING;
+ }
+ }
+ // 3) Send to interested clients - iterate backwards so request collaboration works, and
+ // so we can decrease queueLen on the fly while iterating. Should you ever change this to start
+ // from 0, you also need to change the "attach to existing request"-logic in uplink_request()
+ outReply.magic = dnbd3_packet_magic;
+ bool served = false;
+ for ( i = link->queueLen - 1; i >= 0; --i ) {
+ dnbd3_queued_request_t * const req = &link->queue[i];
+ if ( req->status == ULR_PROCESSING ) {
+ size_t bytesSent = 0;
+ assert( req->from >= start && req->to <= end );
+ dnbd3_client_t * const client = req->client;
+ outReply.cmd = CMD_GET_BLOCK;
+ outReply.handle = req->handle;
+ outReply.size = (uint32_t)( req->to - req->from );
+ iov[0].iov_base = &outReply;
+ iov[0].iov_len = sizeof outReply;
+ iov[1].iov_base = link->recvBuffer + (req->from - start);
+ iov[1].iov_len = outReply.size;
+ fixup_reply( outReply );
+ req->status = ULR_FREE;
+ req->client = NULL;
+ served = true;
+ pthread_mutex_lock( &client->sendMutex );
+ spin_unlock( &link->queueLock );
+ if ( client->sock != -1 ) {
+ ssize_t sent = writev( client->sock, iov, 2 );
+ if ( sent > (ssize_t)sizeof outReply ) {
+ bytesSent = (size_t)sent - sizeof outReply;
+ }
+ }
+ pthread_mutex_unlock( &client->sendMutex );
+ if ( bytesSent != 0 ) {
+ client->bytesSent += bytesSent;
+ }
+ spin_lock( &link->queueLock );
+ }
+ if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;
+ }
+ spin_unlock( &link->queueLock );
+#ifdef _DEBUG
+ if ( !served && start != link->replicationHandle ) {
+ logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
+ }
+#endif
+ if ( start == link->replicationHandle ) {
+ // Was our background replication
+ link->replicationHandle = REP_NONE;
+ // Try to remove from fs cache if no client was interested in this data
+ if ( !served && link->cacheFd != -1 ) {
+ posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
+ }
+ }
+ if ( served ) {
+ // Was some client -- reset idle counter
+ link->idleTime = 0;
+ // Re-enable replication if disabled
+ if ( link->nextReplicationIndex == -1 ) {
+ link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
+ }
+ }
+ }
+ if ( link->replicationHandle == REP_NONE ) {
+ spin_lock( &link->queueLock );
+ const bool rep = ( link->queueLen == 0 );
+ spin_unlock( &link->queueLock );
+ if ( rep ) uplink_sendReplicationRequest( link );
+ }
+ return;
+ // Error handling from failed receive or message parsing
+ error_cleanup: ;
+ uplink_connectionFailed( link, true );
+}
+
+static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew)
+{
+ if ( link->fd == -1 )
+ return;
+ altservers_serverFailed( &link->currentServer );
+ close( link->fd );
+ link->fd = -1;
+ link->replicationHandle = REP_NONE;
+ if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
+ link->nextReplicationIndex = 0;
+ }
+ if ( !findNew )
+ return;
+ spin_lock( &link->rttLock );
+ bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1;
+ spin_unlock( &link->rttLock );
+ if ( bail )
+ return;
+ altservers_findUplink( link );
+}
+
+/**
+ * Send keep alive request to server
+ */
+static int uplink_sendKeepalive(const int fd)
+{
+ static dnbd3_request_t request = { 0 };
+ if ( request.magic == 0 ) {
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_KEEPALIVE;
+ fixup_request( request );
+ }
+ return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+}
+
+static void uplink_addCrc32(dnbd3_connection_t *uplink)
+{
+ dnbd3_image_t *image = uplink->image;
+ if ( image == NULL || image->virtualFilesize == 0 ) return;
+ size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t);
+ uint32_t masterCrc;
+ uint32_t *buffer = malloc( bytes );
+ if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) {
+ free( buffer );
+ return;
+ }
+ uint32_t lists_crc = crc32( 0, NULL, 0 );
+ lists_crc = crc32( lists_crc, (uint8_t*)buffer, bytes );
+ lists_crc = net_order_32( lists_crc );
+ if ( lists_crc != masterCrc ) {
+ logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s)!", uplink->image->name );
+ free( buffer );
+ return;
+ }
+ uplink->image->masterCrc32 = masterCrc;
+ uplink->image->crc32 = buffer;
+ const size_t len = strlen( uplink->image->path ) + 30;
+ char path[len];
+ snprintf( path, len, "%s.crc", uplink->image->path );
+ const int fd = open( path, O_WRONLY | O_CREAT, 0644 );
+ if ( fd >= 0 ) {
+ write( fd, &masterCrc, sizeof(uint32_t) );
+ write( fd, buffer, bytes );
+ close( fd );
+ }
+}
+
+/**
+ * Open the given image's main image file in
+ * rw mode, assigning it to the cacheFd struct member.
+ *
+ * @param force If cacheFd was previously assigned a file descriptor (not == -1),
+ * it will be closed first. Otherwise, nothing will happen and true will be returned
+ * immediately.
+ */
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force)
+{
+ if ( link->cacheFd != -1 ) {
+ if ( !force ) return true;
+ close( link->cacheFd );
+ }
+ link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT, 0644 );
+ return link->cacheFd != -1;
+}
+
+/**
+ * Saves the cache map of the given image.
+ * Return true on success.
+ * Locks on: imageListLock, image.lock
+ */
+static bool uplink_saveCacheMap(dnbd3_connection_t *link)
+{
+ dnbd3_image_t *image = link->image;
+ assert( image != NULL );
+
+ if ( link->cacheFd != -1 ) {
+ if ( fsync( link->cacheFd ) == -1 ) {
+ // A failing fsync means we have no guarantee that any data
+ // since the last fsync (or open if none) has been saved. Apart
+ // from keeping the cache_map from the last successful fsync
+ // around and restoring it there isn't much we can do to recover
+ // a consistent state. Bail out.
+ logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno );
+ logadd( LOG_ERROR, "Bailing out immediately" );
+ exit( 1 );
+ }
+ }
+
+ if ( image->cache_map == NULL ) return true;
+ logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid );
+ spin_lock( &image->lock );
+ // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to
+ // figure out that this image's cache copy is complete
+ if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) {
+ spin_unlock( &image->lock );
+ return true;
+ }
+ const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
+ uint8_t *map = malloc( size );
+ memcpy( map, image->cache_map, size );
+ // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image,
+ // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O
+ spin_unlock( &image->lock );
+ assert( image->path != NULL );
+ char mapfile[strlen( image->path ) + 4 + 1];
+ strcpy( mapfile, image->path );
+ strcat( mapfile, ".map" );
+
+ int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
+ if ( fd == -1 ) {
+ const int err = errno;
+ free( map );
+ logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
+ return false;
+ }
+
+ size_t done = 0;
+ while ( done < size ) {
+ const ssize_t ret = write( fd, map, size - done );
+ if ( ret == -1 ) {
+ if ( errno == EINTR ) continue;
+ logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
+ break;
+ }
+ if ( ret <= 0 ) {
+ logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
+ break;
+ }
+ done += (size_t)ret;
+ }
+ if ( fsync( fd ) == -1 ) {
+ logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
+ }
+ close( fd );
+ free( map );
+ return true;
+}
+
+static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link)
+{
+ return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT && _backgroundReplication != BGR_FULL );
+}
+