summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2018-07-04 14:39:43 +0200
committerSimon Rettberg2018-07-04 14:39:43 +0200
commit053ca3b9a9601467d5ce30c56c3cea078c897f57 (patch)
tree29b60273e13684bb98b1badcd784b6f2b2e4d0f4 /src/server/uplink.c
parent[SERVER] cmake: Add config option for extra compiler opptions (diff)
downloaddnbd3-053ca3b9a9601467d5ce30c56c3cea078c897f57.tar.gz
dnbd3-053ca3b9a9601467d5ce30c56c3cea078c897f57.tar.xz
dnbd3-053ca3b9a9601467d5ce30c56c3cea078c897f57.zip
[SERVER] Refactor uplink/cache handling, improve crc checking
The cacheFd is now moved to the uplink data structure and will only be handled by the uplink thread. The integrity checker now supports checking all blocks of an image. This will be triggered automatically whenever a check for a single block failed. Also, if a crc check on startup fails, the image won't be discarded anymore, but rather a full check will be initiated. Furthermore, when calling image_updateCacheMap() on an image that was previously complete, the cache map will now be re-initialized, and a new uplink connection created.
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c217
1 files changed, 182 insertions, 35 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index c0babaa..538f388 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -12,7 +12,13 @@
#include <inttypes.h>
#include <fcntl.h>
#include <poll.h>
+#include <unistd.h>
+#ifdef HAVE_FDATASYNC
+#define dnbd3_fdatasync fdatasync
+#else
+#define dnbd3_fdatasync fsync
+#endif
static uint64_t totalBytesReceived = 0;
static pthread_spinlock_t statisticsReceivedLock;
@@ -24,6 +30,8 @@ static int uplink_sendKeepalive(const int fd);
static void uplink_addCrc32(dnbd3_connection_t *uplink);
static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link);
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
+static bool uplink_saveCacheMap(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -47,7 +55,7 @@ uint64_t uplink_getTotalBytesReceived()
*/
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
- if ( !_isProxy ) return false;
+ if ( !_isProxy || _shutdown ) return false;
dnbd3_connection_t *link = NULL;
assert( image != NULL );
spin_lock( &image->lock );
@@ -66,8 +74,10 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
link->image = image;
link->bytesReceived = 0;
link->lastBytesReceived = 0;
+ link->idleCount = 0;
link->queueLen = 0;
link->fd = -1;
+ link->cacheFd = -1;
link->signal = NULL;
link->replicationHandle = 0;
spin_lock( &link->rttLock );
@@ -123,10 +133,15 @@ void uplink_shutdown(dnbd3_image_t *image)
join = true;
}
spin_unlock( &uplink->queueLock );
+ bool wait = image->uplink != NULL;
spin_unlock( &image->lock );
if ( join ) thread_join( thread, NULL );
- while ( image->uplink != NULL )
- usleep( 10000 );
+ while ( wait ) {
+ usleep( 5000 );
+ spin_lock( &image->lock );
+ wait = image->uplink != NULL && image->uplink->shutdown;
+ spin_unlock( &image->lock );
+ }
}
/**
@@ -273,6 +288,7 @@ static void* uplink_mainloop(void *data)
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
int discoverFailCount = 0;
+ int unsavedCount = 0;
ticks nextAltCheck, nextKeepalive;
char buffer[200];
memset( events, 0, sizeof(events) );
@@ -282,6 +298,11 @@ static void* uplink_mainloop(void *data)
assert( link != NULL );
setThreadName( "idle-uplink" );
blockNoncriticalSignals();
+ // Make sure file is open for writing
+ if ( !uplink_reopenCacheFd( link, false ) ) {
+ // It might have failed - still offer proxy mode, we just can't cache
+ logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno );
+ }
//
link->signal = signal_new();
if ( link->signal == NULL ) {
@@ -335,7 +356,7 @@ static void* uplink_mainloop(void *data)
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
} while(0);
- if ( waitTime < 1500 ) waitTime = 1500;
+ if ( waitTime < 100 ) waitTime = 100;
if ( waitTime > 5000 ) waitTime = 5000;
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
@@ -371,17 +392,29 @@ static void* uplink_mainloop(void *data)
if ( link->fd == -1 ) timing_get( &nextAltCheck );
if ( _shutdown || link->shutdown ) goto cleanup;
}
- // Send keep alive if nothing is happening
declare_now;
- if ( link->fd != -1 && link->replicationHandle == 0 && timing_reached( &nextKeepalive, &now ) ) {
- timing_set( &nextKeepalive, &now, 20 );
- if ( uplink_sendKeepalive( link->fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
- } else {
- const int fd = link->fd;
- link->fd = -1;
- close( fd );
+ if ( timing_reached( &nextKeepalive, &now ) ) {
+ timing_set( &nextKeepalive, &now, 10 );
+ link->idleCount++;
+ unsavedCount++;
+ if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) {
+ // fsync/save every 4 minutes, or every 60 seconds if link is idle
+ unsavedCount = 0;
+ uplink_saveCacheMap( link );
+ }
+ if ( link->idleCount % 2 == 0 ) {
+ // Save cache map only if we don't seem busy handling actual client requests
+ if ( link->fd != -1 && link->replicationHandle == 0 ) {
+ // Send keep alive if nothing is happening
+ if ( uplink_sendKeepalive( link->fd ) ) {
+ // Re-trigger periodically, in case it requires a minimum user count
+ uplink_sendReplicationRequest( link );
+ } else {
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ }
+ }
}
}
// See if we should trigger an RTT measurement
@@ -394,7 +427,6 @@ static void* uplink_mainloop(void *data)
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
- image_markComplete( link->image );
goto cleanup;
} else {
// Not complete - do measurement
@@ -441,9 +473,10 @@ static void* uplink_mainloop(void *data)
}
cleanup: ;
altservers_removeUplink( link );
+ uplink_saveCacheMap( link );
spin_lock( &link->image->lock );
- spin_lock( &link->queueLock );
link->image->uplink = NULL;
+ spin_lock( &link->queueLock );
const int fd = link->fd;
const dnbd3_signal_t* signal = link->signal;
link->fd = -1;
@@ -452,6 +485,9 @@ static void* uplink_mainloop(void *data)
link->shutdown = true;
thread_detach( link->thread );
}
+ // Do not access link->image after unlocking, since we set
+ // image->uplink to NULL. Acquire with image_lock first,
+ // like done below when checking whether to re-init uplink
spin_unlock( &link->image->lock );
spin_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
@@ -459,13 +495,26 @@ static void* uplink_mainloop(void *data)
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
- if ( link->betterFd != -1 ) close( link->betterFd );
+ if ( link->betterFd != -1 ) {
+ close( link->betterFd );
+ }
spin_destroy( &link->queueLock );
spin_destroy( &link->rttLock );
free( link->recvBuffer );
link->recvBuffer = NULL;
uplink_updateGlobalReceivedCounter( link );
- free( link );
+ if ( link->cacheFd != -1 ) {
+ close( link->cacheFd );
+ }
+ dnbd3_image_t *image = image_lock( link->image );
+ free( link ); // !!!
+ if ( image != NULL ) {
+ if ( !_shutdown && image->cache_map != NULL ) {
+ // Ingegrity checker must have found something in the meantime
+ uplink_init( image, -1, NULL, 0 );
+ }
+ image_release( image );
+ }
return NULL ;
}
@@ -556,36 +605,36 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
int ret, i;
for (;;) {
ret = dnbd3_read_reply( link->fd, &inReply, false );
- if ( ret == REPLY_INTR && !_shutdown && !link->shutdown ) continue;
+ if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
- if ( ret == REPLY_CLOSED ) {
+ if ( unlikely( ret == REPLY_CLOSED ) ) {
logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path );
goto error_cleanup;
}
- if ( ret == REPLY_WRONGMAGIC ) {
+ if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
goto error_cleanup;
}
- if ( ret != REPLY_OK ) {
+ if ( unlikely( ret != REPLY_OK ) ) {
logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path );
goto error_cleanup;
}
- if ( inReply.size > (uint32_t)_maxPayload ) {
+ if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path );
goto error_cleanup;
}
- if ( link->recvBufferLen < inReply.size ) {
+ if ( unlikely( link->recvBufferLen < inReply.size ) ) {
link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
}
- if ( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) {
+ if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
goto error_cleanup;
}
// Payload read completely
// Bail out if we're not interested
- if ( inReply.cmd != CMD_GET_BLOCK ) continue;
+ if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue;
// Is a legit block reply
struct iovec iov[2];
const uint64_t start = inReply.handle;
@@ -594,16 +643,16 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
link->bytesReceived += inReply.size;
spin_unlock( &link->image->lock );
// 1) Write to cache file
- if ( unlikely( link->image->cacheFd == -1 ) ) {
- image_reopenCacheFd( link->image, false );
+ if ( unlikely( link->cacheFd == -1 ) ) {
+ uplink_reopenCacheFd( link, false );
}
- if ( likely( link->image->cacheFd != -1 ) ) {
+ if ( likely( link->cacheFd != -1 ) ) {
int err = 0;
bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
uint32_t done = 0;
ret = 0;
while ( done < inReply.size ) {
- ret = (int)pwrite( link->image->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
+ ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
if ( unlikely( ret == -1 ) ) {
err = errno;
if ( err == EINTR ) continue;
@@ -614,7 +663,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
continue; // Success, retry write
}
if ( err == EBADF || err == EINVAL || err == EIO ) {
- if ( !tryAgain || !image_reopenCacheFd( link->image, true ) )
+ if ( !tryAgain || !uplink_reopenCacheFd( link, true ) )
break;
tryAgain = false;
continue; // Write handle to image successfully re-opened, try again
@@ -628,9 +677,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
}
done += (uint32_t)ret;
}
- if ( done > 0 ) image_updateCachemap( link->image, start, start + done, true );
- if ( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) {
- // Only disable caching if something seems severely wrong, not on ENOSPC since that might get better
+ if ( likely( done > 0 ) ) {
+ image_updateCachemap( link->image, start, start + done, true );
+ }
+ if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.",
link->image->name, (int)link->image->rid );
}
@@ -691,7 +741,13 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
if ( !served && start != link->replicationHandle )
logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
#endif
- if ( start == link->replicationHandle ) link->replicationHandle = 0;
+ if ( start == link->replicationHandle ) {
+ // Was our background replication
+ link->replicationHandle = 0;
+ } else {
+ // Was some client -- reset idle counter
+ link->idleCount = 0;
+ }
}
spin_lock( &link->queueLock );
const bool rep = ( link->queueLen == 0 );
@@ -766,3 +822,94 @@ static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link)
link->lastBytesReceived = link->bytesReceived;
}
+/**
+ * Open the given image's main image file in
+ * rw mode, assigning it to the cacheFd struct member.
+ *
+ * @param force If cacheFd was previously assigned a file descriptor (not == -1),
+ * it will be closed first. Otherwise, nothing will happen and true will be returned
+ * immediately.
+ */
+static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force)
+{
+ if ( link->cacheFd != -1 ) {
+ if ( !force ) return true;
+ close( link->cacheFd );
+ }
+ link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT );
+ return link->cacheFd != -1;
+}
+
+/**
+ * Saves the cache map of the given image.
+ * Return true on success.
+ * Locks on: imageListLock, image.lock
+ */
+static bool uplink_saveCacheMap(dnbd3_connection_t *link)
+{
+ dnbd3_image_t *image = link->image;
+ assert( image != NULL );
+
+ if ( link->cacheFd != -1 ) {
+ if ( dnbd3_fdatasync( link->cacheFd ) == -1 ) {
+ // A failing fsync means we have no guarantee that any data
+ // since the last fsync (or open if none) has been saved. Apart
+ // from keeping the cache_map from the last successful fsync
+ // around and restoring it there isn't much we can do to recover
+ // a consistent state. Bail out.
+ logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno );
+ logadd( LOG_ERROR, "Bailing out immediately" );
+ exit( 1 );
+ }
+ }
+
+ if ( image->cache_map == NULL ) return true;
+ logadd( LOG_DEBUG1, "Saving cache map of %s:%d", image->name, (int)image->rid );
+ spin_lock( &image->lock );
+ // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to
+ // figure out that this image's cache copy is complete
+ if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) {
+ spin_unlock( &image->lock );
+ return true;
+ }
+ const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize);
+ uint8_t *map = malloc( size );
+ memcpy( map, image->cache_map, size );
+ // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image,
+ // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O
+ spin_unlock( &image->lock );
+ assert( image->path != NULL );
+ char mapfile[strlen( image->path ) + 4 + 1];
+ strcpy( mapfile, image->path );
+ strcat( mapfile, ".map" );
+
+ int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 );
+ if ( fd == -1 ) {
+ const int err = errno;
+ free( map );
+ logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile );
+ return false;
+ }
+
+ size_t done = 0;
+ while ( done < size ) {
+ const ssize_t ret = write( fd, map, size - done );
+ if ( ret == -1 ) {
+ if ( errno == EINTR ) continue;
+ logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile );
+ break;
+ }
+ if ( ret <= 0 ) {
+ logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile );
+ break;
+ }
+ done += (size_t)ret;
+ }
+ if ( dnbd3_fdatasync( fd ) == -1 ) {
+ logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno );
+ }
+ close( fd );
+ free( map );
+ return true;
+}
+