From 2507c2bbf312ba34200719842997f5d272d71777 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 9 Jul 2018 23:42:06 +0200 Subject: [SERVER] Introduce backgroundReplication=hashblock This is a compromise; if you want to validate replicated data fairly quickly, using this option will make background replication only kick in when there's a "dirty" 16M block, i.e. some blocks within a 16M block are cached locally, but not all. Completing the block makes it possible to validate its CRC32 checksum. --- src/server/uplink.c | 154 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 121 insertions(+), 33 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 2bd6ed2..59f3494 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -14,11 +14,18 @@ #include #include +#define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 ) +#define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE ) +#define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) ) + +#define REP_NONE ( (uint64_t)0xffffffffffffffff ) + static uint64_t totalBytesReceived = 0; static pthread_spinlock_t statisticsReceivedLock; static void* uplink_mainloop(void *data); static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly); +static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int lastBlockIndex); static void uplink_handleReceive(dnbd3_connection_t *link); static int uplink_sendKeepalive(const int fd); static void uplink_addCrc32(dnbd3_connection_t *uplink); @@ -73,7 +80,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version link->fd = -1; link->cacheFd = -1; link->signal = NULL; - link->replicationHandle = 0; + link->replicationHandle = REP_NONE; spin_lock( &link->rttLock ); link->cycleDetected = false; if ( sock >= 0 ) { @@ -324,7 +331,7 @@ static void* uplink_mainloop(void *data) spin_unlock( &link->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); - link->replicationHandle = 0; + link->replicationHandle = REP_NONE; link->image->working = true; link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; @@ -398,7 +405,7 @@ static void* uplink_mainloop(void *data) } if ( link->idleCount % 2 == 0 ) { // Save cache map only if we don't seem busy handling actual client requests - if ( link->fd != -1 && link->replicationHandle == 0 ) { + if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { // Send keep alive if nothing is happening if ( uplink_sendKeepalive( link->fd ) ) { // Re-trigger periodically, in case it requires a minimum user count @@ -425,6 +432,9 @@ static void* uplink_mainloop(void *data) } else { // Not complete - do measurement altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) + if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == REP_NONE ) { + link->nextReplicationIndex = 0; + } } altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); timing_set( &nextAltCheck, &now, altCheckInterval ); @@ -555,38 +565,110 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) */ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) { - if ( !_backgroundReplication || link->cacheFd == -1 ) return; // Don't do background replication if ( link == NULL || link->fd == -1 ) return; + if ( _backgroundReplication == BGR_DISABLED || link->cacheFd == -1 ) return; // Don't do background replication + if ( link->nextReplicationIndex == -1 || link->replicationHandle != REP_NONE ) + return; dnbd3_image_t * const image = link->image; - if ( image->realFilesize < DNBD3_BLOCK_SIZE ) return; + if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return; spin_lock( &image->lock ); - if ( image == NULL || image->cache_map == NULL || link->replicationHandle != 0 || image->users < _bgrMinClients ) { + if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) { // No cache map (=image complete), or replication pending, or not enough users, do nothing spin_unlock( &image->lock ); return; } - const int len = IMGSIZE_TO_MAPBYTES( image->realFilesize ) - 1; - // Needs to be 8 (bit->byte, bitmap) - const uint32_t requestBlockSize = DNBD3_BLOCK_SIZE * 8; - for ( int j = 0; j <= len; ++j ) { - const int i = ( j + link->nextReplicationIndex ) % ( len + 1 ); - if ( image->cache_map == NULL || link->fd == -1 ) break; - if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue; - link->replicationHandle = 1; // Prevent race condition - spin_unlock( &image->lock ); - // Unlocked - do not break or continue here... - const uint64_t offset = link->replicationHandle = (uint64_t)i * (uint64_t)requestBlockSize; - const uint32_t size = (uint32_t)MIN( image->realFilesize - offset, requestBlockSize ); - if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) { - logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); - return; + const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + const int lastBlockIndex = mapBytes - 1; + int endByte; + if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks + endByte = link->nextReplicationIndex + mapBytes; + } else { // Hashblock based: Only look for match in current hash block + endByte = ( link->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; + if ( endByte > mapBytes ) { + endByte = mapBytes; + } + } + int replicationIndex = -1; + for ( int j = link->nextReplicationIndex; j < endByte; ++j ) { + const int i = j % ( mapBytes ); // Wrap around for BGR_FULL + if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !link->replicatedLastBlock ) ) { + // Found incomplete one + replicationIndex = i; + break; } - link->nextReplicationIndex = i + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter - if ( i == len ) link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks - return; // Request was sent, bail out, nothing is locked } spin_unlock( &image->lock ); - // Replication might be complete, uplink_mainloop should take care.... + if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { + // Nothing left in current block, find next one + replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte ); + } + if ( replicationIndex == -1 ) { + // Replication might be complete, uplink_mainloop should take care.... + link->nextReplicationIndex = -1; + return; + } + const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; + link->replicationHandle = offset; + const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); + if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) { + logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); + return; + } + if ( replicationIndex == lastBlockIndex ) { + link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks + } + link->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter + if ( _backgroundReplication == BGR_HASHBLOCK + && link->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { + // Just crossed a hash block boundary, look for new candidate starting at this very index + link->nextReplicationIndex = uplink_findNextIncompleteHashBlock( link, link->nextReplicationIndex ); + } +} + +/** + * find next index into cache_map that corresponds to the beginning + * if a hash block which is neither completely empty nor completely + * replicated yet. Returns -1 if no match. + */ +static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex) +{ + int retval = -1; + spin_lock( &link->image->lock ); + const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize ); + const uint8_t *cache_map = link->image->cache_map; + if ( cache_map != NULL ) { + int j; + const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK ); + for (j = 0; j < mapBytes; ++j) { + const int i = ( start + j ) % mapBytes; + if ( cache_map[i] != 0 && cache_map[i] != 0xff ) { + // Neither full nor empty, replicate + if ( retval == -1 ) { + retval = i; + } + break; + } + if ( ( i & MAP_INDEX_HASH_START_MASK ) == i ) { + // Reset state if we just crossed into the next hash chunk + retval = ( cache_map[i] == 0 ) ? ( i ) : ( -1 ); + } else if ( cache_map[i] == 0xff ) { + if ( retval != -1 ) { + // It's a full one, previous one was empty -> replicate + break; + } + } else { // ( cache_map[i] == 0 ) + if ( retval == -1 ) { // Previous one was full -> replicate + retval = i; + break; + } + } + } + if ( j == mapBytes ) { // Nothing found, loop ran until end + retval = -1; + } + } + spin_unlock( &link->image->lock ); + return retval; } /** @@ -737,7 +819,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link) #endif if ( start == link->replicationHandle ) { // Was our background replication - link->replicationHandle = 0; + link->replicationHandle = REP_NONE; // Try to remove from fs cache if no client was interested in this data if ( !served && link->cacheFd != -1 ) { posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); @@ -745,19 +827,25 @@ static void uplink_handleReceive(dnbd3_connection_t *link) } else { // Was some client -- reset idle counter link->idleCount = 0; + // Re-enable replication if disabled + if ( link->nextReplicationIndex == -1 ) { + link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; + } } } - spin_lock( &link->queueLock ); - const bool rep = ( link->queueLen == 0 ); - spin_unlock( &link->queueLock ); - if ( rep ) uplink_sendReplicationRequest( link ); + if ( link->replicationHandle == REP_NONE ) { + spin_lock( &link->queueLock ); + const bool rep = ( link->queueLen == 0 ); + spin_unlock( &link->queueLock ); + if ( rep ) uplink_sendReplicationRequest( link ); + } return; // Error handling from failed receive or message parsing error_cleanup: ; altservers_serverFailed( &link->currentServer ); const int fd = link->fd; link->fd = -1; - link->replicationHandle = 0; + link->replicationHandle = REP_NONE; if ( fd != -1 ) close( fd ); altservers_findUplink( link ); // Can we just call it here? } @@ -779,8 +867,8 @@ static int uplink_sendKeepalive(const int fd) static void uplink_addCrc32(dnbd3_connection_t *uplink) { dnbd3_image_t *image = uplink->image; - if ( image == NULL || image->realFilesize == 0 ) return; - size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->realFilesize ) * sizeof(uint32_t); + if ( image == NULL || image->virtualFilesize == 0 ) return; + size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t); uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) { -- cgit v1.2.3-55-g7522