summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c217
1 files changed, 182 insertions, 35 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index c0babaa..538f388 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -12,7 +12,13 @@
#include <inttypes.h>
#include <fcntl.h>
#include <poll.h>
+#include <unistd.h>
+#ifdef HAVE_FDATASYNC
+#define dnbd3_fdatasync fdatasync
+#else
+#define dnbd3_fdatasync fsync
+#endif
static uint64_t totalBytesReceived = 0;
static pthread_spinlock_t statisticsReceivedLock;
@@ -24,6 +30,8 @@ static int uplink_sendKeepalive(const int fd);
static void uplink_addCrc32(dnbd3_connection_t *uplink);
static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link);
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
+static bool uplink_saveCacheMap(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -47,7 +55,7 @@ uint64_t uplink_getTotalBytesReceived()
*/
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
- if ( !_isProxy ) return false;
+ if ( !_isProxy || _shutdown ) return false;
dnbd3_connection_t *link = NULL;
assert( image != NULL );
spin_lock( &image->lock );
@@ -66,8 +74,10 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
link->image = image;
link->bytesReceived = 0;
link->lastBytesReceived = 0;
+ link->idleCount = 0;
link->queueLen = 0;
link->fd = -1;
+ link->cacheFd = -1;
link->signal = NULL;
link->replicationHandle = 0;
spin_lock( &link->rttLock );
@@ -123,10 +133,15 @@ void uplink_shutdown(dnbd3_image_t *image)
join = true;
}
spin_unlock( &uplink->queueLock );
+ bool wait = image->uplink != NULL;
spin_unlock( &image->lock );
if ( join ) thread_join( thread, NULL );
- while ( image->uplink != NULL )
- usleep( 10000 );
+ while ( wait ) {
+ usleep( 5000 );
+ spin_lock( &image->lock );
+ wait = image->uplink != NULL && image->uplink->shutdown;
+ spin_unlock( &image->lock );
+ }
}
/**
@@ -273,6 +288,7 @@ static void* uplink_mainloop(void *data)
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
int discoverFailCount = 0;
+ int unsavedCount = 0;
ticks nextAltCheck, nextKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
@@ -282,6 +298,11 @@ static void* uplink_mainloop(void *data)
assert( link != NULL );
setThreadName( "idle-uplink" );
blockNoncriticalSignals();
+ // Make sure file is open for writing
+ if ( !uplink_reopenCacheFd( link, false ) ) {
+ // It might have failed - still offer proxy mode, we just can't cache
+ logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno );
+ }
//
link->signal = signal_new();
if ( link->signal == NULL ) {
@@ -335,7 +356,7 @@ static void* uplink_mainloop(void *data)
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
} while(0);
- if ( waitTime < 1500 ) waitTime = 1500;
+ if ( waitTime < 100 ) waitTime = 100;
if ( waitTime > 5000 ) waitTime = 5000;
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
@@ -371,17 +392,29 @@ static void* uplink_mainloop(void *data)
if ( link->fd == -1 ) timing_get( &nextAltCheck );
if ( _shutdown || link->shutdown ) goto cleanup;
}
- // Send keep alive if nothing is happening
declare_now;
- if ( link->fd != -1 && link->replicationHandle == 0 && timing_reached( &nextKeepalive, &now ) ) {
- timing_set( &nextKeepalive, &now, 20 );
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- const int fd = link->fd;
- link->fd = -1;
- close( fd );
+ if ( timing_reached( &nextKeepalive, &now ) ) {
+ timing_set( &nextKeepalive, &now, 10 );
+ link->idleCount++;
+ unsavedCount++;
+ if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) {
+ // fsync/save every 4 minutes, or every 60 seconds if link is idle
+ unsavedCount = 0;
+ uplink_saveCacheMap( link );
+ }
+ if ( link->idleCount % 2 == 0 ) {
+ // Save cache map only if we don't seem busy handling actual client requests
+ if ( link->fd != -1 && link->replicationHandle == 0 ) {
+ // Send keep alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ }
+ }
}
}
// See if we should trigger an RTT measurement
@@ -394,7 +427,6 @@ static void* uplink_mainloop(void *data)
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
- image_markComplete( link->image );
goto cleanup;
} else {
// Not complete - do measurement
@@ -441,9 +473,10 @@ static void* uplink_mainloop(void *data)
}
cleanup: ;
altservers_removeUplink( link );
+ uplink_saveCacheMap( link );
spin_lock( &link->image->lock );
- spin_lock( &link->queueLock );
link->image->uplink = NULL;
+ spin_lock( &link->queueLock );
const int fd = link->fd;
const dnbd3_signal_t* signal = link->signal;
link->fd = -1;
@@ -452,6 +485,9 @@ static void* uplink_mainloop(void *data)
link->shutdown = true;
thread_detach( link->thread );
}
+ // Do not access link->image after unlocking, since we set
+ // image->uplink to NULL. Acquire with image_lock first,
+ // like done below when checking whether to re-init uplink
spin_unlock( &link->image->lock );
spin_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
@@ -459,13 +495,26 @@ static void* uplink_mainloop(void *data)
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
- if ( link->betterFd != -1 ) close( link->betterFd );
+ if ( link->betterFd != -1 ) {
+ close( link->betterFd );
+ }
spin_destroy( &link->queueLock );
spin_destroy( &link->rttLock );
free( link->recvBuffer );
link->recvBuffer = NULL;
uplink_updateGlobalReceivedCounter( link );
- free( link );
+ if ( link->cacheFd != -1 ) {
+ close( link->cacheFd );
+ }
+ dnbd3_image_t *image = image_lock( link->image );
+ free( link ); // !!!
+ if ( image != NULL ) {
+ if ( !_shutdown && image->cache_map != NULL ) {
+ // Ingegrity checker must have found something in the meantime
+ uplink_init( image, -1, NULL, 0 );
+ }
+ image_release( image );
+ }
return NULL ;
}
@@ -556,36 +605,36 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
int ret, i;
for (;;) {
ret = dnbd3_read_reply( link->fd, &inReply, false );
- if ( ret == REPLY_INTR && !_shutdown && !link->shutdown ) continue;
+ if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
- if ( ret == REPLY_CLOSED ) {
+ if ( unlikely( ret == REPLY_CLOSED ) ) {
logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path );
goto error_cleanup;
}
- if ( ret == REPLY_WRONGMAGIC ) {
+ if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
goto error_cleanup;
}
- if ( ret != REPLY_OK ) {
+ if ( unlikely( ret != REPLY_OK ) ) {
logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path );
goto error_cleanup;
}
- if ( inReply.size > (uint32_t)_maxPayload ) {
+ if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path );
goto error_cleanup;
}
- if ( link->recvBufferLen < inReply.size ) {
+ if ( unlikely( link->recvBufferLen < inReply.size ) ) {
link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
}
- if ( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) {
+ if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
goto error_cleanup;
}
// Payload read completely
// Bail out if we're not interested
- if ( inReply.cmd != CMD_GET_BLOCK ) continue;
+ if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue;
// Is a legit block reply
struct iovec iov[2];
const uint64_t start = inReply.handle;
@@ -594,16 +643,16 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
link->bytesReceived += inReply.size;
spin_unlock( &link->image->lock );
// 1) Write to cache file
- if ( unlikely( link->image->cacheFd == -1 ) ) {
- image_reopenCacheFd( link->image, false );
+ if ( unlikely( link->cacheFd == -1 ) ) {
+ uplink_reopenCacheFd( link, false );
}
- if ( likely( link->image->cacheFd != -1 ) ) {
+ if ( likely( link->cacheFd != -1 ) ) {
int err = 0;
bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
uint32_t done = 0;
ret = 0;
while ( done < inReply.size ) {
- ret = (int)pwrite( link->image->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
+ ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
if ( unlikely( ret == -1 ) ) {
err = errno;
if ( err == EINTR ) continue;
@@ -614,7 +663,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
continue; // Success, retry write
}
if ( err == EBADF || err == EINVAL || err == EIO ) {
- if ( !tryAgain || !image_reopenCacheFd( link->image, true ) )
+ if ( !tryAgain || !uplink_reopenCacheFd( link, true ) )
break;
tryAgain = false;
continue; // Write handle to image successfully re-opened, try again
@@ -628,9 +677,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
done += (uint32_t)ret;
}
- if ( done > 0 ) image_updateCachemap( link->image, start, start + done, true );
- if ( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) {
- // Only disable caching if something seems severely wrong, not on ENOSPC since that might get better
+ if ( likely( done > 0 ) ) {
+ image_updateCachemap( link->image, start, start + done, true );
+ }
+ if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.",
link->image->name, (int)link->image->rid );
}
@@ -691,7 +741,13 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
if ( !served && start != link->replicationHandle )
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
#endif
- if ( start == link->replicationHandle ) link->replicationHandle = 0;
+ if ( start == link->replicationHandle ) {
+ // Was our background replication
+ link->replicationHandle = 0;
+ } else {
+ // Was some client -- reset idle counter
+ link->idleCount = 0;
+ }
}
spin_lock( &link->queueLock );
const bool rep = ( link->queueLen == 0 );
@@ -766,3 +822,94 @@ static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link)
link->lastBytesReceived = link->bytesReceived;
}
+/**
+ * Open the given image's main image file in
+ * rw mode, assigning it to the cacheFd struct member.
+ *
+ * @param force If cacheFd was previously assigned a file descriptor (not == -1),
+ * it will be closed first. Otherwise, nothing will happen and true will be returned
+ * immediately.
+ */
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force)
+{
+ if ( link->cacheFd != -1 ) {
+ if ( !force ) return true;
+ close( link->cacheFd );
+ }
+ link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT );
+ return link->cacheFd != -1;
+}
+
+/**
+ * Saves the cache map of the given image.
+ * Return true on success.
+ * Locks on: imageListLock, image.lock
+ */
+static bool uplink_saveCacheMap(dnbd3_connection_t *link)
+{
+ dnbd3_image_t *image = link->image;
+ assert( image != NULL );
+
+ if ( link->cacheFd != -1 ) {
+ if ( dnbd3_fdatasync( link->cacheFd ) == -1 ) {
+ // A failing fsync means we have no guarantee that any data
+ // since the last fsync (or open if none) has been saved. Apart
+ // from keeping the cache_map from the last successful fsync
+ // around and restoring it there isn't much we can do to recover
+ // a consistent state. Bail out.
+ logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno );
+ logadd( LOG_ERROR, "Bailing out immediately" );
+ exit( 1 );
+ }
+ }
+
+ if ( image->cache_map == NULL ) return true;
+ logadd( LOG_DEBUG1, "Saving cache map of %s:%d", image->name, (int)image->rid );
+ spin_lock( &image->lock );
+ // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to
+ // figure out that this image's cache copy is complete
+ if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) {
+ spin_unlock( &image->lock );
+ return true;
+ }
+ const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
+ uint8_t *map = malloc( size );
+ memcpy( map, image->cache_map, size );
+ // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image,
+ // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O
+ spin_unlock( &image->lock );
+ assert( image->path != NULL );
+ char mapfile[strlen( image->path ) + 4 + 1];
+ strcpy( mapfile, image->path );
+ strcat( mapfile, ".map" );
+
+ int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
+ if ( fd == -1 ) {
+ const int err = errno;
+ free( map );
+ logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
+ return false;
+ }
+
+ size_t done = 0;
+ while ( done < size ) {
+ const ssize_t ret = write( fd, map, size - done );
+ if ( ret == -1 ) {
+ if ( errno == EINTR ) continue;
+ logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
+ break;
+ }
+ if ( ret <= 0 ) {
+ logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
+ break;
+ }
+ done += (size_t)ret;
+ }
+ if ( dnbd3_fdatasync( fd ) == -1 ) {
+ logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
+ }
+ close( fd );
+ free( map );
+ return true;
+}
+