diff options
Diffstat (limited to 'src/fuse/cowfile.c')
-rw-r--r-- | src/fuse/cowfile.c | 1535 |
1 files changed, 1535 insertions, 0 deletions
diff --git a/src/fuse/cowfile.c b/src/fuse/cowfile.c new file mode 100644 index 0000000..965e6f4 --- /dev/null +++ b/src/fuse/cowfile.c @@ -0,0 +1,1535 @@ +#include "cowfile.h" +#include "math.h" +extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi ); + +static int cowFileVersion = 1; +static bool statStdout; +static bool statFile; +static pthread_t tidCowUploader; +static pthread_t tidStatUpdater; +static char *cowServerAddress; +static CURL *curl; +static cowfile_metadata_header_t *metadata = NULL; +static atomic_uint_fast64_t bytesUploaded; +static uint64_t totalBlocksUploaded = 0; +static atomic_int activeUploads = 0; +atomic_bool uploadLoop = true; +atomic_bool uploadLoopDone = false; + +static struct cow +{ + pthread_mutex_t l2CreateLock; + int fhm; + int fhd; + int fhs; + char *metadata_mmap; + l1 *l1; + l2 *firstL2; + size_t maxImageSize; + size_t l1Size; //size of l1 array + +} cow; + +/** + * @brief Computes the l1 offset from the absolute file offset + * + * @param offset absolute file offset + * @return int l2 offset + */ +static int getL1Offset( size_t offset ) +{ + return (int)( offset / COW_L2_STORAGE_CAPACITY ); +} + +/** + * @brief Computes the l2 offset from the absolute file offset + * + * @param offset absolute file offset + * @return int l2 offset + */ +static int getL2Offset( size_t offset ) +{ + return (int)( ( offset % COW_L2_STORAGE_CAPACITY ) / COW_METADATA_STORAGE_CAPACITY ); +} + +/** + * @brief Computes the bit in the bitfield from the absolute file offset + * + * @param offset absolute file offset + * @return int bit(0-319) in the bitfield + */ +static int getBitfieldOffset( size_t offset ) +{ + return (int)( offset / DNBD3_BLOCK_SIZE ) % ( COW_BITFIELD_SIZE * 8 ); +} + +/** + * @brief Sets the specified bits in the specified range threadsafe to 1. + * + * @param byte of a bitfield + * @param from start bit + * @param to end bit + * @param value set bits to 1 or 0 + */ +static void setBits( atomic_char *byte, int from, int to, bool value ) +{ + char mask = (char)( ( 255 >> ( 7 - ( to - from ) ) ) << from ); + if ( value ) { + atomic_fetch_or( byte, mask ); + } else { + atomic_fetch_and( byte, ~mask ); + } +} + +/** + * @brief Sets the specified bits in the specified range threadsafe to 1. + * + * @param bitfield of a cow_block_metadata + * @param from start bit + * @param to end bit + * @param value set bits to 1 or 0 + */ +static void setBitsInBitfield( atomic_char *bitfield, int from, int to, bool value ) +{ + assert( from >= 0 || to < COW_BITFIELD_SIZE * 8 ); + int start = from / 8; + int end = to / 8; + + for ( int i = start; i <= end; i++ ) { + setBits( ( bitfield + i ), from - i * 8, MIN( 7, to - i * 8 ), value ); + from = ( i + 1 ) * 8; + } +} + +/** + * @brief Checks if the n bit of a bit field is 0 or 1. + * + * @param bitfield of a cow_block_metadata + * @param n the bit which should be checked + */ +static bool checkBit( atomic_char *bitfield, int n ) +{ + return ( atomic_load( ( bitfield + ( n / 8 ) ) ) >> ( n % 8 ) ) & 1; +} + + +/** + * @brief Implementation of CURLOPT_WRITEFUNCTION , this function will be called when + * the server sends back data. + * for more details see: https://curl.se/libcurl/c/CURLOPT_WRITEFUNCTION .html + * + * @param buffer that contains the response data from the server + * @param itemSize size of one item + * @param nitems number of items + * @param response userdata which will later contain the uuid + * @return size_t size that have been read + */ +size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response ) +{ + size_t bytes = itemSize * nitems; + if ( strlen( response ) + bytes != 36 ) { + logadd( LOG_INFO, "strlen(response): %lu bytes: %lu \n", strlen( response ), bytes ); + return bytes; + } + + strncat( response, buffer, 36 ); + return bytes; +} + +/** + * @brief Create a Session with the cow server and gets the session guid. + * + * @param imageName + * @param version of the original Image + */ +bool createSession( const char *imageName, uint16_t version ) +{ + CURLcode res; + char url[COW_URL_STRING_SIZE]; + snprintf( url, COW_URL_STRING_SIZE, COW_API_CREATE, cowServerAddress ); + logadd( LOG_INFO, "COW_API_CREATE URL: %s", url ); + curl_easy_setopt( curl, CURLOPT_POST, 1L ); + curl_easy_setopt( curl, CURLOPT_URL, url ); + + curl_mime *mime; + curl_mimepart *part; + mime = curl_mime_init( curl ); + part = curl_mime_addpart( mime ); + curl_mime_name( part, "imageName" ); + curl_mime_data( part, imageName, CURL_ZERO_TERMINATED ); + part = curl_mime_addpart( mime ); + curl_mime_name( part, "version" ); + char buf[sizeof( int ) * 3 + 2]; + snprintf( buf, sizeof buf, "%d", version ); + curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); + + part = curl_mime_addpart( mime ); + curl_mime_name( part, "bitfieldSize" ); + snprintf( buf, sizeof buf, "%d", metadata->bitfieldSize ); + curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); + + curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime ); + + metadata->uuid[0] = '\0'; + curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, curlCallbackCreateSession ); + curl_easy_setopt( curl, CURLOPT_WRITEDATA, &metadata->uuid ); + + res = curl_easy_perform( curl ); + curl_mime_free( mime ); + + /* Check for errors */ + if ( res != CURLE_OK ) { + logadd( LOG_ERROR, "COW_API_CREATE failed: %s\n", curl_easy_strerror( res ) ); + return false; + } + + long http_code = 0; + curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code ); + if ( http_code != 200 ) { + logadd( LOG_ERROR, "COW_API_CREATE failed http: %ld\n", http_code ); + return false; + } + curl_easy_reset( curl ); + metadata->uuid[36] = '\0'; + logadd( LOG_DEBUG1, "Cow session started, guid: %s\n", metadata->uuid ); + return true; +} + +/** + * @brief Implementation of CURLOPT_READFUNCTION, this function will first send the bit field and + * then the block data in one bitstream. this function is usually called multiple times per block, + * because the buffer is usually not large for one block and its bitfield. + * for more details see: https://curl.se/libcurl/c/CURLOPT_READFUNCTION.html + * + * @param ptr to the buffer + * @param size of one element in buffer + * @param nmemb number of elements in buffer + * @param userdata from CURLOPT_READFUNCTION + * @return size_t size written in buffer + */ +size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata ) +{ + cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata; + size_t len = 0; + if ( uploadBlock->position < (size_t)metadata->bitfieldSize ) { + size_t lenCpy = MIN( metadata->bitfieldSize - uploadBlock->position, size * nmemb ); + memcpy( ptr, uploadBlock->block->bitfield + uploadBlock->position, lenCpy ); + uploadBlock->position += lenCpy; + len += lenCpy; + } + if ( uploadBlock->position >= (size_t)metadata->bitfieldSize ) { + size_t lenRead = MIN( COW_METADATA_STORAGE_CAPACITY - ( uploadBlock->position - ( metadata->bitfieldSize ) ), + ( size * nmemb ) - len ); + off_t inBlockOffset = uploadBlock->position - metadata->bitfieldSize; + size_t lengthRead = pread( cow.fhd, ( ptr + len ), lenRead, uploadBlock->block->offset + inBlockOffset ); + + if ( lenRead != lengthRead ) { + // fill up since last block may not be a full block + lengthRead = lenRead; + } + uploadBlock->position += lengthRead; + len += lengthRead; + } + return len; +} + + +/** + * @brief Requests the merging of the image on the cow server. + + */ +bool mergeRequest() +{ + CURLcode res; + curl_easy_setopt( curl, CURLOPT_POST, 1L ); + + char url[COW_URL_STRING_SIZE]; + snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress ); + curl_easy_setopt( curl, CURLOPT_URL, url ); + + + curl_mime *mime; + curl_mimepart *part; + mime = curl_mime_init( curl ); + + part = curl_mime_addpart( mime ); + curl_mime_name( part, "guid" ); + curl_mime_data( part, metadata->uuid, CURL_ZERO_TERMINATED ); + + part = curl_mime_addpart( mime ); + curl_mime_name( part, "originalFileSize" ); + char buf[21]; + snprintf( buf, sizeof buf, "%" PRIu64, metadata->originalImageSize ); + curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); + + part = curl_mime_addpart( mime ); + curl_mime_name( part, "newFileSize" ); + snprintf( buf, sizeof buf, "%" PRIu64, metadata->imageSize ); + curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); + + curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime ); + + + res = curl_easy_perform( curl ); + if ( res != CURLE_OK ) { + logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s\n", curl_easy_strerror( res ) ); + curl_easy_reset( curl ); + return false; + } + long http_code = 0; + curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code ); + if ( http_code != 200 ) { + logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld\n", http_code ); + curl_easy_reset( curl ); + return false; + } + curl_easy_reset( curl ); + curl_mime_free( mime ); + return true; +} + +/** + * @brief Wrapper for mergeRequest so if its fails it will be tried again. + * + */ +void startMerge() +{ + int fails = 0; + bool success = false; + success = mergeRequest(); + while ( fails <= 5 && !success ) { + fails++; + logadd( LOG_WARNING, "Trying again. %i/5", fails ); + mergeRequest(); + } +} + +/** + * @brief Implementation of the CURLOPT_XFERINFOFUNCTION. + * For more infos see: https://curl.se/libcurl/c/CURLOPT_XFERINFOFUNCTION.html + * + * Each active transfer callbacks this function. + * This function computes the uploaded bytes between each call and adds it to + * bytesUploaded, which is used to compute the kb/s uploaded over all transfers. + * + * @param clientp + * @param ulNow number of bytes uploaded by this transfer so far. + * @return int always returns 0 to continue the callbacks. + */ +int progress_callback( void *clientp, __attribute__( ( unused ) ) curl_off_t dlTotal, + __attribute__( ( unused ) ) curl_off_t dlNow, __attribute__( ( unused ) ) curl_off_t ulTotal, curl_off_t ulNow ) +{ + CURL *eh = (CURL *)clientp; + cow_curl_read_upload_t *curlUploadBlock; + CURLcode res; + res = curl_easy_getinfo( eh, CURLINFO_PRIVATE, &curlUploadBlock ); + if ( res != CURLE_OK ) { + logadd( LOG_ERROR, "ERROR" ); + return 0; + } + bytesUploaded += ( ulNow - curlUploadBlock->ulLast ); + curlUploadBlock->ulLast = ulNow; + return 0; +} + +/** + * @brief Updates the status to the stdout/statfile depending on the startup parameters. + * + * @param inQueue Blocks that have changes old enough to be uploaded. + * @param modified Blocks that have been changed but whose changes are not old enough to be uploaded. + * @param idle Blocks that do not contain changes that have not yet been uploaded. + * @param speedBuffer ptr to char array that contains the current upload speed. + */ + +void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer ) +{ + char buffer[300]; + char state[30]; + if ( uploadLoop ) { + snprintf( state, 30, "%s", "backgroundUpload" ); + } else if ( !uploadLoopDone ) { + snprintf( state, 30, "%s", "uploading" ); + } else { + snprintf( state, 30, "%s", "done" ); + } + + int len = snprintf( buffer, 300, + "state=%s\n" + "inQueue=%" PRIu64 "\n" + "modifiedBlocks=%" PRIu64 "\n" + "idleBlocks=%" PRIu64 "\n" + "totalBlocksUploaded=%" PRIu64 "\n" + "activeUploads:%i\n" + "%s=%s", + state, inQueue, modified, idle, totalBlocksUploaded, activeUploads, COW_SHOW_UL_SPEED ? "ulspeed" : "", + speedBuffer ); + + if ( statStdout ) { + logadd( LOG_INFO, "%s", buffer ); + } + + if ( statFile ) { + if ( pwrite( cow.fhs, buffer, len, 43 ) != len ) { + logadd( LOG_WARNING, "Could not update cow status file" ); + } + if ( ftruncate( cow.fhs, 43 + len ) ) { + logadd( LOG_WARNING, "Could not truncate cow status file" ); + } +#ifdef COW_DUMP_BLOCK_UPLOADS + if ( !uploadLoop && uploadLoopDone ) { + dumpBlockUploads(); + } +#endif + } +} +int cmpfunc( const void *a, const void *b ) +{ + return (int)( ( (cow_block_upload_statistics_t *)b )->uploads - ( (cow_block_upload_statistics_t *)a )->uploads ); +} +/** + * @brief Writes all block numbers sorted by the number of uploads into the statsfile. + * + */ +void dumpBlockUploads() +{ + long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_L2_STORAGE_CAPACITY ); + + cow_block_upload_statistics_t blockUploads[l1MaxOffset * COW_L2_SIZE]; + uint64_t currentBlock = 0; + for ( long unsigned int l1Offset = 0; l1Offset < l1MaxOffset; l1Offset++ ) { + if ( cow.l1[l1Offset] == -1 ) { + continue; + } + for ( int l2Offset = 0; l2Offset < COW_L2_SIZE; l2Offset++ ) { + cow_block_metadata_t *block = ( cow.firstL2[cow.l1[l1Offset]] + l2Offset ); + + blockUploads[currentBlock].uploads = block->uploads; + blockUploads[currentBlock].blocknumber = ( l1Offset * COW_L2_SIZE + l2Offset ); + currentBlock++; + } + } + qsort( blockUploads, currentBlock, sizeof( cow_block_upload_statistics_t ), cmpfunc ); + lseek( cow.fhs, 0, SEEK_END ); + + dprintf( cow.fhs, "\n\nblocknumber: uploads\n==Block Upload Dump===\n" ); + for ( uint64_t i = 0; i < currentBlock; i++ ) { + dprintf( cow.fhs, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].blocknumber, blockUploads[i].uploads ); + } +} + +/** + * @brief Starts the upload of a given block. + * + * @param cm Curl_multi + * @param curlUploadBlock containing the data for the block to upload. + */ +bool addUpload( CURLM *cm, cow_curl_read_upload_t *curlUploadBlock, struct curl_slist *headers ) +{ + CURL *eh = curl_easy_init(); + + char url[COW_URL_STRING_SIZE]; + + snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, curlUploadBlock->blocknumber ); + + curl_easy_setopt( eh, CURLOPT_URL, url ); + curl_easy_setopt( eh, CURLOPT_POST, 1L ); + curl_easy_setopt( eh, CURLOPT_READFUNCTION, curlReadCallbackUploadBlock ); + curl_easy_setopt( eh, CURLOPT_READDATA, (void *)curlUploadBlock ); + curl_easy_setopt( eh, CURLOPT_PRIVATE, (void *)curlUploadBlock ); + // min upload speed of 1kb/s over 10 sec otherwise the upload is canceled. + curl_easy_setopt( eh, CURLOPT_LOW_SPEED_TIME, 10L ); + curl_easy_setopt( eh, CURLOPT_LOW_SPEED_LIMIT, 1000L ); + + curl_easy_setopt( + eh, CURLOPT_POSTFIELDSIZE_LARGE, (long)( metadata->bitfieldSize + COW_METADATA_STORAGE_CAPACITY ) ); + if ( COW_SHOW_UL_SPEED ) { + curlUploadBlock->ulLast = 0; + curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L ); + curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback ); + curl_easy_setopt( eh, CURLOPT_XFERINFODATA, eh ); + } + curl_easy_setopt( eh, CURLOPT_HTTPHEADER, headers ); + curl_multi_add_handle( cm, eh ); + + return true; +} + +/** + * @brief After an upload completes, either successful or unsuccessful this + * function cleans everything up. If unsuccessful and there are some tries left + * retries to upload the block. + * + * @param cm Curl_multi + * @param msg CURLMsg + * @return true returned if the upload was successful or retries are still possible. + * @return false returned if the upload was unsuccessful. + */ +bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers ) +{ + bool status = true; + cow_curl_read_upload_t *curlUploadBlock; + CURLcode res; + CURLcode res2; + res = curl_easy_getinfo( msg->easy_handle, CURLINFO_PRIVATE, &curlUploadBlock ); + + long http_code = 0; + res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code ); + + if ( res != CURLE_OK || res2 != CURLE_OK || http_code != 200 || msg->msg != CURLMSG_DONE ) { + curlUploadBlock->fails++; + logadd( LOG_ERROR, "COW_API_UPDATE failed %i/5: %s\n", curlUploadBlock->fails, + curl_easy_strerror( msg->data.result ) ); + if ( curlUploadBlock->fails <= 5 ) { + addUpload( cm, curlUploadBlock, headers ); + goto CLEANUP; + } + free( curlUploadBlock ); + status = false; + goto CLEANUP; + } + + // everything went ok, update timeChanged + atomic_compare_exchange_strong( &curlUploadBlock->block->timeChanged, &curlUploadBlock->time, 0 ); + + curlUploadBlock->block->uploads++; + + totalBlocksUploaded++; + free( curlUploadBlock ); +CLEANUP: + curl_multi_remove_handle( cm, msg->easy_handle ); + curl_easy_cleanup( msg->easy_handle ); + return status; +} + +/** + * @brief + * + * @param cm Curl_multi + * @param activeUploads ptr to integer which holds the number of current uploads + * @param breakIfNotMax will return as soon as there are not all upload slots used, so they can be filled up. + * @param foregroundUpload used to determine the number of max uploads. If true COW_MAX_PARALLEL_UPLOADS will be the limit, + * else COW_MAX_PARALLEL_BACKGROUND_UPLOADS. + * @return true returned if all upload's were successful + * @return false returned if one ore more upload's failed. + */ +bool MessageHandler( + CURLM *cm, atomic_int *activeUploads, bool breakIfNotMax, bool foregroundUpload, struct curl_slist *headers ) +{ + CURLMsg *msg; + int msgsLeft = -1; + bool status = true; + do { + curl_multi_perform( cm, activeUploads ); + + while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) ) { + if ( !finishUpload( cm, msg, headers ) ) { + status = false; + } + } + if ( breakIfNotMax + && *activeUploads + < ( foregroundUpload ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) { + break; + } + // ony wait if there are active uploads + if ( *activeUploads ) { + curl_multi_wait( cm, NULL, 0, 1000, NULL ); + } + + } while ( *activeUploads ); + return status; +} + +/** + * @brief loops through all blocks and uploads them. + * + * @param ignoreMinUploadDelay If true uploads all blocks that have changes while + * ignoring COW_MIN_UPLOAD_DELAY + * @param cm Curl_multi + * @return true if all blocks uploaded successful + * @return false if one ore more blocks failed to upload + */ +bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm ) +{ + bool success = true; + struct curl_slist *headers = NULL; + headers = curl_slist_append( headers, "Content-Type: application/octet-stream" ); + + long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_L2_STORAGE_CAPACITY ); + for ( long unsigned int l1Offset = 0; l1Offset < l1MaxOffset; l1Offset++ ) { + if ( cow.l1[l1Offset] == -1 ) { + continue; + } + for ( int l2Offset = 0; l2Offset < COW_L2_SIZE; l2Offset++ ) { + cow_block_metadata_t *block = ( cow.firstL2[cow.l1[l1Offset]] + l2Offset ); + if ( block->offset == -1 ) { + continue; + } + if ( block->timeChanged != 0 ) { + if ( ( time( NULL ) - block->timeChanged > COW_MIN_UPLOAD_DELAY ) || ignoreMinUploadDelay ) { + do { + if ( !MessageHandler( cm, &activeUploads, true, ignoreMinUploadDelay, headers ) ) { + success = false; + } + } while ( !( activeUploads < ( ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS + : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) + && activeUploads ); + cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) ); + b->block = block; + b->blocknumber = ( l1Offset * COW_L2_SIZE + l2Offset ); + b->fails = 0; + b->position = 0; + b->time = block->timeChanged; + addUpload( cm, b, headers ); + if ( !ignoreMinUploadDelay && !uploadLoop ) { + goto DONE; + } + } + } + } + } +DONE: + while ( activeUploads > 0 ) { + MessageHandler( cm, &activeUploads, false, ignoreMinUploadDelay, headers ); + } + curl_slist_free_all( headers ); + return success; +} + + +/** + * @brief Computes the data for the status to the stdout/statfile every COW_STATS_UPDATE_TIME seconds. + * + */ + +void *cowfile_statUpdater( __attribute__( ( unused ) ) void *something ) +{ + uint64_t lastUpdateTime = time( NULL ); + + while ( !uploadLoopDone ) { + sleep( COW_STATS_UPDATE_TIME ); + int modified = 0; + int inQueue = 0; + int idle = 0; + long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_L2_STORAGE_CAPACITY ); + uint64_t now = time( NULL ); + for ( long unsigned int l1Offset = 0; l1Offset < l1MaxOffset; l1Offset++ ) { + if ( cow.l1[l1Offset] == -1 ) { + continue; + } + for ( int l2Offset = 0; l2Offset < COW_L2_SIZE; l2Offset++ ) { + cow_block_metadata_t *block = ( cow.firstL2[cow.l1[l1Offset]] + l2Offset ); + if ( block->offset == -1 ) { + continue; + } + if ( block->timeChanged != 0 ) { + if ( !uploadLoop || now > block->timeChanged + COW_MIN_UPLOAD_DELAY ) { + inQueue++; + } else { + modified++; + } + } else { + idle++; + } + } + } + char speedBuffer[20]; + + if ( COW_SHOW_UL_SPEED ) { + now = time( NULL ); + uint64_t bytes = atomic_exchange( &bytesUploaded, 0 ); + snprintf( speedBuffer, 20, "%.2f", (double)( ( bytes ) / ( 1 + now - lastUpdateTime ) / 1000 ) ); + + lastUpdateTime = now; + } + + + updateCowStatsFile( inQueue, modified, idle, speedBuffer ); + } +} + +/** + * @brief main loop for blockupload in the background + */ +void *cowfile_uploader( __attribute__( ( unused ) ) void *something ) +{ + CURLM *cm; + + cm = curl_multi_init(); + curl_multi_setopt( + cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ); + + + while ( uploadLoop ) { + uploaderLoop( false, cm ); + sleep( 2 ); + } + logadd( LOG_DEBUG1, "start uploading the remaining blocks." ); + + // force the upload of all remaining blocks because the user dismounted the image + if ( !uploaderLoop( true, cm ) ) { + logadd( LOG_ERROR, "one or more blocks failed to upload" ); + curl_multi_cleanup( cm ); + uploadLoopDone = true; + return NULL; + } + uploadLoopDone = true; + curl_multi_cleanup( cm ); + logadd( LOG_DEBUG1, "all blocks uploaded" ); + if ( cow_merge_after_upload ) { + startMerge(); + logadd( LOG_DEBUG1, "Requesting merge." ); + } + return NULL; +} + +/** + * @brief Create a Cow Stats File an inserts the session guid + * + * @param path where the file is created + * @return true + * @return false if failed to create or to write into the file + */ +bool createCowStatsFile( char *path ) +{ + char pathStatus[strlen( path ) + 12]; + + snprintf( pathStatus, strlen( path ) + 12, "%s%s", path, "/status.txt" ); + + char buffer[100]; + int len = snprintf( buffer, 100, "uuid=%s\nstate: active\n", metadata->uuid ); + if ( statStdout ) { + logadd( LOG_INFO, "%s", buffer ); + } + if ( statFile ) { + if ( ( cow.fhs = open( pathStatus, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) { + logadd( LOG_ERROR, "Could not create cow status file. Bye.\n" ); + return false; + } + + if ( pwrite( cow.fhs, buffer, len, 0 ) != len ) { + logadd( LOG_ERROR, "Could not write to cow status file. Bye.\n" ); + return false; + } + } + return true; +} + +/** + * @brief initializes the cow functionality, creates the data & meta file. + * + * @param path where the files should be stored + * @param image_Name name of the original file/image + * @param imageSizePtr + */ +bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, atomic_uint_fast64_t **imageSizePtr, + char *serverAddress, bool sStdout, bool sfile ) +{ + statStdout = sStdout; + statFile = sfile; + char pathMeta[strlen( path ) + 6]; + char pathData[strlen( path ) + 6]; + + snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" ); + snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" ); + + if ( ( cow.fhm = open( pathMeta, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) { + logadd( LOG_ERROR, "Could not create cow meta file. Bye.\n %s \n", pathMeta ); + return false; + } + + if ( ( cow.fhd = open( pathData, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) { + logadd( LOG_ERROR, "Could not create cow data file. Bye.\n" ); + return false; + } + + int maxPageSize = 8192; + + + size_t metaDataSizeHeader = sizeof( cowfile_metadata_header_t ) + strlen( image_Name ); + + + cow.maxImageSize = COW_MAX_IMAGE_SIZE; + cow.l1Size = ( ( cow.maxImageSize + COW_L2_STORAGE_CAPACITY - 1LL ) / COW_L2_STORAGE_CAPACITY ); + + // size of l1 array + number of l2's * size of l2 + size_t metadata_size = cow.l1Size * sizeof( l1 ) + cow.l1Size * sizeof( l2 ); + + // compute next fitting multiple of getpagesize() + size_t meta_data_start = ( ( metaDataSizeHeader + maxPageSize - 1 ) / maxPageSize ) * maxPageSize; + + size_t metadataFileSize = meta_data_start + metadata_size; + if ( pwrite( cow.fhm, "", 1, metadataFileSize ) != 1 ) { + logadd( LOG_ERROR, "Could not write cow meta_data_table to file. Bye.\n" ); + return false; + } + + cow.metadata_mmap = mmap( NULL, metadataFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fhm, 0 ); + + + if ( cow.metadata_mmap == MAP_FAILED ) { + logadd( LOG_ERROR, "Error while mapping mmap:\n%s \n Bye.\n", strerror( errno ) ); + return false; + } + + metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap ); + metadata->magicValue = COW_FILE_META_MAGIC_VALUE; + metadata->version = cowFileVersion; + metadata->dataFileSize = ATOMIC_VAR_INIT( 0 ); + metadata->metadataFileSize = ATOMIC_VAR_INIT( 0 ); + metadata->metadataFileSize = metadataFileSize; + metadata->blocksize = DNBD3_BLOCK_SIZE; + metadata->originalImageSize = **imageSizePtr; + metadata->imageSize = metadata->originalImageSize; + metadata->creationTime = time( NULL ); + *imageSizePtr = &metadata->imageSize; + metadata->metaDataStart = meta_data_start; + metadata->bitfieldSize = COW_BITFIELD_SIZE; + metadata->maxImageSize = cow.maxImageSize; + snprintf( metadata->imageName, 200, "%s", image_Name ); + cow.l1 = (l1 *)( cow.metadata_mmap + meta_data_start ); + metadata->nextL2 = 0; + + for ( size_t i = 0; i < cow.l1Size; i++ ) { + cow.l1[i] = -1; + } + cow.firstL2 = (l2 *)( ( (char *)cow.l1 ) + cow.l1Size ); + + // write header to data file + uint64_t header = COW_FILE_DATA_MAGIC_VALUE; + if ( pwrite( cow.fhd, &header, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) { + logadd( LOG_ERROR, "Could not write header to cow data file. Bye.\n" ); + return false; + } + // move the dataFileSize to make room for the header + atomic_store( &metadata->dataFileSize, COW_METADATA_STORAGE_CAPACITY ); + + pthread_mutex_init( &cow.l2CreateLock, NULL ); + + + cowServerAddress = serverAddress; + curl_global_init( CURL_GLOBAL_ALL ); + curl = curl_easy_init(); + if ( !curl ) { + logadd( LOG_ERROR, "Error on curl init. Bye.\n" ); + return false; + } + if ( !createSession( image_Name, imageVersion ) ) { + return false; + } + + createCowStatsFile( path ); + pthread_create( &tidCowUploader, NULL, &cowfile_uploader, NULL ); + if ( statFile || statStdout ) { + pthread_create( &tidStatUpdater, NULL, &cowfile_statUpdater, NULL ); + } + return true; +} + +/** + * @brief loads an existing cow state from the meta & data files + * + * @param path where the meta & data file is located + * @param imageSizePtr + */ +bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile ) +{ + statStdout = sStdout; + statFile = sFile; + cowServerAddress = serverAddress; + curl_global_init( CURL_GLOBAL_ALL ); + curl = curl_easy_init(); + char pathMeta[strlen( path ) + 6]; + char pathData[strlen( path ) + 6]; + + snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" ); + snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" ); + + + if ( ( cow.fhm = open( pathMeta, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR ) ) == -1 ) { + logadd( LOG_ERROR, "Could not open cow meta file. Bye.\n" ); + return false; + } + if ( ( cow.fhd = open( pathData, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR ) ) == -1 ) { + logadd( LOG_ERROR, "Could not open cow data file. Bye.\n" ); + return false; + } + + cowfile_metadata_header_t header; + { + size_t sizeToRead = sizeof( cowfile_metadata_header_t ); + size_t readBytes = 0; + while ( readBytes < sizeToRead ) { + ssize_t bytes = pread( cow.fhm, ( ( &header ) + readBytes ), sizeToRead, 0 ); + if ( bytes <= 0 ) { + logadd( LOG_ERROR, "Error while reading meta file header. Bye.\n" ); + return false; + } + readBytes += bytes; + } + + + if ( header.magicValue != COW_FILE_META_MAGIC_VALUE ) { + if ( __builtin_bswap64( header.magicValue ) == COW_FILE_META_MAGIC_VALUE ) { + logadd( LOG_ERROR, "cow meta file of wrong endianess. Bye.\n" ); + return false; + } + logadd( LOG_ERROR, "cow meta file of unkown format. Bye.\n" ); + return false; + } + struct stat st; + stat( pathMeta, &st ); + if ( (long)st.st_size < (long)header.metaDataStart + (long)header.nextL2 * (long)sizeof( l2 ) ) { + logadd( LOG_ERROR, "cow meta file to small. Bye.\n" ); + return false; + } + } + { + uint64_t magicValueDataFile; + if ( pread( cow.fhd, &magicValueDataFile, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) { + logadd( LOG_ERROR, "Error while reading cow data file, wrong file?. Bye.\n" ); + return false; + } + + if ( magicValueDataFile != COW_FILE_DATA_MAGIC_VALUE ) { + if ( __builtin_bswap64( magicValueDataFile ) == COW_FILE_DATA_MAGIC_VALUE ) { + logadd( LOG_ERROR, "cow data file of wrong endianess. Bye.\n" ); + return false; + } + logadd( LOG_ERROR, "cow data file of unkown format. Bye.\n" ); + return false; + } + struct stat st; + stat( pathData, &st ); + if ( (long)header.dataFileSize < st.st_size ) { + logadd( LOG_ERROR, "cow data file to small. Bye.\n" ); + return false; + } + } + + cow.metadata_mmap = mmap( NULL, header.metadataFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fhm, 0 ); + + if ( cow.metadata_mmap == MAP_FAILED ) { + logadd( LOG_ERROR, "Error while mapping mmap:\n%s \n Bye.\n", strerror( errno ) ); + return false; + } + if ( header.version != cowFileVersion ) { + logadd( LOG_ERROR, "Error wrong file version got: %i expected: 1. Bye.\n", metadata->version ); + return false; + } + + + metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap ); + + *imageSizePtr = &metadata->imageSize; + cow.l1 = (l1 *)( cow.metadata_mmap + metadata->metaDataStart ); + cow.maxImageSize = metadata->maxImageSize; + cow.l1Size = ( ( cow.maxImageSize + COW_L2_STORAGE_CAPACITY - 1LL ) / COW_L2_STORAGE_CAPACITY ); + + cow.firstL2 = (l2 *)( ( (char *)cow.l1 ) + cow.l1Size ); + pthread_mutex_init( &cow.l2CreateLock, NULL ); + createCowStatsFile( path ); + pthread_create( &tidCowUploader, NULL, &cowfile_uploader, NULL ); + + if ( statFile || statStdout ) { + pthread_create( &tidStatUpdater, NULL, &cowfile_statUpdater, NULL ); + } + + return true; +} + +/** + * @brief writes the given data in the data file + * + * @param buffer containing the data + * @param size of the buffer + * @param netSize which actually contributes to the fuse write request (can be different from size if partial full blocks are written) + * @param cowRequest + * @param block + * @param inBlockOffset + */ +static void writeData( const char *buffer, ssize_t size, size_t netSize, atomic_int *errorCode, + atomic_size_t *bytesWorkedOn, cow_block_metadata_t *block, off_t inBlockOffset ) +{ + ssize_t totalBytesWritten = 0; + while ( totalBytesWritten < size ) { + ssize_t bytesWritten = pwrite( cow.fhd, ( buffer + totalBytesWritten ), size - totalBytesWritten, + block->offset + inBlockOffset + totalBytesWritten ); + if ( bytesWritten == -1 ) { + logadd( LOG_ERROR, + "size:%zu netSize:%zu errorCode:%i bytesWorkedOn:%zu inBlockOffset:%lld block->offset:%lld \n", size, + netSize, *errorCode, *bytesWorkedOn, inBlockOffset, block->offset ); + *errorCode = errno; + break; + } else if ( bytesWritten == 0 ) { + logadd( LOG_ERROR, + "size:%zu netSize:%zu errorCode:%i bytesWorkedOn:%zu inBlockOffset:%lld block->offset:%lld \n", size, + netSize, *errorCode, *bytesWorkedOn, inBlockOffset, block->offset ); + *errorCode = EIO; + break; + } + totalBytesWritten += bytesWritten; + } + atomic_fetch_add( bytesWorkedOn, netSize ); + setBitsInBitfield( block->bitfield, (int)( inBlockOffset / DNBD3_BLOCK_SIZE ), + (int)( ( inBlockOffset + totalBytesWritten - 1 ) / DNBD3_BLOCK_SIZE ), 1 ); + + block->timeChanged = time( NULL ); +} + +/** + * @brief Increases the metadata->dataFileSize by COW_METADATA_STORAGE_CAPACITY. + * The space is not reserved on disk. + * + * @param block for which the space should be reserved. + */ +static bool allocateMetaBlockData( cow_block_metadata_t *block ) +{ + block->offset = (atomic_long)atomic_fetch_add( &metadata->dataFileSize, COW_METADATA_STORAGE_CAPACITY ); + return true; +} + +/** + * @brief Get the cow_block_metadata_t from l1Offset and l2Offset + * + * @param l1Offset + * @param l2Offset + * @return cow_block_metadata_t* + */ +static cow_block_metadata_t *getBlock( int l1Offset, int l2Offset ) +{ + cow_block_metadata_t *block = ( cow.firstL2[cow.l1[l1Offset]] + l2Offset ); + if ( block->offset == -1 ) { + allocateMetaBlockData( block ); + } + return block; +} + +/** + * @brief creates an new L2 Block and initializes the containing cow_block_metadata_t blocks + * + * @param l1Offset + */ +static bool createL2Block( int l1Offset ) +{ + pthread_mutex_lock( &cow.l2CreateLock ); + if ( cow.l1[l1Offset] == -1 ) { + for ( int i = 0; i < COW_L2_SIZE; i++ ) { + cow.firstL2[metadata->nextL2][i].offset = -1; + cow.firstL2[metadata->nextL2][i].timeChanged = ATOMIC_VAR_INIT( 0 ); + cow.firstL2[metadata->nextL2][i].uploads = ATOMIC_VAR_INIT( 0 ); + for ( int j = 0; j < COW_BITFIELD_SIZE; j++ ) { + cow.firstL2[metadata->nextL2][i].bitfield[j] = ATOMIC_VAR_INIT( 0 ); + } + } + cow.l1[l1Offset] = metadata->nextL2; + metadata->nextL2 += 1; + } + pthread_mutex_unlock( &cow.l2CreateLock ); + return true; +} + +/** + * @brief Is called once an fuse write request ist finished. + * Calls the corrsponding fuse reply depending on the type and + * success of the request. + * + * @param req fuse_req_t + * @param cowRequest + */ + +static void finishWriteRequest( fuse_req_t req, cow_request_t *cowRequest ) +{ + if ( cowRequest->errorCode != 0 ) { + fuse_reply_err( req, cowRequest->errorCode ); + + } else { + metadata->imageSize = MAX( metadata->imageSize, cowRequest->bytesWorkedOn + cowRequest->fuseRequestOffset ); + fuse_reply_write( req, cowRequest->bytesWorkedOn ); + } + free( cowRequest ); +} + +/** + * @brief Called after the padding data was received from the dnbd3 server. + * The data from the write request will be combined witch the data from the server + * so that we get a full DNBD3_BLOCK and is then written on the disk. + * @param sRequest + */ +static void writePaddedBlock( cow_sub_request_t *sRequest ) +{ + //copy write Data + memcpy( ( sRequest->writeBuffer + ( sRequest->inBlockOffset % DNBD3_BLOCK_SIZE ) ), sRequest->writeSrc, + sRequest->size ); + writeData( sRequest->writeBuffer, DNBD3_BLOCK_SIZE, (ssize_t)sRequest->size, &sRequest->cowRequest->errorCode, + &sRequest->cowRequest->bytesWorkedOn, sRequest->block, + ( sRequest->inBlockOffset - ( sRequest->inBlockOffset % DNBD3_BLOCK_SIZE ) ) ); + + + if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) { + finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest ); + } + free( sRequest ); +} + +/** + * @brief If an block does not start or finishes on an multiple of DNBD3_BLOCK_SIZE, the blocks needs to be + * padded. If this block is inside the original image size, the padding data will be read fro the server + * otherwise it will be padded with 0 since the it must be the block at the end of the image. + * + */ +static void padBlockFromRemote( fuse_req_t req, off_t offset, cow_request_t *cowRequest, const char *buffer, + size_t size, cow_block_metadata_t *block, off_t inBlockOffset ) +{ + if ( offset > (off_t)metadata->originalImageSize ) { + //pad 0 and done + inBlockOffset -= inBlockOffset % DNBD3_BLOCK_SIZE; + char buf[DNBD3_BLOCK_SIZE] = { 0 }; + memcpy( buf + ( offset % 4096 ), buffer, size ); + + writeData( buf, DNBD3_BLOCK_SIZE, (ssize_t)size, &cowRequest->errorCode, &cowRequest->bytesWorkedOn, block, + inBlockOffset ); + return; + } + cow_sub_request_t *sRequest = calloc( sizeof( cow_sub_request_t ) + DNBD3_BLOCK_SIZE, sizeof( char ) ); + sRequest->callback = writePaddedBlock; + sRequest->inBlockOffset = inBlockOffset; + sRequest->block = block; + sRequest->size = size; + sRequest->writeSrc = buffer; + sRequest->cowRequest = cowRequest; + off_t start = offset - ( offset % DNBD3_BLOCK_SIZE ); + + sRequest->dRequest.length = DNBD3_BLOCK_SIZE; + sRequest->dRequest.offset = start; + sRequest->dRequest.fuse_req = req; + + if ( ( (size_t)( offset + DNBD3_BLOCK_SIZE ) ) > metadata->originalImageSize ) { + sRequest->dRequest.length = + (uint32_t)MIN( DNBD3_BLOCK_SIZE, offset + DNBD3_BLOCK_SIZE - metadata->originalImageSize ); + } + + atomic_fetch_add( &cowRequest->workCounter, 1 ); + if ( !connection_read( &sRequest->dRequest ) ) { + cowRequest->errorCode = EIO; + free( sRequest ); + if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) { + finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest ); + } + return; + } +} + +/** + * @brief Will be called after a dnbd3_async_t is finished. + * Calls the corrsponding callback function, either writePaddedBlock or readRemoteData + * depending if the original fuse request was a write or read. + * + */ +void cowfile_handleCallback( dnbd3_async_t *request ) +{ + cow_sub_request_t *sRequest = container_of( request, cow_sub_request_t, dRequest ); + sRequest->callback( sRequest ); +} + + +/** + * @brief called once dnbd3_async_t is finished. Increases bytesWorkedOn by the number of bytes + * this request had. Also checks if it was the last dnbd3_async_t to finish the fuse request, if + * so replys to fuse and cleans up the request. + * + */ +void readRemoteData( cow_sub_request_t *sRequest ) +{ + atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length ); + + if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) { + if ( sRequest->cowRequest->bytesWorkedOn < sRequest->cowRequest->fuseRequestSize ) { + logadd( LOG_ERROR, "pad read to small\n" ); + } + fuse_reply_buf( + sRequest->dRequest.fuse_req, sRequest->cowRequest->readBuffer, sRequest->cowRequest->bytesWorkedOn ); + free( sRequest->cowRequest->readBuffer ); + free( sRequest->cowRequest ); + } + free( sRequest ); +} + +/** + * @brief changes the imageSize + * + * @param req fuse request + * @param size new size the image should have + * @param ino fuse_ino_t + * @param fi fuse_file_info + */ + +void cowfile_setSize( fuse_req_t req, size_t size, fuse_ino_t ino, struct fuse_file_info *fi ) +{ + // decrease + if ( metadata->imageSize > size ) { + if ( size < metadata->originalImageSize ) { + metadata->originalImageSize = size; + } + + // increase + } else if ( metadata->imageSize < size ) { + off_t offset = metadata->imageSize; + int l1Offset = getL1Offset( offset ); + int l2Offset = getL2Offset( offset ); + int l1EndOffset = getL1Offset( size ); + int l2EndOffset = getL2Offset( size ); + // special case first block + if ( cow.l1[l1Offset] != -1 ) { + cow_block_metadata_t *block = getBlock( l1Offset, l2Offset ); + if ( metadata->imageSize % DNBD3_BLOCK_SIZE != 0 ) { + off_t inBlockOffset = metadata->imageSize % COW_METADATA_STORAGE_CAPACITY; + size_t sizeToWrite = DNBD3_BLOCK_SIZE - ( metadata->imageSize % DNBD3_BLOCK_SIZE ); + + if ( checkBit( block->bitfield, (int)( inBlockOffset / DNBD3_BLOCK_SIZE ) ) ) { + char buf[sizeToWrite]; + memset( buf, 0, sizeToWrite ); + + ssize_t bytesWritten = pwrite( cow.fhd, buf, sizeToWrite, block->offset + inBlockOffset ); + + if ( bytesWritten < (ssize_t)sizeToWrite ) { + fuse_reply_err( req, bytesWritten == -1 ? errno : EIO ); + return; + } + block->timeChanged = time( NULL ); + offset += sizeToWrite; + } + } + // rest of block set bits 0 + l1Offset = getL1Offset( offset ); + l2Offset = getL2Offset( offset ); + block = getBlock( l1Offset, l2Offset ); + off_t inBlockOffset = offset % COW_METADATA_STORAGE_CAPACITY; + setBitsInBitfield( + block->bitfield, (int)( inBlockOffset / DNBD3_BLOCK_SIZE ), ( COW_BITFIELD_SIZE * 8 ) - 1, 0 ); + block->timeChanged = time( NULL ); + l2Offset++; + if ( l2Offset >= COW_L2_SIZE ) { + l2Offset = 0; + l1Offset++; + } + } + // null all bitfields + while ( !( l1Offset > l1EndOffset || ( l1Offset == l1EndOffset && l2EndOffset < l2Offset ) ) ) { + if ( cow.l1[l1Offset] == -1 ) { + l1Offset++; + l2Offset = 0; + continue; + } + + cow_block_metadata_t *block = getBlock( l1Offset, l2Offset ); + setBitsInBitfield( block->bitfield, 0, ( COW_BITFIELD_SIZE * 8 ) - 1, 0 ); + block->timeChanged = time( NULL ); + l2Offset++; + if ( l2Offset >= COW_L2_SIZE ) { + l2Offset = 0; + l1Offset++; + } + } + } + metadata->imageSize = size; + if ( req != NULL ) { + image_ll_getattr( req, ino, fi ); + } +} + +/** + * @brief Implementation of a write request or an truncate. + * + * @param req fuse_req_t + * @param cowRequest + * @param offset Offset where the write starts, + * @param size Size of the write. + */ +void cowfile_write( fuse_req_t req, cow_request_t *cowRequest, off_t offset, size_t size ) +{ + // if beyond end of file, pad with 0 + if ( offset > (off_t)metadata->imageSize ) { + cowfile_setSize( NULL, offset, NULL, NULL ); + } + + + off_t currentOffset = offset; + off_t endOffset = offset + size; + + // write data + + int l1Offset = getL1Offset( currentOffset ); + int l2Offset = getL2Offset( currentOffset ); + while ( currentOffset < endOffset ) { + if ( cow.l1[l1Offset] == -1 ) { + createL2Block( l1Offset ); + } + //loop over L2 array (metadata) + while ( currentOffset < (off_t)endOffset && l2Offset < COW_L2_SIZE ) { + cow_block_metadata_t *metaBlock = getBlock( l1Offset, l2Offset ); + + + size_t metaBlockStartOffset = l1Offset * COW_L2_STORAGE_CAPACITY + l2Offset * COW_METADATA_STORAGE_CAPACITY; + + size_t inBlockOffset = currentOffset - metaBlockStartOffset; + size_t sizeToWriteToBlock = + MIN( (size_t)( endOffset - currentOffset ), COW_METADATA_STORAGE_CAPACITY - inBlockOffset ); + + + ///////////////////////// + // lock for the half block probably needed + if ( currentOffset % DNBD3_BLOCK_SIZE != 0 + && !checkBit( metaBlock->bitfield, (int)( inBlockOffset / DNBD3_BLOCK_SIZE ) ) ) { + // write remote + size_t padSize = MIN( sizeToWriteToBlock, DNBD3_BLOCK_SIZE - ( (size_t)currentOffset % DNBD3_BLOCK_SIZE ) ); + const char *sbuf = cowRequest->writeBuffer + ( ( currentOffset - offset ) ); + padBlockFromRemote( req, currentOffset, cowRequest, sbuf, padSize, metaBlock, (off_t)inBlockOffset ); + currentOffset += padSize; + continue; + } + + size_t endPaddedSize = 0; + if ( ( currentOffset + sizeToWriteToBlock ) % DNBD3_BLOCK_SIZE != 0 ) { + off_t currentEndOffset = currentOffset + sizeToWriteToBlock; + off_t padStartOffset = currentEndOffset - ( currentEndOffset % 4096 ); + off_t inBlockPadStartOffset = padStartOffset - metaBlockStartOffset; + if ( !checkBit( metaBlock->bitfield, (int)( inBlockPadStartOffset / DNBD3_BLOCK_SIZE ) ) ) { + const char *sbuf = cowRequest->writeBuffer + ( ( padStartOffset - offset ) ); + padBlockFromRemote( req, padStartOffset, cowRequest, sbuf, (currentEndOffset)-padStartOffset, metaBlock, + inBlockPadStartOffset ); + + + sizeToWriteToBlock -= (currentEndOffset)-padStartOffset; + endPaddedSize = (currentEndOffset)-padStartOffset; + } + } + writeData( cowRequest->writeBuffer + ( ( currentOffset - offset ) ), (ssize_t)sizeToWriteToBlock, + sizeToWriteToBlock, &cowRequest->errorCode, &cowRequest->bytesWorkedOn, metaBlock, inBlockOffset ); + + currentOffset += sizeToWriteToBlock; + currentOffset += endPaddedSize; + + + l2Offset++; + } + l1Offset++; + l2Offset = 0; + } + if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) { + finishWriteRequest( req, cowRequest ); + } +} + + +/** + * @brief Request data, that is not available locally, via the network. + * + * @param req fuse_req_t + * @param offset from the start of the file + * @param size of data to request + * @param buffer into which the data is to be written + * @param workCounter workCounter is increased by one and later reduced by one again when the request is completed. + */ +static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer, cow_request_t *cowRequest ) +{ + // edgecase: Image size got reduced before on a non block border + if ( offset + size > metadata->originalImageSize ) { + size_t padZeroSize = ( offset + size ) - metadata->originalImageSize; + off_t padZeroOffset = metadata->originalImageSize - offset; + assert( offset > 0 ); + memset( ( buffer + padZeroOffset ), 0, padZeroOffset ); + + atomic_fetch_add( &cowRequest->bytesWorkedOn, padZeroSize ); + } + cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) ); + sRequest->callback = readRemoteData; + sRequest->dRequest.length = (uint32_t)size; + sRequest->dRequest.offset = offset; + sRequest->dRequest.fuse_req = req; + sRequest->cowRequest = cowRequest; + sRequest->buffer = buffer; + + atomic_fetch_add( &cowRequest->workCounter, 1 ); + if ( !connection_read( &sRequest->dRequest ) ) { + cowRequest->errorCode = EIO; + free( sRequest ); + if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) { + fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn ); + } + free( cowRequest->readBuffer ); + free( cowRequest ); + return; + } +} + +/** + * @brief Get the Block Data Source object + * + * @param block + * @param bitfieldOffset + * @param offset + * @return enum dataSource + */ +enum dataSource getBlockDataSource( cow_block_metadata_t *block, off_t bitfieldOffset, off_t offset ) +{ + if ( block != NULL && checkBit( block->bitfield, (int)bitfieldOffset ) ) { + return local; + } + if ( offset >= (off_t)metadata->originalImageSize ) { + return zero; + } + return remote; +} + +/** + * @brief Reads data at given offset. If the data are available locally, + * they are read locally, otherwise they are requested remotely. + * + * @param req fuse_req_t + * @param size of date to read + * @param offset offset where the read starts. + * @return uint64_t Number of bytes read. + */ +void cowfile_read( fuse_req_t req, size_t size, off_t offset ) +{ + cow_request_t *cowRequest = malloc( sizeof( cow_request_t ) ); + cowRequest->fuseRequestSize = size; + cowRequest->bytesWorkedOn = ATOMIC_VAR_INIT( 0 ); + cowRequest->workCounter = ATOMIC_VAR_INIT( 1 ); + cowRequest->errorCode = ATOMIC_VAR_INIT( 0 ); + cowRequest->readBuffer = malloc( size ); + cowRequest->fuseRequestOffset = offset; + off_t lastReadOffset = offset; + off_t endOffset = offset + size; + off_t searchOffset = offset; + int l1Offset = getL1Offset( offset ); + int l2Offset = getL2Offset( offset ); + int bitfieldOffset = getBitfieldOffset( offset ); + enum dataSource dataState; + cow_block_metadata_t *block = NULL; + + if ( cow.l1[l1Offset] != -1 ) { + block = getBlock( l1Offset, l2Offset ); + } + + bool doRead = false; + bool firstLoop = true; + bool updateBlock = false; + while ( searchOffset < endOffset ) { + if ( firstLoop ) { + firstLoop = false; + lastReadOffset = searchOffset; + dataState = getBlockDataSource( block, bitfieldOffset, searchOffset ); + } else if ( getBlockDataSource( block, bitfieldOffset, searchOffset ) != dataState ) { + doRead = true; + } else { + bitfieldOffset++; + } + + if ( bitfieldOffset >= COW_BITFIELD_SIZE * 8 ) { + bitfieldOffset = 0; + l2Offset++; + if ( l2Offset >= COW_L2_SIZE ) { + l2Offset = 0; + l1Offset++; + } + updateBlock = true; + if ( dataState == local ) { + doRead = true; + } + } + // compute the original file offset from bitfieldOffset, l2Offset and l1Offset + searchOffset = DNBD3_BLOCK_SIZE * ( bitfieldOffset ) + l2Offset * COW_METADATA_STORAGE_CAPACITY + + l1Offset * COW_L2_STORAGE_CAPACITY; + if ( doRead || searchOffset >= endOffset ) { + ssize_t sizeToRead = MIN( searchOffset, endOffset ); + if ( dataState == remote ) { + if ( sizeToRead > metadata->originalImageSize ) { + //pad rest with 0 + memset( cowRequest->readBuffer + + ( ( lastReadOffset - offset ) + ( metadata->originalImageSize - offset ) ), + 0, sizeToRead - metadata->originalImageSize ); + atomic_fetch_add( &cowRequest->bytesWorkedOn, sizeToRead - metadata->originalImageSize ); + sizeToRead = metadata->originalImageSize; + } + sizeToRead -= lastReadOffset; + readRemote( + req, lastReadOffset, sizeToRead, cowRequest->readBuffer + ( lastReadOffset - offset ), cowRequest ); + } else if ( dataState == zero ) { + sizeToRead -= lastReadOffset; + memset( cowRequest->readBuffer + ( lastReadOffset - offset ), 0, sizeToRead ); + atomic_fetch_add( &cowRequest->bytesWorkedOn, sizeToRead ); + } else { + sizeToRead -= lastReadOffset; + // Compute the offset in the data file where the read starts + off_t localRead = + block->offset + ( ( lastReadOffset % COW_L2_STORAGE_CAPACITY ) % COW_METADATA_STORAGE_CAPACITY ); + ssize_t totalBytesRead = 0; + while ( totalBytesRead < sizeToRead ) { + ssize_t bytesRead = + pread( cow.fhd, cowRequest->readBuffer + ( lastReadOffset - offset ), sizeToRead, localRead ); + if ( bytesRead == -1 ) { + cowRequest->errorCode = errno; + goto fail; + } else if ( bytesRead <= 0 ) { + cowRequest->errorCode = EIO; + goto fail; + } + totalBytesRead += bytesRead; + } + + atomic_fetch_add( &cowRequest->bytesWorkedOn, totalBytesRead ); + } + lastReadOffset = searchOffset; + doRead = false; + firstLoop = true; + } + + if ( updateBlock ) { + if ( cow.l1[l1Offset] != -1 ) { + block = getBlock( l1Offset, l2Offset ); + } else { + block = NULL; + } + updateBlock = false; + } + } +fail:; + if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) { + if ( cowRequest->errorCode != 0 ) { + if ( cowRequest->bytesWorkedOn < size ) { + logadd( LOG_ERROR, " read to small" ); + } + fuse_reply_err( req, cowRequest->errorCode ); + + } else { + if ( cowRequest->bytesWorkedOn < size ) { + logadd( LOG_ERROR, " read to small" ); + } + fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn ); + } + free( cowRequest->readBuffer ); + free( cowRequest ); + } +} + + +/** + * @brief stops the StatUpdater and CowUploader threads + * and waits for them to finish, then cleans up curl. + * + */ +void cowfile_close() +{ + uploadLoop = false; + if ( statFile || statStdout ) { + pthread_join( tidStatUpdater, NULL ); + } + pthread_join( tidCowUploader, NULL ); + + if ( curl ) { + curl_global_cleanup(); + curl_easy_cleanup( curl ); + } +} |