diff options
Diffstat (limited to 'src/fuse/cowfile.c')
-rw-r--r-- | src/fuse/cowfile.c | 480 |
1 files changed, 288 insertions, 192 deletions
diff --git a/src/fuse/cowfile.c b/src/fuse/cowfile.c index a53b101..60d82fb 100644 --- a/src/fuse/cowfile.c +++ b/src/fuse/cowfile.c @@ -2,12 +2,17 @@ #include "main.h" #include "connection.h" +#include <dnbd3/config.h> +#include <dnbd3/types.h> #include <dnbd3/shared/log.h> #include <sys/mman.h> #include <string.h> #include <pthread.h> #include <errno.h> #include <curl/curl.h> +#include <signal.h> +#include <inttypes.h> +#include <assert.h> #define UUID_STRLEN 36 // Maximum assumed page size, in case the cow data gets transferred between different architectures @@ -16,7 +21,7 @@ extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi ); -static const int CURRENT_COW_VERSION = 2; +static const int CURRENT_COW_VERSION = 3; static bool statStdout; static bool statFile; @@ -28,8 +33,10 @@ static cowfile_metadata_header_t *metadata = NULL; static atomic_uint_fast64_t bytesUploaded; static uint64_t totalBlocksUploaded = 0; static int activeUploads = 0; -atomic_bool uploadLoop = true; // Keep upload loop running? -atomic_bool uploadLoopDone = false; // Upload loop has finished all work? +static int uploadLoopThrottle = 0; +static atomic_bool uploadLoop = true; // Keep upload loop running? +static atomic_bool uploadLoopDone = false; // Upload loop has finished all work? +static atomic_bool uploadCancelled = false; // Skip uploading remaining blocks static struct cow { @@ -172,7 +179,7 @@ static bool checkBit( atomic_uchar *bitfield, int64_t n ) * @param response userdata which will later contain the uuid * @return size_t size that have been read */ -size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response ) +static size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response ) { uint64_t done = strlen( response ); uint64_t bytes = itemSize * nitems; @@ -186,12 +193,12 @@ size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, } /** - * @brief Create a Session with the cow server and gets the session guid. + * @brief Create a Session with the cow server and gets the session uuid. * * @param imageName * @param version of the original Image */ -bool createSession( const char *imageName, uint16_t version ) +static bool createSession( const char *imageName, uint16_t version ) { CURLcode res; char url[COW_URL_STRING_SIZE]; @@ -240,7 +247,7 @@ bool createSession( const char *imageName, uint16_t version ) } curl_easy_reset( curl ); metadata->uuid[UUID_STRLEN] = '\0'; - logadd( LOG_DEBUG1, "Cow session started, guid: %s\n", metadata->uuid ); + logadd( LOG_DEBUG1, "Cow session started, uuid: %s\n", metadata->uuid ); return true; } @@ -256,7 +263,7 @@ bool createSession( const char *imageName, uint16_t version ) * @param userdata from CURLOPT_READFUNCTION * @return size_t size written in buffer */ -size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata ) +static size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata ) { cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata; size_t len = 0; @@ -279,6 +286,7 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void * int bitNumber = (int)( inClusterOffset / DNBD3_BLOCK_SIZE ); size_t readSize; // Small performance hack: All bits one in a byte, do a 32k instead of 4k read + // TODO: preadv with a large iov, reading unchanged blocks into a trash-buffer if ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE * 8 && bitNumber % 8 == 0 && uploadBlock->bitfield[bitNumber / 8] == 0xff ) { @@ -286,11 +294,13 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void * } else { readSize = DNBD3_BLOCK_SIZE; } - // Check bits in our copy, as global bitfield could change - if ( checkBit( uploadBlock->bitfield, bitNumber ) ) { + // If handling single block, check bits in our copy, as global bitfield could change + if ( readSize != DNBD3_BLOCK_SIZE || checkBit( uploadBlock->bitfield, bitNumber ) ) { ssize_t lengthRead = pread( cow.fdData, ( ptr + len ), readSize, - uploadBlock->block->offset + inClusterOffset ); + uploadBlock->cluster->offset + inClusterOffset ); if ( lengthRead == -1 ) { + if ( errno == EAGAIN ) + continue; logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno ); return CURL_READFUNC_ABORT; } @@ -313,69 +323,59 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void * /** * @brief Requests the merging of the image on the cow server. */ -bool mergeRequest() +static bool postMergeRequest() { CURLcode res; - curl_easy_setopt( curl, CURLOPT_POST, 1L ); - char url[COW_URL_STRING_SIZE]; - snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress ); - curl_easy_setopt( curl, CURLOPT_URL, url ); - - - curl_mime *mime; - curl_mimepart *part; - mime = curl_mime_init( curl ); - - part = curl_mime_addpart( mime ); - curl_mime_name( part, "guid" ); - curl_mime_data( part, metadata->uuid, CURL_ZERO_TERMINATED ); + char body[500]; + char *uuid; - part = curl_mime_addpart( mime ); - curl_mime_name( part, "originalFileSize" ); - char buf[21]; - snprintf( buf, sizeof buf, "%" PRIu64, metadata->validRemoteSize ); - curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); - - part = curl_mime_addpart( mime ); - curl_mime_name( part, "newFileSize" ); - snprintf( buf, sizeof buf, "%" PRIu64, metadata->imageSize ); - curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); + curl_easy_reset( curl ); - curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime ); + snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress ); + curl_easy_setopt( curl, CURLOPT_URL, url ); + curl_easy_setopt( curl, CURLOPT_POST, 1L ); + uuid = curl_easy_escape( curl, metadata->uuid, 0 ); + if ( uuid == NULL ) { + logadd( LOG_ERROR, "Error escaping uuid" ); + uuid = metadata->uuid; // Hope for the best + } + snprintf( body, sizeof body, "originalFileSize=%"PRIu64"&newFileSize=%"PRIu64"&uuid=%s", + metadata->validRemoteSize, metadata->imageSize, uuid ); + if ( uuid != metadata->uuid ) { + curl_free( uuid ); + } + curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body ); res = curl_easy_perform( curl ); if ( res != CURLE_OK ) { - logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s\n", curl_easy_strerror( res ) ); - curl_easy_reset( curl ); + logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s", curl_easy_strerror( res ) ); return false; } long http_code = 0; curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code ); - if ( http_code != 200 ) { - logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld\n", http_code ); - curl_easy_reset( curl ); + if ( http_code < 200 || http_code >= 300 ) { + logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld", http_code ); return false; } - curl_easy_reset( curl ); - curl_mime_free( mime ); return true; } /** - * @brief Wrapper for mergeRequest so if its fails it will be tried again. + * @brief Wrapper for postMergeRequest so if its fails it will be tried again. * */ -void startMerge() +static void requestRemoteMerge() { int fails = 0; bool success = false; - success = mergeRequest(); + success = postMergeRequest(); while ( fails <= 5 && !success ) { fails++; logadd( LOG_WARNING, "Trying again. %i/5", fails ); - mergeRequest(); + sleep( 10 ); + postMergeRequest(); } } @@ -387,26 +387,54 @@ void startMerge() * This function computes the uploaded bytes between each call and adds it to * bytesUploaded, which is used to compute the kb/s uploaded over all transfers. * - * @param clientp * @param ulNow number of bytes uploaded by this transfer so far. * @return int always returns 0 to continue the callbacks. */ -int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal, - __attribute__((unused)) curl_off_t dlNow, __attribute__((unused)) curl_off_t ulTotal, curl_off_t ulNow ) +static int progress_callback( void *clientp, UNUSED curl_off_t dlTotal, + UNUSED curl_off_t dlNow, UNUSED curl_off_t ulTotal, curl_off_t ulNow ) { - CURL *eh = (CURL *)clientp; - cow_curl_read_upload_t *uploadingCluster; - CURLcode res; - res = curl_easy_getinfo( eh, CURLINFO_PRIVATE, &uploadingCluster ); - if ( res != CURLE_OK ) { - logadd( LOG_ERROR, "ERROR" ); - return 0; - } + cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t *)clientp; bytesUploaded += ( ulNow - uploadingCluster->ulLast ); uploadingCluster->ulLast = ulNow; return 0; } +#ifdef COW_DUMP_BLOCK_UPLOADS +static int cmpfunc( const void *a, const void *b ) +{ + return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads ); +} +/** + * @brief Writes all block numbers sorted by the number of uploads into the statsfile. + * + */ +static void dumpBlockUploads() +{ + long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); + + cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE]; + uint64_t currentBlock = 0; + for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { + if ( cow.l1[l1Index] == -1 ) { + continue; + } + for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { + cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); + + blockUploads[currentBlock].uploads = block->uploads; + blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); + currentBlock++; + } + } + qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc ); + + dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" ); + for ( uint64_t i = 0; i < currentBlock; i++ ) { + dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads ); + } +} +#endif + /** * @brief Updates the status to the stdout/statfile depending on the startup parameters. * @@ -415,8 +443,7 @@ int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal * @param idle Blocks that do not contain changes that have not yet been uploaded. * @param speedBuffer ptr to char array that contains the current upload speed. */ - -void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer ) +static void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer ) { char buffer[300]; const char *state; @@ -429,16 +456,17 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha state = "done"; } - int len = snprintf( buffer, 300, + int len = snprintf( buffer, sizeof buffer, + "[General]\n" "state=%s\n" "inQueue=%" PRIu64 "\n" "modifiedClusters=%" PRIu64 "\n" "idleClusters=%" PRIu64 "\n" "totalClustersUploaded=%" PRIu64 "\n" "activeUploads=%i\n" - "%s%s", + "%s%s\n", state, inQueue, modified, idle, totalBlocksUploaded, activeUploads, - COW_SHOW_UL_SPEED ? "ulspeed=" : "", + COW_SHOW_UL_SPEED ? "avgSpeedKb=" : "", speedBuffer ); if ( len == -1 ) { @@ -465,39 +493,6 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha #endif } } -int cmpfunc( const void *a, const void *b ) -{ - return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads ); -} -/** - * @brief Writes all block numbers sorted by the number of uploads into the statsfile. - * - */ -void dumpBlockUploads() -{ - long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); - - cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE]; - uint64_t currentBlock = 0; - for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { - if ( cow.l1[l1Index] == -1 ) { - continue; - } - for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { - cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); - - blockUploads[currentBlock].uploads = block->uploads; - blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); - currentBlock++; - } - } - qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc ); - - dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" ); - for ( uint64_t i = 0; i < currentBlock; i++ ) { - dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads ); - } -} /** * @brief Starts the upload of a given block. @@ -505,13 +500,14 @@ void dumpBlockUploads() * @param cm Curl_multi * @param uploadingCluster containing the data for the block to upload. */ -bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers ) +static bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers ) { CURL *eh = curl_easy_init(); char url[COW_URL_STRING_SIZE]; - snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber ); + snprintf( url, COW_URL_STRING_SIZE, + COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber ); curl_easy_setopt( eh, CURLOPT_URL, url ); curl_easy_setopt( eh, CURLOPT_POST, 1L ); @@ -531,7 +527,7 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl uploadingCluster->ulLast = 0; curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L ); curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback ); - curl_easy_setopt( eh, CURLOPT_XFERINFODATA, eh ); + curl_easy_setopt( eh, CURLOPT_XFERINFODATA, uploadingCluster ); } curl_easy_setopt( eh, CURLOPT_HTTPHEADER, headers ); curl_multi_add_handle( cm, eh ); @@ -549,9 +545,9 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl * @return true returned if the upload was successful or retries are still possible. * @return false returned if the upload was unsuccessful. */ -bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers ) +static bool clusterUploadDoneHandler( CURLM *cm, CURLMsg *msg ) { - bool status = true; + bool status = false; cow_curl_read_upload_t *uploadingCluster; CURLcode res; CURLcode res2; @@ -560,69 +556,106 @@ bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers ) long http_code = 0; res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code ); - if ( res != CURLE_OK || res2 != CURLE_OK || http_code < 200 || http_code >= 300 - || msg->msg != CURLMSG_DONE ) { - uploadingCluster->fails++; - logadd( LOG_ERROR, "COW_API_UPDATE failed %i/5: %s\n", uploadingCluster->fails, + if ( msg->msg != CURLMSG_DONE ) { + logadd( LOG_ERROR, "multi_message->msg unexpectedly not DONE (%d)", (int)msg->msg ); + } else if ( msg->data.result != CURLE_OK ) { + logadd( LOG_ERROR, "curl_easy returned non-OK after multi-finish: %s", curl_easy_strerror( msg->data.result ) ); - if ( uploadingCluster->fails < 5 ) { - addUpload( cm, uploadingCluster, headers ); - goto CLEANUP; + } else if ( res != CURLE_OK || res2 != CURLE_OK ) { + logadd( LOG_ERROR, "curl_easy_getinfo failed after multifinish (%d, %d)", (int)res, (int)res2 ); + } else if ( http_code == 503 ) { + // If the "Retry-After" header is set, we interpret this as the server being overloaded + // or not ready yet to take another update. We slow down our upload loop then. + // We'll only accept a delay in seconds here, not an HTTP Date string. + // Otherwise, increase the fails counter. + struct curl_header *ptr = NULL; + int delay; + CURLHcode h = curl_easy_header(curl, "Retry-After", 0, CURLH_HEADER, -1, &ptr); + if ( h == CURLHE_OK && ptr != NULL && ( delay = atoi( ptr->value ) ) > 0 ) { + if ( delay > 120 ) { + // Cap to two minutes + delay = 120; + } + logadd( LOG_INFO, "COW server is asking to backoff for %d seconds", delay ); + uploadLoopThrottle = MAX( uploadLoopThrottle, delay ); + status = true; + } else { + logadd( LOG_ERROR, "COW server returned 503 without Retry-After value" ); } + } else if ( http_code < 200 || http_code >= 300 ) { + logadd( LOG_ERROR, "COW server returned HTTP %ld", http_code ); + } else { + // everything went ok, reset timeChanged of underlying cluster, but only if it + // didn't get updated again in the meantime. + atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, 0 ); + uploadingCluster->cluster->uploads++; + totalBlocksUploaded++; free( uploadingCluster ); - status = false; - goto CLEANUP; + status = true; + } + if ( !status ) { + uploadingCluster->cluster->fails++; + if ( uploadLoopThrottle > 0 ) { + // Don't reset timeChanged timestamp, so the next iteration of uploadModifiedClusters + // will queue this upload again after the throttle time expired. + free( uploadingCluster ); + } else { + logadd( LOG_ERROR, "Uploading cluster failed %i/5 times", uploadingCluster->cluster->fails ); + // Pretend the block changed again just now, to prevent immediate retry + atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, + time( NULL ) ); + free( uploadingCluster ); + } } - - // everything went ok, update timeChanged - atomic_compare_exchange_strong( &uploadingCluster->block->timeChanged, &uploadingCluster->time, 0 ); - - uploadingCluster->block->uploads++; - - totalBlocksUploaded++; - free( uploadingCluster ); -CLEANUP: curl_multi_remove_handle( cm, msg->easy_handle ); curl_easy_cleanup( msg->easy_handle ); return status; } /** - * @brief - * * @param cm Curl_multi * @param activeUploads ptr to integer which holds the number of current uploads - * @param breakIfNotMax will return as soon as there are not all upload slots used, so they can be filled up. - * @param foregroundUpload used to determine the number of max uploads. If true COW_MAX_PARALLEL_UPLOADS will be the limit, + * @param minNumberUploads break out of loop as soon as there are less than these many transfers running * else COW_MAX_PARALLEL_BACKGROUND_UPLOADS. - * @return true returned if all upload's were successful - * @return false returned if one ore more upload's failed. + * @return true returned if all uploads were successful + * @return false returned if one ore more upload failed. */ -bool MessageHandler( - CURLM *cm, bool breakIfNotMax, bool foregroundUpload, struct curl_slist *headers ) +static bool curlMultiLoop( CURLM *cm, int minNumberUploads ) { CURLMsg *msg; int msgsLeft = -1; bool status = true; - do { - curl_multi_perform( cm, &activeUploads ); + + if ( minNumberUploads <= 0 ) { + minNumberUploads = 1; + } + for ( ;; ) { + CURLMcode mc = curl_multi_perform( cm, &activeUploads ); + if ( mc != CURLM_OK ) { + logadd( LOG_ERROR, "curl_multi_perform error %d, bailing out", (int)mc ); + status = false; + break; + } while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) { - if ( !finishUpload( cm, msg, headers ) ) { + if ( !clusterUploadDoneHandler( cm, msg ) ) { status = false; } } - if ( breakIfNotMax - && activeUploads - < ( foregroundUpload ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) { + if ( activeUploads < minNumberUploads ) { break; } // ony wait if there are active uploads - if ( activeUploads ) { - curl_multi_wait( cm, NULL, 0, 1000, NULL ); + if ( activeUploads > 0 ) { + mc = curl_multi_wait( cm, NULL, 0, 1000, NULL ); + if ( mc != CURLM_OK ) { + logadd( LOG_ERROR, "curl_multi_wait error %d, bailing out", (int)mc ); + status = false; + break; + } } - } while ( activeUploads ); + } return status; } @@ -635,7 +668,7 @@ bool MessageHandler( * @return true if all blocks uploaded successful * @return false if one ore more blocks failed to upload */ -bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm ) +bool uploadModifiedClusters( bool ignoreMinUploadDelay, CURLM *cm ) { bool success = true; struct curl_slist *headers = NULL; @@ -648,36 +681,39 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm ) if ( cow.l1[l1Index] == -1 ) { continue; // Not allocated } - // Now all L2 blocks + // Now all L2 clusters for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { - cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); - if ( block->offset == -1 ) { + cow_l2_entry_t *cluster = ( cow.l2[cow.l1[l1Index]] + l2Index ); + if ( cluster->offset == -1 ) { continue; // Not allocated } - if ( block->timeChanged == 0 ) { + if ( cluster->timeChanged == 0 ) { continue; // Not changed } - if ( !ignoreMinUploadDelay && ( now - block->timeChanged < COW_MIN_UPLOAD_DELAY ) ) { + if ( !ignoreMinUploadDelay && ( now - cluster->timeChanged < COW_MIN_UPLOAD_DELAY ) ) { continue; // Last change not old enough } // Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached - do { - if ( !MessageHandler( cm, true, ignoreMinUploadDelay, headers ) ) { - success = false; - } - } while ( ( activeUploads >= ( ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS - : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) - && activeUploads > 0 ); + int minUploads = ignoreMinUploadDelay + ? COW_MAX_PARALLEL_UPLOADS + : COW_MAX_PARALLEL_BACKGROUND_UPLOADS; + if ( !curlMultiLoop( cm, minUploads ) ) { + success = false; + } + // Maybe one of the uploads was rejected by the server asking us to slow down a bit. + // Check for that case and don't trigger a new upload. + if ( uploadLoopThrottle > 0 ) { + goto DONE; + } cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) ); - b->block = block; + b->cluster = cluster; b->clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); - b->fails = 0; b->position = 0; - b->time = block->timeChanged; + b->time = cluster->timeChanged; // Copy, so it doesn't change during upload // when we assemble the data in curlReadCallbackUploadBlock() for ( int i = 0; i < COW_BITFIELD_SIZE; ++i ) { - b->bitfield[i] = block->bitfield[i]; + b->bitfield[i] = cluster->bitfield[i]; } addUpload( cm, b, headers ); if ( !ignoreMinUploadDelay && !uploadLoop ) { @@ -686,8 +722,12 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm ) } } DONE: + // Finish all the transfers still active while ( activeUploads > 0 ) { - MessageHandler( cm, false, ignoreMinUploadDelay, headers ); + if ( !curlMultiLoop( cm, 1 ) ) { + success = false; + break; + } } curl_slist_free_all( headers ); return success; @@ -699,17 +739,18 @@ DONE: * */ -void *cowfile_statUpdater( __attribute__((unused)) void *something ) +void *cowfile_statUpdater( UNUSED void *something ) { uint64_t lastUpdateTime = time( NULL ); + time_t now; + char speedBuffer[20]; while ( !uploadLoopDone ) { - sleep( COW_STATS_UPDATE_TIME ); int modified = 0; int inQueue = 0; int idle = 0; long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); - uint64_t now = time( NULL ); + now = time( NULL ); for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { if ( cow.l1[l1Index] == -1 ) { continue; @@ -730,59 +771,88 @@ void *cowfile_statUpdater( __attribute__((unused)) void *something ) } } } - char speedBuffer[20]; if ( COW_SHOW_UL_SPEED ) { + double delta; + double bytes = (double)atomic_exchange( &bytesUploaded, 0 ); now = time( NULL ); - uint64_t bytes = atomic_exchange( &bytesUploaded, 0 ); - snprintf( speedBuffer, 20, "%.2f", (double)( ( bytes ) / ( 1 + now - lastUpdateTime ) / 1000 ) ); - + delta = (double)( now - lastUpdateTime ); lastUpdateTime = now; + if ( delta > 0 ) { + snprintf( speedBuffer, sizeof speedBuffer, "%.2f", bytes / 1000.0 / delta ); + } } - updateCowStatsFile( inQueue, modified, idle, speedBuffer ); + sleep( COW_STATS_UPDATE_TIME ); } return NULL; } +void quitSigHandler( int sig UNUSED ) +{ + uploadCancelled = true; + uploadLoop = false; + main_shutdown(); +} + /** * @brief main loop for blockupload in the background */ -static void *uploaderThreadMain( __attribute__((unused)) void *something ) +static void *uploaderThreadMain( UNUSED void *something ) { CURLM *cm; cm = curl_multi_init(); - curl_multi_setopt( - cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ); + curl_multi_setopt( cm, CURLMOPT_MAXCONNECTS, + (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ); + do { + // Unblock so this very thread gets the signal for abandoning the upload + struct sigaction newHandler = { .sa_handler = &quitSigHandler }; + sigemptyset( &newHandler.sa_mask ); + sigaction( SIGQUIT, &newHandler, NULL ); + sigset_t sigmask; + sigemptyset( &sigmask ); + sigaddset( &sigmask, SIGQUIT ); + pthread_sigmask( SIG_UNBLOCK, &sigmask, NULL ); + } while ( 0 ); while ( uploadLoop ) { - uploaderLoop( false, cm ); + while ( uploadLoopThrottle > 0 && uploadLoop ) { + sleep( 1 ); + uploadLoopThrottle--; + } sleep( 2 ); + if ( !uploadLoop ) + break; + uploadModifiedClusters( false, cm ); } - logadd( LOG_DEBUG1, "start uploading the remaining blocks." ); - // force the upload of all remaining blocks because the user dismounted the image - if ( !uploaderLoop( true, cm ) ) { - logadd( LOG_ERROR, "one or more blocks failed to upload" ); - curl_multi_cleanup( cm ); + if ( uploadCancelled ) { uploadLoopDone = true; - return NULL; + logadd( LOG_INFO, "Not uploading remaining clusters, SIGQUIT received" ); + } else { + // force the upload of all remaining blocks because the user dismounted the image + logadd( LOG_INFO, "Start uploading the remaining clusters." ); + if ( !uploadModifiedClusters( true, cm ) ) { + uploadLoopDone = true; + logadd( LOG_ERROR, "One or more clusters failed to upload" ); + } else { + uploadLoopDone = true; + logadd( LOG_DEBUG1, "All clusters uploaded" ); + if ( cow_merge_after_upload ) { + requestRemoteMerge(); + logadd( LOG_DEBUG1, "Requesting merge" ); + } + } } - uploadLoopDone = true; curl_multi_cleanup( cm ); - logadd( LOG_DEBUG1, "all blocks uploaded" ); - if ( cow_merge_after_upload ) { - startMerge(); - logadd( LOG_DEBUG1, "Requesting merge." ); - } return NULL; } /** - * @brief Create a Cow Stats File an inserts the session guid + * @brief Create a Cow Stats File an inserts the session uuid * * @param path where the file is created * @return true @@ -819,11 +889,17 @@ static bool createCowStatsFile( char *path ) * @param path where the files should be stored * @param image_Name name of the original file/image * @param imageSizePtr + * @param cowUuid optional, use given UUID for talking to COW server instead of creating session */ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, atomic_uint_fast64_t **imageSizePtr, - char *serverAddress, bool sStdout, bool sfile ) + char *serverAddress, bool sStdout, bool sfile, const char *cowUuid ) { + if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) { + logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid ); + return false; + } + statStdout = sStdout; statFile = sfile; char pathMeta[strlen( path ) + 6]; @@ -860,6 +936,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, // size of l1 array + number of l2's * size of l2 size_t ps = getpagesize(); + if ( ps == 0 || ps > INT_MAX ) { + logadd( LOG_ERROR, "Cannot get native page size, aborting..." ); + return false; + } size_t metaSize = ( ( startL2 + l1NumEntries * sizeof( l2 ) + ps - 1 ) / ps ) * ps; if ( ftruncate( cow.fdMeta, metaSize ) != 0 ) { @@ -911,7 +991,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, logadd( LOG_ERROR, "Error on curl init. Bye.\n" ); return false; } - if ( !createSession( image_Name, imageVersion ) ) { + if ( cowUuid != NULL ) { + snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid ); + logadd( LOG_INFO, "Using provided upload session id" ); + } else if ( !createSession( image_Name, imageVersion ) ) { return false; } createCowStatsFile( path ); @@ -925,7 +1008,7 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, * @param path where the meta & data file is located * @param imageSizePtr */ -bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile ) +bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid ) { statStdout = sStdout; statFile = sFile; @@ -935,6 +1018,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server char pathMeta[strlen( path ) + 6]; char pathData[strlen( path ) + 6]; + if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) { + logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid ); + return false; + } + snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" ); snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" ); @@ -1026,6 +1114,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap ); + if ( cowUuid != NULL ) { + logadd( LOG_INFO, "Overriding stored upload session id with provided one" ); + snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid ); + } + *imageSizePtr = &metadata->imageSize; cow.l1 = (l1 *)( cow.metadata_mmap + metadata->startL1 ); cow.l2 = (l2 *)( cow.metadata_mmap + metadata->startL2 ); @@ -1150,7 +1243,7 @@ static void writePaddedBlock( cow_sub_request_t *sRequest ) // Here, we again check if the block is written locally - there might have been a second write // that wrote the full block, hence didn't have to wait for remote data and finished faster. // In that case, don't pad from remote as we'd overwrite newer data. - if ( isBlockLocal( sRequest->block, sRequest->inClusterOffset ) ) { + if ( isBlockLocal( sRequest->cluster, sRequest->inClusterOffset ) ) { logadd( LOG_INFO, "It happened!" ); } else { // copy write Data @@ -1158,13 +1251,13 @@ static void writePaddedBlock( cow_sub_request_t *sRequest ) memcpy( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ), sRequest->writeSrc, sRequest->size ); if ( !writeAll( cow.fdData, sRequest->writeBuffer, DNBD3_BLOCK_SIZE, - sRequest->block->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) { + sRequest->cluster->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) { sRequest->cowRequest->errorCode = errno; } else { sRequest->cowRequest->bytesWorkedOn += sRequest->size; int64_t bit = sRequest->inClusterOffset / DNBD3_BLOCK_SIZE; - setBitsInBitfield( sRequest->block->bitfield, bit, bit, true ); - sRequest->block->timeChanged = time( NULL ); + setBitsInBitfield( sRequest->cluster->bitfield, bit, bit, true ); + sRequest->cluster->timeChanged = time( NULL ); } } @@ -1228,10 +1321,11 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest, cow_sub_request_t *sub = calloc( sizeof( *sub ) + DNBD3_BLOCK_SIZE, 1 ); sub->callback = writePaddedBlock; sub->inClusterOffset = inClusterOffset; - sub->block = cluster; + sub->cluster = cluster; sub->size = size; sub->writeSrc = srcBuffer; sub->cowRequest = cowRequest; + sub->buffer = sub->writeBuffer; sub->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, validImageSize - startOffset ); sub->dRequest.offset = startOffset & ~DNBD3_BLOCK_MASK; @@ -1248,7 +1342,7 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest, /** * @brief Will be called after a dnbd3_async_t is finished. - * Calls the corrsponding callback function, either writePaddedBlock or readRemoteData + * Calls the corrsponding callback function, either writePaddedBlock or readRemoteCallback * depending if the original fuse request was a write or read. * */ @@ -1265,7 +1359,7 @@ void cowfile_handleCallback( dnbd3_async_t *request ) * so replys to fuse and cleans up the request. * */ -void readRemoteData( cow_sub_request_t *sRequest ) +static void readRemoteCallback( cow_sub_request_t *sRequest ) { atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length ); @@ -1464,7 +1558,7 @@ static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer return; assert( size > 0 ); cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) ); - sRequest->callback = readRemoteData; + sRequest->callback = readRemoteCallback; sRequest->dRequest.length = (uint32_t)size; sRequest->dRequest.offset = offset; sRequest->dRequest.fuse_req = req; @@ -1632,13 +1726,15 @@ fail:; void cowfile_close() { uploadLoop = false; + pthread_join( tidCowUploader, NULL ); if ( statFile || statStdout ) { + // Send a signal in case it's hanging in the sleep call + pthread_kill( tidStatUpdater, SIGHUP ); pthread_join( tidStatUpdater, NULL ); } - pthread_join( tidCowUploader, NULL ); if ( curl ) { - curl_global_cleanup(); curl_easy_cleanup( curl ); + curl_global_cleanup(); } } |