#include "cowfile.h" #include "main.h" #include "connection.h" #include #include #include #include #include #include #include #include #include #include #include #define UUID_STRLEN 36 // Maximum assumed page size, in case the cow data gets transferred between different architectures // 16k should be the largest minimum in existence (Itanium) #define MAX_PAGE_SIZE 16384 extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi ); static const int CURRENT_COW_VERSION = 3; static bool statStdout; static bool statFile; static pthread_t tidCowUploader; static pthread_t tidStatUpdater; static const char *cowServerAddress; static CURL *curl; static cowfile_metadata_header_t *metadata = NULL; static atomic_uint_fast64_t bytesUploaded; static uint64_t totalBlocksUploaded = 0; static int activeUploads = 0; static int uploadLoopThrottle = 0; static atomic_bool uploadLoop = true; // Keep upload loop running? static atomic_bool uploadLoopDone = false; // Upload loop has finished all work? static atomic_bool uploadCancelled = false; // Skip uploading remaining blocks static struct curl_slist *uploadHeaders = NULL; static struct cow { char *metadata_mmap; l1 *l1; l2 *l2; int fdMeta; int fdData; int fdStats; pthread_mutex_t l2CreateLock; } cow; static size_t curlHeaderCallbackUploadBlock( char *buffer, size_t size, size_t nitems, void *userdata ); static int countOneBits( atomic_uchar *bf, int numBytes ) { int bitCount = 0; for ( int i = 0; i < numBytes; ++i ) { unsigned char value = bf[i]; while ( value > 0 ) { if ( ( value & 1 ) == 1 ) { bitCount++; } value >>= 1; } } return bitCount; } #define IS_4K_ALIGNED(v) ( ( (uint64_t)(v) & DNBD3_BLOCK_MASK ) == 0 ) static bool writeAll( int fd, const char *buf, size_t count, off_t offset ) { while ( count > 0 ) { ssize_t ret = pwrite( fd, buf, count, offset ); if ( ret == (ssize_t)count ) return true; if ( ret == -1 ) { if ( errno == EINTR ) continue; return false; } if ( ret == 0 ) return false; count -= ret; buf += ret; } return true; } /** * @brief Computes the l1 index for an absolute file offset * * @param offset absolute file offset * @return int l1 index */ static int offsetToL1Index( size_t offset ) { return (int)( offset / COW_FULL_L2_TABLE_DATA_SIZE ); } /** * @brief Computes the l2 index for an absolute file offset * * @param offset absolute file offset * @return int l2 index */ static int offsetToL2Index( size_t offset ) { return (int)( ( offset % COW_FULL_L2_TABLE_DATA_SIZE ) / COW_DATA_CLUSTER_SIZE ); } /** * @brief Computes the bit in the bitfield from the absolute file offset * * @param offset absolute file offset * @return int bit(0-319) in the bitfield */ static int getBitfieldOffsetBit( size_t offset ) { return (int)( offset / DNBD3_BLOCK_SIZE ) % ( COW_BITFIELD_SIZE * 8 ); } /** * @brief Sets the specified bits in the specified range threadsafe to 1. * * @param byte of a bitfield * @param from start bit * @param to end bit * @param value set bits to 1 or 0 */ static void setBits( atomic_uchar *byte, int64_t from, int64_t to, bool value ) { char mask = (char)( ( 255 >> ( 7 - ( to - from ) ) ) << from ); if ( value ) { atomic_fetch_or( byte, mask ); } else { atomic_fetch_and( byte, ~mask ); } } /** * @brief Sets the specified bits in the specified range threadsafe to 1. * * @param bitfield of a cow_l2_entry * @param from start bit * @param to end bit * @param value set bits to 1 or 0 */ static void setBitsInBitfield( atomic_uchar *bitfield, int64_t from, int64_t to, bool value ) { assert( from >= 0 && to < COW_BITFIELD_SIZE * 8 ); int64_t start = from / 8; int64_t end = to / 8; for ( int64_t i = start; i <= end; i++ ) { setBits( ( bitfield + i ), from - i * 8, MIN( 7, to - i * 8 ), value ); from = ( i + 1 ) * 8; } } /** * @brief Checks if the n bit of a bit field is 0 or 1. * * @param bitfield of a cow_l2_entry * @param n the bit which should be checked */ static bool checkBit( atomic_uchar *bitfield, int64_t n ) { return ( bitfield[n / 8] >> ( n % 8 ) ) & 1; } /** * Generic callback for writing received data to a 500 byte buffer. * MAKE SURE THE BUFFER IS EMPTY AT THE START! (i.e. buffer[0] = '\0') */ static size_t curlWriteCb500( char *buffer, size_t itemSize, size_t nitems, void *userpointer ) { char *dest = (char*)userpointer; size_t done = strlen( dest ); size_t bytes = itemSize * nitems; assert( done < 500 ); if ( done < 499 ) { size_t n = MIN( bytes, 499 - done ); memcpy( dest + done, buffer, n ); dest[done + n] = '\0'; } return bytes; } /** * @brief Create a Session with the cow server and gets the session uuid. */ static bool createSession( const char *imageName, uint16_t rid ) { CURLcode res; char url[COW_URL_STRING_SIZE]; char body[1000], reply[500]; const char *nameEsc; curl_easy_reset( curl ); snprintf( url, COW_URL_STRING_SIZE, COW_API_CREATE, cowServerAddress ); logadd( LOG_INFO, "COW_API_CREATE URL: %s", url ); curl_easy_setopt( curl, CURLOPT_POST, 1L ); curl_easy_setopt( curl, CURLOPT_URL, url ); nameEsc = curl_easy_escape( curl, imageName, 0 ); if ( nameEsc == NULL ) { logadd( LOG_ERROR, "Error escaping imageName" ); nameEsc = imageName; // Hope for the best } snprintf( body, sizeof body, "revision=%d&bitfieldSize=%d&imageName=%s", (int)rid, (int)metadata->bitfieldSize, nameEsc ); if ( nameEsc != imageName ) { curl_free( (char*)nameEsc ); } curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body ); reply[0] = '\0'; curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, curlWriteCb500 ); curl_easy_setopt( curl, CURLOPT_WRITEDATA, reply ); res = curl_easy_perform( curl ); /* Check for errors */ if ( res != CURLE_OK ) { logadd( LOG_ERROR, "COW_API_CREATE failed: curl says %s", curl_easy_strerror( res ) ); return false; } long http_code = 0; curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code ); if ( http_code < 200 || http_code >= 300 ) { logadd( LOG_ERROR, "COW_API_CREATE failed: http code %ld, %s", http_code, reply ); return false; } if ( strlen( reply ) > UUID_STRLEN ) { logadd( LOG_ERROR, "Returned session id is too long: '%s'", reply ); return false; } strncpy( metadata->uuid, reply, sizeof(metadata->uuid) ); logadd( LOG_DEBUG1, "Cow session started, uuid: %s", metadata->uuid ); return true; } /** * @brief Implementation of CURLOPT_READFUNCTION, this function will first send the bit field and * then the block data in one bitstream. this function is usually called multiple times per block, * because the buffer is usually not large for one block and its bitfield. * for more details see: https://curl.se/libcurl/c/CURLOPT_READFUNCTION.html * * @param ptr to the buffer * @param size of one element in buffer * @param nmemb number of elements in buffer * @param userdata from CURLOPT_READFUNCTION * @return size_t size written in buffer */ static size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata ) { cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata; size_t len = 0; // Check if we're still in the bitfield if ( uploadBlock->position < COW_BITFIELD_SIZE ) { size_t lenCpy = MIN( COW_BITFIELD_SIZE - uploadBlock->position, size * nmemb ); memcpy( ptr + uploadBlock->position, uploadBlock->bitfield + uploadBlock->position, lenCpy ); uploadBlock->position += lenCpy; len += lenCpy; } // No elseif here, might just have crossed over... if ( uploadBlock->position >= COW_BITFIELD_SIZE ) { // Subtract the bitfield size from everything first off_t inClusterOffset = uploadBlock->position - COW_BITFIELD_SIZE; ssize_t spaceLeft = ( size * nmemb ) - len; // Only read blocks that have been written to the cluster. Saves bandwidth. Not optimal since // we do a lot of 4k/32k reads, but it's not that performance critical I guess... while ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE && inClusterOffset < (off_t)COW_DATA_CLUSTER_SIZE ) { int bitNumber = (int)( inClusterOffset / DNBD3_BLOCK_SIZE ); size_t readSize; // Small performance hack: All bits one in a byte, do a 32k instead of 4k read // TODO: preadv with a large iov, reading unchanged blocks into a trash-buffer if ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE * 8 && bitNumber % 8 == 0 && uploadBlock->bitfield[bitNumber / 8] == 0xff ) { readSize = DNBD3_BLOCK_SIZE * 8; } else { readSize = DNBD3_BLOCK_SIZE; } // If handling single block, check bits in our copy, as global bitfield could change if ( readSize != DNBD3_BLOCK_SIZE || checkBit( uploadBlock->bitfield, bitNumber ) ) { ssize_t lengthRead = pread( cow.fdData, ( ptr + len ), readSize, uploadBlock->cluster->offset + inClusterOffset ); if ( lengthRead == -1 ) { if ( errno == EAGAIN ) continue; logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno ); return CURL_READFUNC_ABORT; } if ( lengthRead != (ssize_t)readSize ) { logadd( LOG_ERROR, "Upload: Reading from COW file failed with short read (%d/%d)", (int)lengthRead, (int)readSize ); return CURL_READFUNC_ABORT; } len += lengthRead; spaceLeft -= lengthRead; } inClusterOffset += readSize; uploadBlock->position += readSize; } } return len; } /** * @brief Requests the merging of the image on the cow server. */ static bool postMergeRequest() { CURLcode res; char url[COW_URL_STRING_SIZE]; char body[500], reply[500]; char *uuid; curl_easy_reset( curl ); snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress ); curl_easy_setopt( curl, CURLOPT_URL, url ); curl_easy_setopt( curl, CURLOPT_POST, 1L ); curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, curlWriteCb500 ); curl_easy_setopt( curl, CURLOPT_WRITEDATA, reply ); uuid = curl_easy_escape( curl, metadata->uuid, 0 ); if ( uuid == NULL ) { logadd( LOG_ERROR, "Error escaping uuid" ); uuid = metadata->uuid; // Hope for the best } snprintf( body, sizeof body, "originalFileSize=%"PRIu64"&newFileSize=%"PRIu64"&uuid=%s", metadata->validRemoteSize, metadata->imageSize, uuid ); if ( uuid != metadata->uuid ) { curl_free( uuid ); } curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body ); reply[0] = '\0'; res = curl_easy_perform( curl ); if ( res != CURLE_OK ) { logadd( LOG_WARNING, "COW_API_START_MERGE failed. curl reported: %s", curl_easy_strerror( res ) ); return false; } long http_code = 0; curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code ); if ( http_code < 200 || http_code >= 300 ) { logadd( LOG_WARNING, "COW_API_START_MERGE failed with http: %ld: %s", http_code, reply ); return false; } return true; } /** * @brief Wrapper for postMergeRequest so if its fails it will be tried again. * */ static void requestRemoteMerge() { int fails = 0; bool success = false; success = postMergeRequest(); while ( fails <= 5 && !success ) { fails++; logadd( LOG_WARNING, "Trying again. %i/5", fails ); sleep( 10 ); postMergeRequest(); } } /** * @brief Implementation of the CURLOPT_XFERINFOFUNCTION. * For more infos see: https://curl.se/libcurl/c/CURLOPT_XFERINFOFUNCTION.html * * Each active transfer callbacks this function. * This function computes the uploaded bytes between each call and adds it to * bytesUploaded, which is used to compute the kb/s uploaded over all transfers. * * @param ulNow number of bytes uploaded by this transfer so far. * @return int always returns 0 to continue the callbacks. */ static int progress_callback( void *clientp, UNUSED curl_off_t dlTotal, UNUSED curl_off_t dlNow, UNUSED curl_off_t ulTotal, curl_off_t ulNow ) { cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t *)clientp; bytesUploaded += ( ulNow - uploadingCluster->ulLast ); uploadingCluster->ulLast = ulNow; return 0; } #ifdef COW_DUMP_BLOCK_UPLOADS static int cmpfunc( const void *a, const void *b ) { return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads ); } /** * @brief Writes all block numbers sorted by the number of uploads into the statsfile. * */ static void dumpBlockUploads() { long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE]; uint64_t currentBlock = 0; for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { if ( cow.l1[l1Index] == -1 ) { continue; } for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); blockUploads[currentBlock].uploads = block->uploads; blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); currentBlock++; } } qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc ); dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" ); for ( uint64_t i = 0; i < currentBlock; i++ ) { dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads ); } } #endif /** * @brief Updates the status to the stdout/statfile depending on the startup parameters. * * @param inQueue Blocks that have changes old enough to be uploaded. * @param modified Blocks that have been changed but whose changes are not old enough to be uploaded. * @param idle Blocks that do not contain changes that have not yet been uploaded. * @param speedBuffer ptr to char array that contains the current upload speed. */ static void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer ) { char buffer[300]; const char *state; if ( uploadLoop ) { state = "backgroundUpload"; } else if ( !uploadLoopDone ) { state = "uploading"; } else { state = "done"; } int len = snprintf( buffer, sizeof buffer, "[General]\n" "state=%s\n" "inQueue=%" PRIu64 "\n" "modifiedClusters=%" PRIu64 "\n" "idleClusters=%" PRIu64 "\n" "totalClustersUploaded=%" PRIu64 "\n" "activeUploads=%i\n" "%s%s\n", state, inQueue, modified, idle, totalBlocksUploaded, activeUploads, COW_SHOW_UL_SPEED ? "avgSpeedKb=" : "", speedBuffer ); if ( len == -1 ) { logadd( LOG_ERROR, "snprintf error" ); return; } if ( statStdout ) { logadd( LOG_INFO, "%s", buffer ); } if ( statFile ) { // Pad with a bunch of newlines so we don't change the file size all the time ssize_t extra = MIN( 20, (ssize_t)sizeof(buffer) - len - 1 ); memset( buffer + len, '\n', extra ); lseek( cow.fdStats, 43, SEEK_SET ); if ( write( cow.fdStats, buffer, len + extra ) != len + extra ) { logadd( LOG_WARNING, "Could not update cow status file" ); } #ifdef COW_DUMP_BLOCK_UPLOADS if ( !uploadLoop && uploadLoopDone ) { dumpBlockUploads(); } #endif } } /** * @brief Starts the upload of a given block. * * @param cm Curl_multi * @param uploadingCluster containing the data for the block to upload. */ static bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster ) { CURL *eh = curl_easy_init(); char url[COW_URL_STRING_SIZE]; snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber ); curl_easy_setopt( eh, CURLOPT_URL, url ); curl_easy_setopt( eh, CURLOPT_POST, 1L ); curl_easy_setopt( eh, CURLOPT_HEADERFUNCTION, curlHeaderCallbackUploadBlock ); curl_easy_setopt( eh, CURLOPT_HEADERDATA, (void *)uploadingCluster ); curl_easy_setopt( eh, CURLOPT_READFUNCTION, curlReadCallbackUploadBlock ); curl_easy_setopt( eh, CURLOPT_READDATA, (void *)uploadingCluster ); curl_easy_setopt( eh, CURLOPT_PRIVATE, (void *)uploadingCluster ); // min upload speed of 1kb/s over 10 sec otherwise the upload is canceled. curl_easy_setopt( eh, CURLOPT_LOW_SPEED_TIME, 10L ); curl_easy_setopt( eh, CURLOPT_LOW_SPEED_LIMIT, 1000L ); curl_easy_setopt( eh, CURLOPT_POSTFIELDSIZE_LARGE, (long)( COW_BITFIELD_SIZE + DNBD3_BLOCK_SIZE * countOneBits( uploadingCluster->bitfield, COW_BITFIELD_SIZE ) ) ); if ( COW_SHOW_UL_SPEED ) { uploadingCluster->ulLast = 0; curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L ); curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback ); curl_easy_setopt( eh, CURLOPT_XFERINFODATA, uploadingCluster ); } curl_easy_setopt( eh, CURLOPT_HTTPHEADER, uploadHeaders ); curl_multi_add_handle( cm, eh ); return true; } static size_t curlHeaderCallbackUploadBlock( char *buffer, size_t size, size_t nitems, void *userdata ) { size_t len, offset; int delay; cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t*)userdata; // If the "Retry-After" header is set, we interpret this as the server being overloaded // or not ready yet to take another update. We slow down our upload loop then. // We'll only accept a delay in seconds here, not an HTTP Date string. // Otherwise, increase the fails counter. len = size * nitems; if ( len < 13 ) return len; for ( int i = 0; i < 11; ++i ) { buffer[i] |= 0x60; } if ( strncmp( buffer, "retry-after:", 12 ) != 0 ) return len; offset = 12; while ( offset + 1 < len && buffer[offset] == ' ' ) { offset++; } delay = atoi( buffer + offset ); if ( delay > 0 ) { if ( delay > 120 ) { // Cap to two minutes delay = 120; } uploadLoopThrottle = MAX( uploadLoopThrottle, delay ); uploadingCluster->retryTime = delay; } return len; } /** * @brief After an upload completes, either successful or unsuccessful this * function cleans everything up. If unsuccessful and there are some tries left * retries to upload the block. * * @param cm Curl_multi * @param msg CURLMsg * @return true returned if the upload was successful or retries are still possible. * @return false returned if the upload was unsuccessful. */ static bool clusterUploadDoneHandler( CURLM *cm, CURLMsg *msg ) { bool success = false; cow_curl_read_upload_t *uploadingCluster; CURLcode res; CURLcode res2; res = curl_easy_getinfo( msg->easy_handle, CURLINFO_PRIVATE, &uploadingCluster ); long http_code = 0; res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code ); if ( msg->msg != CURLMSG_DONE ) { logadd( LOG_ERROR, "multi_message->msg unexpectedly not DONE (%d)", (int)msg->msg ); } else if ( msg->data.result != CURLE_OK ) { logadd( LOG_ERROR, "curl_easy returned non-OK after multi-finish: %s", curl_easy_strerror( msg->data.result ) ); } else if ( res != CURLE_OK || res2 != CURLE_OK ) { logadd( LOG_ERROR, "curl_easy_getinfo failed after multifinish (%d, %d)", (int)res, (int)res2 ); } else if ( http_code == 503 ) { if ( uploadingCluster->retryTime > 0 ) { logadd( LOG_INFO, "COW server is asking to backoff for %d seconds", uploadingCluster->retryTime ); } else { logadd( LOG_ERROR, "COW server returned 503 without Retry-After value" ); } } else if ( http_code < 200 || http_code >= 300 ) { logadd( LOG_ERROR, "COW server returned HTTP %ld", http_code ); } else { // everything went ok, reset timeChanged of underlying cluster, but only if it // didn't get updated again in the meantime. atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, 0 ); uploadingCluster->cluster->uploads++; totalBlocksUploaded++; success = true; } if ( !success ) { uploadingCluster->cluster->fails++; if ( uploadingCluster->retryTime > 0 ) { // Don't reset timeChanged timestamp, so the next iteration of uploadModifiedClusters // will queue this upload again after the throttle time expired. } else { logadd( LOG_ERROR, "Uploading cluster failed %i/5 times", uploadingCluster->cluster->fails ); // Pretend the block changed again just now, to prevent immediate retry atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, time( NULL ) ); } } curl_multi_remove_handle( cm, msg->easy_handle ); curl_easy_cleanup( msg->easy_handle ); free( uploadingCluster ); return success; } /** * @param cm Curl_multi * @param activeUploads ptr to integer which holds the number of current uploads * @param minNumberUploads break out of loop as soon as there are less than these many transfers running * else COW_MAX_PARALLEL_BACKGROUND_UPLOADS. * @return true returned if all uploads were successful * @return false returned if one ore more upload failed. */ static bool curlMultiLoop( CURLM *cm, int minNumberUploads ) { CURLMsg *msg; int msgsLeft = -1; bool status = true; if ( minNumberUploads <= 0 ) { minNumberUploads = 1; } for ( ;; ) { CURLMcode mc = curl_multi_perform( cm, &activeUploads ); if ( mc != CURLM_OK ) { logadd( LOG_ERROR, "curl_multi_perform error %d, bailing out", (int)mc ); status = false; break; } while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) { if ( !clusterUploadDoneHandler( cm, msg ) ) { status = false; } } if ( activeUploads < minNumberUploads ) { break; } // ony wait if there are active uploads if ( activeUploads > 0 ) { mc = curl_multi_wait( cm, NULL, 0, 1000, NULL ); if ( mc != CURLM_OK ) { logadd( LOG_ERROR, "curl_multi_wait error %d, bailing out", (int)mc ); status = false; break; } } } return status; } /** * @brief loops through all blocks and uploads them. * * @param ignoreMinUploadDelay If true uploads all blocks that have changes while * ignoring COW_MIN_UPLOAD_DELAY * @param cm Curl_multi * @return true if all blocks uploaded successful * @return false if one ore more blocks failed to upload */ bool uploadModifiedClusters( bool ignoreMinUploadDelay, CURLM *cm ) { bool success = true; const time_t now = time( NULL ); long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); // Iterate over all blocks, L1 first for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { if ( cow.l1[l1Index] == -1 ) { continue; // Not allocated } // Now all L2 clusters for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { cow_l2_entry_t *cluster = ( cow.l2[cow.l1[l1Index]] + l2Index ); if ( cluster->offset == -1 ) { continue; // Not allocated } if ( cluster->timeChanged == 0 ) { continue; // Not changed } if ( !ignoreMinUploadDelay && ( now - cluster->timeChanged < COW_MIN_UPLOAD_DELAY ) ) { continue; // Last change not old enough } // Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached int minUploads = ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS; if ( !curlMultiLoop( cm, minUploads ) ) { success = false; } // Maybe one of the uploads was rejected by the server asking us to slow down a bit. // Check for that case and don't trigger a new upload. if ( uploadLoopThrottle > 0 ) { goto DONE; } cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) ); b->cluster = cluster; b->clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); b->position = 0; b->retryTime = 0; b->time = cluster->timeChanged; // Copy, so it doesn't change during upload // when we assemble the data in curlReadCallbackUploadBlock() for ( int i = 0; i < COW_BITFIELD_SIZE; ++i ) { b->bitfield[i] = cluster->bitfield[i]; } addUpload( cm, b ); if ( !ignoreMinUploadDelay && !uploadLoop ) { goto DONE; } } } DONE: // Finish all the transfers still active while ( activeUploads > 0 ) { if ( !curlMultiLoop( cm, 1 ) ) { success = false; break; } } return success; } /** * @brief Computes the data for the status to the stdout/statfile every COW_STATS_UPDATE_TIME seconds. * */ void *cowfile_statUpdater( UNUSED void *something ) { uint64_t lastUpdateTime = time( NULL ); time_t now; char speedBuffer[20]; while ( !uploadLoopDone ) { int modified = 0; int inQueue = 0; int idle = 0; long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); now = time( NULL ); for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { if ( cow.l1[l1Index] == -1 ) { continue; } for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); if ( block->offset == -1 ) { continue; } if ( block->timeChanged != 0 ) { if ( !uploadLoop || now > block->timeChanged + COW_MIN_UPLOAD_DELAY ) { inQueue++; } else { modified++; } } else { idle++; } } } if ( COW_SHOW_UL_SPEED ) { double delta; double bytes = (double)atomic_exchange( &bytesUploaded, 0 ); now = time( NULL ); delta = (double)( now - lastUpdateTime ); lastUpdateTime = now; if ( delta > 0 ) { snprintf( speedBuffer, sizeof speedBuffer, "%.2f", bytes / 1000.0 / delta ); } } updateCowStatsFile( inQueue, modified, idle, speedBuffer ); sleep( COW_STATS_UPDATE_TIME ); } return NULL; } void quitSigHandler( int sig UNUSED ) { uploadCancelled = true; uploadLoop = false; main_shutdown(); } /** * @brief main loop for blockupload in the background */ static void *uploaderThreadMain( UNUSED void *something ) { CURLM *cm; cm = curl_multi_init(); curl_multi_setopt( cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ); do { // Unblock so this very thread gets the signal for abandoning the upload struct sigaction newHandler = { .sa_handler = &quitSigHandler }; sigemptyset( &newHandler.sa_mask ); sigaction( SIGQUIT, &newHandler, NULL ); sigset_t sigmask; sigemptyset( &sigmask ); sigaddset( &sigmask, SIGQUIT ); pthread_sigmask( SIG_UNBLOCK, &sigmask, NULL ); } while ( 0 ); while ( uploadLoop ) { while ( uploadLoopThrottle > 0 && uploadLoop ) { sleep( 1 ); uploadLoopThrottle--; } sleep( 2 ); if ( !uploadLoop ) break; uploadModifiedClusters( false, cm ); } if ( uploadCancelled ) { uploadLoopDone = true; logadd( LOG_INFO, "Not uploading remaining clusters, SIGQUIT received" ); } else { // force the upload of all remaining blocks because the user dismounted the image logadd( LOG_INFO, "Start uploading the remaining clusters." ); if ( !uploadModifiedClusters( true, cm ) ) { uploadLoopDone = true; logadd( LOG_ERROR, "One or more clusters failed to upload" ); } else { uploadLoopDone = true; logadd( LOG_DEBUG1, "All clusters uploaded" ); if ( cow_merge_after_upload ) { requestRemoteMerge(); logadd( LOG_DEBUG1, "Requesting merge" ); } } } curl_multi_cleanup( cm ); return NULL; } /** * @brief Create a Cow Stats File an inserts the session uuid * * @param path where the file is created * @return true * @return false if failed to create or to write into the file */ static bool createCowStatsFile( char *path ) { char pathStatus[strlen( path ) + 12]; snprintf( pathStatus, strlen( path ) + 12, "%s%s", path, "/status.txt" ); char buffer[100]; int len = snprintf( buffer, 100, "uuid=%s\nstate: active\n", metadata->uuid ); if ( statStdout ) { logadd( LOG_INFO, "%s", buffer ); } if ( statFile ) { if ( ( cow.fdStats = open( pathStatus, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) { logadd( LOG_ERROR, "Could not create cow status file. Bye.\n" ); return false; } if ( pwrite( cow.fdStats, buffer, len, 0 ) != len ) { logadd( LOG_ERROR, "Could not write to cow status file. Bye.\n" ); return false; } } return true; } static bool commonInit( const char* serverAddress, const char *cowUuid ) { CURLcode m; if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) { logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid ); return false; } uploadHeaders = curl_slist_append( uploadHeaders, "Content-Type: application/octet-stream" ); pthread_mutex_init( &cow.l2CreateLock, NULL ); cowServerAddress = serverAddress; if ( ( m = curl_global_init( CURL_GLOBAL_ALL ) ) != CURLE_OK ) { logadd( LOG_ERROR, "curl_global_init failed: %s", curl_easy_strerror( m ) ); return false; } curl = curl_easy_init(); if ( curl == NULL ) { logadd( LOG_ERROR, "Error on curl_easy_init" ); return false; } return true; } /** * @brief initializes the cow functionality, creates the data & meta file. * * @param path where the files should be stored * @param image_Name name of the original file/image * @param imageSizePtr * @param cowUuid optional, use given UUID for talking to COW server instead of creating session */ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sfile, const char *cowUuid ) { char pathMeta[strlen( path ) + 6]; char pathData[strlen( path ) + 6]; if ( !commonInit( serverAddress, cowUuid ) ) return false; statStdout = sStdout; statFile = sfile; snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" ); snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" ); if ( ( cow.fdMeta = open( pathMeta, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR ) ) == -1 ) { logadd( LOG_ERROR, "Could not create cow meta file. Bye.\n %s \n", pathMeta ); return false; } if ( ( cow.fdData = open( pathData, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR ) ) == -1 ) { logadd( LOG_ERROR, "Could not create cow data file. Bye.\n" ); return false; } struct stat fs; if ( fstat( cow.fdData, &fs ) == -1 || fs.st_size != 0 ) { logadd( LOG_ERROR, "/data file already exists and is not empty" ); return false; } size_t metaDataSizeHeader = sizeof( cowfile_metadata_header_t ); // Calculate how many full l2 tables we need to address COW_MAX_IMAGE_SIZE size_t l1NumEntries = ( ( COW_MAX_IMAGE_SIZE + COW_FULL_L2_TABLE_DATA_SIZE - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); // Make sure l1 and l2 are aligned to struct size size_t sizeL1 = sizeof(cow.l1[0]); size_t sizeL2 = sizeof(cow.l2[0]); size_t startL1 = ( ( metaDataSizeHeader + sizeL1 - 1 ) / sizeL1 ) * sizeL1; size_t startL2 = ( ( startL1 + l1NumEntries * sizeL1 + sizeL2 - 1 ) / sizeL2 ) * sizeL2; // size of l1 array + number of l2's * size of l2 size_t ps = getpagesize(); if ( ps == 0 || ps > INT_MAX ) { logadd( LOG_ERROR, "Cannot get native page size, aborting..." ); return false; } size_t metaSize = ( ( startL2 + l1NumEntries * sizeof( l2 ) + ps - 1 ) / ps ) * ps; if ( ftruncate( cow.fdMeta, metaSize ) != 0 ) { logadd( LOG_ERROR, "Could not set file size of meta data file (errno=%d). Bye.\n", errno ); return false; } cow.metadata_mmap = mmap( NULL, metaSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fdMeta, 0 ); if ( cow.metadata_mmap == MAP_FAILED ) { logadd( LOG_ERROR, "Error while mmap()ing meta data, errno=%d", errno ); return false; } metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap ); metadata->magicValue = COW_FILE_META_MAGIC_VALUE; metadata->imageSize = **imageSizePtr; metadata->version = CURRENT_COW_VERSION; metadata->validRemoteSize = **imageSizePtr; metadata->startL1 = (uint32_t)startL1; metadata->startL2 = (uint32_t)startL2; metadata->bitfieldSize = COW_BITFIELD_SIZE; metadata->nextL2 = 0; metadata->metaSize = ATOMIC_VAR_INIT( metaSize ); metadata->nextClusterOffset = ATOMIC_VAR_INIT( COW_DATA_CLUSTER_SIZE ); metadata->maxImageSize = COW_MAX_IMAGE_SIZE; metadata->creationTime = time( NULL ); snprintf( metadata->imageName, 200, "%s", image_Name ); cow.l1 = (l1 *)( cow.metadata_mmap + startL1 ); cow.l2 = (l2 *)( cow.metadata_mmap + startL2 ); for ( size_t i = 0; i < l1NumEntries; i++ ) { cow.l1[i] = -1; } // write header to data file uint64_t header = COW_FILE_DATA_MAGIC_VALUE; if ( pwrite( cow.fdData, &header, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) { logadd( LOG_ERROR, "Could not write header to cow data file. Bye.\n" ); return false; } if ( cowUuid != NULL ) { snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid ); logadd( LOG_INFO, "Using provided upload session id" ); } else if ( !createSession( image_Name, imageVersion ) ) { return false; } createCowStatsFile( path ); *imageSizePtr = &metadata->imageSize; return true; } /** * @brief loads an existing cow state from the meta & data files * * @param path where the meta & data file is located * @param imageSizePtr */ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid ) { char pathMeta[strlen( path ) + 6]; char pathData[strlen( path ) + 6]; if ( !commonInit( serverAddress, cowUuid ) ) return false; statStdout = sStdout; statFile = sFile; snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" ); snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" ); if ( ( cow.fdMeta = open( pathMeta, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) { logadd( LOG_ERROR, "Could not open cow meta file. Bye.\n" ); return false; } if ( ( cow.fdData = open( pathData, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) { logadd( LOG_ERROR, "Could not open cow data file. Bye.\n" ); return false; } cowfile_metadata_header_t header; { size_t sizeToRead = sizeof( cowfile_metadata_header_t ); size_t readBytes = 0; while ( readBytes < sizeToRead ) { ssize_t bytes = pread( cow.fdMeta, ( ( &header ) + readBytes ), sizeToRead - readBytes, 0 ); if ( bytes <= 0 ) { logadd( LOG_ERROR, "Error while reading meta file header. Bye.\n" ); return false; } readBytes += bytes; } if ( header.magicValue != COW_FILE_META_MAGIC_VALUE ) { if ( __builtin_bswap64( header.magicValue ) == COW_FILE_META_MAGIC_VALUE ) { logadd( LOG_ERROR, "cow meta file of wrong endianess. Bye.\n" ); return false; } logadd( LOG_ERROR, "cow meta file of unkown format. Bye.\n" ); return false; } if ( header.bitfieldSize != COW_BITFIELD_SIZE ) { logadd( LOG_ERROR, "cow meta file has unexpected bitfield size %d", (int)header.bitfieldSize ); return false; } if ( header.startL1 >= header.startL2 || header.startL2 >= header.metaSize ) { logadd( LOG_ERROR, "l1/l2 offset messed up in metadata." ); return false; } struct stat st; fstat( cow.fdMeta, &st ); if ( st.st_size < (off_t)header.metaSize ) { logadd( LOG_ERROR, "cow meta file too small. Bye." ); return false; } } { uint64_t magicValueDataFile; if ( pread( cow.fdData, &magicValueDataFile, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) { logadd( LOG_ERROR, "Error while reading cow data file, wrong file?. Bye." ); return false; } if ( magicValueDataFile != COW_FILE_DATA_MAGIC_VALUE ) { if ( __builtin_bswap64( magicValueDataFile ) == COW_FILE_DATA_MAGIC_VALUE ) { logadd( LOG_ERROR, "cow data file of wrong endianess. Bye." ); return false; } logadd( LOG_ERROR, "cow data file of unkown format. Bye." ); return false; } struct stat st; fstat( cow.fdData, &st ); // add cluster size, since we don't preallocate if ( header.nextClusterOffset > st.st_size + (int)COW_DATA_CLUSTER_SIZE ) { logadd( LOG_ERROR, "cow data file too small. Expected=%jd, Is=%jd.", (intmax_t)header.nextClusterOffset, (intmax_t)st.st_size ); return false; } } cow.metadata_mmap = mmap( NULL, header.metaSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fdMeta, 0 ); if ( cow.metadata_mmap == MAP_FAILED ) { logadd( LOG_ERROR, "Error while mapping mmap, errno=%d.", errno ); return false; } if ( header.version != CURRENT_COW_VERSION ) { logadd( LOG_ERROR, "Error wrong file version got: %i expected: %i. Bye.", metadata->version, CURRENT_COW_VERSION ); return false; } metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap ); if ( cowUuid != NULL ) { logadd( LOG_INFO, "Overriding stored upload session id with provided one" ); snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid ); } *imageSizePtr = &metadata->imageSize; cow.l1 = (l1 *)( cow.metadata_mmap + metadata->startL1 ); cow.l2 = (l2 *)( cow.metadata_mmap + metadata->startL2 ); createCowStatsFile( path ); return true; } /** * @brief Starts the cow BackgroundThreads which are needed for stats and data upload * */ bool cowfile_startBackgroundThreads() { if( pthread_create( &tidCowUploader, NULL, &uploaderThreadMain, NULL ) != 0 ) { logadd( LOG_ERROR, "Could not create cow uploader thread"); return false; } if ( statFile || statStdout ) { if(pthread_create( &tidStatUpdater, NULL, &cowfile_statUpdater, NULL ) != 0 ) { logadd( LOG_ERROR, "Could not create stat updater thread"); return false; } } return true; } /** * Check if block at given offset is local, i.e. has been modified. * @param meta The cow_l2_entry for the according cluster MUST be provided * @param offset offset of data, can be absolute image offset as it will be transformed into cluster offset */ static bool isBlockLocal( cow_l2_entry_t *meta, off_t offset ) { if ( meta == NULL ) return false; return checkBit( meta->bitfield, ( offset % COW_DATA_CLUSTER_SIZE ) / DNBD3_BLOCK_SIZE ); } /** * @brief Get the cow_l2_entry_t from l1Index and l2Index. * l1 offset must be valid * * @param l1Index * @param l2Index * @return cow_l2_entry_t* */ static cow_l2_entry_t *getL2Entry( int l1Index, int l2Index, bool create ) { if ( cow.l1[l1Index] == -1 ) return NULL; cow_l2_entry_t *block = cow.l2[cow.l1[l1Index]] + l2Index; if ( block->offset == -1 ) { if ( !create ) return NULL; block->offset = atomic_fetch_add( &metadata->nextClusterOffset, COW_DATA_CLUSTER_SIZE ); } return block; } /** * @brief creates an new L2 table and initializes the containing cow_l2_entry_t * * @param l1Index */ static bool createL2Table( int l1Index ) { pthread_mutex_lock( &cow.l2CreateLock ); if ( cow.l1[l1Index] == -1 ) { int idx = metadata->nextL2++; for ( int i = 0; i < COW_L2_TABLE_SIZE; i++ ) { cow.l2[idx][i].offset = -1; cow.l2[idx][i].timeChanged = ATOMIC_VAR_INIT( 0 ); cow.l2[idx][i].uploads = ATOMIC_VAR_INIT( 0 ); for ( int j = 0; j < COW_BITFIELD_SIZE; j++ ) { cow.l2[idx][i].bitfield[j] = ATOMIC_VAR_INIT( 0 ); } } cow.l1[l1Index] = idx; } pthread_mutex_unlock( &cow.l2CreateLock ); return true; } /** * @brief Is called once a fuse write request ist finished. * Calls the corrsponding fuse reply depending on the type and * success of the request. * * @param req fuse_req_t * @param cowRequest */ static void finishWriteRequest( fuse_req_t req, cow_request_t *cowRequest ) { if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) != 1 ) return; // More sub-requests are pending, bail out if ( cowRequest->errorCode != 0 ) { fuse_reply_err( req, cowRequest->errorCode ); } else { uint64_t newSize = cowRequest->bytesWorkedOn + cowRequest->fuseRequestOffset; if ( newSize > metadata->imageSize ) { uint64_t oldSize; do { oldSize = metadata->imageSize; newSize = MAX( oldSize, newSize ); } while ( !atomic_compare_exchange_weak( &metadata->imageSize, &oldSize, newSize ) ); } fuse_reply_write( req, cowRequest->bytesWorkedOn ); } free( cowRequest ); } /** * @brief Called after the padding data was received from the dnbd3 server. * The data from the write request will be combined with the data from the server * so that we get a full DNBD3_BLOCK and is then written on the disk. * @param sRequest */ static void writePaddedBlock( cow_sub_request_t *sRequest ) { assert( ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ) + sRequest->size <= DNBD3_BLOCK_SIZE ); // Here, we again check if the block is written locally - there might have been a second write // that wrote the full block, hence didn't have to wait for remote data and finished faster. // In that case, don't pad from remote as we'd overwrite newer data. if ( isBlockLocal( sRequest->cluster, sRequest->inClusterOffset ) ) { logadd( LOG_INFO, "It happened!" ); } else { // copy write Data // writeBuffer is the received data, patch data from fuse write into it memcpy( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ), sRequest->writeSrc, sRequest->size ); if ( !writeAll( cow.fdData, sRequest->writeBuffer, DNBD3_BLOCK_SIZE, sRequest->cluster->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) { sRequest->cowRequest->errorCode = errno; } else { sRequest->cowRequest->bytesWorkedOn += sRequest->size; int64_t bit = sRequest->inClusterOffset / DNBD3_BLOCK_SIZE; setBitsInBitfield( sRequest->cluster->bitfield, bit, bit, true ); sRequest->cluster->timeChanged = time( NULL ); } } finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest ); free( sRequest ); } /** * @brief If a block does not start or finish on an multiple of DNBD3_BLOCK_SIZE, the blocks need to be * padded. If this block is inside the original image size, the padding data will be read from the server. * Otherwise it will be padded with 0 since the it must be a block after the end of the image. * @param req fuse_req_t * @param cowRequest cow_request_t * @param startOffset Absolute offset where the real data starts * @param endOffset Absolute offset where the real data ends * @param srcBuffer pointer to the data that needs to be padded, ie. data from user space. */ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest, off_t startOffset, off_t endOffset, const char *srcBuffer ) { // Make sure we pad exactly one block endOffset = MIN( (uint64_t)endOffset, ( startOffset + DNBD3_BLOCK_SIZE ) & ~DNBD3_BLOCK_MASK ); assert( startOffset < endOffset ); size_t size = (size_t)( endOffset - startOffset ); int l1Index = offsetToL1Index( startOffset ); int l2Index = offsetToL2Index( startOffset ); off_t inClusterOffset = startOffset % COW_DATA_CLUSTER_SIZE; cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, true ); if ( isBlockLocal( cluster, startOffset ) ) { // No padding at all, keep existing data bool ret = writeAll( cow.fdData, srcBuffer, size, cluster->offset + inClusterOffset ); if ( ret ) { cowRequest->bytesWorkedOn += size; cluster->timeChanged = time( NULL ); } return ret; } // Not local, need some form of padding createL2Table( l1Index ); if ( cluster == NULL ) { cluster = getL2Entry( l1Index, l2Index, true ); } uint64_t validImageSize = metadata->validRemoteSize; // As we don't lock if ( startOffset >= (off_t)validImageSize ) { // After end of remote valid data, pad with zeros entirely char buf[DNBD3_BLOCK_SIZE] = {0}; off_t start = startOffset % DNBD3_BLOCK_SIZE; assert( start + size <= DNBD3_BLOCK_SIZE ); memcpy( buf + start, srcBuffer, size ); bool ret = writeAll( cow.fdData, buf, DNBD3_BLOCK_SIZE, cluster->offset + ( inClusterOffset & ~DNBD3_BLOCK_MASK ) ); if ( ret ) { int64_t bit = inClusterOffset / DNBD3_BLOCK_SIZE; setBitsInBitfield( cluster->bitfield, bit, bit, true ); cowRequest->bytesWorkedOn += size; cluster->timeChanged = time( NULL ); } return ret; } // Need to fetch padding from upstream, allocate struct plus one block cow_sub_request_t *sub = calloc( sizeof( *sub ) + DNBD3_BLOCK_SIZE, 1 ); sub->callback = writePaddedBlock; sub->inClusterOffset = inClusterOffset; sub->cluster = cluster; sub->size = size; sub->writeSrc = srcBuffer; sub->cowRequest = cowRequest; sub->buffer = sub->writeBuffer; sub->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, validImageSize - startOffset ); sub->dRequest.offset = startOffset & ~DNBD3_BLOCK_MASK; sub->dRequest.fuse_req = req; if ( !connection_read( &sub->dRequest ) ) { free( sub ); errno = ENOTSOCK; return false; } atomic_fetch_add( &cowRequest->workCounter, 1 ); return true; } /** * @brief Will be called after a dnbd3_async_t is finished. * Calls the corrsponding callback function, either writePaddedBlock or readRemoteCallback * depending if the original fuse request was a write or read. * */ void cowfile_handleCallback( dnbd3_async_t *request ) { cow_sub_request_t *sRequest = container_of( request, cow_sub_request_t, dRequest ); sRequest->callback( sRequest ); } /** * @brief called once dnbd3_async_t is finished. Increases bytesWorkedOn by the number of bytes * this request had. Also checks if it was the last dnbd3_async_t to finish the fuse request, if * so replys to fuse and cleans up the request. * */ static void readRemoteCallback( cow_sub_request_t *sRequest ) { atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length ); if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) { if ( sRequest->cowRequest->bytesWorkedOn != sRequest->cowRequest->fuseRequestSize ) { // Because connection_read() will always return exactly as many bytes as requested, // or simply never finish. logadd( LOG_ERROR, "BUG? Pad read has invalid size. worked on: %"PRIu64", request size: %" PRIu64", offset: %"PRIu64, (uint64_t)sRequest->cowRequest->bytesWorkedOn, (uint64_t)sRequest->cowRequest->fuseRequestSize, (uint64_t)sRequest->cowRequest->fuseRequestOffset ); fuse_reply_err( sRequest->dRequest.fuse_req, EIO ); } else { fuse_reply_buf( sRequest->dRequest.fuse_req, sRequest->cowRequest->readBuffer, sRequest->cowRequest->bytesWorkedOn ); } free( sRequest->cowRequest->readBuffer ); free( sRequest->cowRequest ); } free( sRequest ); } /** * @brief changes the imageSize * * @param req fuse request * @param size new size the image should have * @param ino fuse_ino_t * @param fi fuse_file_info */ void cowfile_setSize( fuse_req_t req, size_t size, fuse_ino_t ino, struct fuse_file_info *fi ) { if ( size < metadata->imageSize ) { // truncate file if ( size < metadata->validRemoteSize ) { metadata->validRemoteSize = size; } } else if ( size > metadata->imageSize ) { // grow file, pad with zeroes off_t offset = metadata->imageSize; int l1Index = offsetToL1Index( offset ); int l2Index = offsetToL2Index( offset ); int l1EndIndex = offsetToL1Index( size ); int l2EndIndex = offsetToL2Index( size ); // Special case, first cluster through which the size change passes cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, false ); if ( cluster != NULL ) { off_t inClusterOffset = offset % COW_DATA_CLUSTER_SIZE; // if the new size is inside a DNBD3_BLOCK it might still contain old data before a truncate if ( !IS_4K_ALIGNED( metadata->imageSize ) ) { size_t sizeToWrite = DNBD3_BLOCK_SIZE - ( metadata->imageSize % DNBD3_BLOCK_SIZE ); if ( checkBit( cluster->bitfield, inClusterOffset / DNBD3_BLOCK_SIZE ) ) { char buf[DNBD3_BLOCK_SIZE] = {0}; ssize_t bytesWritten = pwrite( cow.fdData, buf, sizeToWrite, cluster->offset + inClusterOffset ); if ( bytesWritten < (ssize_t)sizeToWrite ) { fuse_reply_err( req, bytesWritten == -1 ? errno : EIO ); return; } cluster->timeChanged = time( NULL ); offset += sizeToWrite; } } // all remaining bits in cluster will get set to 0 inClusterOffset = offset % COW_DATA_CLUSTER_SIZE; setBitsInBitfield( cluster->bitfield, inClusterOffset / DNBD3_BLOCK_SIZE, ( COW_BITFIELD_SIZE * 8 ) - 1, false ); cluster->timeChanged = time( NULL ); l2Index++; if ( l2Index >= COW_L2_TABLE_SIZE ) { l2Index = 0; l1Index++; } } // normal case, if clusters exist, null bitfields while ( l1Index < l1EndIndex || ( l1Index == l1EndIndex && l2Index <= l2EndIndex ) ) { if ( cow.l1[l1Index] == -1 ) { l1Index++; l2Index = 0; continue; } cluster = getL2Entry( l1Index, l2Index, false ); if ( cluster != NULL ) { memset( cluster->bitfield, 0, COW_BITFIELD_SIZE ); cluster->timeChanged = time( NULL ); } l2Index++; if ( l2Index >= COW_L2_TABLE_SIZE ) { l2Index = 0; l1Index++; } } } metadata->imageSize = size; if ( req != NULL ) { image_ll_getattr( req, ino, fi ); } } /** * @brief Implementation of a write request. * * @param req fuse_req_t * @param cowRequest * @param offset Offset where the write starts, * @param size Size of the write. */ void cowfile_write( fuse_req_t req, cow_request_t *cowRequest, off_t offset, size_t size ) { // if beyond end of file, pad with 0 if ( offset > (off_t)metadata->imageSize ) { cowfile_setSize( NULL, offset, 0, NULL ); } off_t currentOffset = offset; off_t endOffset = offset + size; if ( !IS_4K_ALIGNED( currentOffset ) ) { // Handle case where start is not 4k aligned if ( !padBlockForWrite( req, cowRequest, currentOffset, endOffset, cowRequest->writeBuffer ) ) { goto fail; } // Move forward to next block border currentOffset = ( currentOffset + DNBD3_BLOCK_SIZE ) & ~DNBD3_BLOCK_MASK; } if ( currentOffset < endOffset && !IS_4K_ALIGNED( endOffset ) ) { // Handle case where end is not 4k aligned off_t lastBlockStart = endOffset & ~DNBD3_BLOCK_MASK; if ( !padBlockForWrite( req, cowRequest, lastBlockStart, endOffset, cowRequest->writeBuffer + ( lastBlockStart - offset ) ) ) { goto fail; } endOffset = lastBlockStart; } // From here on start and end are block-aligned int l1Index = offsetToL1Index( currentOffset ); int l2Index = offsetToL2Index( currentOffset ); while ( currentOffset < endOffset ) { if ( cow.l1[l1Index] == -1 ) { createL2Table( l1Index ); } //loop over L2 array (metadata) while ( currentOffset < endOffset && l2Index < COW_L2_TABLE_SIZE ) { cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, true ); size_t inClusterOffset = currentOffset % COW_DATA_CLUSTER_SIZE; // How many bytes we can write to this cluster before crossing a boundary, // or before the write request is complete size_t bytesToWriteToCluster = MIN( (size_t)( endOffset - currentOffset ), COW_DATA_CLUSTER_SIZE - inClusterOffset ); if ( !writeAll( cow.fdData, cowRequest->writeBuffer + ( currentOffset - offset ), bytesToWriteToCluster, cluster->offset + inClusterOffset ) ) { goto fail; } int64_t f = inClusterOffset / DNBD3_BLOCK_SIZE; int64_t t = ( inClusterOffset + bytesToWriteToCluster - 1 ) / DNBD3_BLOCK_SIZE; setBitsInBitfield( cluster->bitfield, f, t, true ); cowRequest->bytesWorkedOn += bytesToWriteToCluster; currentOffset += bytesToWriteToCluster; cluster->timeChanged = time( NULL ); l2Index++; } l1Index++; l2Index = 0; } goto success; fail: if ( cowRequest->errorCode == 0 ) { cowRequest->errorCode = errno != 0 ? errno : EIO; } success: finishWriteRequest( req, cowRequest ); } /** * @brief Request data, that is not available locally, via the network. * * @param req fuse_req_t * @param offset from the start of the file * @param size of data to request * @param buffer into which the data is to be written * @param cowRequest cow_request_t */ static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer, cow_request_t *cowRequest ) { assert( offset < (off_t)metadata->validRemoteSize ); assert( offset + size <= (off_t)metadata->validRemoteSize ); if ( size == 0 ) return; assert( size > 0 ); cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) ); sRequest->callback = readRemoteCallback; sRequest->dRequest.length = (uint32_t)size; sRequest->dRequest.offset = offset; sRequest->dRequest.fuse_req = req; sRequest->cowRequest = cowRequest; sRequest->buffer = buffer; atomic_fetch_add( &cowRequest->workCounter, 1 ); if ( !connection_read( &sRequest->dRequest ) ) { cowRequest->errorCode = EIO; free( sRequest ); if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) { fuse_reply_err( req, EIO ); free( cowRequest->readBuffer ); free( cowRequest ); } } } /** * @brief Get the Block Data Source object * * @param block * @param bitfieldOffset * @param offset * @return enum dataSource */ enum dataSource getBlockDataSource( cow_l2_entry_t *block, off_t bitfieldOffset, off_t offset ) { if ( block != NULL && checkBit( block->bitfield, bitfieldOffset ) ) { return ds_local; } if ( offset >= (off_t)metadata->validRemoteSize ) { return ds_zero; } return ds_remote; } /** * @brief Reads data at given offset. If the data are available locally, * they are read locally, otherwise they are requested remotely. * * @param req fuse_req_t * @param size of date to read * @param offset offset where the read starts. * @return uint64_t Number of bytes read. */ void cowfile_read( fuse_req_t req, size_t size, off_t startOffset ) { cow_request_t *cowRequest = malloc( sizeof( cow_request_t ) ); cowRequest->fuseRequestSize = size; cowRequest->bytesWorkedOn = ATOMIC_VAR_INIT( 0 ); cowRequest->workCounter = ATOMIC_VAR_INIT( 1 ); cowRequest->errorCode = ATOMIC_VAR_INIT( 0 ); cowRequest->readBuffer = calloc( size, 1 ); cowRequest->fuseRequestOffset = startOffset; off_t lastReadOffset = -1; off_t endOffset = startOffset + size; off_t searchOffset = startOffset; int l1Index = offsetToL1Index( startOffset ); int l2Index = offsetToL2Index( startOffset ); int bitfieldOffset = getBitfieldOffsetBit( startOffset ); cow_l2_entry_t *cluster = getL2Entry( l1Index, l2Index, false ); enum dataSource dataState = ds_invalid; bool flushCurrentSpan = false; // Set if we need to read the current span and start the next one bool newSourceType = true; // Set if we're starting a new span, and the source type needs to be determined while ( searchOffset < endOffset ) { if ( newSourceType ) { newSourceType = false; lastReadOffset = searchOffset; dataState = getBlockDataSource( cluster, bitfieldOffset, searchOffset ); } else if ( getBlockDataSource( cluster, bitfieldOffset, searchOffset ) != dataState ) { // Source type changed, obviously need to flush current span flushCurrentSpan = true; } else { bitfieldOffset++; // If reading from local cow file, crossing a cluster border means we need to flush // since the next cluster might be somewhere else in the data file if ( dataState == ds_local && bitfieldOffset == COW_BITFIELD_SIZE * 8 ) { flushCurrentSpan = true; } } // compute the absolute image offset from bitfieldOffset, l2Index and l1Index // bitfieldOffset might be out of bounds here, but that doesn't matter for the calculation searchOffset = DNBD3_BLOCK_SIZE * bitfieldOffset + l2Index * COW_DATA_CLUSTER_SIZE + l1Index * COW_FULL_L2_TABLE_DATA_SIZE; if ( flushCurrentSpan || searchOffset >= endOffset ) { ssize_t spanEndOffset = MIN( searchOffset, endOffset ); if ( dataState == ds_remote ) { if ( spanEndOffset > (ssize_t)metadata->validRemoteSize ) { // Account for bytes we leave zero, because they're beyond the (truncated) original image size atomic_fetch_add( &cowRequest->bytesWorkedOn, spanEndOffset - metadata->validRemoteSize ); spanEndOffset = metadata->validRemoteSize; } readRemote( req, lastReadOffset, spanEndOffset - lastReadOffset, cowRequest->readBuffer + ( lastReadOffset - startOffset ), cowRequest ); } else if ( dataState == ds_zero ) { // Past end of image, account for leaving them zero ssize_t numBytes = spanEndOffset - lastReadOffset; atomic_fetch_add( &cowRequest->bytesWorkedOn, numBytes ); } else if ( dataState == ds_local ) { ssize_t numBytes = spanEndOffset - lastReadOffset; // Compute the startOffset in the data file where the read starts off_t localRead = cluster->offset + ( lastReadOffset % COW_DATA_CLUSTER_SIZE ); ssize_t totalBytesRead = 0; while ( totalBytesRead < numBytes ) { ssize_t bytesRead = pread( cow.fdData, cowRequest->readBuffer + ( lastReadOffset - startOffset ), numBytes - totalBytesRead, localRead + totalBytesRead ); if ( bytesRead == -1 ) { cowRequest->errorCode = errno; goto fail; } else if ( bytesRead == 0 ) { logadd( LOG_ERROR, "EOF for read at localRead=%"PRIu64", totalBR=%"PRIu64, (uint64_t)localRead, (uint64_t)totalBytesRead ); logadd( LOG_ERROR, "searchOffset=%"PRIu64", endOffset=%"PRIu64", imageSize=%"PRIu64, searchOffset, endOffset, metadata->imageSize ); cowRequest->errorCode = EIO; goto fail; } totalBytesRead += bytesRead; } atomic_fetch_add( &cowRequest->bytesWorkedOn, numBytes ); } else { assert( 4 == 6 ); } lastReadOffset = searchOffset; flushCurrentSpan = false; // Since the source type changed, reset newSourceType = true; } if ( bitfieldOffset == COW_BITFIELD_SIZE * 8 ) { // Advance to next cluster in current l2 table bitfieldOffset = 0; l2Index++; if ( l2Index >= COW_L2_TABLE_SIZE ) { // Advance to next l1 entry, reset l2 index l2Index = 0; l1Index++; } cluster = getL2Entry( l1Index, l2Index, false ); } } fail:; if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) { if ( cowRequest->errorCode != 0 || cowRequest->bytesWorkedOn != size ) { logadd( LOG_ERROR, "incomplete read or I/O error (errno=%d, workedOn: %"PRIu64", size: %"PRIu64")", cowRequest->errorCode, (uint64_t)cowRequest->bytesWorkedOn, (uint64_t)size ); fuse_reply_err( req, cowRequest->errorCode != 0 ? cowRequest->errorCode : EIO ); } else { fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn ); } free( cowRequest->readBuffer ); free( cowRequest ); } } /** * @brief stops the StatUpdater and CowUploader threads * and waits for them to finish, then cleans up curl. * */ void cowfile_close() { uploadLoop = false; pthread_join( tidCowUploader, NULL ); if ( statFile || statStdout ) { // Send a signal in case it's hanging in the sleep call pthread_kill( tidStatUpdater, SIGHUP ); pthread_join( tidStatUpdater, NULL ); } curl_slist_free_all( uploadHeaders ); if ( curl ) { curl_easy_cleanup( curl ); curl_global_cleanup(); } }