summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2024-04-11 16:14:59 +0200
committerSimon Rettberg2024-04-11 16:14:59 +0200
commitdc4da9869594bbf0573fcaac31488eb51a9f9e6a (patch)
treefa60afd7ac6c8e542cbca059ea7326dd109fd718
parent[FUSE] Rename COW API endpoints again (diff)
downloaddnbd3-dc4da9869594bbf0573fcaac31488eb51a9f9e6a.tar.gz
dnbd3-dc4da9869594bbf0573fcaac31488eb51a9f9e6a.tar.xz
dnbd3-dc4da9869594bbf0573fcaac31488eb51a9f9e6a.zip
[FUSE] More cleanup and minor fixes
- Add support for aborting session and further uploads via SIGQUIT - Make API versioned - Change it to "uuid" everywhere instead of a mix of uuid and guid - Server can now tell us to wait with further uploads - merge request is now urlencoded POST instead of mime data
-rw-r--r--inc/dnbd3/config/cow.h2
-rw-r--r--src/fuse/connection.c10
-rw-r--r--src/fuse/cowDoc/readme.md12
-rw-r--r--src/fuse/cowfile.c480
-rw-r--r--src/fuse/cowfile.h15
-rw-r--r--src/fuse/main.c118
-rw-r--r--src/fuse/main.h32
7 files changed, 387 insertions, 282 deletions
diff --git a/inc/dnbd3/config/cow.h b/inc/dnbd3/config/cow.h
index d3be949..b266fc8 100644
--- a/inc/dnbd3/config/cow.h
+++ b/inc/dnbd3/config/cow.h
@@ -15,7 +15,7 @@
// +++++ COW API Endpoints +++++
#define COW_API_PREFIX "%s/v1/"
#define COW_API_CREATE COW_API_PREFIX "file/create"
-#define COW_API_UPDATE COW_API_PREFIX "file/update?guid=%s&clusterindex=%lu"
+#define COW_API_UPDATE COW_API_PREFIX "file/update?uuid=%s&clusterindex=%lu"
#define COW_API_START_MERGE COW_API_PREFIX "file/merge"
#endif
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 38102bc..8f0ff01 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -846,15 +846,7 @@ static size_t receiveRequest(const int sock, dnbd3_async_t* request )
{
if( useCow ) {
cow_sub_request_t * cow_request = container_of( request, cow_sub_request_t, dRequest );
- // TODO This is ugly, we have a callback so we don't need to special-case receiving the
- // reply, yet here we check what the callback function is for some reason :-(
- // Ideally there should be no cow-related code in this file at all.
- // This requires moving the callback to dnbd3_async_t from cow_sub_request_t though...
- if( cow_request->callback == readRemoteData ) {
- return sock_recv( sock, cow_request->buffer, request->length );
- } else{
- return sock_recv( sock, &cow_request->writeBuffer, request->length );
- }
+ return sock_recv( sock, cow_request->buffer, request->length );
} else {
return sock_recv( sock, container_of( request, dnbd3_async_parent_t, request )->buffer, request->length );
}
diff --git a/src/fuse/cowDoc/readme.md b/src/fuse/cowDoc/readme.md
index fd3557c..46b753e 100644
--- a/src/fuse/cowDoc/readme.md
+++ b/src/fuse/cowDoc/readme.md
@@ -222,7 +222,7 @@ The following configuration variables have been added to `config/cow.h`.
// +++++ COW API Endpoints +++++
#define COW_API_PREFIX "%s/v1/"
#define COW_API_CREATE COW_API_PREFIX "file/create"
-#define COW_API_UPDATE COW_API_PREFIX "file/update?guid=%s&clusterindex=%lu"
+#define COW_API_UPDATE COW_API_PREFIX "file/update?uuid=%s&clusterindex=%lu"
#define COW_API_START_MERGE COW_API_PREFIX "file/merge"
```
@@ -251,7 +251,7 @@ The following Rest API is used to transmit the data and commands to the cow serv
| ---- | ----------- |
| 200 | Success |
-This request is used as soon as a new cow session is created. The returned guid is used in all subsequent requests to identify the session.
+This request is used as soon as a new cow session is created. The returned uuid is used in all subsequent requests to identify the session.
### v1/file/update
@@ -261,7 +261,7 @@ This request is used as soon as a new cow session is created. The returned guid
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ---- |
-| guid | query | | Yes | string (uuid) |
+| uuid | query | | Yes | string (uuid) |
| clusterNumber | query | | Yes | integer |
##### Responses
@@ -280,7 +280,7 @@ Used to upload a cluster. The cluster number is the absolute cluster number. The
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ---- |
-| guid | Form | | Yes | string (uuid) |
+| uuid | Form | | Yes | string (uuid) |
| originalFileSize | Form | | Yes | integer |
| newFileSize | Form | | Yes | integer |
##### Responses
@@ -299,7 +299,7 @@ Used to start the merge on the server.
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ---- |
-| guid | query | | Yes | string (uuid) |
+| uuid | query | | Yes | string (uuid) |
| amount | query | | Yes | integer |
##### Responses
@@ -317,7 +317,7 @@ This request returns a list containing the cluster IDs and the number of uploads
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ---- |
-| guid | query | | Yes | string (uuid) |
+| uuid | query | | Yes | string (uuid) |
##### Responses
diff --git a/src/fuse/cowfile.c b/src/fuse/cowfile.c
index a53b101..60d82fb 100644
--- a/src/fuse/cowfile.c
+++ b/src/fuse/cowfile.c
@@ -2,12 +2,17 @@
#include "main.h"
#include "connection.h"
+#include <dnbd3/config.h>
+#include <dnbd3/types.h>
#include <dnbd3/shared/log.h>
#include <sys/mman.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <curl/curl.h>
+#include <signal.h>
+#include <inttypes.h>
+#include <assert.h>
#define UUID_STRLEN 36
// Maximum assumed page size, in case the cow data gets transferred between different architectures
@@ -16,7 +21,7 @@
extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );
-static const int CURRENT_COW_VERSION = 2;
+static const int CURRENT_COW_VERSION = 3;
static bool statStdout;
static bool statFile;
@@ -28,8 +33,10 @@ static cowfile_metadata_header_t *metadata = NULL;
static atomic_uint_fast64_t bytesUploaded;
static uint64_t totalBlocksUploaded = 0;
static int activeUploads = 0;
-atomic_bool uploadLoop = true; // Keep upload loop running?
-atomic_bool uploadLoopDone = false; // Upload loop has finished all work?
+static int uploadLoopThrottle = 0;
+static atomic_bool uploadLoop = true; // Keep upload loop running?
+static atomic_bool uploadLoopDone = false; // Upload loop has finished all work?
+static atomic_bool uploadCancelled = false; // Skip uploading remaining blocks
static struct cow
{
@@ -172,7 +179,7 @@ static bool checkBit( atomic_uchar *bitfield, int64_t n )
* @param response userdata which will later contain the uuid
* @return size_t size that have been read
*/
-size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response )
+static size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response )
{
uint64_t done = strlen( response );
uint64_t bytes = itemSize * nitems;
@@ -186,12 +193,12 @@ size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems,
}
/**
- * @brief Create a Session with the cow server and gets the session guid.
+ * @brief Create a Session with the cow server and gets the session uuid.
*
* @param imageName
* @param version of the original Image
*/
-bool createSession( const char *imageName, uint16_t version )
+static bool createSession( const char *imageName, uint16_t version )
{
CURLcode res;
char url[COW_URL_STRING_SIZE];
@@ -240,7 +247,7 @@ bool createSession( const char *imageName, uint16_t version )
}
curl_easy_reset( curl );
metadata->uuid[UUID_STRLEN] = '\0';
- logadd( LOG_DEBUG1, "Cow session started, guid: %s\n", metadata->uuid );
+ logadd( LOG_DEBUG1, "Cow session started, uuid: %s\n", metadata->uuid );
return true;
}
@@ -256,7 +263,7 @@ bool createSession( const char *imageName, uint16_t version )
* @param userdata from CURLOPT_READFUNCTION
* @return size_t size written in buffer
*/
-size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata )
+static size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata )
{
cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata;
size_t len = 0;
@@ -279,6 +286,7 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *
int bitNumber = (int)( inClusterOffset / DNBD3_BLOCK_SIZE );
size_t readSize;
// Small performance hack: All bits one in a byte, do a 32k instead of 4k read
+ // TODO: preadv with a large iov, reading unchanged blocks into a trash-buffer
if ( spaceLeft >= (ssize_t)DNBD3_BLOCK_SIZE * 8
&& bitNumber % 8 == 0
&& uploadBlock->bitfield[bitNumber / 8] == 0xff ) {
@@ -286,11 +294,13 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *
} else {
readSize = DNBD3_BLOCK_SIZE;
}
- // Check bits in our copy, as global bitfield could change
- if ( checkBit( uploadBlock->bitfield, bitNumber ) ) {
+ // If handling single block, check bits in our copy, as global bitfield could change
+ if ( readSize != DNBD3_BLOCK_SIZE || checkBit( uploadBlock->bitfield, bitNumber ) ) {
ssize_t lengthRead = pread( cow.fdData, ( ptr + len ), readSize,
- uploadBlock->block->offset + inClusterOffset );
+ uploadBlock->cluster->offset + inClusterOffset );
if ( lengthRead == -1 ) {
+ if ( errno == EAGAIN )
+ continue;
logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno );
return CURL_READFUNC_ABORT;
}
@@ -313,69 +323,59 @@ size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *
/**
* @brief Requests the merging of the image on the cow server.
*/
-bool mergeRequest()
+static bool postMergeRequest()
{
CURLcode res;
- curl_easy_setopt( curl, CURLOPT_POST, 1L );
-
char url[COW_URL_STRING_SIZE];
- snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress );
- curl_easy_setopt( curl, CURLOPT_URL, url );
-
-
- curl_mime *mime;
- curl_mimepart *part;
- mime = curl_mime_init( curl );
-
- part = curl_mime_addpart( mime );
- curl_mime_name( part, "guid" );
- curl_mime_data( part, metadata->uuid, CURL_ZERO_TERMINATED );
+ char body[500];
+ char *uuid;
- part = curl_mime_addpart( mime );
- curl_mime_name( part, "originalFileSize" );
- char buf[21];
- snprintf( buf, sizeof buf, "%" PRIu64, metadata->validRemoteSize );
- curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
-
- part = curl_mime_addpart( mime );
- curl_mime_name( part, "newFileSize" );
- snprintf( buf, sizeof buf, "%" PRIu64, metadata->imageSize );
- curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
+ curl_easy_reset( curl );
- curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime );
+ snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress );
+ curl_easy_setopt( curl, CURLOPT_URL, url );
+ curl_easy_setopt( curl, CURLOPT_POST, 1L );
+ uuid = curl_easy_escape( curl, metadata->uuid, 0 );
+ if ( uuid == NULL ) {
+ logadd( LOG_ERROR, "Error escaping uuid" );
+ uuid = metadata->uuid; // Hope for the best
+ }
+ snprintf( body, sizeof body, "originalFileSize=%"PRIu64"&newFileSize=%"PRIu64"&uuid=%s",
+ metadata->validRemoteSize, metadata->imageSize, uuid );
+ if ( uuid != metadata->uuid ) {
+ curl_free( uuid );
+ }
+ curl_easy_setopt( curl, CURLOPT_POSTFIELDS, body );
res = curl_easy_perform( curl );
if ( res != CURLE_OK ) {
- logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s\n", curl_easy_strerror( res ) );
- curl_easy_reset( curl );
+ logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s", curl_easy_strerror( res ) );
return false;
}
long http_code = 0;
curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
- if ( http_code != 200 ) {
- logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld\n", http_code );
- curl_easy_reset( curl );
+ if ( http_code < 200 || http_code >= 300 ) {
+ logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld", http_code );
return false;
}
- curl_easy_reset( curl );
- curl_mime_free( mime );
return true;
}
/**
- * @brief Wrapper for mergeRequest so if its fails it will be tried again.
+ * @brief Wrapper for postMergeRequest so if its fails it will be tried again.
*
*/
-void startMerge()
+static void requestRemoteMerge()
{
int fails = 0;
bool success = false;
- success = mergeRequest();
+ success = postMergeRequest();
while ( fails <= 5 && !success ) {
fails++;
logadd( LOG_WARNING, "Trying again. %i/5", fails );
- mergeRequest();
+ sleep( 10 );
+ postMergeRequest();
}
}
@@ -387,26 +387,54 @@ void startMerge()
* This function computes the uploaded bytes between each call and adds it to
* bytesUploaded, which is used to compute the kb/s uploaded over all transfers.
*
- * @param clientp
* @param ulNow number of bytes uploaded by this transfer so far.
* @return int always returns 0 to continue the callbacks.
*/
-int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal,
- __attribute__((unused)) curl_off_t dlNow, __attribute__((unused)) curl_off_t ulTotal, curl_off_t ulNow )
+static int progress_callback( void *clientp, UNUSED curl_off_t dlTotal,
+ UNUSED curl_off_t dlNow, UNUSED curl_off_t ulTotal, curl_off_t ulNow )
{
- CURL *eh = (CURL *)clientp;
- cow_curl_read_upload_t *uploadingCluster;
- CURLcode res;
- res = curl_easy_getinfo( eh, CURLINFO_PRIVATE, &uploadingCluster );
- if ( res != CURLE_OK ) {
- logadd( LOG_ERROR, "ERROR" );
- return 0;
- }
+ cow_curl_read_upload_t *uploadingCluster = (cow_curl_read_upload_t *)clientp;
bytesUploaded += ( ulNow - uploadingCluster->ulLast );
uploadingCluster->ulLast = ulNow;
return 0;
}
+#ifdef COW_DUMP_BLOCK_UPLOADS
+static int cmpfunc( const void *a, const void *b )
+{
+ return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads );
+}
+/**
+ * @brief Writes all block numbers sorted by the number of uploads into the statsfile.
+ *
+ */
+static void dumpBlockUploads()
+{
+ long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
+
+ cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE];
+ uint64_t currentBlock = 0;
+ for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
+ if ( cow.l1[l1Index] == -1 ) {
+ continue;
+ }
+ for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
+ cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
+
+ blockUploads[currentBlock].uploads = block->uploads;
+ blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
+ currentBlock++;
+ }
+ }
+ qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc );
+
+ dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" );
+ for ( uint64_t i = 0; i < currentBlock; i++ ) {
+ dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads );
+ }
+}
+#endif
+
/**
* @brief Updates the status to the stdout/statfile depending on the startup parameters.
*
@@ -415,8 +443,7 @@ int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal
* @param idle Blocks that do not contain changes that have not yet been uploaded.
* @param speedBuffer ptr to char array that contains the current upload speed.
*/
-
-void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer )
+static void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer )
{
char buffer[300];
const char *state;
@@ -429,16 +456,17 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha
state = "done";
}
- int len = snprintf( buffer, 300,
+ int len = snprintf( buffer, sizeof buffer,
+ "[General]\n"
"state=%s\n"
"inQueue=%" PRIu64 "\n"
"modifiedClusters=%" PRIu64 "\n"
"idleClusters=%" PRIu64 "\n"
"totalClustersUploaded=%" PRIu64 "\n"
"activeUploads=%i\n"
- "%s%s",
+ "%s%s\n",
state, inQueue, modified, idle, totalBlocksUploaded, activeUploads,
- COW_SHOW_UL_SPEED ? "ulspeed=" : "",
+ COW_SHOW_UL_SPEED ? "avgSpeedKb=" : "",
speedBuffer );
if ( len == -1 ) {
@@ -465,39 +493,6 @@ void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, cha
#endif
}
}
-int cmpfunc( const void *a, const void *b )
-{
- return (int)( ( (cow_cluster_statistics_t *)b )->uploads - ( (cow_cluster_statistics_t *)a )->uploads );
-}
-/**
- * @brief Writes all block numbers sorted by the number of uploads into the statsfile.
- *
- */
-void dumpBlockUploads()
-{
- long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
-
- cow_cluster_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE];
- uint64_t currentBlock = 0;
- for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
- if ( cow.l1[l1Index] == -1 ) {
- continue;
- }
- for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
- cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
-
- blockUploads[currentBlock].uploads = block->uploads;
- blockUploads[currentBlock].clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
- currentBlock++;
- }
- }
- qsort( blockUploads, currentBlock, sizeof( cow_cluster_statistics_t ), cmpfunc );
-
- dprintf( cow.fdStats, "\n\nclusterNumber: uploads\n==Block Upload Dump===\n" );
- for ( uint64_t i = 0; i < currentBlock; i++ ) {
- dprintf( cow.fdStats, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].clusterNumber, blockUploads[i].uploads );
- }
-}
/**
* @brief Starts the upload of a given block.
@@ -505,13 +500,14 @@ void dumpBlockUploads()
* @param cm Curl_multi
* @param uploadingCluster containing the data for the block to upload.
*/
-bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers )
+static bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl_slist *headers )
{
CURL *eh = curl_easy_init();
char url[COW_URL_STRING_SIZE];
- snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber );
+ snprintf( url, COW_URL_STRING_SIZE,
+ COW_API_UPDATE, cowServerAddress, metadata->uuid, uploadingCluster->clusterNumber );
curl_easy_setopt( eh, CURLOPT_URL, url );
curl_easy_setopt( eh, CURLOPT_POST, 1L );
@@ -531,7 +527,7 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl
uploadingCluster->ulLast = 0;
curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L );
curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback );
- curl_easy_setopt( eh, CURLOPT_XFERINFODATA, eh );
+ curl_easy_setopt( eh, CURLOPT_XFERINFODATA, uploadingCluster );
}
curl_easy_setopt( eh, CURLOPT_HTTPHEADER, headers );
curl_multi_add_handle( cm, eh );
@@ -549,9 +545,9 @@ bool addUpload( CURLM *cm, cow_curl_read_upload_t *uploadingCluster, struct curl
* @return true returned if the upload was successful or retries are still possible.
* @return false returned if the upload was unsuccessful.
*/
-bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers )
+static bool clusterUploadDoneHandler( CURLM *cm, CURLMsg *msg )
{
- bool status = true;
+ bool status = false;
cow_curl_read_upload_t *uploadingCluster;
CURLcode res;
CURLcode res2;
@@ -560,69 +556,106 @@ bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers )
long http_code = 0;
res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code );
- if ( res != CURLE_OK || res2 != CURLE_OK || http_code < 200 || http_code >= 300
- || msg->msg != CURLMSG_DONE ) {
- uploadingCluster->fails++;
- logadd( LOG_ERROR, "COW_API_UPDATE failed %i/5: %s\n", uploadingCluster->fails,
+ if ( msg->msg != CURLMSG_DONE ) {
+ logadd( LOG_ERROR, "multi_message->msg unexpectedly not DONE (%d)", (int)msg->msg );
+ } else if ( msg->data.result != CURLE_OK ) {
+ logadd( LOG_ERROR, "curl_easy returned non-OK after multi-finish: %s",
curl_easy_strerror( msg->data.result ) );
- if ( uploadingCluster->fails < 5 ) {
- addUpload( cm, uploadingCluster, headers );
- goto CLEANUP;
+ } else if ( res != CURLE_OK || res2 != CURLE_OK ) {
+ logadd( LOG_ERROR, "curl_easy_getinfo failed after multifinish (%d, %d)", (int)res, (int)res2 );
+ } else if ( http_code == 503 ) {
+ // If the "Retry-After" header is set, we interpret this as the server being overloaded
+ // or not ready yet to take another update. We slow down our upload loop then.
+ // We'll only accept a delay in seconds here, not an HTTP Date string.
+ // Otherwise, increase the fails counter.
+ struct curl_header *ptr = NULL;
+ int delay;
+ CURLHcode h = curl_easy_header(curl, "Retry-After", 0, CURLH_HEADER, -1, &ptr);
+ if ( h == CURLHE_OK && ptr != NULL && ( delay = atoi( ptr->value ) ) > 0 ) {
+ if ( delay > 120 ) {
+ // Cap to two minutes
+ delay = 120;
+ }
+ logadd( LOG_INFO, "COW server is asking to backoff for %d seconds", delay );
+ uploadLoopThrottle = MAX( uploadLoopThrottle, delay );
+ status = true;
+ } else {
+ logadd( LOG_ERROR, "COW server returned 503 without Retry-After value" );
}
+ } else if ( http_code < 200 || http_code >= 300 ) {
+ logadd( LOG_ERROR, "COW server returned HTTP %ld", http_code );
+ } else {
+ // everything went ok, reset timeChanged of underlying cluster, but only if it
+ // didn't get updated again in the meantime.
+ atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time, 0 );
+ uploadingCluster->cluster->uploads++;
+ totalBlocksUploaded++;
free( uploadingCluster );
- status = false;
- goto CLEANUP;
+ status = true;
+ }
+ if ( !status ) {
+ uploadingCluster->cluster->fails++;
+ if ( uploadLoopThrottle > 0 ) {
+ // Don't reset timeChanged timestamp, so the next iteration of uploadModifiedClusters
+ // will queue this upload again after the throttle time expired.
+ free( uploadingCluster );
+ } else {
+ logadd( LOG_ERROR, "Uploading cluster failed %i/5 times", uploadingCluster->cluster->fails );
+ // Pretend the block changed again just now, to prevent immediate retry
+ atomic_compare_exchange_strong( &uploadingCluster->cluster->timeChanged, &uploadingCluster->time,
+ time( NULL ) );
+ free( uploadingCluster );
+ }
}
-
- // everything went ok, update timeChanged
- atomic_compare_exchange_strong( &uploadingCluster->block->timeChanged, &uploadingCluster->time, 0 );
-
- uploadingCluster->block->uploads++;
-
- totalBlocksUploaded++;
- free( uploadingCluster );
-CLEANUP:
curl_multi_remove_handle( cm, msg->easy_handle );
curl_easy_cleanup( msg->easy_handle );
return status;
}
/**
- * @brief
- *
* @param cm Curl_multi
* @param activeUploads ptr to integer which holds the number of current uploads
- * @param breakIfNotMax will return as soon as there are not all upload slots used, so they can be filled up.
- * @param foregroundUpload used to determine the number of max uploads. If true COW_MAX_PARALLEL_UPLOADS will be the limit,
+ * @param minNumberUploads break out of loop as soon as there are less than these many transfers running
* else COW_MAX_PARALLEL_BACKGROUND_UPLOADS.
- * @return true returned if all upload's were successful
- * @return false returned if one ore more upload's failed.
+ * @return true returned if all uploads were successful
+ * @return false returned if one ore more upload failed.
*/
-bool MessageHandler(
- CURLM *cm, bool breakIfNotMax, bool foregroundUpload, struct curl_slist *headers )
+static bool curlMultiLoop( CURLM *cm, int minNumberUploads )
{
CURLMsg *msg;
int msgsLeft = -1;
bool status = true;
- do {
- curl_multi_perform( cm, &activeUploads );
+
+ if ( minNumberUploads <= 0 ) {
+ minNumberUploads = 1;
+ }
+ for ( ;; ) {
+ CURLMcode mc = curl_multi_perform( cm, &activeUploads );
+ if ( mc != CURLM_OK ) {
+ logadd( LOG_ERROR, "curl_multi_perform error %d, bailing out", (int)mc );
+ status = false;
+ break;
+ }
while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) {
- if ( !finishUpload( cm, msg, headers ) ) {
+ if ( !clusterUploadDoneHandler( cm, msg ) ) {
status = false;
}
}
- if ( breakIfNotMax
- && activeUploads
- < ( foregroundUpload ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) {
+ if ( activeUploads < minNumberUploads ) {
break;
}
// ony wait if there are active uploads
- if ( activeUploads ) {
- curl_multi_wait( cm, NULL, 0, 1000, NULL );
+ if ( activeUploads > 0 ) {
+ mc = curl_multi_wait( cm, NULL, 0, 1000, NULL );
+ if ( mc != CURLM_OK ) {
+ logadd( LOG_ERROR, "curl_multi_wait error %d, bailing out", (int)mc );
+ status = false;
+ break;
+ }
}
- } while ( activeUploads );
+ }
return status;
}
@@ -635,7 +668,7 @@ bool MessageHandler(
* @return true if all blocks uploaded successful
* @return false if one ore more blocks failed to upload
*/
-bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
+bool uploadModifiedClusters( bool ignoreMinUploadDelay, CURLM *cm )
{
bool success = true;
struct curl_slist *headers = NULL;
@@ -648,36 +681,39 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
if ( cow.l1[l1Index] == -1 ) {
continue; // Not allocated
}
- // Now all L2 blocks
+ // Now all L2 clusters
for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
- cow_l2_entry_t *block = ( cow.l2[cow.l1[l1Index]] + l2Index );
- if ( block->offset == -1 ) {
+ cow_l2_entry_t *cluster = ( cow.l2[cow.l1[l1Index]] + l2Index );
+ if ( cluster->offset == -1 ) {
continue; // Not allocated
}
- if ( block->timeChanged == 0 ) {
+ if ( cluster->timeChanged == 0 ) {
continue; // Not changed
}
- if ( !ignoreMinUploadDelay && ( now - block->timeChanged < COW_MIN_UPLOAD_DELAY ) ) {
+ if ( !ignoreMinUploadDelay && ( now - cluster->timeChanged < COW_MIN_UPLOAD_DELAY ) ) {
continue; // Last change not old enough
}
// Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached
- do {
- if ( !MessageHandler( cm, true, ignoreMinUploadDelay, headers ) ) {
- success = false;
- }
- } while ( ( activeUploads >= ( ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS
- : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) )
- && activeUploads > 0 );
+ int minUploads = ignoreMinUploadDelay
+ ? COW_MAX_PARALLEL_UPLOADS
+ : COW_MAX_PARALLEL_BACKGROUND_UPLOADS;
+ if ( !curlMultiLoop( cm, minUploads ) ) {
+ success = false;
+ }
+ // Maybe one of the uploads was rejected by the server asking us to slow down a bit.
+ // Check for that case and don't trigger a new upload.
+ if ( uploadLoopThrottle > 0 ) {
+ goto DONE;
+ }
cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) );
- b->block = block;
+ b->cluster = cluster;
b->clusterNumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
- b->fails = 0;
b->position = 0;
- b->time = block->timeChanged;
+ b->time = cluster->timeChanged;
// Copy, so it doesn't change during upload
// when we assemble the data in curlReadCallbackUploadBlock()
for ( int i = 0; i < COW_BITFIELD_SIZE; ++i ) {
- b->bitfield[i] = block->bitfield[i];
+ b->bitfield[i] = cluster->bitfield[i];
}
addUpload( cm, b, headers );
if ( !ignoreMinUploadDelay && !uploadLoop ) {
@@ -686,8 +722,12 @@ bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
}
}
DONE:
+ // Finish all the transfers still active
while ( activeUploads > 0 ) {
- MessageHandler( cm, false, ignoreMinUploadDelay, headers );
+ if ( !curlMultiLoop( cm, 1 ) ) {
+ success = false;
+ break;
+ }
}
curl_slist_free_all( headers );
return success;
@@ -699,17 +739,18 @@ DONE:
*
*/
-void *cowfile_statUpdater( __attribute__((unused)) void *something )
+void *cowfile_statUpdater( UNUSED void *something )
{
uint64_t lastUpdateTime = time( NULL );
+ time_t now;
+ char speedBuffer[20];
while ( !uploadLoopDone ) {
- sleep( COW_STATS_UPDATE_TIME );
int modified = 0;
int inQueue = 0;
int idle = 0;
long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
- uint64_t now = time( NULL );
+ now = time( NULL );
for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
if ( cow.l1[l1Index] == -1 ) {
continue;
@@ -730,59 +771,88 @@ void *cowfile_statUpdater( __attribute__((unused)) void *something )
}
}
}
- char speedBuffer[20];
if ( COW_SHOW_UL_SPEED ) {
+ double delta;
+ double bytes = (double)atomic_exchange( &bytesUploaded, 0 );
now = time( NULL );
- uint64_t bytes = atomic_exchange( &bytesUploaded, 0 );
- snprintf( speedBuffer, 20, "%.2f", (double)( ( bytes ) / ( 1 + now - lastUpdateTime ) / 1000 ) );
-
+ delta = (double)( now - lastUpdateTime );
lastUpdateTime = now;
+ if ( delta > 0 ) {
+ snprintf( speedBuffer, sizeof speedBuffer, "%.2f", bytes / 1000.0 / delta );
+ }
}
-
updateCowStatsFile( inQueue, modified, idle, speedBuffer );
+ sleep( COW_STATS_UPDATE_TIME );
}
return NULL;
}
+void quitSigHandler( int sig UNUSED )
+{
+ uploadCancelled = true;
+ uploadLoop = false;
+ main_shutdown();
+}
+
/**
* @brief main loop for blockupload in the background
*/
-static void *uploaderThreadMain( __attribute__((unused)) void *something )
+static void *uploaderThreadMain( UNUSED void *something )
{
CURLM *cm;
cm = curl_multi_init();
- curl_multi_setopt(
- cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) );
+ curl_multi_setopt( cm, CURLMOPT_MAXCONNECTS,
+ (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) );
+ do {
+ // Unblock so this very thread gets the signal for abandoning the upload
+ struct sigaction newHandler = { .sa_handler = &quitSigHandler };
+ sigemptyset( &newHandler.sa_mask );
+ sigaction( SIGQUIT, &newHandler, NULL );
+ sigset_t sigmask;
+ sigemptyset( &sigmask );
+ sigaddset( &sigmask, SIGQUIT );
+ pthread_sigmask( SIG_UNBLOCK, &sigmask, NULL );
+ } while ( 0 );
while ( uploadLoop ) {
- uploaderLoop( false, cm );
+ while ( uploadLoopThrottle > 0 && uploadLoop ) {
+ sleep( 1 );
+ uploadLoopThrottle--;
+ }
sleep( 2 );
+ if ( !uploadLoop )
+ break;
+ uploadModifiedClusters( false, cm );
}
- logadd( LOG_DEBUG1, "start uploading the remaining blocks." );
- // force the upload of all remaining blocks because the user dismounted the image
- if ( !uploaderLoop( true, cm ) ) {
- logadd( LOG_ERROR, "one or more blocks failed to upload" );
- curl_multi_cleanup( cm );
+ if ( uploadCancelled ) {
uploadLoopDone = true;
- return NULL;
+ logadd( LOG_INFO, "Not uploading remaining clusters, SIGQUIT received" );
+ } else {
+ // force the upload of all remaining blocks because the user dismounted the image
+ logadd( LOG_INFO, "Start uploading the remaining clusters." );
+ if ( !uploadModifiedClusters( true, cm ) ) {
+ uploadLoopDone = true;
+ logadd( LOG_ERROR, "One or more clusters failed to upload" );
+ } else {
+ uploadLoopDone = true;
+ logadd( LOG_DEBUG1, "All clusters uploaded" );
+ if ( cow_merge_after_upload ) {
+ requestRemoteMerge();
+ logadd( LOG_DEBUG1, "Requesting merge" );
+ }
+ }
}
- uploadLoopDone = true;
curl_multi_cleanup( cm );
- logadd( LOG_DEBUG1, "all blocks uploaded" );
- if ( cow_merge_after_upload ) {
- startMerge();
- logadd( LOG_DEBUG1, "Requesting merge." );
- }
return NULL;
}
/**
- * @brief Create a Cow Stats File an inserts the session guid
+ * @brief Create a Cow Stats File an inserts the session uuid
*
* @param path where the file is created
* @return true
@@ -819,11 +889,17 @@ static bool createCowStatsFile( char *path )
* @param path where the files should be stored
* @param image_Name name of the original file/image
* @param imageSizePtr
+ * @param cowUuid optional, use given UUID for talking to COW server instead of creating session
*/
bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
atomic_uint_fast64_t **imageSizePtr,
- char *serverAddress, bool sStdout, bool sfile )
+ char *serverAddress, bool sStdout, bool sfile, const char *cowUuid )
{
+ if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) {
+ logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid );
+ return false;
+ }
+
statStdout = sStdout;
statFile = sfile;
char pathMeta[strlen( path ) + 6];
@@ -860,6 +936,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
// size of l1 array + number of l2's * size of l2
size_t ps = getpagesize();
+ if ( ps == 0 || ps > INT_MAX ) {
+ logadd( LOG_ERROR, "Cannot get native page size, aborting..." );
+ return false;
+ }
size_t metaSize = ( ( startL2 + l1NumEntries * sizeof( l2 ) + ps - 1 ) / ps ) * ps;
if ( ftruncate( cow.fdMeta, metaSize ) != 0 ) {
@@ -911,7 +991,10 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
logadd( LOG_ERROR, "Error on curl init. Bye.\n" );
return false;
}
- if ( !createSession( image_Name, imageVersion ) ) {
+ if ( cowUuid != NULL ) {
+ snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid );
+ logadd( LOG_INFO, "Using provided upload session id" );
+ } else if ( !createSession( image_Name, imageVersion ) ) {
return false;
}
createCowStatsFile( path );
@@ -925,7 +1008,7 @@ bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
* @param path where the meta & data file is located
* @param imageSizePtr
*/
-bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile )
+bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid )
{
statStdout = sStdout;
statFile = sFile;
@@ -935,6 +1018,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server
char pathMeta[strlen( path ) + 6];
char pathData[strlen( path ) + 6];
+ if ( cowUuid != NULL && strlen( cowUuid ) > UUID_STRLEN ) {
+ logadd( LOG_ERROR, "COW UUID too long: '%s'", cowUuid );
+ return false;
+ }
+
snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );
@@ -1026,6 +1114,11 @@ bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *server
metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );
+ if ( cowUuid != NULL ) {
+ logadd( LOG_INFO, "Overriding stored upload session id with provided one" );
+ snprintf( metadata->uuid, UUID_STRLEN, "%s", cowUuid );
+ }
+
*imageSizePtr = &metadata->imageSize;
cow.l1 = (l1 *)( cow.metadata_mmap + metadata->startL1 );
cow.l2 = (l2 *)( cow.metadata_mmap + metadata->startL2 );
@@ -1150,7 +1243,7 @@ static void writePaddedBlock( cow_sub_request_t *sRequest )
// Here, we again check if the block is written locally - there might have been a second write
// that wrote the full block, hence didn't have to wait for remote data and finished faster.
// In that case, don't pad from remote as we'd overwrite newer data.
- if ( isBlockLocal( sRequest->block, sRequest->inClusterOffset ) ) {
+ if ( isBlockLocal( sRequest->cluster, sRequest->inClusterOffset ) ) {
logadd( LOG_INFO, "It happened!" );
} else {
// copy write Data
@@ -1158,13 +1251,13 @@ static void writePaddedBlock( cow_sub_request_t *sRequest )
memcpy( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ), sRequest->writeSrc,
sRequest->size );
if ( !writeAll( cow.fdData, sRequest->writeBuffer, DNBD3_BLOCK_SIZE,
- sRequest->block->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) {
+ sRequest->cluster->offset + ( sRequest->inClusterOffset & ~DNBD3_BLOCK_MASK ) ) ) {
sRequest->cowRequest->errorCode = errno;
} else {
sRequest->cowRequest->bytesWorkedOn += sRequest->size;
int64_t bit = sRequest->inClusterOffset / DNBD3_BLOCK_SIZE;
- setBitsInBitfield( sRequest->block->bitfield, bit, bit, true );
- sRequest->block->timeChanged = time( NULL );
+ setBitsInBitfield( sRequest->cluster->bitfield, bit, bit, true );
+ sRequest->cluster->timeChanged = time( NULL );
}
}
@@ -1228,10 +1321,11 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest,
cow_sub_request_t *sub = calloc( sizeof( *sub ) + DNBD3_BLOCK_SIZE, 1 );
sub->callback = writePaddedBlock;
sub->inClusterOffset = inClusterOffset;
- sub->block = cluster;
+ sub->cluster = cluster;
sub->size = size;
sub->writeSrc = srcBuffer;
sub->cowRequest = cowRequest;
+ sub->buffer = sub->writeBuffer;
sub->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, validImageSize - startOffset );
sub->dRequest.offset = startOffset & ~DNBD3_BLOCK_MASK;
@@ -1248,7 +1342,7 @@ static bool padBlockForWrite( fuse_req_t req, cow_request_t *cowRequest,
/**
* @brief Will be called after a dnbd3_async_t is finished.
- * Calls the corrsponding callback function, either writePaddedBlock or readRemoteData
+ * Calls the corrsponding callback function, either writePaddedBlock or readRemoteCallback
* depending if the original fuse request was a write or read.
*
*/
@@ -1265,7 +1359,7 @@ void cowfile_handleCallback( dnbd3_async_t *request )
* so replys to fuse and cleans up the request.
*
*/
-void readRemoteData( cow_sub_request_t *sRequest )
+static void readRemoteCallback( cow_sub_request_t *sRequest )
{
atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length );
@@ -1464,7 +1558,7 @@ static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer
return;
assert( size > 0 );
cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) );
- sRequest->callback = readRemoteData;
+ sRequest->callback = readRemoteCallback;
sRequest->dRequest.length = (uint32_t)size;
sRequest->dRequest.offset = offset;
sRequest->dRequest.fuse_req = req;
@@ -1632,13 +1726,15 @@ fail:;
void cowfile_close()
{
uploadLoop = false;
+ pthread_join( tidCowUploader, NULL );
if ( statFile || statStdout ) {
+ // Send a signal in case it's hanging in the sleep call
+ pthread_kill( tidStatUpdater, SIGHUP );
pthread_join( tidStatUpdater, NULL );
}
- pthread_join( tidCowUploader, NULL );
if ( curl ) {
- curl_global_cleanup();
curl_easy_cleanup( curl );
+ curl_global_cleanup();
}
}
diff --git a/src/fuse/cowfile.h b/src/fuse/cowfile.h
index 0f395de..d124b0c 100644
--- a/src/fuse/cowfile.h
+++ b/src/fuse/cowfile.h
@@ -20,6 +20,7 @@ _Static_assert( ATOMIC_INT_LOCK_FREE == 2, "ATOMIC INT not lock free" );
_Static_assert( ATOMIC_LONG_LOCK_FREE == 2, "ATOMIC LONG not lock free" );
_Static_assert( ATOMIC_LLONG_LOCK_FREE == 2, "ATOMIC LLONG not lock free" );
_Static_assert( sizeof( atomic_uint_least64_t ) == 8, "atomic_uint_least64_t not 8 byte" );
+_Static_assert( sizeof( _Atomic(uint32_t) ) == 4, "_Atomic(uint32_t) not 4 byte" );
_Static_assert( sizeof( atomic_int_least64_t ) == 8, "atomic_int_least64_t not 8 byte" );
enum dataSource
@@ -56,8 +57,9 @@ _Static_assert( sizeof( cowfile_metadata_header_t ) == COW_METADATA_HEADER_SIZE,
typedef struct cow_l2_entry
{
atomic_int_least64_t offset;
- atomic_uint_least64_t timeChanged;
- atomic_uint_least64_t uploads;
+ atomic_int_least64_t timeChanged;
+ _Atomic(uint32_t) uploads;
+ _Atomic(uint32_t) fails;
atomic_uchar bitfield[COW_BITFIELD_SIZE];
} cow_l2_entry_t;
_Static_assert( sizeof( cow_l2_entry_t ) == COW_L2_ENTRY_SIZE, "cow_l2_entry_t is messed up" );
@@ -93,7 +95,7 @@ typedef struct cow_sub_request
off_t inClusterOffset; // offset relative to the beginning of the cluster
const char *writeSrc; // pointer to the data of a write request which needs padding
char *buffer; // The pointer points to the original read buffer to the place where the sub read request should be copied to.
- cow_l2_entry_t *block; // the cluster inClusterOffset refers to
+ cow_l2_entry_t *cluster; // the cluster inClusterOffset refers to
cow_callback callback; // Callback when we're done handling this
cow_request_t *cowRequest; // parent request
dnbd3_async_t dRequest; // Probably request to dnbd3-server for non-aligned writes (wrt 4k dnbd3 block)
@@ -103,10 +105,9 @@ typedef struct cow_sub_request
typedef struct cow_curl_read_upload
{
atomic_uint_least64_t time;
- cow_l2_entry_t *block;
+ cow_l2_entry_t *cluster;
size_t position;
long unsigned int clusterNumber;
- int fails;
int64_t ulLast;
atomic_uchar bitfield[COW_BITFIELD_SIZE];
} cow_curl_read_upload_t;
@@ -122,9 +123,9 @@ typedef int32_t l1;
typedef cow_l2_entry_t l2[COW_L2_TABLE_SIZE];
bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion, atomic_uint_fast64_t **imageSizePtr,
- char *serverAddress, bool sStdout, bool sFile );
+ char *serverAddress, bool sStdout, bool sFile, const char *cowUuid );
-bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile );
+bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile, const char *cowUuid );
bool cowfile_startBackgroundThreads();
void cowfile_read( fuse_req_t req, size_t size, off_t offset );
diff --git a/src/fuse/main.c b/src/fuse/main.c
index 96d8f5c..7be34bd 100644
--- a/src/fuse/main.c
+++ b/src/fuse/main.c
@@ -9,8 +9,34 @@
* */
#include "main.h"
-
-
+#include "cowfile.h"
+#include "connection.h"
+#include "helper.h"
+#include <dnbd3/version.h>
+#include <dnbd3/build.h>
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/log.h>
+#include <dnbd3/config.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <assert.h>
+/* for printing uint */
+//#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <getopt.h>
+#include <time.h>
+#include <signal.h>
+#include <pthread.h>
+#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0)
+
+#define INO_ROOT (1)
+#define INO_STATS (2)
+#define INO_IMAGE (3)
static const char *IMAGE_NAME = "img";
static const char *STATS_NAME = "status";
@@ -46,14 +72,14 @@ static int image_stat( fuse_ino_t ino, struct stat *stbuf )
case INO_ROOT:
stbuf->st_mode = S_IFDIR | 0550;
if( useCow ) {
- stbuf->st_mode = S_IFDIR | 0777;
+ stbuf->st_mode = S_IFDIR | 0770;
}
stbuf->st_nlink = 2;
stbuf->st_mtim = startupTime;
break;
case INO_IMAGE:
if ( useCow ) {
- stbuf->st_mode = S_IFREG | 0777;
+ stbuf->st_mode = S_IFREG | 0660;
} else {
stbuf->st_mode = S_IFREG | 0440;
}
@@ -212,7 +238,7 @@ static void image_ll_read( fuse_req_t req, fuse_ino_t ino, size_t size, off_t of
++logInfo.blockRequestCount[startBlock];
}
}
-
+
dnbd3_async_parent_t *parent = malloc( sizeof(dnbd3_async_parent_t) + size );
parent->request.length = (uint32_t)size;
@@ -230,7 +256,7 @@ static void noopSigHandler( int signum )
(void)signum;
}
-static void image_ll_init( void *userdata, struct fuse_conn_info *conn )
+static void image_ll_init( void *userdata UNUSED, struct fuse_conn_info *conn UNUSED )
{
( void ) userdata;
( void ) conn;
@@ -342,6 +368,7 @@ static void printUsage( char *argv0, int exitCode )
printf( " -c Enables cow, creates the cow files at given location\n" );
printf( " -L Loads the cow files from the given location\n" );
printf( " -C Host address of the cow server\n" );
+ printf( "--upload-uuid <id> Use provided UUID as upload session id instead of asking server/loading from file\n" );
printf( "--cow-stats-stdout prints the cow status in stdout\n" );
printf( "--cow-stats-file creates and updates the cow status file\n" );
printf( " -m --merge tell server to merge and create new revision on exit\n" );
@@ -363,6 +390,7 @@ static const struct option longOpts[] = {
{ "loadcow", required_argument, NULL, 'L' },
{ "cowServer", required_argument, NULL, 'C' },
{ "merge", no_argument, NULL, 'm' },
+ { "upload-uuid", required_argument, NULL, 'uuid' },
{ "cow-stats-stdout", no_argument, NULL, 'sout' },
{ "cow-stats-file", no_argument, NULL, 'sfil' },
{ 0, 0, 0, 0 }
@@ -388,6 +416,7 @@ int main( int argc, char *argv[] )
bool loadCow = false;
bool sStdout = false;
bool sFile = false;
+ const char *cowUuidOverride = NULL;
log_init();
@@ -474,6 +503,9 @@ int main( int argc, char *argv[] )
case 'sfil':
sFile = true;
break;
+ case 'uuid':
+ cowUuidOverride = optarg;
+ break;
default:
printUsage( argv[0], EXIT_FAILURE );
}
@@ -493,7 +525,7 @@ int main( int argc, char *argv[] )
}
}
if( useCow && cow_server_address == NULL ) {
- printf( "for -c you also need a cow server address. Please also use -C --host \n" );
+ printf( "for -c you also need a cow server address. Please also use -C\n" );
printUsage( argv[0], EXIT_FAILURE );
}
if( cow_merge_after_upload && !useCow ) {
@@ -502,24 +534,26 @@ int main( int argc, char *argv[] )
}
if ( loadCow ) {
if( cow_server_address == NULL ) {
- printf( "for -L you also need a cow server address. Please also use -C --host \n" );
+ printf( "for -L you also need a cow server address. Please also use -C\n" );
printUsage( argv[0], EXIT_FAILURE );
}
- if ( !cowfile_load( cow_file_path, &imageSizePtr, cow_server_address, sStdout, sFile ) ) {
+ if ( !cowfile_load( cow_file_path, &imageSizePtr, cow_server_address, sStdout, sFile, cowUuidOverride ) ) {
return EXIT_FAILURE;
}
- }
- // Prepare our handler
- struct sigaction newHandler;
- memset( &newHandler, 0, sizeof( newHandler ) );
- newHandler.sa_handler = &noopSigHandler;
- sigemptyset( &newHandler.sa_mask );
- sigaction( SIGHUP, &newHandler, NULL );
- sigset_t sigmask;
- sigemptyset( &sigmask );
- sigaddset( &sigmask, SIGHUP );
- pthread_sigmask( SIG_BLOCK, &sigmask, NULL );
+ }
+ do {
+ // The empty handler prevents fuse from registering its own handler
+ struct sigaction newHandler = { .sa_handler = &noopSigHandler };
+ sigemptyset( &newHandler.sa_mask );
+ sigaction( SIGHUP, &newHandler, NULL );
+ } while ( 0 );
+ if ( useCow ) {
+ sigset_t sigmask;
+ sigemptyset( &sigmask );
+ sigaddset( &sigmask, SIGQUIT ); // Block here and unblock in cow as abort signal
+ pthread_sigmask( SIG_BLOCK, &sigmask, NULL );
+ }
if ( !connection_init( server_address, image_Name, rid, learnNewServers ) ) {
logadd( LOG_ERROR, "Could not connect to any server. Bye.\n" );
@@ -538,9 +572,9 @@ int main( int argc, char *argv[] )
}
newArgv[newArgc++] = "-o";
- if(useCow){
+ if ( useCow ) {
newArgv[newArgc++] = "default_permissions";
- }else{
+ } else {
newArgv[newArgc++] = "ro,default_permissions";
}
// Mount point goes last
@@ -555,7 +589,7 @@ int main( int argc, char *argv[] )
owner = getuid();
if ( useCow & !loadCow) {
- if( !cowfile_init( cow_file_path, connection_getImageName(), connection_getImageRID(), &imageSizePtr, cow_server_address, sStdout, sFile ) ) {
+ if( !cowfile_init( cow_file_path, connection_getImageName(), connection_getImageRID(), &imageSizePtr, cow_server_address, sStdout, sFile, cowUuidOverride ) ) {
return EXIT_FAILURE;
}
}
@@ -576,24 +610,24 @@ int main( int argc, char *argv[] )
if ( _fuseSession == NULL ) {
logadd( LOG_ERROR, "Could not initialize fuse session" );
} else {
+ fuse_session_add_chan( _fuseSession, ch );
+ // Do not spawn any threads before we daemonize, they'd die at this point
+ fuse_daemonize( foreground );
if ( fuse_set_signal_handlers( _fuseSession ) == -1 ) {
- logadd( LOG_ERROR, "Could not install fuse signal handlers" );
- } else {
- fuse_session_add_chan( _fuseSession, ch );
- fuse_daemonize( foreground );
- if ( useCow ) {
- if ( !cowfile_startBackgroundThreads() ){
- logadd( LOG_ERROR, "Could not start cow background Threads" );
- }
- }
- if ( single_thread ) {
- fuse_err = fuse_session_loop( _fuseSession );
- } else {
- fuse_err = fuse_session_loop_mt( _fuseSession ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all
+ logadd( LOG_WARNING, "Could not install fuse signal handlers" );
+ }
+ if ( useCow ) {
+ if ( !cowfile_startBackgroundThreads() ) {
+ logadd( LOG_ERROR, "Could not start cow background threads" );
}
- fuse_remove_signal_handlers( _fuseSession );
- fuse_session_remove_chan( ch );
}
+ if ( single_thread ) {
+ fuse_err = fuse_session_loop( _fuseSession );
+ } else {
+ fuse_err = fuse_session_loop_mt( _fuseSession ); //MT produces errors (race conditions) in libfuse and didnt improve speed at all
+ }
+ fuse_remove_signal_handlers( _fuseSession );
+ fuse_session_remove_chan( ch );
fuse_session_destroy( _fuseSession );
_fuseSession = NULL;
}
@@ -608,3 +642,11 @@ int main( int argc, char *argv[] )
logadd( LOG_DEBUG1, "Terminating. FUSE REPLIED: %d\n", fuse_err );
return fuse_err;
}
+
+void main_shutdown(void)
+{
+ fuse_session_exit( _fuseSession );
+ // TODO: Figure out why this doesn't wake up the fuse mainloop.
+ // For now, just send SIGQUIT followed by SIGTERM....
+ kill( 0, SIGINT );
+}
diff --git a/src/fuse/main.h b/src/fuse/main.h
index 721f251..53d81e4 100644
--- a/src/fuse/main.h
+++ b/src/fuse/main.h
@@ -1,39 +1,13 @@
#ifndef _MAIN_H_
#define _MAIN_H_
-#include "cowfile.h"
-#include "connection.h"
-#include "helper.h"
-#include <dnbd3/version.h>
-#include <dnbd3/build.h>
-#include <dnbd3/shared/protocol.h>
-#include <dnbd3/shared/log.h>
-
#define FUSE_USE_VERSION 30
-#include <dnbd3/config.h>
#include <fuse_lowlevel.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <assert.h>
-/* for printing uint */
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-#include <getopt.h>
-#include <time.h>
-#include <signal.h>
-#include <pthread.h>
-#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0)
-
-#define INO_ROOT (1)
-#define INO_STATS (2)
-#define INO_IMAGE (3)
+#include <stdbool.h>
extern bool useCow;
extern bool cow_merge_after_upload;
void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );
+void main_shutdown(void);
-#endif /* main_H_ */ \ No newline at end of file
+#endif /* main_H_ */