diff options
-rw-r--r-- | inc/dnbd3/config/cow.h | 2 | ||||
-rw-r--r-- | src/fuse/connection.c | 10 | ||||
-rw-r--r-- | src/fuse/cowDoc/readme.md | 28 | ||||
-rw-r--r-- | src/fuse/cowfile.c | 480 | ||||
-rw-r--r-- | src/fuse/cowfile.h | 15 | ||||
-rw-r--r-- | src/fuse/main.c | 118 | ||||
-rw-r--r-- | src/fuse/main.h | 32 |
7 files changed, 396 insertions, 289 deletions
diff --git a/inc/dnbd3/config/cow.h b/inc/dnbd3/config/cow.h index d3be949..b266fc8 100644 --- a/inc/dnbd3/config/cow.h +++ b/inc/dnbd3/config/cow.h @@ -15,7 +15,7 @@ // +++++ COW API Endpoints +++++ #define COW_API_PREFIX "%s/v1/" #define COW_API_CREATE COW_API_PREFIX "file/create" -#define COW_API_UPDATE COW_API_PREFIX "file/update?guid=%s&clusterindex=%lu" +#define COW_API_UPDATE COW_API_PREFIX "file/update?uuid=%s&clusterindex=%lu" #define COW_API_START_MERGE COW_API_PREFIX "file/merge" #endif diff --git a/src/fuse/connection.c b/src/fuse/connection.c index 2264c00..dede680 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -960,15 +960,7 @@ static size_t receiveRequest(const int sock, dnbd3_async_t* request ) { if( useCow ) { cow_sub_request_t * cow_request = container_of( request, cow_sub_request_t, dRequest ); - // TODO This is ugly, we have a callback so we don't need to special-case receiving the - // reply, yet here we check what the callback function is for some reason :-( - // Ideally there should be no cow-related code in this file at all. - // This requires moving the callback to dnbd3_async_t from cow_sub_request_t though... - if( cow_request->callback == readRemoteData ) { - return sock_recv( sock, cow_request->buffer, request->length ); - } else{ - return sock_recv( sock, &cow_request->writeBuffer, request->length ); - } + return sock_recv( sock, cow_request->buffer, request->length ); } else { return sock_recv( sock, container_of( request, dnbd3_async_parent_t, request )->buffer, request->length ); } diff --git a/src/fuse/cowDoc/readme.md b/src/fuse/cowDoc/readme.md index fd3557c..a289209 100644 --- a/src/fuse/cowDoc/readme.md +++ b/src/fuse/cowDoc/readme.md @@ -56,7 +56,8 @@ typedef struct cow_l2_entry { atomic_int_least64_t offset; atomic_uint_least64_t timeChanged; - atomic_uint_least64_t uploads; + _Atomic(uint32_t) uploads; + _Atomic(uint32_t) fails; atomic_char bitfield[40]; } cow_l2_entry_t; ``` @@ -132,7 +133,7 @@ modifiedBlocks=0 idleClusters=0 totalClustersUploaded=0 activeUploads=0 -ulspeed=0.00 +avgSpeedKb=0.00 ``` - The `uuid` is the session uuid used by the Cow server to identify the session. @@ -161,19 +162,20 @@ typedef struct cowfile_metadata_header atomic_uint_least64_t imageSize; // 8byte int32_t version; // 4byte int32_t blocksize; // 4byte - uint64_t originalImageSize; // 8byte - uint64_t metaDataStart; // 8byte + uint64_t validRemoteSize; // 8byte + uint32_t startL1; // 4byte + uint32_t startL2; // 4byte int32_t bitfieldSize; // 4byte int32_t nextL2; // 4byte - atomic_uint_least64_t metadataFileSize; // 8byte - atomic_uint_least64_t dataFileSize; // 8byte + atomic_int_least64_t metaSize; // 8byte + atomic_int_least64_t nextClusterOffset; // 8byte uint64_t maxImageSize; // 8byte uint64_t creationTime; // 8byte char uuid[40]; // 40byte char imageName[200]; // 200byte } cowfile_metadata_header_t; ``` -After this header, the above-mentioned l1 and then the l2 data structure begins at byte 8192. +After this header, the above-mentioned l1 and then the l2 data structure begins at byte offsets specified by members startL1 and startL2. The offsets are absolute from the beginning of the file. ### data @@ -222,7 +224,7 @@ The following configuration variables have been added to `config/cow.h`. // +++++ COW API Endpoints +++++ #define COW_API_PREFIX "%s/v1/" #define COW_API_CREATE COW_API_PREFIX "file/create" -#define COW_API_UPDATE COW_API_PREFIX "file/update?guid=%s&clusterindex=%lu" +#define COW_API_UPDATE COW_API_PREFIX "file/update?uuid=%s&clusterindex=%lu" #define COW_API_START_MERGE COW_API_PREFIX "file/merge" ``` @@ -251,7 +253,7 @@ The following Rest API is used to transmit the data and commands to the cow serv | ---- | ----------- | | 200 | Success | -This request is used as soon as a new cow session is created. The returned guid is used in all subsequent requests to identify the session. +This request is used as soon as a new cow session is created. The returned uuid is used in all subsequent requests to identify the session. ### v1/file/update @@ -261,7 +263,7 @@ This request is used as soon as a new cow session is created. The returned guid | Name | Located in | Description | Required | Schema | | ---- | ---------- | ----------- | -------- | ---- | -| guid | query | | Yes | string (uuid) | +| uuid | query | | Yes | string (uuid) | | clusterNumber | query | | Yes | integer | ##### Responses @@ -280,7 +282,7 @@ Used to upload a cluster. The cluster number is the absolute cluster number. The | Name | Located in | Description | Required | Schema | | ---- | ---------- | ----------- | -------- | ---- | -| guid | Form | | Yes | string (uuid) | +| uuid | Form | | Yes | string (uuid) | | originalFileSize | Form | | Yes | integer | | newFileSize | Form | | Yes | integer | ##### Responses @@ -299,7 +301,7 @@ Used to start the merge on the server. | Name | Located in | Description | Required | Schema | | ---- | ---------- | ----------- | -------- | ---- | -| guid | query | | Yes | string (uuid) | +| uuid | query | | Yes | string (uuid) | | amount | query | | Yes | integer | ##### Responses @@ -317,7 +319,7 @@ This request returns a list containing the cluster IDs and the number of uploads | Name | Located in | Description | Required | Schema | | ---- | ---------- | ----------- | -------- | ---- | -| guid | query | | Yes | string (uuid) | +| uuid | query | | Yes | string (uuid) | ##### Responses diff --git a/src/fuse/cowfile.c b/src/fuse/cowfile.c index a53b101..60d82fb 100644 --- a/src/fuse/cowfile.c +++ b/src/fuse/cowfile.c @@ -2,12 +2,17 @@ #include "main.h" #include "connection.h" +#include <dnbd3/config.h> +#include <dnbd3/types.h> #include <dnbd3/shared/log.h> #include <sys/mman.h> #include <string.h> #include <pthread.h> #include <errno.h> #include <curl/curl.h> +#include <signal.h> +#include <inttypes.h> +#include <assert.h> #define UUID_STRLEN 36 // Maximum assumed page size, in case the cow data gets transferred between different architectures @@ -16,7 +21,7 @@ extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi ); -static const int CURRENT_COW_VERSION = 2; +static const int CURRENT_COW_VERSION = 3; static bool statStdout; static bool statFile; @@ -28,8 +33,10 @@ static cowfile_metadata_header_t *metadata = NULL; static atomic_uint_fast64_t bytesUploaded; static uint64_t totalBlocksUploaded = 0; static int activeUploads = 0; -atomic_bool uploadLoop = true; // Keep upload loop running? -atomic_bool uploadLoopDone = false; // Upload loop has finished all work? +static int uploadLoopThrottle = 0; +static atomic_bool uploadLoop = true; // Keep upload loop running? +static atomic_bool uploadLoopDone = false; // Upload loop has finished all work? +static atomic_bool uploadCancelled = false; // Skip uploading remaining blocks static struct cow { @@ -172,7 +179,7 @@ static bool checkBit( atomic_uchar *bitfield, int64_t n ) * @param response userdata which will later contain the uuid * @return size_t size that have been read */ -size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response ) +static size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response ) { uint64_t done = strlen( response ); uint64_t bytes = itemSize * nitems; @@ -186,12 +193,12 @@ size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, } /** - * @brief Create a Session with the cow server and gets the session guid. + * @brief Create a Session with the cow server and gets the session uuid. * * @param imageName * @param version of the original Image */ -bool createSession( const char *imageName, uint16_t version ) +static bool createSession( const char *imageName, uint16_t version ) { CURLcode res; char url[COW_URL_STRING_SIZE]; @@ -240,7 +247,7 @@ bool createSession( const char *imageName, uint16_t version ) } curl_easy_reset( curl ); metadata->uuid[UUID_STRLEN] = '\0'; - logadd( LOG_DEBUG1, "Cow session started, guid: %s\n", metadata->uuid ); + logadd( LOG_DEBUG1, "Cow session started, uuid: %s\n", metadata->uuid ); return true; } @@ -256,7 +263,7 @@ bool createSession( const char *imageName, uint16_t version ) * @param userdata from CURLOPT_READFUNCTION * @return size_t size written in buffer */ -size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata ) +static size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata ) { cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata; size_t len = 0; @@ -279,6 +286,7 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void * int bitNumber = (int)( inClusterOffset / DNBD3_BLOCK_SIZE ); size_t readSize; // Small performance hack: All bits one in a byte, do a 32k instead of 4k read + // TODO: preadv with a large iov, reading unchanged blocks into a trash-buffer if ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE * 8 && bitNumber % 8 == 0 && uploadBlock->bitfield[bitNumber / 8] == 0xff ) { @@ -286,11 +294,13 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void * } else { readSize = DNBD3_BLOCK_SIZE; } - // Check bits in our copy, as global bitfield could change - if ( checkBit( uploadBlock->bitfield, bitNumber ) ) { + // If handling single block, check bits in our copy, as global bitfield could change + if ( readSize != DNBD3_BLOCK_SIZE || checkBit( uploadBlock->bitfield, bitNumber ) ) { ssize_t lengthRead = pread( cow.fdData, ( ptr + len ), readSize, - uploadBlock->block->offset + inClusterOffset ); + uploadBlock->cluster->offset + inClusterOffset ); if ( lengthRead == -1 ) { + if ( errno == EAGAIN ) + continue; logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno ); return CURL_READFUNC_ABORT; } @@ -313,69 +323,59 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void * /** * @brief Requests the merging of the image on the cow server. */ -bool mergeRequest() +static bool postMergeRequest() { CURLcode res; - curl_easy_setopt( curl, CURLOPT_POST, 1L ); - char url[COW_URL_STRING_SIZE]; - snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress ); - curl_easy_setopt( curl, CURLOPT_URL, url ); - - - curl_mime *mime; - curl_mimepart *part; - mime = curl_mime_init( curl ); - - part = curl_mime_addpart( mime ); - curl_mime_name( part, "guid" ); - curl_mime_data( part, metadata->uuid, CURL_ZERO_TERMINATED ); + char body[500]; + char *uuid; - part = curl_mime_addpart( mime ); - curl_mime_name( part, "originalFileSize" ); - char buf[21]; - snprintf( buf, sizeof buf, "%" PRIu64, metadata->validRemoteSize ); - curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); - - part = curl_mime_addpart( mime ); - curl_mime_name( part, "newFileSize" ); - snprintf( buf, sizeof buf, "%" PRIu64, metadata->imageSize ); - curl_mime_data( part, buf, CURL_ZERO_TERMINATED ); + curl_easy_reset( curl ); - curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime ); + snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress ); + curl_easy_setopt( curl, CURLOPT_URL, url ); + curl_easy_setopt( curl, CURLOPT_POST, 1L ); + uuid = curl_easy_escape( curl, metadata->uuid, 0 ); + if ( uuid == NULL ) { + logadd( LOG_ERROR, "Error escaping uuid" ); + uuid = metadata->uuid; // Hope for the best + } + snprintf( body, sizeof body, "originalFileSize=%"PRIu64"&newFileSize=%"PRIu64"&uuid=%s", + metadata->validRemoteSize, metadata->imageSize, uuid ); + if ( uuid != metadata->uuid ) { + curl_free( uuid ); + } + curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body ); res = curl_easy_perform( curl ); if ( res != CURLE_OK ) { - logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s\n", curl_easy_strerror( res ) ); - curl_easy_reset( curl ); + logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s", curl_easy_strerror( res ) ); return false; } long http_code = 0; curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code ); - if ( http_code != 200 ) { - logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld\n", http_code ); - curl_easy_reset( curl ); + if ( http_code < 200 || http_code >= 300 ) { + logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld", http_code ); return false; } - curl_easy_reset( curl ); - curl_mime_free( mime ); return true; } /** - * @brief Wrapper for mergeRequest so if its fails it will be tried again. + * @brief Wrapper for postMergeRequest so if its fails it will be tried again. * */ -void startMerge() +static void requestRemoteMerge() { int fails = 0; bool success = false; - success = mergeRequest(); + success = postMergeRequest(); while ( fails <= 5 && !success ) { fails++; logadd( LOG_WARNING, "Trying again. %i/5", fails ); - mergeRequest(); + sleep( 10 ); + postMergeRequest(); } } @@ -387,26 +387,54 @@ void startMerge() * This function computes the uploaded bytes between each call and adds it to * bytesUploaded, which is used to compute the kb/s uploaded over all transfers. * - * @param clientp * @param ulNow number of bytes uploaded by this transfer so far. * @return int always returns 0 to continue the callbacks. */ -int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal, - __attribute__((unused)) curl_off_t dlNow, __attribute__((unused)) curl_off_t ulTotal, curl_off_t ulNow ) +static int progress_callback( void *clientp, UNUSED curl_off_t dlTotal, + UNUSED curl_off_t dlNow, UNUSED curl_off_t ulTotal, curl_off_t ulNow ) { - CURL *eh = (CURL *)clientp; - cow_curl_read_upload_t *uploadingCluster; - CURLcode res; - res = curl_easy_getinfo( eh, CURLINFO_PRIVATE, &uploadingCluster ); - if ( res != CURLE_OK ) { - logadd( LOG_ERROR, "ERROR" ); - return 0; - } + cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t *)clientp; bytesUploaded += ( ulNow - uploadingCluster->ulLast ); uploadingCluster->ulLast = ulNow; return 0; } +#ifdef COW_DUMP_BLOCK_UPLOADS +static int cmpfunc( const void *a, const void *b ) +{ + return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads ); +} +/** + * @brief Writes all block numbers sorted by the number of uploads into the statsfile. + * + */ +static void dumpBlockUploads() +{ + long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); + + cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE]; + uint64_t currentBlock = 0; + for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { + if ( cow.l1[l1Index] == -1 ) { + continue; + } + for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { + cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); + + blockUploads[currentBlock].uploads = block->uploads; + blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); + currentBlock++; + } + } + qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc ); + + dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" ); + for ( uint64_t i = 0; i < currentBlock; i++ ) { + dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads ); + } +} +#endif + /** * @brief Updates the status to the stdout/statfile depending on the startup parameters. * @@ -415,8 +443,7 @@ int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal * @param idle Blocks that do not contain changes that have not yet been uploaded. * @param speedBuffer ptr to char array that contains the current upload speed. */ - -void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer ) +static void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer ) { char buffer[300]; const char *state; @@ -429,16 +456,17 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha state = "done"; } - int len = snprintf( buffer, 300, + int len = snprintf( buffer, sizeof buffer, + "[General]\n" "state=%s\n" "inQueue=%" PRIu64 "\n" "modifiedClusters=%" PRIu64 "\n" "idleClusters=%" PRIu64 "\n" "totalClustersUploaded=%" PRIu64 "\n" "activeUploads=%i\n" - "%s%s", + "%s%s\n", state, inQueue, modified, idle, totalBlocksUploaded, activeUploads, - COW_SHOW_UL_SPEED ? "ulspeed=" : "", + COW_SHOW_UL_SPEED ? "avgSpeedKb=" : "", speedBuffer ); if ( len == -1 ) { @@ -465,39 +493,6 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha #endif } } -int cmpfunc( const void *a, const void *b ) -{ - return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads ); -} -/** - * @brief Writes all block numbers sorted by the number of uploads into the statsfile. - * - */ -void dumpBlockUploads() -{ - long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); - - cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE]; - uint64_t currentBlock = 0; - for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { - if ( cow.l1[l1Index] == -1 ) { - continue; - } - for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { - cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); - - blockUploads[currentBlock].uploads = block->uploads; - blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); - currentBlock++; - } - } - qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc ); - - dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" ); - for ( uint64_t i = 0; i < currentBlock; i++ ) { - dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads ); - } -} /** * @brief Starts the upload of a given block. @@ -505,13 +500,14 @@ void dumpBlockUploads() * @param cm Curl_multi * @param uploadingCluster containing the data for the block to upload. */ -bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers ) +static bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers ) { CURL *eh = curl_easy_init(); char url[COW_URL_STRING_SIZE]; - snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber ); + snprintf( url, COW_URL_STRING_SIZE, + COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber ); curl_easy_setopt( eh, CURLOPT_URL, url ); curl_easy_setopt( eh, CURLOPT_POST, 1L ); @@ -531,7 +527,7 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl uploadingCluster->ulLast = 0; curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L ); curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback ); - curl_easy_setopt( eh, CURLOPT_XFERINFODATA, eh ); + curl_easy_setopt( eh, CURLOPT_XFERINFODATA, uploadingCluster ); } curl_easy_setopt( eh, CURLOPT_HTTPHEADER, headers ); curl_multi_add_handle( cm, eh ); @@ -549,9 +545,9 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl * @return true returned if the upload was successful or retries are still possible. * @return false returned if the upload was unsuccessful. */ -bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers ) +static bool clusterUploadDoneHandler( CURLM *cm, CURLMsg *msg ) { - bool status = true; + bool status = false; cow_curl_read_upload_t *uploadingCluster; CURLcode res; CURLcode res2; @@ -560,69 +556,106 @@ bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers ) long http_code = 0; res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code ); - if ( res != CURLE_OK || res2 != CURLE_OK || http_code < 200 || http_code >= 300 - || msg->msg != CURLMSG_DONE ) { - uploadingCluster->fails++; - logadd( LOG_ERROR, "COW_API_UPDATE failed %i/5: %s\n", uploadingCluster->fails, + if ( msg->msg != CURLMSG_DONE ) { + logadd( LOG_ERROR, "multi_message->msg unexpectedly not DONE (%d)", (int)msg->msg ); + } else if ( msg->data.result != CURLE_OK ) { + logadd( LOG_ERROR, "curl_easy returned non-OK after multi-finish: %s", curl_easy_strerror( msg->data.result ) ); - if ( uploadingCluster->fails < 5 ) { - addUpload( cm, uploadingCluster, headers ); - goto CLEANUP; + } else if ( res != CURLE_OK || res2 != CURLE_OK ) { + logadd( LOG_ERROR, "curl_easy_getinfo failed after multifinish (%d, %d)", (int)res, (int)res2 ); + } else if ( http_code == 503 ) { + // If the "Retry-After" header is set, we interpret this as the server being overloaded + // or not ready yet to take another update. We slow down our upload loop then. + // We'll only accept a delay in seconds here, not an HTTP Date string. + // Otherwise, increase the fails counter. + struct curl_header *ptr = NULL; + int delay; + CURLHcode h = curl_easy_header(curl, "Retry-After", 0, CURLH_HEADER, -1, &ptr); + if ( h == CURLHE_OK && ptr != NULL && ( delay = atoi( ptr->value ) ) > 0 ) { + if ( delay > 120 ) { + // Cap to two minutes + delay = 120; + } + logadd( LOG_INFO, "COW server is asking to backoff for %d seconds", delay ); + uploadLoopThrottle = MAX( uploadLoopThrottle, delay ); + status = true; + } else { + logadd( LOG_ERROR, "COW server returned 503 without Retry-After value" ); } + } else if ( http_code < 200 || http_code >= 300 ) { + logadd( LOG_ERROR, "COW server returned HTTP %ld", http_code ); + } else { + // everything went ok, reset timeChanged of underlying cluster, but only if it + // didn't get updated again in the meantime. + atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, 0 ); + uploadingCluster->cluster->uploads++; + totalBlocksUploaded++; free( uploadingCluster ); - status = false; - goto CLEANUP; + status = true; + } + if ( !status ) { + uploadingCluster->cluster->fails++; + if ( uploadLoopThrottle > 0 ) { + // Don't reset timeChanged timestamp, so the next iteration of uploadModifiedClusters + // will queue this upload again after the throttle time expired. + free( uploadingCluster ); + } else { + logadd( LOG_ERROR, "Uploading cluster failed %i/5 times", uploadingCluster->cluster->fails ); + // Pretend the block changed again just now, to prevent immediate retry + atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, + time( NULL ) ); + free( uploadingCluster ); + } } - - // everything went ok, update timeChanged - atomic_compare_exchange_strong( &uploadingCluster->block->timeChanged, &uploadingCluster->time, 0 ); - - uploadingCluster->block->uploads++; - - totalBlocksUploaded++; - free( uploadingCluster ); -CLEANUP: curl_multi_remove_handle( cm, msg->easy_handle ); curl_easy_cleanup( msg->easy_handle ); return status; } /** - * @brief - * * @param cm Curl_multi * @param activeUploads ptr to integer which holds the number of current uploads - * @param breakIfNotMax will return as soon as there are not all upload slots used, so they can be filled up. - * @param foregroundUpload used to determine the number of max uploads. If true COW_MAX_PARALLEL_UPLOADS will be the limit, + * @param minNumberUploads break out of loop as soon as there are less than these many transfers running * else COW_MAX_PARALLEL_BACKGROUND_UPLOADS. - * @return true returned if all upload's were successful - * @return false returned if one ore more upload's failed. + * @return true returned if all uploads were successful + * @return false returned if one ore more upload failed. */ -bool MessageHandler( - CURLM *cm, bool breakIfNotMax, bool foregroundUpload, struct curl_slist *headers ) +static bool curlMultiLoop( CURLM *cm, int minNumberUploads ) { CURLMsg *msg; int msgsLeft = -1; bool status = true; - do { - curl_multi_perform( cm, &activeUploads ); + + if ( minNumberUploads <= 0 ) { + minNumberUploads = 1; + } + for ( ;; ) { + CURLMcode mc = curl_multi_perform( cm, &activeUploads ); + if ( mc != CURLM_OK ) { + logadd( LOG_ERROR, "curl_multi_perform error %d, bailing out", (int)mc ); + status = false; + break; + } while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) { - if ( !finishUpload( cm, msg, headers ) ) { + if ( !clusterUploadDoneHandler( cm, msg ) ) { status = false; } } - if ( breakIfNotMax - && activeUploads - < ( foregroundUpload ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) { + if ( activeUploads < minNumberUploads ) { break; } // ony wait if there are active uploads - if ( activeUploads ) { - curl_multi_wait( cm, NULL, 0, 1000, NULL ); + if ( activeUploads > 0 ) { + mc = curl_multi_wait( cm, NULL, 0, 1000, NULL ); + if ( mc != CURLM_OK ) { + logadd( LOG_ERROR, "curl_multi_wait error %d, bailing out", (int)mc ); + status = false; + break; + } } - } while ( activeUploads ); + } return status; } @@ -635,7 +668,7 @@ bool MessageHandler( * @return true if all blocks uploaded successful * @return false if one ore more blocks failed to upload */ -bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm ) +bool uploadModifiedClusters( bool ignoreMinUploadDelay, CURLM *cm ) { bool success = true; struct curl_slist *headers = NULL; @@ -648,36 +681,39 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm ) if ( cow.l1[l1Index] == -1 ) { continue; // Not allocated } - // Now all L2 blocks + // Now all L2 clusters for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) { - cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index ); - if ( block->offset == -1 ) { + cow_l2_entry_t *cluster = ( cow.l2[cow.l1[l1Index]] + l2Index ); + if ( cluster->offset == -1 ) { continue; // Not allocated } - if ( block->timeChanged == 0 ) { + if ( cluster->timeChanged == 0 ) { continue; // Not changed } - if ( !ignoreMinUploadDelay && ( now - block->timeChanged < COW_MIN_UPLOAD_DELAY ) ) { + if ( !ignoreMinUploadDelay && ( now - cluster->timeChanged < COW_MIN_UPLOAD_DELAY ) ) { continue; // Last change not old enough } // Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached - do { - if ( !MessageHandler( cm, true, ignoreMinUploadDelay, headers ) ) { - success = false; - } - } while ( ( activeUploads >= ( ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS - : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) - && activeUploads > 0 ); + int minUploads = ignoreMinUploadDelay + ? COW_MAX_PARALLEL_UPLOADS + : COW_MAX_PARALLEL_BACKGROUND_UPLOADS; + if ( !curlMultiLoop( cm, minUploads ) ) { + success = false; + } + // Maybe one of the uploads was rejected by the server asking us to slow down a bit. + // Check for that case and don't trigger a new upload. + if ( uploadLoopThrottle > 0 ) { + goto DONE; + } cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) ); - b->block = block; + b->cluster = cluster; b->clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index ); - b->fails = 0; b->position = 0; - b->time = block->timeChanged; + b->time = cluster->timeChanged; // Copy, so it doesn't change during upload // when we assemble the data in curlReadCallbackUploadBlock() for ( int i = 0; i < COW_BITFIELD_SIZE; ++i ) { - b->bitfield[i] = block->bitfield[i]; + b->bitfield[i] = cluster->bitfield[i]; } addUpload( cm, b, headers ); if ( !ignoreMinUploadDelay && !uploadLoop ) { @@ -686,8 +722,12 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm ) } } DONE: + // Finish all the transfers still active while ( activeUploads > 0 ) { - MessageHandler( cm, false, ignoreMinUploadDelay, headers ); + if ( !curlMultiLoop( cm, 1 ) ) { + success = false; + break; + } } curl_slist_free_all( headers ); return success; @@ -699,17 +739,18 @@ DONE: * */ -void *cowfile_statUpdater( __attribute__((unused)) void *something ) +void *cowfile_statUpdater( UNUSED void *something ) { uint64_t lastUpdateTime = time( NULL ); + time_t now; + char speedBuffer[20]; while ( !uploadLoopDone ) { - sleep( COW_STATS_UPDATE_TIME ); int modified = 0; int inQueue = 0; int idle = 0; long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE ); - uint64_t now = time( NULL ); + now = time( NULL ); for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) { if ( cow.l1[l1Index] == -1 ) { continue; @@ -730,59 +771,88 @@ void *cowfile_statUpdater( __attribute__((unused)) void *something ) } } } - char speedBuffer[20]; if ( COW_SHOW_UL_SPEED ) { + double delta; + double bytes = (double)atomic_exchange( &bytesUploaded, 0 ); now = time( NULL ); - uint64_t bytes = atomic_exchange( &bytesUploaded, 0 ); - snprintf( speedBuffer, 20, "%.2f", (double)( ( bytes ) / ( 1 + now - lastUpdateTime ) / 1000 ) ); - + delta = (double)( now - lastUpdateTime ); lastUpdateTime = now; + if ( delta > 0 ) { + snprintf( speedBuffer, sizeof speedBuffer, "%.2f", bytes / 1000.0 / delta ); + } } - updateCowStatsFile( inQueue, modified, idle, speedBuffer ); + sleep( COW_STATS_UPDATE_TIME ); } return NULL; } +void quitSigHandler( int sig UNUSED ) +{ + uploadCancelled = true; + uploadLoop = false; + main_shutdown(); +} + /** * @brief main loop for blockupload in the background */ -static void *uploaderThreadMain( __attribute__((unused)) void *something ) +static void *uploaderThreadMain( UNUSED void *something ) { CURLM *cm; cm = curl_multi_init(); - curl_multi_setopt( - cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ); + curl_multi_setopt( cm, CURLMOPT_MAXCONNECTS, + (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ); + do { + // Unblock so this very thread gets the signal for abandoning the upload + struct sigaction newHandler = { .sa_handler = &quitSigHandler }; + sigemptyset( &newHandler.sa_mask ); + sigaction( SIGQUIT, &newHandler, NULL ); + sigset_t sigmask; + sigemptyset( &sigmask ); + sigaddset( &sigmask, SIGQUIT ); + pthread_sigmask( SIG_UNBLOCK, &sigmask, NULL ); + } while ( 0 ); while ( uploadLoop ) { - uploaderLoop( false, cm ); + while ( uploadLoopThrottle > 0 && uploadLoop ) { + sleep( 1 ); + uploadLoopThrottle--; + } sleep( 2 ); + if ( !uploadLoop ) + break; + uploadModifiedClusters( false, cm ); } - logadd( LOG_DEBUG1, "start uploading the remaining blocks." ); - // force the upload of all remaining blocks because the user dismounted the image - if ( !uploaderLoop( true, cm ) ) { - logadd( LOG_ERROR, "one or more blocks failed to upload" ); - curl_multi_cleanup( cm ); + if ( uploadCancelled ) { uploadLoopDone = true; - return NULL; + logadd( LOG_INFO, "Not uploading remaining clusters, SIGQUIT received" ); + } else { + // force the upload of all remaining blocks because the user dismounted the image + logadd( LOG_INFO, "Start uploading the remaining clusters." ); + if ( !uploadModifiedClusters( true, cm ) ) { + uploadLoopDone = true; + logadd( LOG_ERROR, "One or more clusters failed to upload" ); + } else { + uploadLoopDone = true; + logadd( LOG_DEBUG1, "All clusters uploaded" ); + if ( cow_merge_after_upload ) { + requestRemoteMerge(); + logadd( LOG_DEBUG1, "Requesting merge" ); + } + } } - uploadLoopDone = true; curl_multi_cleanup( cm ); - logadd( LOG_DEBUG1, "all blocks uploaded" ); - if ( cow_merge_after_upload ) { - startMerge(); - logadd( LOG_DEBUG1, "Requesting merge." ); - } return NULL; } /** - * @brief Create a Cow Stats File an inserts the session guid + * @brief Create a Cow Stats File an inserts the session uuid * * @param path where the file is created * @return true @@ -819,11 +889,17 @@ static bool createCowStatsFile( char *path ) * @param path where the files should be stored * @param image_Name name of the original file/image * @param imageSizePtr + * @param cowUuid optional, use given UUID for talking to COW server instead of creating session */ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, atomic_uint_fast64_t **imageSizePtr, - char *serverAddress, bool sStdout, bool sfile ) + char *serverAddress, bool sStdout, bool sfile, const char *cowUuid ) { + if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) { + logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid ); + return false; + } + statStdout = sStdout; statFile = sfile; char pathMeta[strlen( path ) + 6]; @@ -860,6 +936,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, // size of l1 array + number of l2's * size of l2 size_t ps = getpagesize(); + if ( ps == 0 || ps > INT_MAX ) { + logadd( LOG_ERROR, "Cannot get native page size, aborting..." ); + return false; + } size_t metaSize = ( ( startL2 + l1NumEntries * sizeof( l2 ) + ps - 1 ) / ps ) * ps; if ( ftruncate( cow.fdMeta, metaSize ) != 0 ) { @@ -911,7 +991,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, logadd( LOG_ERROR, "Error on curl init. Bye.\n" ); return false; } - if ( !createSession( image_Name, imageVersion ) ) { + if ( cowUuid != NULL ) { + snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid ); + logadd( LOG_INFO, "Using provided upload session id" ); + } else if ( !createSession( image_Name, imageVersion ) ) { return false; } createCowStatsFile( path ); @@ -925,7 +1008,7 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, * @param path where the meta & data file is located * @param imageSizePtr */ -bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile ) +bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid ) { statStdout = sStdout; statFile = sFile; @@ -935,6 +1018,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server char pathMeta[strlen( path ) + 6]; char pathData[strlen( path ) + 6]; + if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) { + logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid ); + return false; + } + snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" ); snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" ); @@ -1026,6 +1114,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap ); + if ( cowUuid != NULL ) { + logadd( LOG_INFO, "Overriding stored upload session id with provided one" ); + snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid ); + } + *imageSizePtr = &metadata->imageSize; cow.l1 = (l1 *)( cow.metadata_mmap + metadata->startL1 ); cow.l2 = (l2 *)( cow.metadata_mmap + metadata->startL2 ); @@ -1150,7 +1243,7 @@ static void writePaddedBlock( cow_sub_request_t *sRequest ) // Here, we again check if the block is written locally - there might have been a second write // that wrote the full block, hence didn't have to wait for remote data and finished faster. // In that case, don't pad from remote as we'd overwrite newer data. - if ( isBlockLocal( sRequest->block, sRequest->inClusterOffset ) ) { + if ( isBlockLocal( sRequest->cluster, sRequest->inClusterOffset ) ) { logadd( LOG_INFO, "It happened!" ); } else { // copy write Data @@ -1158,13 +1251,13 @@ static void writePaddedBlock( cow_sub_request_t *sRequest ) memcpy( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ), sRequest->writeSrc, sRequest->size ); if ( !writeAll( cow.fdData, sRequest->writeBuffer, DNBD3_BLOCK_SIZE, - sRequest->block->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) { + sRequest->cluster->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) { sRequest->cowRequest->errorCode = errno; } else { sRequest->cowRequest->bytesWorkedOn += sRequest->size; int64_t bit = sRequest->inClusterOffset / DNBD3_BLOCK_SIZE; - setBitsInBitfield( sRequest->block->bitfield, bit, bit, true ); - sRequest->block->timeChanged = time( NULL ); + setBitsInBitfield( sRequest->cluster->bitfield, bit, bit, true ); + sRequest->cluster->timeChanged = time( NULL ); } } @@ -1228,10 +1321,11 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest, cow_sub_request_t *sub = calloc( sizeof( *sub ) + DNBD3_BLOCK_SIZE, 1 ); sub->callback = writePaddedBlock; sub->inClusterOffset = inClusterOffset; - sub->block = cluster; + sub->cluster = cluster; sub->size = size; sub->writeSrc = srcBuffer; sub->cowRequest = cowRequest; + sub->buffer = sub->writeBuffer; sub->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, validImageSize - startOffset ); sub->dRequest.offset = startOffset & ~DNBD3_BLOCK_MASK; @@ -1248,7 +1342,7 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest, /** * @brief Will be called after a dnbd3_async_t is finished. - * Calls the corrsponding callback function, either writePaddedBlock or readRemoteData + * Calls the corrsponding callback function, either writePaddedBlock or readRemoteCallback * depending if the original fuse request was a write or read. * */ @@ -1265,7 +1359,7 @@ void cowfile_handleCallback( dnbd3_async_t *request ) * so replys to fuse and cleans up the request. * */ -void readRemoteData( cow_sub_request_t *sRequest ) +static void readRemoteCallback( cow_sub_request_t *sRequest ) { atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length ); @@ -1464,7 +1558,7 @@ static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer return; assert( size > 0 ); cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) ); - sRequest->callback = readRemoteData; + sRequest->callback = readRemoteCallback; sRequest->dRequest.length = (uint32_t)size; sRequest->dRequest.offset = offset; sRequest->dRequest.fuse_req = req; @@ -1632,13 +1726,15 @@ fail:; void cowfile_close() { uploadLoop = false; + pthread_join( tidCowUploader, NULL ); if ( statFile || statStdout ) { + // Send a signal in case it's hanging in the sleep call + pthread_kill( tidStatUpdater, SIGHUP ); pthread_join( tidStatUpdater, NULL ); } - pthread_join( tidCowUploader, NULL ); if ( curl ) { - curl_global_cleanup(); curl_easy_cleanup( curl ); + curl_global_cleanup(); } } diff --git a/src/fuse/cowfile.h b/src/fuse/cowfile.h index 0f395de..d124b0c 100644 --- a/src/fuse/cowfile.h +++ b/src/fuse/cowfile.h @@ -20,6 +20,7 @@ _Static_assert( ATOMIC_INT_LOCK_FREE == 2, "ATOMIC INT not lock free" ); _Static_assert( ATOMIC_LONG_LOCK_FREE == 2, "ATOMIC LONG not lock free" ); _Static_assert( ATOMIC_LLONG_LOCK_FREE == 2, "ATOMIC LLONG not lock free" ); _Static_assert( sizeof( atomic_uint_least64_t ) == 8, "atomic_uint_least64_t not 8 byte" ); +_Static_assert( sizeof( _Atomic(uint32_t) ) == 4, "_Atomic(uint32_t) not 4 byte" ); _Static_assert( sizeof( atomic_int_least64_t ) == 8, "atomic_int_least64_t not 8 byte" ); enum dataSource @@ -56,8 +57,9 @@ _Static_assert( sizeof( cowfile_metadata_header_t ) == COW_METADATA_HEADER_SIZE, typedef struct cow_l2_entry { atomic_int_least64_t offset; - atomic_uint_least64_t timeChanged; - atomic_uint_least64_t uploads; + atomic_int_least64_t timeChanged; + _Atomic(uint32_t) uploads; + _Atomic(uint32_t) fails; atomic_uchar bitfield[COW_BITFIELD_SIZE]; } cow_l2_entry_t; _Static_assert( sizeof( cow_l2_entry_t ) == COW_L2_ENTRY_SIZE, "cow_l2_entry_t is messed up" ); @@ -93,7 +95,7 @@ typedef struct cow_sub_request off_t inClusterOffset; // offset relative to the beginning of the cluster const char *writeSrc; // pointer to the data of a write request which needs padding char *buffer; // The pointer points to the original read buffer to the place where the sub read request should be copied to. - cow_l2_entry_t *block; // the cluster inClusterOffset refers to + cow_l2_entry_t *cluster; // the cluster inClusterOffset refers to cow_callback callback; // Callback when we're done handling this cow_request_t *cowRequest; // parent request dnbd3_async_t dRequest; // Probably request to dnbd3-server for non-aligned writes (wrt 4k dnbd3 block) @@ -103,10 +105,9 @@ typedef struct cow_sub_request typedef struct cow_curl_read_upload { atomic_uint_least64_t time; - cow_l2_entry_t *block; + cow_l2_entry_t *cluster; size_t position; long unsigned int clusterNumber; - int fails; int64_t ulLast; atomic_uchar bitfield[COW_BITFIELD_SIZE]; } cow_curl_read_upload_t; @@ -122,9 +123,9 @@ typedef int32_t l1; typedef cow_l2_entry_t l2[COW_L2_TABLE_SIZE]; bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, atomic_uint_fast64_t **imageSizePtr, - char *serverAddress, bool sStdout, bool sFile ); + char *serverAddress, bool sStdout, bool sFile, const char *cowUuid ); -bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile ); +bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid ); bool cowfile_startBackgroundThreads(); void cowfile_read( fuse_req_t req, size_t size, off_t offset ); diff --git a/src/fuse/main.c b/src/fuse/main.c index 96d8f5c..7be34bd 100644 --- a/src/fuse/main.c +++ b/src/fuse/main.c @@ -9,8 +9,34 @@ * */ #include "main.h" - - +#include "cowfile.h" +#include "connection.h" +#include "helper.h" +#include <dnbd3/version.h> +#include <dnbd3/build.h> +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/log.h> +#include <dnbd3/config.h> + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <unistd.h> +#include <assert.h> +/* for printing uint */ +//#define __STDC_FORMAT_MACROS +#include <inttypes.h> +#include <getopt.h> +#include <time.h> +#include <signal.h> +#include <pthread.h> +#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0) + +#define INO_ROOT (1) +#define INO_STATS (2) +#define INO_IMAGE (3) static const char *IMAGE_NAME = "img"; static const char *STATS_NAME = "status"; @@ -46,14 +72,14 @@ static int image_stat( fuse_ino_t ino, struct stat *stbuf ) case INO_ROOT: stbuf->st_mode = S_IFDIR | 0550; if( useCow ) { - stbuf->st_mode = S_IFDIR | 0777; + stbuf->st_mode = S_IFDIR | 0770; } stbuf->st_nlink = 2; stbuf->st_mtim = startupTime; break; case INO_IMAGE: if ( useCow ) { - stbuf->st_mode = S_IFREG | 0777; + stbuf->st_mode = S_IFREG | 0660; } else { stbuf->st_mode = S_IFREG | 0440; } @@ -212,7 +238,7 @@ static void image_ll_read( fuse_req_t req, fuse_ino_t ino, size_t size, off_t of ++logInfo.blockRequestCount[startBlock]; } } - + dnbd3_async_parent_t *parent = malloc( sizeof(dnbd3_async_parent_t) + size ); parent->request.length = (uint32_t)size; @@ -230,7 +256,7 @@ static void noopSigHandler( int signum ) (void)signum; } -static void image_ll_init( void *userdata, struct fuse_conn_info *conn ) +static void image_ll_init( void *userdata UNUSED, struct fuse_conn_info *conn UNUSED ) { ( void ) userdata; ( void ) conn; @@ -342,6 +368,7 @@ static void printUsage( char *argv0, int exitCode ) printf( " -c Enables cow, creates the cow files at given location\n" ); printf( " -L Loads the cow files from the given location\n" ); printf( " -C Host address of the cow server\n" ); + printf( "--upload-uuid <id> Use provided UUID as upload session id instead of asking server/loading from file\n" ); printf( "--cow-stats-stdout prints the cow status in stdout\n" ); printf( "--cow-stats-file creates and updates the cow status file\n" ); printf( " -m --merge tell server to merge and create new revision on exit\n" ); @@ -363,6 +390,7 @@ static const struct option longOpts[] = { { "loadcow", required_argument, NULL, 'L' }, { "cowServer", required_argument, NULL, 'C' }, { "merge", no_argument, NULL, 'm' }, + { "upload-uuid", required_argument, NULL, 'uuid' }, { "cow-stats-stdout", no_argument, NULL, 'sout' }, { "cow-stats-file", no_argument, NULL, 'sfil' }, { 0, 0, 0, 0 } @@ -388,6 +416,7 @@ int main( int argc, char *argv[] ) bool loadCow = false; bool sStdout = false; bool sFile = false; + const char *cowUuidOverride = NULL; log_init(); @@ -474,6 +503,9 @@ int main( int argc, char *argv[] ) case 'sfil': sFile = true; break; + case 'uuid': + cowUuidOverride = optarg; + break; default: printUsage( argv[0], EXIT_FAILURE ); } @@ -493,7 +525,7 @@ int main( int argc, char *argv[] ) } } if( useCow && cow_server_address == NULL ) { - printf( "for -c you also need a cow server address. Please also use -C --host \n" ); + printf( "for -c you also need a cow server address. Please also use -C\n" ); printUsage( argv[0], EXIT_FAILURE ); } if( cow_merge_after_upload && !useCow ) { @@ -502,24 +534,26 @@ int main( int argc, char *argv[] ) } if ( loadCow ) { if( cow_server_address == NULL ) { - printf( "for -L you also need a cow server address. Please also use -C --host \n" ); + printf( "for -L you also need a cow server address. Please also use -C\n" ); printUsage( argv[0], EXIT_FAILURE ); } - if ( !cowfile_load( cow_file_path, &imageSizePtr, cow_server_address, sStdout, sFile ) ) { + if ( !cowfile_load( cow_file_path, &imageSizePtr, cow_server_address, sStdout, sFile, cowUuidOverride ) ) { return EXIT_FAILURE; } - } - // Prepare our handler - struct sigaction newHandler; - memset( &newHandler, 0, sizeof( newHandler ) ); - newHandler.sa_handler = &noopSigHandler; - sigemptyset( &newHandler.sa_mask ); - sigaction( SIGHUP, &newHandler, NULL ); - sigset_t sigmask; - sigemptyset( &sigmask ); - sigaddset( &sigmask, SIGHUP ); - pthread_sigmask( SIG_BLOCK, &sigmask, NULL ); + } + do { + // The empty handler prevents fuse from registering its own handler + struct sigaction newHandler = { .sa_handler = &noopSigHandler }; + sigemptyset( &newHandler.sa_mask ); + sigaction( SIGHUP, &newHandler, NULL ); + } while ( 0 ); + if ( useCow ) { + sigset_t sigmask; + sigemptyset( &sigmask ); + sigaddset( &sigmask, SIGQUIT ); // Block here and unblock in cow as abort signal + pthread_sigmask( SIG_BLOCK, &sigmask, NULL ); + } if ( !connection_init( server_address, image_Name, rid, learnNewServers ) ) { logadd( LOG_ERROR, "Could not connect to any server. Bye.\n" ); @@ -538,9 +572,9 @@ int main( int argc, char *argv[] ) } newArgv[newArgc++] = "-o"; - if(useCow){ + if ( useCow ) { newArgv[newArgc++] = "default_permissions"; - }else{ + } else { newArgv[newArgc++] = "ro,default_permissions"; } // Mount point goes last @@ -555,7 +589,7 @@ int main( int argc, char *argv[] ) owner = getuid(); if ( useCow & !loadCow) { - if( !cowfile_init( cow_file_path, connection_getImageName(), connection_getImageRID(), &imageSizePtr, cow_server_address, sStdout, sFile ) ) { + if( !cowfile_init( cow_file_path, connection_getImageName(), connection_getImageRID(), &imageSizePtr, cow_server_address, sStdout, sFile, cowUuidOverride ) ) { return EXIT_FAILURE; } } @@ -576,24 +610,24 @@ int main( int argc, char *argv[] ) if ( _fuseSession == NULL ) { logadd( LOG_ERROR, "Could not initialize fuse session" ); } else { + fuse_session_add_chan( _fuseSession, ch ); + // Do not spawn any threads before we daemonize, they'd die at this point + fuse_daemonize( foreground ); if ( fuse_set_signal_handlers( _fuseSession ) == -1 ) { - logadd( LOG_ERROR, "Could not install fuse signal handlers" ); - } else { - fuse_session_add_chan( _fuseSession, ch ); - fuse_daemonize( foreground ); - if ( useCow ) { - if ( !cowfile_startBackgroundThreads() ){ - logadd( LOG_ERROR, "Could not start cow background Threads" ); - } - } - if ( single_thread ) { - fuse_err = fuse_session_loop( _fuseSession ); - } else { - fuse_err = fuse_session_loop_mt( _fuseSession ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all + logadd( LOG_WARNING, "Could not install fuse signal handlers" ); + } + if ( useCow ) { + if ( !cowfile_startBackgroundThreads() ) { + logadd( LOG_ERROR, "Could not start cow background threads" ); } - fuse_remove_signal_handlers( _fuseSession ); - fuse_session_remove_chan( ch ); } + if ( single_thread ) { + fuse_err = fuse_session_loop( _fuseSession ); + } else { + fuse_err = fuse_session_loop_mt( _fuseSession ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all + } + fuse_remove_signal_handlers( _fuseSession ); + fuse_session_remove_chan( ch ); fuse_session_destroy( _fuseSession ); _fuseSession = NULL; } @@ -608,3 +642,11 @@ int main( int argc, char *argv[] ) logadd( LOG_DEBUG1, "Terminating. FUSE REPLIED: %d\n", fuse_err ); return fuse_err; } + +void main_shutdown(void) +{ + fuse_session_exit( _fuseSession ); + // TODO: Figure out why this doesn't wake up the fuse mainloop. + // For now, just send SIGQUIT followed by SIGTERM.... + kill( 0, SIGINT ); +} diff --git a/src/fuse/main.h b/src/fuse/main.h index 721f251..53d81e4 100644 --- a/src/fuse/main.h +++ b/src/fuse/main.h @@ -1,39 +1,13 @@ #ifndef _MAIN_H_ #define _MAIN_H_ -#include "cowfile.h" -#include "connection.h" -#include "helper.h" -#include <dnbd3/version.h> -#include <dnbd3/build.h> -#include <dnbd3/shared/protocol.h> -#include <dnbd3/shared/log.h> - #define FUSE_USE_VERSION 30 -#include <dnbd3/config.h> #include <fuse_lowlevel.h> -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <errno.h> -#include <fcntl.h> -#include <unistd.h> -#include <assert.h> -/* for printing uint */ -#define __STDC_FORMAT_MACROS -#include <inttypes.h> -#include <getopt.h> -#include <time.h> -#include <signal.h> -#include <pthread.h> -#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0) - -#define INO_ROOT (1) -#define INO_STATS (2) -#define INO_IMAGE (3) +#include <stdbool.h> extern bool useCow; extern bool cow_merge_after_upload; void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi ); +void main_shutdown(void); -#endif /* main_H_ */
\ No newline at end of file +#endif /* main_H_ */ |