diff options
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r-- | src/server/uplink.c | 217 |
1 files changed, 182 insertions, 35 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c index c0babaa..538f388 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -12,7 +12,13 @@ #include <inttypes.h> #include <fcntl.h> #include <poll.h> +#include <unistd.h> +#ifdef HAVE_FDATASYNC +#define dnbd3_fdatasync fdatasync +#else +#define dnbd3_fdatasync fsync +#endif static uint64_t totalBytesReceived = 0; static pthread_spinlock_t statisticsReceivedLock; @@ -24,6 +30,8 @@ static int uplink_sendKeepalive(const int fd); static void uplink_addCrc32(dnbd3_connection_t *uplink); static void uplink_sendReplicationRequest(dnbd3_connection_t *link); static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link); +static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); +static bool uplink_saveCacheMap(dnbd3_connection_t *link); // ############ uplink connection handling @@ -47,7 +55,7 @@ uint64_t uplink_getTotalBytesReceived() */ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { - if ( !_isProxy ) return false; + if ( !_isProxy || _shutdown ) return false; dnbd3_connection_t *link = NULL; assert( image != NULL ); spin_lock( &image->lock ); @@ -66,8 +74,10 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version link->image = image; link->bytesReceived = 0; link->lastBytesReceived = 0; + link->idleCount = 0; link->queueLen = 0; link->fd = -1; + link->cacheFd = -1; link->signal = NULL; link->replicationHandle = 0; spin_lock( &link->rttLock ); @@ -123,10 +133,15 @@ void uplink_shutdown(dnbd3_image_t *image) join = true; } spin_unlock( &uplink->queueLock ); + bool wait = image->uplink != NULL; spin_unlock( &image->lock ); if ( join ) thread_join( thread, NULL ); - while ( image->uplink != NULL ) - usleep( 10000 ); + while ( wait ) { + usleep( 5000 ); + spin_lock( &image->lock ); + wait = image->uplink != NULL && image->uplink->shutdown; + spin_unlock( &image->lock ); + } } /** @@ -273,6 +288,7 @@ static void* uplink_mainloop(void *data) int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; int discoverFailCount = 0; + int unsavedCount = 0; ticks nextAltCheck, nextKeepalive; char buffer[200]; memset( events, 0, sizeof(events) ); @@ -282,6 +298,11 @@ static void* uplink_mainloop(void *data) assert( link != NULL ); setThreadName( "idle-uplink" ); blockNoncriticalSignals(); + // Make sure file is open for writing + if ( !uplink_reopenCacheFd( link, false ) ) { + // It might have failed - still offer proxy mode, we just can't cache + logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno ); + } // link->signal = signal_new(); if ( link->signal == NULL ) { @@ -335,7 +356,7 @@ static void* uplink_mainloop(void *data) declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); } while(0); - if ( waitTime < 1500 ) waitTime = 1500; + if ( waitTime < 100 ) waitTime = 100; if ( waitTime > 5000 ) waitTime = 5000; numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || link->shutdown ) goto cleanup; @@ -371,17 +392,29 @@ static void* uplink_mainloop(void *data) if ( link->fd == -1 ) timing_get( &nextAltCheck ); if ( _shutdown || link->shutdown ) goto cleanup; } - // Send keep alive if nothing is happening declare_now; - if ( link->fd != -1 && link->replicationHandle == 0 && timing_reached( &nextKeepalive, &now ) ) { - timing_set( &nextKeepalive, &now, 20 ); - if ( uplink_sendKeepalive( link->fd ) ) { - // Re-trigger periodically, in case it requires a minimum user count - uplink_sendReplicationRequest( link ); - } else { - const int fd = link->fd; - link->fd = -1; - close( fd ); + if ( timing_reached( &nextKeepalive, &now ) ) { + timing_set( &nextKeepalive, &now, 10 ); + link->idleCount++; + unsavedCount++; + if ( unsavedCount > 24 || ( unsavedCount > 6 && link->idleCount >= 2 && link->idleCount <= 7 ) ) { + // fsync/save every 4 minutes, or every 60 seconds if link is idle + unsavedCount = 0; + uplink_saveCacheMap( link ); + } + if ( link->idleCount % 2 == 0 ) { + // Save cache map only if we don't seem busy handling actual client requests + if ( link->fd != -1 && link->replicationHandle == 0 ) { + // Send keep alive if nothing is happening + if ( uplink_sendKeepalive( link->fd ) ) { + // Re-trigger periodically, in case it requires a minimum user count + uplink_sendReplicationRequest( link ); + } else { + const int fd = link->fd; + link->fd = -1; + close( fd ); + } + } } } // See if we should trigger an RTT measurement @@ -394,7 +427,6 @@ static void* uplink_mainloop(void *data) if ( image_isComplete( link->image ) ) { // Quit work if image is complete logadd( LOG_INFO, "Replication of %s complete.", link->image->name ); - image_markComplete( link->image ); goto cleanup; } else { // Not complete - do measurement @@ -441,9 +473,10 @@ static void* uplink_mainloop(void *data) } cleanup: ; altservers_removeUplink( link ); + uplink_saveCacheMap( link ); spin_lock( &link->image->lock ); - spin_lock( &link->queueLock ); link->image->uplink = NULL; + spin_lock( &link->queueLock ); const int fd = link->fd; const dnbd3_signal_t* signal = link->signal; link->fd = -1; @@ -452,6 +485,9 @@ static void* uplink_mainloop(void *data) link->shutdown = true; thread_detach( link->thread ); } + // Do not access link->image after unlocking, since we set + // image->uplink to NULL. Acquire with image_lock first, + // like done below when checking whether to re-init uplink spin_unlock( &link->image->lock ); spin_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); @@ -459,13 +495,26 @@ static void* uplink_mainloop(void *data) // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); - if ( link->betterFd != -1 ) close( link->betterFd ); + if ( link->betterFd != -1 ) { + close( link->betterFd ); + } spin_destroy( &link->queueLock ); spin_destroy( &link->rttLock ); free( link->recvBuffer ); link->recvBuffer = NULL; uplink_updateGlobalReceivedCounter( link ); - free( link ); + if ( link->cacheFd != -1 ) { + close( link->cacheFd ); + } + dnbd3_image_t *image = image_lock( link->image ); + free( link ); // !!! + if ( image != NULL ) { + if ( !_shutdown && image->cache_map != NULL ) { + // Ingegrity checker must have found something in the meantime + uplink_init( image, -1, NULL, 0 ); + } + image_release( image ); + } return NULL ; } @@ -556,36 +605,36 @@ static void uplink_handleReceive(dnbd3_connection_t *link) int ret, i; for (;;) { ret = dnbd3_read_reply( link->fd, &inReply, false ); - if ( ret == REPLY_INTR && !_shutdown && !link->shutdown ) continue; + if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue; if ( ret == REPLY_AGAIN ) break; - if ( ret == REPLY_CLOSED ) { + if ( unlikely( ret == REPLY_CLOSED ) ) { logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path ); goto error_cleanup; } - if ( ret == REPLY_WRONGMAGIC ) { + if ( unlikely( ret == REPLY_WRONGMAGIC ) ) { logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); goto error_cleanup; } - if ( ret != REPLY_OK ) { + if ( unlikely( ret != REPLY_OK ) ) { logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path ); goto error_cleanup; } - if ( inReply.size > (uint32_t)_maxPayload ) { + if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } - if ( link->recvBufferLen < inReply.size ) { + if ( unlikely( link->recvBufferLen < inReply.size ) ) { link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); } - if ( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) { + if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) { logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path ); goto error_cleanup; } // Payload read completely // Bail out if we're not interested - if ( inReply.cmd != CMD_GET_BLOCK ) continue; + if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue; // Is a legit block reply struct iovec iov[2]; const uint64_t start = inReply.handle; @@ -594,16 +643,16 @@ static void uplink_handleReceive(dnbd3_connection_t *link) link->bytesReceived += inReply.size; spin_unlock( &link->image->lock ); // 1) Write to cache file - if ( unlikely( link->image->cacheFd == -1 ) ) { - image_reopenCacheFd( link->image, false ); + if ( unlikely( link->cacheFd == -1 ) ) { + uplink_reopenCacheFd( link, false ); } - if ( likely( link->image->cacheFd != -1 ) ) { + if ( likely( link->cacheFd != -1 ) ) { int err = 0; bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid uint32_t done = 0; ret = 0; while ( done < inReply.size ) { - ret = (int)pwrite( link->image->cacheFd, link->recvBuffer + done, inReply.size - done, start + done ); + ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done ); if ( unlikely( ret == -1 ) ) { err = errno; if ( err == EINTR ) continue; @@ -614,7 +663,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link) continue; // Success, retry write } if ( err == EBADF || err == EINVAL || err == EIO ) { - if ( !tryAgain || !image_reopenCacheFd( link->image, true ) ) + if ( !tryAgain || !uplink_reopenCacheFd( link, true ) ) break; tryAgain = false; continue; // Write handle to image successfully re-opened, try again @@ -628,9 +677,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } done += (uint32_t)ret; } - if ( done > 0 ) image_updateCachemap( link->image, start, start + done, true ); - if ( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) { - // Only disable caching if something seems severely wrong, not on ENOSPC since that might get better + if ( likely( done > 0 ) ) { + image_updateCachemap( link->image, start, start + done, true ); + } + if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { logadd( LOG_WARNING, "Error writing received data for %s:%d; disabling caching.", link->image->name, (int)link->image->rid ); } @@ -691,7 +741,13 @@ static void uplink_handleReceive(dnbd3_connection_t *link) if ( !served && start != link->replicationHandle ) logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); #endif - if ( start == link->replicationHandle ) link->replicationHandle = 0; + if ( start == link->replicationHandle ) { + // Was our background replication + link->replicationHandle = 0; + } else { + // Was some client -- reset idle counter + link->idleCount = 0; + } } spin_lock( &link->queueLock ); const bool rep = ( link->queueLen == 0 ); @@ -766,3 +822,94 @@ static void uplink_updateGlobalReceivedCounter(dnbd3_connection_t *link) link->lastBytesReceived = link->bytesReceived; } +/** + * Open the given image's main image file in + * rw mode, assigning it to the cacheFd struct member. + * + * @param force If cacheFd was previously assigned a file descriptor (not == -1), + * it will be closed first. Otherwise, nothing will happen and true will be returned + * immediately. + */ +static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force) +{ + if ( link->cacheFd != -1 ) { + if ( !force ) return true; + close( link->cacheFd ); + } + link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT ); + return link->cacheFd != -1; +} + +/** + * Saves the cache map of the given image. + * Return true on success. + * Locks on: imageListLock, image.lock + */ +static bool uplink_saveCacheMap(dnbd3_connection_t *link) +{ + dnbd3_image_t *image = link->image; + assert( image != NULL ); + + if ( link->cacheFd != -1 ) { + if ( dnbd3_fdatasync( link->cacheFd ) == -1 ) { + // A failing fsync means we have no guarantee that any data + // since the last fsync (or open if none) has been saved. Apart + // from keeping the cache_map from the last successful fsync + // around and restoring it there isn't much we can do to recover + // a consistent state. Bail out. + logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); + logadd( LOG_ERROR, "Bailing out immediately" ); + exit( 1 ); + } + } + + if ( image->cache_map == NULL ) return true; + logadd( LOG_DEBUG1, "Saving cache map of %s:%d", image->name, (int)image->rid ); + spin_lock( &image->lock ); + // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to + // figure out that this image's cache copy is complete + if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) { + spin_unlock( &image->lock ); + return true; + } + const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); + uint8_t *map = malloc( size ); + memcpy( map, image->cache_map, size ); + // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image, + // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O + spin_unlock( &image->lock ); + assert( image->path != NULL ); + char mapfile[strlen( image->path ) + 4 + 1]; + strcpy( mapfile, image->path ); + strcat( mapfile, ".map" ); + + int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); + if ( fd == -1 ) { + const int err = errno; + free( map ); + logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); + return false; + } + + size_t done = 0; + while ( done < size ) { + const ssize_t ret = write( fd, map, size - done ); + if ( ret == -1 ) { + if ( errno == EINTR ) continue; + logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); + break; + } + if ( ret <= 0 ) { + logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); + break; + } + done += (size_t)ret; + } + if ( dnbd3_fdatasync( fd ) == -1 ) { + logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); + } + close( fd ); + free( map ); + return true; +} + |