summaryrefslogtreecommitdiffstats
path: root/src/fuse/cowfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fuse/cowfile.c')
-rw-r--r--src/fuse/cowfile.c480
1 files changed, 288 insertions, 192 deletions
diff --git a/src/fuse/cowfile.c b/src/fuse/cowfile.c
index a53b101..60d82fb 100644
--- a/src/fuse/cowfile.c
+++ b/src/fuse/cowfile.c
@@ -2,12 +2,17 @@
#include "main.h"
#include "connection.h"
+#include <dnbd3/config.h>
+#include <dnbd3/types.h>
#include <dnbd3/shared/log.h>
#include <sys/mman.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <curl/curl.h>
+#include <signal.h>
+#include <inttypes.h>
+#include <assert.h>
#define UUID_STRLEN 36
// Maximum assumed page size, in case the cow data gets transferred between different architectures
@@ -16,7 +21,7 @@
extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );
-static const int CURRENT_COW_VERSION = 2;
+static const int CURRENT_COW_VERSION = 3;
static bool statStdout;
static bool statFile;
@@ -28,8 +33,10 @@ static cowfile_metadata_header_t *metadata = NULL;
static atomic_uint_fast64_t bytesUploaded;
static uint64_t totalBlocksUploaded = 0;
static int activeUploads = 0;
-atomic_bool uploadLoop = true; // Keep upload loop running?
-atomic_bool uploadLoopDone = false; // Upload loop has finished all work?
+static int uploadLoopThrottle = 0;
+static atomic_bool uploadLoop = true; // Keep upload loop running?
+static atomic_bool uploadLoopDone = false; // Upload loop has finished all work?
+static atomic_bool uploadCancelled = false; // Skip uploading remaining blocks
static struct cow
{
@@ -172,7 +179,7 @@ static bool checkBit( atomic_uchar *bitfield, int64_t n )
* @param response userdata which will later contain the uuid
* @return size_t size that have been read
*/
-size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response )
+static size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response )
{
uint64_t done = strlen( response );
uint64_t bytes = itemSize * nitems;
@@ -186,12 +193,12 @@ size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems,
}
/**
- * @brief Create a Session with the cow server and gets the session guid.
+ * @brief Create a Session with the cow server and gets the session uuid.
*
* @param imageName
* @param version of the original Image
*/
-bool createSession( const char *imageName, uint16_t version )
+static bool createSession( const char *imageName, uint16_t version )
{
CURLcode res;
char url[COW_URL_STRING_SIZE];
@@ -240,7 +247,7 @@ bool createSession( const char *imageName, uint16_t version )
}
curl_easy_reset( curl );
metadata->uuid[UUID_STRLEN] = '\0';
- logadd( LOG_DEBUG1, "Cow session started, guid: %s\n", metadata->uuid );
+ logadd( LOG_DEBUG1, "Cow session started, uuid: %s\n", metadata->uuid );
return true;
}
@@ -256,7 +263,7 @@ bool createSession( const char *imageName, uint16_t version )
* @param userdata from CURLOPT_READFUNCTION
* @return size_t size written in buffer
*/
-size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata )
+static size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata )
{
cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata;
size_t len = 0;
@@ -279,6 +286,7 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *
int bitNumber = (int)( inClusterOffset / DNBD3_BLOCK_SIZE );
size_t readSize;
// Small performance hack: All bits one in a byte, do a 32k instead of 4k read
+ // TODO: preadv with a large iov, reading unchanged blocks into a trash-buffer
if ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE * 8
&& bitNumber % 8 == 0
&& uploadBlock->bitfield[bitNumber / 8] == 0xff ) {
@@ -286,11 +294,13 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *
} else {
readSize = DNBD3_BLOCK_SIZE;
}
- // Check bits in our copy, as global bitfield could change
- if ( checkBit( uploadBlock->bitfield, bitNumber ) ) {
+ // If handling single block, check bits in our copy, as global bitfield could change
+ if ( readSize != DNBD3_BLOCK_SIZE || checkBit( uploadBlock->bitfield, bitNumber ) ) {
ssize_t lengthRead = pread( cow.fdData, ( ptr + len ), readSize,
- uploadBlock->block->offset + inClusterOffset );
+ uploadBlock->cluster->offset + inClusterOffset );
if ( lengthRead == -1 ) {
+ if ( errno == EAGAIN )
+ continue;
logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno );
return CURL_READFUNC_ABORT;
}
@@ -313,69 +323,59 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *
/**
* @brief Requests the merging of the image on the cow server.
*/
-bool mergeRequest()
+static bool postMergeRequest()
{
CURLcode res;
- curl_easy_setopt( curl, CURLOPT_POST, 1L );
-
char url[COW_URL_STRING_SIZE];
- snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress );
- curl_easy_setopt( curl, CURLOPT_URL, url );
-
-
- curl_mime *mime;
- curl_mimepart *part;
- mime = curl_mime_init( curl );
-
- part = curl_mime_addpart( mime );
- curl_mime_name( part, "guid" );
- curl_mime_data( part, metadata->uuid, CURL_ZERO_TERMINATED );
+ char body[500];
+ char *uuid;
- part = curl_mime_addpart( mime );
- curl_mime_name( part, "originalFileSize" );
- char buf[21];
- snprintf( buf, sizeof buf, "%" PRIu64, metadata->validRemoteSize );
- curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
-
- part = curl_mime_addpart( mime );
- curl_mime_name( part, "newFileSize" );
- snprintf( buf, sizeof buf, "%" PRIu64, metadata->imageSize );
- curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
+ curl_easy_reset( curl );
- curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime );
+ snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress );
+ curl_easy_setopt( curl, CURLOPT_URL, url );
+ curl_easy_setopt( curl, CURLOPT_POST, 1L );
+ uuid = curl_easy_escape( curl, metadata->uuid, 0 );
+ if ( uuid == NULL ) {
+ logadd( LOG_ERROR, "Error escaping uuid" );
+ uuid = metadata->uuid; // Hope for the best
+ }
+ snprintf( body, sizeof body, "originalFileSize=%"PRIu64"&newFileSize=%"PRIu64"&uuid=%s",
+ metadata->validRemoteSize, metadata->imageSize, uuid );
+ if ( uuid != metadata->uuid ) {
+ curl_free( uuid );
+ }
+ curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body );
res = curl_easy_perform( curl );
if ( res != CURLE_OK ) {
- logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s\n", curl_easy_strerror( res ) );
- curl_easy_reset( curl );
+ logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s", curl_easy_strerror( res ) );
return false;
}
long http_code = 0;
curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
- if ( http_code != 200 ) {
- logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld\n", http_code );
- curl_easy_reset( curl );
+ if ( http_code < 200 || http_code >= 300 ) {
+ logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld", http_code );
return false;
}
- curl_easy_reset( curl );
- curl_mime_free( mime );
return true;
}
/**
- * @brief Wrapper for mergeRequest so if its fails it will be tried again.
+ * @brief Wrapper for postMergeRequest so if its fails it will be tried again.
*
*/
-void startMerge()
+static void requestRemoteMerge()
{
int fails = 0;
bool success = false;
- success = mergeRequest();
+ success = postMergeRequest();
while ( fails <= 5 && !success ) {
fails++;
logadd( LOG_WARNING, "Trying again. %i/5", fails );
- mergeRequest();
+ sleep( 10 );
+ postMergeRequest();
}
}
@@ -387,26 +387,54 @@ void startMerge()
* This function computes the uploaded bytes between each call and adds it to
* bytesUploaded, which is used to compute the kb/s uploaded over all transfers.
*
- * @param clientp
* @param ulNow number of bytes uploaded by this transfer so far.
* @return int always returns 0 to continue the callbacks.
*/
-int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal,
- __attribute__((unused)) curl_off_t dlNow, __attribute__((unused)) curl_off_t ulTotal, curl_off_t ulNow )
+static int progress_callback( void *clientp, UNUSED curl_off_t dlTotal,
+ UNUSED curl_off_t dlNow, UNUSED curl_off_t ulTotal, curl_off_t ulNow )
{
- CURL *eh = (CURL *)clientp;
- cow_curl_read_upload_t *uploadingCluster;
- CURLcode res;
- res = curl_easy_getinfo( eh, CURLINFO_PRIVATE, &uploadingCluster );
- if ( res != CURLE_OK ) {
- logadd( LOG_ERROR, "ERROR" );
- return 0;
- }
+ cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t *)clientp;
bytesUploaded += ( ulNow - uploadingCluster->ulLast );
uploadingCluster->ulLast = ulNow;
return 0;
}
+#ifdef COW_DUMP_BLOCK_UPLOADS
+static int cmpfunc( const void *a, const void *b )
+{
+ return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads );
+}
+/**
+ * @brief Writes all block numbers sorted by the number of uploads into the statsfile.
+ *
+ */
+static void dumpBlockUploads()
+{
+ long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
+
+ cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE];
+ uint64_t currentBlock = 0;
+ for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
+ if ( cow.l1[l1Index] == -1 ) {
+ continue;
+ }
+ for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
+ cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
+
+ blockUploads[currentBlock].uploads = block->uploads;
+ blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
+ currentBlock++;
+ }
+ }
+ qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc );
+
+ dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" );
+ for ( uint64_t i = 0; i < currentBlock; i++ ) {
+ dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads );
+ }
+}
+#endif
+
/**
* @brief Updates the status to the stdout/statfile depending on the startup parameters.
*
@@ -415,8 +443,7 @@ int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal
* @param idle Blocks that do not contain changes that have not yet been uploaded.
* @param speedBuffer ptr to char array that contains the current upload speed.
*/
-
-void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer )
+static void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer )
{
char buffer[300];
const char *state;
@@ -429,16 +456,17 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha
state = "done";
}
- int len = snprintf( buffer, 300,
+ int len = snprintf( buffer, sizeof buffer,
+ "[General]\n"
"state=%s\n"
"inQueue=%" PRIu64 "\n"
"modifiedClusters=%" PRIu64 "\n"
"idleClusters=%" PRIu64 "\n"
"totalClustersUploaded=%" PRIu64 "\n"
"activeUploads=%i\n"
- "%s%s",
+ "%s%s\n",
state, inQueue, modified, idle, totalBlocksUploaded, activeUploads,
- COW_SHOW_UL_SPEED ? "ulspeed=" : "",
+ COW_SHOW_UL_SPEED ? "avgSpeedKb=" : "",
speedBuffer );
if ( len == -1 ) {
@@ -465,39 +493,6 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha
#endif
}
}
-int cmpfunc( const void *a, const void *b )
-{
- return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads );
-}
-/**
- * @brief Writes all block numbers sorted by the number of uploads into the statsfile.
- *
- */
-void dumpBlockUploads()
-{
- long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
-
- cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE];
- uint64_t currentBlock = 0;
- for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
- if ( cow.l1[l1Index] == -1 ) {
- continue;
- }
- for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
- cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
-
- blockUploads[currentBlock].uploads = block->uploads;
- blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
- currentBlock++;
- }
- }
- qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc );
-
- dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" );
- for ( uint64_t i = 0; i < currentBlock; i++ ) {
- dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads );
- }
-}
/**
* @brief Starts the upload of a given block.
@@ -505,13 +500,14 @@ void dumpBlockUploads()
* @param cm Curl_multi
* @param uploadingCluster containing the data for the block to upload.
*/
-bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers )
+static bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers )
{
CURL *eh = curl_easy_init();
char url[COW_URL_STRING_SIZE];
- snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber );
+ snprintf( url, COW_URL_STRING_SIZE,
+ COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber );
curl_easy_setopt( eh, CURLOPT_URL, url );
curl_easy_setopt( eh, CURLOPT_POST, 1L );
@@ -531,7 +527,7 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl
uploadingCluster->ulLast = 0;
curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L );
curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback );
- curl_easy_setopt( eh, CURLOPT_XFERINFODATA, eh );
+ curl_easy_setopt( eh, CURLOPT_XFERINFODATA, uploadingCluster );
}
curl_easy_setopt( eh, CURLOPT_HTTPHEADER, headers );
curl_multi_add_handle( cm, eh );
@@ -549,9 +545,9 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl
* @return true returned if the upload was successful or retries are still possible.
* @return false returned if the upload was unsuccessful.
*/
-bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers )
+static bool clusterUploadDoneHandler( CURLM *cm, CURLMsg *msg )
{
- bool status = true;
+ bool status = false;
cow_curl_read_upload_t *uploadingCluster;
CURLcode res;
CURLcode res2;
@@ -560,69 +556,106 @@ bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers )
long http_code = 0;
res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code );
- if ( res != CURLE_OK || res2 != CURLE_OK || http_code < 200 || http_code >= 300
- || msg->msg != CURLMSG_DONE ) {
- uploadingCluster->fails++;
- logadd( LOG_ERROR, "COW_API_UPDATE failed %i/5: %s\n", uploadingCluster->fails,
+ if ( msg->msg != CURLMSG_DONE ) {
+ logadd( LOG_ERROR, "multi_message->msg unexpectedly not DONE (%d)", (int)msg->msg );
+ } else if ( msg->data.result != CURLE_OK ) {
+ logadd( LOG_ERROR, "curl_easy returned non-OK after multi-finish: %s",
curl_easy_strerror( msg->data.result ) );
- if ( uploadingCluster->fails < 5 ) {
- addUpload( cm, uploadingCluster, headers );
- goto CLEANUP;
+ } else if ( res != CURLE_OK || res2 != CURLE_OK ) {
+ logadd( LOG_ERROR, "curl_easy_getinfo failed after multifinish (%d, %d)", (int)res, (int)res2 );
+ } else if ( http_code == 503 ) {
+ // If the "Retry-After" header is set, we interpret this as the server being overloaded
+ // or not ready yet to take another update. We slow down our upload loop then.
+ // We'll only accept a delay in seconds here, not an HTTP Date string.
+ // Otherwise, increase the fails counter.
+ struct curl_header *ptr = NULL;
+ int delay;
+ CURLHcode h = curl_easy_header(curl, "Retry-After", 0, CURLH_HEADER, -1, &ptr);
+ if ( h == CURLHE_OK && ptr != NULL && ( delay = atoi( ptr->value ) ) > 0 ) {
+ if ( delay > 120 ) {
+ // Cap to two minutes
+ delay = 120;
+ }
+ logadd( LOG_INFO, "COW server is asking to backoff for %d seconds", delay );
+ uploadLoopThrottle = MAX( uploadLoopThrottle, delay );
+ status = true;
+ } else {
+ logadd( LOG_ERROR, "COW server returned 503 without Retry-After value" );
}
+ } else if ( http_code < 200 || http_code >= 300 ) {
+ logadd( LOG_ERROR, "COW server returned HTTP %ld", http_code );
+ } else {
+ // everything went ok, reset timeChanged of underlying cluster, but only if it
+ // didn't get updated again in the meantime.
+ atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, 0 );
+ uploadingCluster->cluster->uploads++;
+ totalBlocksUploaded++;
free( uploadingCluster );
- status = false;
- goto CLEANUP;
+ status = true;
+ }
+ if ( !status ) {
+ uploadingCluster->cluster->fails++;
+ if ( uploadLoopThrottle > 0 ) {
+ // Don't reset timeChanged timestamp, so the next iteration of uploadModifiedClusters
+ // will queue this upload again after the throttle time expired.
+ free( uploadingCluster );
+ } else {
+ logadd( LOG_ERROR, "Uploading cluster failed %i/5 times", uploadingCluster->cluster->fails );
+ // Pretend the block changed again just now, to prevent immediate retry
+ atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time,
+ time( NULL ) );
+ free( uploadingCluster );
+ }
}
-
- // everything went ok, update timeChanged
- atomic_compare_exchange_strong( &uploadingCluster->block->timeChanged, &uploadingCluster->time, 0 );
-
- uploadingCluster->block->uploads++;
-
- totalBlocksUploaded++;
- free( uploadingCluster );
-CLEANUP:
curl_multi_remove_handle( cm, msg->easy_handle );
curl_easy_cleanup( msg->easy_handle );
return status;
}
/**
- * @brief
- *
* @param cm Curl_multi
* @param activeUploads ptr to integer which holds the number of current uploads
- * @param breakIfNotMax will return as soon as there are not all upload slots used, so they can be filled up.
- * @param foregroundUpload used to determine the number of max uploads. If true COW_MAX_PARALLEL_UPLOADS will be the limit,
+ * @param minNumberUploads break out of loop as soon as there are less than these many transfers running
* else COW_MAX_PARALLEL_BACKGROUND_UPLOADS.
- * @return true returned if all upload's were successful
- * @return false returned if one ore more upload's failed.
+ * @return true returned if all uploads were successful
+ * @return false returned if one ore more upload failed.
*/
-bool MessageHandler(
- CURLM *cm, bool breakIfNotMax, bool foregroundUpload, struct curl_slist *headers )
+static bool curlMultiLoop( CURLM *cm, int minNumberUploads )
{
CURLMsg *msg;
int msgsLeft = -1;
bool status = true;
- do {
- curl_multi_perform( cm, &activeUploads );
+
+ if ( minNumberUploads <= 0 ) {
+ minNumberUploads = 1;
+ }
+ for ( ;; ) {
+ CURLMcode mc = curl_multi_perform( cm, &activeUploads );
+ if ( mc != CURLM_OK ) {
+ logadd( LOG_ERROR, "curl_multi_perform error %d, bailing out", (int)mc );
+ status = false;
+ break;
+ }
while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) {
- if ( !finishUpload( cm, msg, headers ) ) {
+ if ( !clusterUploadDoneHandler( cm, msg ) ) {
status = false;
}
}
- if ( breakIfNotMax
- && activeUploads
- < ( foregroundUpload ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) {
+ if ( activeUploads < minNumberUploads ) {
break;
}
// ony wait if there are active uploads
- if ( activeUploads ) {
- curl_multi_wait( cm, NULL, 0, 1000, NULL );
+ if ( activeUploads > 0 ) {
+ mc = curl_multi_wait( cm, NULL, 0, 1000, NULL );
+ if ( mc != CURLM_OK ) {
+ logadd( LOG_ERROR, "curl_multi_wait error %d, bailing out", (int)mc );
+ status = false;
+ break;
+ }
}
- } while ( activeUploads );
+ }
return status;
}
@@ -635,7 +668,7 @@ bool MessageHandler(
* @return true if all blocks uploaded successful
* @return false if one ore more blocks failed to upload
*/
-bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
+bool uploadModifiedClusters( bool ignoreMinUploadDelay, CURLM *cm )
{
bool success = true;
struct curl_slist *headers = NULL;
@@ -648,36 +681,39 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
if ( cow.l1[l1Index] == -1 ) {
continue; // Not allocated
}
- // Now all L2 blocks
+ // Now all L2 clusters
for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
- cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
- if ( block->offset == -1 ) {
+ cow_l2_entry_t *cluster = ( cow.l2[cow.l1[l1Index]] + l2Index );
+ if ( cluster->offset == -1 ) {
continue; // Not allocated
}
- if ( block->timeChanged == 0 ) {
+ if ( cluster->timeChanged == 0 ) {
continue; // Not changed
}
- if ( !ignoreMinUploadDelay && ( now - block->timeChanged < COW_MIN_UPLOAD_DELAY ) ) {
+ if ( !ignoreMinUploadDelay && ( now - cluster->timeChanged < COW_MIN_UPLOAD_DELAY ) ) {
continue; // Last change not old enough
}
// Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached
- do {
- if ( !MessageHandler( cm, true, ignoreMinUploadDelay, headers ) ) {
- success = false;
- }
- } while ( ( activeUploads >= ( ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS
- : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) )
- && activeUploads > 0 );
+ int minUploads = ignoreMinUploadDelay
+ ? COW_MAX_PARALLEL_UPLOADS
+ : COW_MAX_PARALLEL_BACKGROUND_UPLOADS;
+ if ( !curlMultiLoop( cm, minUploads ) ) {
+ success = false;
+ }
+ // Maybe one of the uploads was rejected by the server asking us to slow down a bit.
+ // Check for that case and don't trigger a new upload.
+ if ( uploadLoopThrottle > 0 ) {
+ goto DONE;
+ }
cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) );
- b->block = block;
+ b->cluster = cluster;
b->clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
- b->fails = 0;
b->position = 0;
- b->time = block->timeChanged;
+ b->time = cluster->timeChanged;
// Copy, so it doesn't change during upload
// when we assemble the data in curlReadCallbackUploadBlock()
for ( int i = 0; i < COW_BITFIELD_SIZE; ++i ) {
- b->bitfield[i] = block->bitfield[i];
+ b->bitfield[i] = cluster->bitfield[i];
}
addUpload( cm, b, headers );
if ( !ignoreMinUploadDelay && !uploadLoop ) {
@@ -686,8 +722,12 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
}
}
DONE:
+ // Finish all the transfers still active
while ( activeUploads > 0 ) {
- MessageHandler( cm, false, ignoreMinUploadDelay, headers );
+ if ( !curlMultiLoop( cm, 1 ) ) {
+ success = false;
+ break;
+ }
}
curl_slist_free_all( headers );
return success;
@@ -699,17 +739,18 @@ DONE:
*
*/
-void *cowfile_statUpdater( __attribute__((unused)) void *something )
+void *cowfile_statUpdater( UNUSED void *something )
{
uint64_t lastUpdateTime = time( NULL );
+ time_t now;
+ char speedBuffer[20];
while ( !uploadLoopDone ) {
- sleep( COW_STATS_UPDATE_TIME );
int modified = 0;
int inQueue = 0;
int idle = 0;
long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
- uint64_t now = time( NULL );
+ now = time( NULL );
for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
if ( cow.l1[l1Index] == -1 ) {
continue;
@@ -730,59 +771,88 @@ void *cowfile_statUpdater( __attribute__((unused)) void *something )
}
}
}
- char speedBuffer[20];
if ( COW_SHOW_UL_SPEED ) {
+ double delta;
+ double bytes = (double)atomic_exchange( &bytesUploaded, 0 );
now = time( NULL );
- uint64_t bytes = atomic_exchange( &bytesUploaded, 0 );
- snprintf( speedBuffer, 20, "%.2f", (double)( ( bytes ) / ( 1 + now - lastUpdateTime ) / 1000 ) );
-
+ delta = (double)( now - lastUpdateTime );
lastUpdateTime = now;
+ if ( delta > 0 ) {
+ snprintf( speedBuffer, sizeof speedBuffer, "%.2f", bytes / 1000.0 / delta );
+ }
}
-
updateCowStatsFile( inQueue, modified, idle, speedBuffer );
+ sleep( COW_STATS_UPDATE_TIME );
}
return NULL;
}
+void quitSigHandler( int sig UNUSED )
+{
+ uploadCancelled = true;
+ uploadLoop = false;
+ main_shutdown();
+}
+
/**
* @brief main loop for blockupload in the background
*/
-static void *uploaderThreadMain( __attribute__((unused)) void *something )
+static void *uploaderThreadMain( UNUSED void *something )
{
CURLM *cm;
cm = curl_multi_init();
- curl_multi_setopt(
- cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) );
+ curl_multi_setopt( cm, CURLMOPT_MAXCONNECTS,
+ (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) );
+ do {
+ // Unblock so this very thread gets the signal for abandoning the upload
+ struct sigaction newHandler = { .sa_handler = &quitSigHandler };
+ sigemptyset( &newHandler.sa_mask );
+ sigaction( SIGQUIT, &newHandler, NULL );
+ sigset_t sigmask;
+ sigemptyset( &sigmask );
+ sigaddset( &sigmask, SIGQUIT );
+ pthread_sigmask( SIG_UNBLOCK, &sigmask, NULL );
+ } while ( 0 );
while ( uploadLoop ) {
- uploaderLoop( false, cm );
+ while ( uploadLoopThrottle > 0 && uploadLoop ) {
+ sleep( 1 );
+ uploadLoopThrottle--;
+ }
sleep( 2 );
+ if ( !uploadLoop )
+ break;
+ uploadModifiedClusters( false, cm );
}
- logadd( LOG_DEBUG1, "start uploading the remaining blocks." );
- // force the upload of all remaining blocks because the user dismounted the image
- if ( !uploaderLoop( true, cm ) ) {
- logadd( LOG_ERROR, "one or more blocks failed to upload" );
- curl_multi_cleanup( cm );
+ if ( uploadCancelled ) {
uploadLoopDone = true;
- return NULL;
+ logadd( LOG_INFO, "Not uploading remaining clusters, SIGQUIT received" );
+ } else {
+ // force the upload of all remaining blocks because the user dismounted the image
+ logadd( LOG_INFO, "Start uploading the remaining clusters." );
+ if ( !uploadModifiedClusters( true, cm ) ) {
+ uploadLoopDone = true;
+ logadd( LOG_ERROR, "One or more clusters failed to upload" );
+ } else {
+ uploadLoopDone = true;
+ logadd( LOG_DEBUG1, "All clusters uploaded" );
+ if ( cow_merge_after_upload ) {
+ requestRemoteMerge();
+ logadd( LOG_DEBUG1, "Requesting merge" );
+ }
+ }
}
- uploadLoopDone = true;
curl_multi_cleanup( cm );
- logadd( LOG_DEBUG1, "all blocks uploaded" );
- if ( cow_merge_after_upload ) {
- startMerge();
- logadd( LOG_DEBUG1, "Requesting merge." );
- }
return NULL;
}
/**
- * @brief Create a Cow Stats File an inserts the session guid
+ * @brief Create a Cow Stats File an inserts the session uuid
*
* @param path where the file is created
* @return true
@@ -819,11 +889,17 @@ static bool createCowStatsFile( char *path )
* @param path where the files should be stored
* @param image_Name name of the original file/image
* @param imageSizePtr
+ * @param cowUuid optional, use given UUID for talking to COW server instead of creating session
*/
bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
atomic_uint_fast64_t **imageSizePtr,
- char *serverAddress, bool sStdout, bool sfile )
+ char *serverAddress, bool sStdout, bool sfile, const char *cowUuid )
{
+ if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) {
+ logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid );
+ return false;
+ }
+
statStdout = sStdout;
statFile = sfile;
char pathMeta[strlen( path ) + 6];
@@ -860,6 +936,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
// size of l1 array + number of l2's * size of l2
size_t ps = getpagesize();
+ if ( ps == 0 || ps > INT_MAX ) {
+ logadd( LOG_ERROR, "Cannot get native page size, aborting..." );
+ return false;
+ }
size_t metaSize = ( ( startL2 + l1NumEntries * sizeof( l2 ) + ps - 1 ) / ps ) * ps;
if ( ftruncate( cow.fdMeta, metaSize ) != 0 ) {
@@ -911,7 +991,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
logadd( LOG_ERROR, "Error on curl init. Bye.\n" );
return false;
}
- if ( !createSession( image_Name, imageVersion ) ) {
+ if ( cowUuid != NULL ) {
+ snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid );
+ logadd( LOG_INFO, "Using provided upload session id" );
+ } else if ( !createSession( image_Name, imageVersion ) ) {
return false;
}
createCowStatsFile( path );
@@ -925,7 +1008,7 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
* @param path where the meta & data file is located
* @param imageSizePtr
*/
-bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile )
+bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid )
{
statStdout = sStdout;
statFile = sFile;
@@ -935,6 +1018,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server
char pathMeta[strlen( path ) + 6];
char pathData[strlen( path ) + 6];
+ if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) {
+ logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid );
+ return false;
+ }
+
snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );
@@ -1026,6 +1114,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server
metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );
+ if ( cowUuid != NULL ) {
+ logadd( LOG_INFO, "Overriding stored upload session id with provided one" );
+ snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid );
+ }
+
*imageSizePtr = &metadata->imageSize;
cow.l1 = (l1 *)( cow.metadata_mmap + metadata->startL1 );
cow.l2 = (l2 *)( cow.metadata_mmap + metadata->startL2 );
@@ -1150,7 +1243,7 @@ static void writePaddedBlock( cow_sub_request_t *sRequest )
// Here, we again check if the block is written locally - there might have been a second write
// that wrote the full block, hence didn't have to wait for remote data and finished faster.
// In that case, don't pad from remote as we'd overwrite newer data.
- if ( isBlockLocal( sRequest->block, sRequest->inClusterOffset ) ) {
+ if ( isBlockLocal( sRequest->cluster, sRequest->inClusterOffset ) ) {
logadd( LOG_INFO, "It happened!" );
} else {
// copy write Data
@@ -1158,13 +1251,13 @@ static void writePaddedBlock( cow_sub_request_t *sRequest )
memcpy( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ), sRequest->writeSrc,
sRequest->size );
if ( !writeAll( cow.fdData, sRequest->writeBuffer, DNBD3_BLOCK_SIZE,
- sRequest->block->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) {
+ sRequest->cluster->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) {
sRequest->cowRequest->errorCode = errno;
} else {
sRequest->cowRequest->bytesWorkedOn += sRequest->size;
int64_t bit = sRequest->inClusterOffset / DNBD3_BLOCK_SIZE;
- setBitsInBitfield( sRequest->block->bitfield, bit, bit, true );
- sRequest->block->timeChanged = time( NULL );
+ setBitsInBitfield( sRequest->cluster->bitfield, bit, bit, true );
+ sRequest->cluster->timeChanged = time( NULL );
}
}
@@ -1228,10 +1321,11 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest,
cow_sub_request_t *sub = calloc( sizeof( *sub ) + DNBD3_BLOCK_SIZE, 1 );
sub->callback = writePaddedBlock;
sub->inClusterOffset = inClusterOffset;
- sub->block = cluster;
+ sub->cluster = cluster;
sub->size = size;
sub->writeSrc = srcBuffer;
sub->cowRequest = cowRequest;
+ sub->buffer = sub->writeBuffer;
sub->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, validImageSize - startOffset );
sub->dRequest.offset = startOffset & ~DNBD3_BLOCK_MASK;
@@ -1248,7 +1342,7 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest,
/**
* @brief Will be called after a dnbd3_async_t is finished.
- * Calls the corrsponding callback function, either writePaddedBlock or readRemoteData
+ * Calls the corrsponding callback function, either writePaddedBlock or readRemoteCallback
* depending if the original fuse request was a write or read.
*
*/
@@ -1265,7 +1359,7 @@ void cowfile_handleCallback( dnbd3_async_t *request )
* so replys to fuse and cleans up the request.
*
*/
-void readRemoteData( cow_sub_request_t *sRequest )
+static void readRemoteCallback( cow_sub_request_t *sRequest )
{
atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length );
@@ -1464,7 +1558,7 @@ static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer
return;
assert( size > 0 );
cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) );
- sRequest->callback = readRemoteData;
+ sRequest->callback = readRemoteCallback;
sRequest->dRequest.length = (uint32_t)size;
sRequest->dRequest.offset = offset;
sRequest->dRequest.fuse_req = req;
@@ -1632,13 +1726,15 @@ fail:;
void cowfile_close()
{
uploadLoop = false;
+ pthread_join( tidCowUploader, NULL );
if ( statFile || statStdout ) {
+ // Send a signal in case it's hanging in the sleep call
+ pthread_kill( tidStatUpdater, SIGHUP );
pthread_join( tidStatUpdater, NULL );
}
- pthread_join( tidCowUploader, NULL );
if ( curl ) {
- curl_global_cleanup();
curl_easy_cleanup( curl );
+ curl_global_cleanup();
}
}