#include "cowfile.h"
#include "main.h"
#include "connection.h"
#include <dnbd3/shared/log.h>
#include <sys/mman.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <curl/curl.h>
#define UUID_STRLEN 36
extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );
static const int CURRENT_COW_VERSION = 1;
static bool statStdout;
static bool statFile;
static pthread_t tidCowUploader;
static pthread_t tidStatUpdater;
static char *cowServerAddress;
static CURL *curl;
static cowfile_metadata_header_t *metadata = NULL;
static atomic_uint_fast64_t bytesUploaded;
static uint64_t totalBlocksUploaded = 0;
static int activeUploads = 0;
atomic_bool uploadLoop = true; // Keep upload loop running?
atomic_bool uploadLoopDone = false; // Upload loop has finished all work?
static struct cow
{
pthread_mutex_t l2CreateLock;
int fhm;
int fhd;
int fhs;
char *metadata_mmap;
l1 *l1;
l2 *firstL2;
size_t maxImageSize;
size_t l1Size; //size of l1 array
} cow;
/**
* @brief Computes the l1 index for an absolute file offset
*
* @param offset absolute file offset
* @return int l1 index
*/
static int offsetToL1Index( size_t offset )
{
return (int)( offset / COW_FULL_L2_TABLE_DATA_SIZE );
}
/**
* @brief Computes the l2 index for an absolute file offset
*
* @param offset absolute file offset
* @return int l2 index
*/
static int offsetToL2Index( size_t offset )
{
return (int)( ( offset % COW_FULL_L2_TABLE_DATA_SIZE ) / COW_DATA_CLUSTER_SIZE );
}
/**
* @brief Computes the bit in the bitfield from the absolute file offset
*
* @param offset absolute file offset
* @return int bit(0-319) in the bitfield
*/
static int getBitfieldOffsetBit( size_t offset )
{
return (int)( offset / DNBD3_BLOCK_SIZE ) % ( COW_BITFIELD_SIZE * 8 );
}
/**
* @brief Sets the specified bits in the specified range threadsafe to 1.
*
* @param byte of a bitfield
* @param from start bit
* @param to end bit
* @param value set bits to 1 or 0
*/
static void setBits( atomic_char *byte, int from, int to, bool value )
{
char mask = (char)( ( 255 >> ( 7 - ( to - from ) ) ) << from );
if ( value ) {
atomic_fetch_or( byte, mask );
} else {
atomic_fetch_and( byte, ~mask );
}
}
/**
* @brief Sets the specified bits in the specified range threadsafe to 1.
*
* @param bitfield of a cow_l2_entry
* @param from start bit
* @param to end bit
* @param value set bits to 1 or 0
*/
static void setBitsInBitfield( atomic_char *bitfield, int from, int to, bool value )
{
assert( from >= 0 || to < COW_BITFIELD_SIZE * 8 );
int start = from / 8;
int end = to / 8;
for ( int i = start; i <= end; i++ ) {
setBits( ( bitfield + i ), from - i * 8, MIN( 7, to - i * 8 ), value );
from = ( i + 1 ) * 8;
}
}
/**
* @brief Checks if the n bit of a bit field is 0 or 1.
*
* @param bitfield of a cow_l2_entry
* @param n the bit which should be checked
*/
static bool checkBit( atomic_char *bitfield, int n )
{
return ( atomic_load( ( bitfield + ( n / 8 ) ) ) >> ( n % 8 ) ) & 1;
}
/**
* @brief Implementation of CURLOPT_WRITEFUNCTION , this function will be called when
* the server sends back data.
* for more details see: https://curl.se/libcurl/c/CURLOPT_WRITEFUNCTION .html
*
* @param buffer that contains the response data from the server
* @param itemSize size of one item
* @param nitems number of items
* @param response userdata which will later contain the uuid
* @return size_t size that have been read
*/
size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response )
{
uint64_t done = strlen( response );
uint64_t bytes = itemSize * nitems;
if ( done + bytes > UUID_STRLEN ) {
logadd( LOG_INFO, "strlen(response): %"PRIu64" bytes: %"PRIu64"\n", done, bytes );
return bytes;
}
strncat( response, buffer, UUID_STRLEN - done + 1 );
return bytes;
}
/**
* @brief Create a Session with the cow server and gets the session guid.
*
* @param imageName
* @param version of the original Image
*/
bool createSession( const char *imageName, uint16_t version )
{
CURLcode res;
char url[COW_URL_STRING_SIZE];
snprintf( url, COW_URL_STRING_SIZE, COW_API_CREATE, cowServerAddress );
logadd( LOG_INFO, "COW_API_CREATE URL: %s", url );
curl_easy_setopt( curl, CURLOPT_POST, 1L );
curl_easy_setopt( curl, CURLOPT_URL, url );
curl_mime *mime;
curl_mimepart *part;
mime = curl_mime_init( curl );
part = curl_mime_addpart( mime );
curl_mime_name( part, "imageName" );
curl_mime_data( part, imageName, CURL_ZERO_TERMINATED );
part = curl_mime_addpart( mime );
curl_mime_name( part, "version" );
char buf[sizeof( int ) * 3 + 2];
snprintf( buf, sizeof buf, "%d", version );
curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
part = curl_mime_addpart( mime );
curl_mime_name( part, "bitfieldSize" );
snprintf( buf, sizeof buf, "%d", metadata->bitfieldSize );
curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime );
metadata->uuid[0] = '\0';
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, curlCallbackCreateSession );
curl_easy_setopt( curl, CURLOPT_WRITEDATA, &metadata->uuid );
res = curl_easy_perform( curl );
curl_mime_free( mime );
/* Check for errors */
if ( res != CURLE_OK ) {
logadd( LOG_ERROR, "COW_API_CREATE failed: %s\n", curl_easy_strerror( res ) );
return false;
}
long http_code = 0;
curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
if ( http_code < 200 || http_code >= 300 ) {
logadd( LOG_ERROR, "COW_API_CREATE failed http: %ld\n", http_code );
return false;
}
curl_easy_reset( curl );
metadata->uuid[UUID_STRLEN] = '\0';
logadd( LOG_DEBUG1, "Cow session started, guid: %s\n", metadata->uuid );
return true;
}
/**
* @brief Implementation of CURLOPT_READFUNCTION, this function will first send the bit field and
* then the block data in one bitstream. this function is usually called multiple times per block,
* because the buffer is usually not large for one block and its bitfield.
* for more details see: https://curl.se/libcurl/c/CURLOPT_READFUNCTION.html
*
* @param ptr to the buffer
* @param size of one element in buffer
* @param nmemb number of elements in buffer
* @param userdata from CURLOPT_READFUNCTION
* @return size_t size written in buffer
*/
size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata )
{
cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata;
size_t len = 0;
// Check if we're still in the bitfield
if ( uploadBlock->position < (size_t)metadata->bitfieldSize ) {
size_t lenCpy = MIN( metadata->bitfieldSize - uploadBlock->position, size * nmemb );
memcpy( ptr, uploadBlock->block->bitfield + uploadBlock->position, lenCpy );
uploadBlock->position += lenCpy;
len += lenCpy;
}
// No elseif here, might just have crossed over...
if ( uploadBlock->position >= (size_t)metadata->bitfieldSize ) {
ssize_t wantRead = (ssize_t)MIN(
COW_DATA_CLUSTER_SIZE - ( uploadBlock->position - ( metadata->bitfieldSize ) ),
( size * nmemb ) - len );
off_t inClusterOffset = uploadBlock->position - metadata->bitfieldSize;
ssize_t lengthRead = pread( cow.fhd, ( ptr + len ), wantRead, uploadBlock->block->offset + inClusterOffset );
if ( lengthRead == -1 ) {
logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno );
return CURL_READFUNC_ABORT;
}
if ( wantRead > lengthRead ) {
// fill up since last block may not be a full block
memset( ptr + len + lengthRead, 0, wantRead - lengthRead );
// TODO what about partial read? We should know how much data there actually is...
lengthRead = wantRead;
}
uploadBlock->position += lengthRead;
len += lengthRead;
}
return len;
}
/**
* @brief Requests the merging of the image on the cow server.
*/
bool mergeRequest()
{
CURLcode res;
curl_easy_setopt( curl, CURLOPT_POST, 1L );
char url[COW_URL_STRING_SIZE];
snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress );
curl_easy_setopt( curl, CURLOPT_URL, url );
curl_mime *mime;
curl_mimepart *part;
mime = curl_mime_init( curl );
part = curl_mime_addpart( mime );
curl_mime_name( part, "guid" );
curl_mime_data( part, metadata->uuid, CURL_ZERO_TERMINATED );
part = curl_mime_addpart( mime );
curl_mime_name( part, "originalFileSize" );
char buf[21];
snprintf( buf, sizeof buf, "%" PRIu64, metadata->originalImageSize );
curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
part = curl_mime_addpart( mime );
curl_mime_name( part, "newFileSize" );
snprintf( buf, sizeof buf, "%" PRIu64, metadata->imageSize );
curl_mime_data( part, buf, CURL_ZERO_TERMINATED );
curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime );
res = curl_easy_perform( curl );
if ( res != CURLE_OK ) {
logadd( LOG_WARNING, "COW_API_START_MERGE failed: %s\n", curl_easy_strerror( res ) );
curl_easy_reset( curl );
return false;
}
long http_code = 0;
curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
if ( http_code != 200 ) {
logadd( LOG_WARNING, "COW_API_START_MERGE failed http: %ld\n", http_code );
curl_easy_reset( curl );
return false;
}
curl_easy_reset( curl );
curl_mime_free( mime );
return true;
}
/**
* @brief Wrapper for mergeRequest so if its fails it will be tried again.
*
*/
void startMerge()
{
int fails = 0;
bool success = false;
success = mergeRequest();
while ( fails <= 5 && !success ) {
fails++;
logadd( LOG_WARNING, "Trying again. %i/5", fails );
mergeRequest();
}
}
/**
* @brief Implementation of the CURLOPT_XFERINFOFUNCTION.
* For more infos see: https://curl.se/libcurl/c/CURLOPT_XFERINFOFUNCTION.html
*
* Each active transfer callbacks this function.
* This function computes the uploaded bytes between each call and adds it to
* bytesUploaded, which is used to compute the kb/s uploaded over all transfers.
*
* @param clientp
* @param ulNow number of bytes uploaded by this transfer so far.
* @return int always returns 0 to continue the callbacks.
*/
int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal,
__attribute__((unused)) curl_off_t dlNow, __attribute__((unused)) curl_off_t ulTotal, curl_off_t ulNow )
{
CURL *eh = (CURL *)clientp;
cow_curl_read_upload_t *curlUploadBlock;
CURLcode res;
res = curl_easy_getinfo( eh, CURLINFO_PRIVATE, &curlUploadBlock );
if ( res != CURLE_OK ) {
logadd( LOG_ERROR, "ERROR" );
return 0;
}
bytesUploaded += ( ulNow - curlUploadBlock->ulLast );
curlUploadBlock->ulLast = ulNow;
return 0;
}
/**
* @brief Updates the status to the stdout/statfile depending on the startup parameters.
*
* @param inQueue Blocks that have changes old enough to be uploaded.
* @param modified Blocks that have been changed but whose changes are not old enough to be uploaded.
* @param idle Blocks that do not contain changes that have not yet been uploaded.
* @param speedBuffer ptr to char array that contains the current upload speed.
*/
void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer )
{
char buffer[300];
const char *state;
if ( uploadLoop ) {
state = "backgroundUpload";
} else if ( !uploadLoopDone ) {
state = "uploading";
} else {
state = "done";
}
int len = snprintf( buffer, 300,
"state=%s\n"
"inQueue=%" PRIu64 "\n"
"modifiedClusters=%" PRIu64 "\n"
"idleClusters=%" PRIu64 "\n"
"totalClustersUploaded=%" PRIu64 "\n"
"activeUploads=:%i\n"
"%s%s",
state, inQueue, modified, idle, totalBlocksUploaded, activeUploads,
COW_SHOW_UL_SPEED ? "ulspeed=" : "",
speedBuffer );
if ( len == -1 ) {
logadd( LOG_ERROR, "snprintf error" );
return;
}
if ( statStdout ) {
logadd( LOG_INFO, "%s", buffer );
}
if ( statFile ) {
// Pad with a bunch of newlines so we don't change the file size all the time
ssize_t extra = MIN( 20, sizeof(buffer) - len - 1 );
memset( buffer + len, '\n', extra );
if ( pwrite( cow.fhs, buffer, len + extra, 43 ) != len ) {
logadd( LOG_WARNING, "Could not update cow status file" );
}
#ifdef COW_DUMP_BLOCK_UPLOADS
if ( !uploadLoop && uploadLoopDone ) {
dumpBlockUploads();
}
#endif
}
}
int cmpfunc( const void *a, const void *b )
{
return (int)( ( (cow_block_upload_statistics_t *)b )->uploads - ( (cow_block_upload_statistics_t *)a )->uploads );
}
/**
* @brief Writes all block numbers sorted by the number of uploads into the statsfile.
*
*/
void dumpBlockUploads()
{
long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
cow_block_upload_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE];
uint64_t currentBlock = 0;
for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
if ( cow.l1[l1Index] == -1 ) {
continue;
}
for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );
blockUploads[currentBlock].uploads = block->uploads;
blockUploads[currentBlock].blocknumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
currentBlock++;
}
}
qsort( blockUploads, currentBlock, sizeof( cow_block_upload_statistics_t ), cmpfunc );
lseek( cow.fhs, 0, SEEK_END );
dprintf( cow.fhs, "\n\nblocknumber: uploads\n==Block Upload Dump===\n" );
for ( uint64_t i = 0; i < currentBlock; i++ ) {
dprintf( cow.fhs, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].blocknumber, blockUploads[i].uploads );
}
}
/**
* @brief Starts the upload of a given block.
*
* @param cm Curl_multi
* @param curlUploadBlock containing the data for the block to upload.
*/
bool addUpload( CURLM *cm, cow_curl_read_upload_t *curlUploadBlock, struct curl_slist *headers )
{
CURL *eh = curl_easy_init();
char url[COW_URL_STRING_SIZE];
snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, curlUploadBlock->blocknumber );
curl_easy_setopt( eh, CURLOPT_URL, url );
curl_easy_setopt( eh, CURLOPT_POST, 1L );
curl_easy_setopt( eh, CURLOPT_READFUNCTION, curlReadCallbackUploadBlock );
curl_easy_setopt( eh, CURLOPT_READDATA, (void *)curlUploadBlock );
curl_easy_setopt( eh, CURLOPT_PRIVATE, (void *)curlUploadBlock );
// min upload speed of 1kb/s over 10 sec otherwise the upload is canceled.
curl_easy_setopt( eh, CURLOPT_LOW_SPEED_TIME, 10L );
curl_easy_setopt( eh, CURLOPT_LOW_SPEED_LIMIT, 1000L );
curl_easy_setopt(
eh, CURLOPT_POSTFIELDSIZE_LARGE, (long)( metadata->bitfieldSize + COW_DATA_CLUSTER_SIZE ) );
if ( COW_SHOW_UL_SPEED ) {
curlUploadBlock->ulLast = 0;
curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L );
curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback );
curl_easy_setopt( eh, CURLOPT_XFERINFODATA, eh );
}
curl_easy_setopt( eh, CURLOPT_HTTPHEADER, headers );
curl_multi_add_handle( cm, eh );
return true;
}
/**
* @brief After an upload completes, either successful or unsuccessful this
* function cleans everything up. If unsuccessful and there are some tries left
* retries to upload the block.
*
* @param cm Curl_multi
* @param msg CURLMsg
* @return true returned if the upload was successful or retries are still possible.
* @return false returned if the upload was unsuccessful.
*/
bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers )
{
bool status = true;
cow_curl_read_upload_t *curlUploadBlock;
CURLcode res;
CURLcode res2;
res = curl_easy_getinfo( msg->easy_handle, CURLINFO_PRIVATE, &curlUploadBlock );
long http_code = 0;
res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code );
if ( res != CURLE_OK || res2 != CURLE_OK || http_code < 200 || http_code >= 300
|| msg->msg != CURLMSG_DONE ) {
curlUploadBlock->fails++;
logadd( LOG_ERROR, "COW_API_UPDATE failed %i/5: %s\n", curlUploadBlock->fails,
curl_easy_strerror( msg->data.result ) );
if ( curlUploadBlock->fails <= 5 ) {
addUpload( cm, curlUploadBlock, headers );
goto CLEANUP;
}
free( curlUploadBlock );
status = false;
goto CLEANUP;
}
// everything went ok, update timeChanged
atomic_compare_exchange_strong( &curlUploadBlock->block->timeChanged, &curlUploadBlock->time, 0 );
curlUploadBlock->block->uploads++;
totalBlocksUploaded++;
free( curlUploadBlock );
CLEANUP:
curl_multi_remove_handle( cm, msg->easy_handle );
curl_easy_cleanup( msg->easy_handle );
return status;
}
/**
* @brief
*
* @param cm Curl_multi
* @param activeUploads ptr to integer which holds the number of current uploads
* @param breakIfNotMax will return as soon as there are not all upload slots used, so they can be filled up.
* @param foregroundUpload used to determine the number of max uploads. If true COW_MAX_PARALLEL_UPLOADS will be the limit,
* else COW_MAX_PARALLEL_BACKGROUND_UPLOADS.
* @return true returned if all upload's were successful
* @return false returned if one ore more upload's failed.
*/
bool MessageHandler(
CURLM *cm, bool breakIfNotMax, bool foregroundUpload, struct curl_slist *headers )
{
CURLMsg *msg;
int msgsLeft = -1;
bool status = true;
do {
curl_multi_perform( cm, &activeUploads );
while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) {
if ( !finishUpload( cm, msg, headers ) ) {
status = false;
}
}
if ( breakIfNotMax
&& activeUploads
< ( foregroundUpload ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) {
break;
}
// ony wait if there are active uploads
if ( activeUploads ) {
curl_multi_wait( cm, NULL, 0, 1000, NULL );
}
} while ( activeUploads );
return status;
}
/**
* @brief loops through all blocks and uploads them.
*
* @param ignoreMinUploadDelay If true uploads all blocks that have changes while
* ignoring COW_MIN_UPLOAD_DELAY
* @param cm Curl_multi
* @return true if all blocks uploaded successful
* @return false if one ore more blocks failed to upload
*/
bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
{
bool success = true;
struct curl_slist *headers = NULL;
const time_t now = time( NULL );
headers = curl_slist_append( headers, "Content-Type: application/octet-stream" );
long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
// Iterate over all blocks, L1 first
for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
if ( cow.l1[l1Index] == -1 ) {
continue; // Not allocated
}
// Now all L2 blocks
for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );
if ( block->offset == -1 ) {
continue; // Not allocated
}
if ( block->timeChanged == 0 ) {
continue; // Not changed
}
if ( !ignoreMinUploadDelay && ( now - block->timeChanged < COW_MIN_UPLOAD_DELAY ) ) {
continue; // Last change not old enough
}
// Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached
do {
if ( !MessageHandler( cm, true, ignoreMinUploadDelay, headers ) ) {
success = false;
}
} while ( ( activeUploads >= ( ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS
: COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) )
&& activeUploads > 0 );
cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) );
b->block = block;
b->blocknumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
b->fails = 0;
b->position = 0;
b->time = block->timeChanged;
addUpload( cm, b, headers );
if ( !ignoreMinUploadDelay && !uploadLoop ) {
goto DONE;
}
}
}
DONE:
while ( activeUploads > 0 ) {
MessageHandler( cm, false, ignoreMinUploadDelay, headers );
}
curl_slist_free_all( headers );
return success;
}
/**
* @brief Computes the data for the status to the stdout/statfile every COW_STATS_UPDATE_TIME seconds.
*
*/
void *cowfile_statUpdater( __attribute__( ( unused ) ) void *something )
{
uint64_t lastUpdateTime = time( NULL );
while ( !uploadLoopDone ) {
sleep( COW_STATS_UPDATE_TIME );
int modified = 0;
int inQueue = 0;
int idle = 0;
long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
uint64_t now = time( NULL );
for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
if ( cow.l1[l1Index] == -1 ) {
continue;
}
for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );
if ( block->offset == -1 ) {
continue;
}
if ( block->timeChanged != 0 ) {
if ( !uploadLoop || now > block->timeChanged + COW_MIN_UPLOAD_DELAY ) {
inQueue++;
} else {
modified++;
}
} else {
idle++;
}
}
}
char speedBuffer[20];
if ( COW_SHOW_UL_SPEED ) {
now = time( NULL );
uint64_t bytes = atomic_exchange( &bytesUploaded, 0 );
snprintf( speedBuffer, 20, "%.2f", (double)( ( bytes ) / ( 1 + now - lastUpdateTime ) / 1000 ) );
lastUpdateTime = now;
}
updateCowStatsFile( inQueue, modified, idle, speedBuffer );
}
return NULL;
}
/**
* @brief main loop for blockupload in the background
*/
static void *uploaderThreadMain( __attribute__((unused)) void *something )
{
CURLM *cm;
cm = curl_multi_init();
curl_multi_setopt(
cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) );
while ( uploadLoop ) {
uploaderLoop( false, cm );
sleep( 2 );
}
logadd( LOG_DEBUG1, "start uploading the remaining blocks." );
// force the upload of all remaining blocks because the user dismounted the image
if ( !uploaderLoop( true, cm ) ) {
logadd( LOG_ERROR, "one or more blocks failed to upload" );
curl_multi_cleanup( cm );
uploadLoopDone = true;
return NULL;
}
uploadLoopDone = true;
curl_multi_cleanup( cm );
logadd( LOG_DEBUG1, "all blocks uploaded" );
if ( cow_merge_after_upload ) {
startMerge();
logadd( LOG_DEBUG1, "Requesting merge." );
}
return NULL;
}
/**
* @brief Create a Cow Stats File an inserts the session guid
*
* @param path where the file is created
* @return true
* @return false if failed to create or to write into the file
*/
static bool createCowStatsFile( char *path )
{
char pathStatus[strlen( path ) + 12];
snprintf( pathStatus, strlen( path ) + 12, "%s%s", path, "/status.txt" );
char buffer[100];
int len = snprintf( buffer, 100, "uuid=%s\nstate: active\n", metadata->uuid );
if ( statStdout ) {
logadd( LOG_INFO, "%s", buffer );
}
if ( statFile ) {
if ( ( cow.fhs = open( pathStatus, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) {
logadd( LOG_ERROR, "Could not create cow status file. Bye.\n" );
return false;
}
if ( pwrite( cow.fhs, buffer, len, 0 ) != len ) {
logadd( LOG_ERROR, "Could not write to cow status file. Bye.\n" );
return false;
}
}
return true;
}
/**
* @brief initializes the cow functionality, creates the data & meta file.
*
* @param path where the files should be stored
* @param image_Name name of the original file/image
* @param imageSizePtr
*/
bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
atomic_uint_fast64_t **imageSizePtr,
char *serverAddress, bool sStdout, bool sfile )
{
statStdout = sStdout;
statFile = sfile;
char pathMeta[strlen( path ) + 6];
char pathData[strlen( path ) + 6];
snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );
if ( ( cow.fhm = open( pathMeta, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) {
logadd( LOG_ERROR, "Could not create cow meta file. Bye.\n %s \n", pathMeta );
return false;
}
if ( ( cow.fhd = open( pathData, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) {
logadd( LOG_ERROR, "Could not create cow data file. Bye.\n" );
return false;
}
int maxPageSize = 8192;
size_t metaDataSizeHeader = sizeof( cowfile_metadata_header_t );
cow.maxImageSize = COW_MAX_IMAGE_SIZE;
cow.l1Size = ( ( cow.maxImageSize + COW_FULL_L2_TABLE_DATA_SIZE - 1LL ) / COW_FULL_L2_TABLE_DATA_SIZE );
// size of l1 array + number of l2's * size of l2
size_t metadata_size = cow.l1Size * sizeof( l1 ) + cow.l1Size * sizeof( l2 );
// compute next fitting multiple of getpagesize()
size_t meta_data_start = ( ( metaDataSizeHeader + maxPageSize - 1 ) / maxPageSize ) * maxPageSize;
size_t metadataFileSize = meta_data_start + metadata_size;
if ( ftruncate( cow.fhm, metadataFileSize ) != 0 ) {
logadd( LOG_ERROR, "Could not set file size of meta data file (errno=%d). Bye.\n", errno );
return false;
}
cow.metadata_mmap = mmap( NULL, metadataFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fhm, 0 );
if ( cow.metadata_mmap == MAP_FAILED ) {
logadd( LOG_ERROR, "Error while mapping mmap:\n%s \n Bye.\n", strerror( errno ) );
return false;
}
metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );
metadata->magicValue = COW_FILE_META_MAGIC_VALUE;
metadata->version = CURRENT_COW_VERSION;
metadata->dataFileSize = ATOMIC_VAR_INIT( COW_DATA_CLUSTER_SIZE );
metadata->metadataFileSize = ATOMIC_VAR_INIT( metadataFileSize );
metadata->blocksize = DNBD3_BLOCK_SIZE;
metadata->originalImageSize = **imageSizePtr;
metadata->imageSize = metadata->originalImageSize;
metadata->creationTime = time( NULL );
*imageSizePtr = &metadata->imageSize;
metadata->metaDataStart = meta_data_start;
metadata->bitfieldSize = COW_BITFIELD_SIZE;
metadata->maxImageSize = cow.maxImageSize;
snprintf( metadata->imageName, 200, "%s", image_Name );
cow.l1 = (l1 *)( cow.metadata_mmap + meta_data_start );
metadata->nextL2 = 0;
for ( size_t i = 0; i < cow.l1Size; i++ ) {
cow.l1[i] = -1;
}
cow.firstL2 = (l2 *)( ( (char *)cow.l1 ) + cow.l1Size );
// write header to data file
uint64_t header = COW_FILE_DATA_MAGIC_VALUE;
if ( pwrite( cow.fhd, &header, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) {
logadd( LOG_ERROR, "Could not write header to cow data file. Bye.\n" );
return false;
}
pthread_mutex_init( &cow.l2CreateLock, NULL );
cowServerAddress = serverAddress;
curl_global_init( CURL_GLOBAL_ALL );
curl = curl_easy_init();
if ( !curl ) {
logadd( LOG_ERROR, "Error on curl init. Bye.\n" );
return false;
}
if ( !createSession( image_Name, imageVersion ) ) {
return false;
}
createCowStatsFile( path );
return true;
}
/**
* @brief loads an existing cow state from the meta & data files
*
* @param path where the meta & data file is located
* @param imageSizePtr
*/
bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile )
{
statStdout = sStdout;
statFile = sFile;
cowServerAddress = serverAddress;
curl_global_init( CURL_GLOBAL_ALL );
curl = curl_easy_init();
char pathMeta[strlen( path ) + 6];
char pathData[strlen( path ) + 6];
snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );
if ( ( cow.fhm = open( pathMeta, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) {
logadd( LOG_ERROR, "Could not open cow meta file. Bye.\n" );
return false;
}
if ( ( cow.fhd = open( pathData, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) {
logadd( LOG_ERROR, "Could not open cow data file. Bye.\n" );
return false;
}
cowfile_metadata_header_t header;
{
size_t sizeToRead = sizeof( cowfile_metadata_header_t );
size_t readBytes = 0;
while ( readBytes < sizeToRead ) {
ssize_t bytes = pread( cow.fhm, ( ( &header ) + readBytes ), sizeToRead, 0 );
if ( bytes <= 0 ) {
logadd( LOG_ERROR, "Error while reading meta file header. Bye.\n" );
return false;
}
readBytes += bytes;
}
if ( header.magicValue != COW_FILE_META_MAGIC_VALUE ) {
if ( __builtin_bswap64( header.magicValue ) == COW_FILE_META_MAGIC_VALUE ) {
logadd( LOG_ERROR, "cow meta file of wrong endianess. Bye.\n" );
return false;
}
logadd( LOG_ERROR, "cow meta file of unkown format. Bye.\n" );
return false;
}
struct stat st;
fstat( cow.fhm, &st );
if ( st.st_size < (off_t)( header.metaDataStart + header.nextL2 * sizeof( l2 ) ) ) {
logadd( LOG_ERROR, "cow meta file to small. Bye.\n" );
return false;
}
}
{
uint64_t magicValueDataFile;
if ( pread( cow.fhd, &magicValueDataFile, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) {
logadd( LOG_ERROR, "Error while reading cow data file, wrong file?. Bye.\n" );
return false;
}
if ( magicValueDataFile != COW_FILE_DATA_MAGIC_VALUE ) {
if ( __builtin_bswap64( magicValueDataFile ) == COW_FILE_DATA_MAGIC_VALUE ) {
logadd( LOG_ERROR, "cow data file of wrong endianess. Bye.\n" );
return false;
}
logadd( LOG_ERROR, "cow data file of unkown format. Bye.\n" );
return false;
}
struct stat st;
fstat( cow.fhd, &st );
if ( (off_t)header.dataFileSize > st.st_size ) {
logadd( LOG_ERROR, "cow data file to small. Bye.\n" );
return false;
}
}
cow.metadata_mmap = mmap( NULL, header.metadataFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fhm, 0 );
if ( cow.metadata_mmap == MAP_FAILED ) {
logadd( LOG_ERROR, "Error while mapping mmap:\n%s \n Bye.\n", strerror( errno ) );
return false;
}
if ( header.version != CURRENT_COW_VERSION ) {
logadd( LOG_ERROR, "Error wrong file version got: %i expected: %i. Bye.\n",
metadata->version, CURRENT_COW_VERSION );
return false;
}
metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );
*imageSizePtr = &metadata->imageSize;
cow.l1 = (l1 *)( cow.metadata_mmap + metadata->metaDataStart );
cow.maxImageSize = metadata->maxImageSize;
cow.l1Size = ( ( cow.maxImageSize + COW_FULL_L2_TABLE_DATA_SIZE - 1LL ) / COW_FULL_L2_TABLE_DATA_SIZE );
cow.firstL2 = (l2 *)( ( (char *)cow.l1 ) + cow.l1Size );
pthread_mutex_init( &cow.l2CreateLock, NULL );
createCowStatsFile( path );
return true;
}
/**
* @brief Starts the cow BackgroundThreads which are needed for stats and data upload
*
*/
bool cowfile_startBackgroundThreads() {
if( pthread_create( &tidCowUploader, NULL, &uploaderThreadMain, NULL ) != 0 ) {
logadd( LOG_ERROR, "Could not create cow uploader thread");
return false;
}
if ( statFile || statStdout ) {
if(pthread_create( &tidStatUpdater, NULL, &cowfile_statUpdater, NULL ) != 0 ) {
logadd( LOG_ERROR, "Could not create stat updater thread");
return false;
}
}
return true;
}
/**
* @brief writes the given data in the data file
*
* @param buffer containing the data
* @param size of the buffer
* @param netSize which actually contributes to the fuse write request (can be different from size if partial full blocks are written)
* @param cowRequest <---- !???? TODO
* @param block block being written to
* @param inClusterOffset offset in this cluster to be written to
*/
static void writeData( const char *buffer, ssize_t size, size_t netSize, atomic_int *errorCode,
atomic_size_t *bytesWorkedOn, cow_l2_entry_t *block, off_t inClusterOffset )
{
// TODO: Assert that size + inClusterOffset <= COW_DATA_CLUSTER_SIZE?
ssize_t totalBytesWritten = 0;
while ( totalBytesWritten < size ) {
ssize_t bytesWritten = pwrite( cow.fhd, ( buffer + totalBytesWritten ), size - totalBytesWritten,
block->offset + inClusterOffset + totalBytesWritten );
if ( bytesWritten == -1 ) {
*errorCode = errno;
logadd( LOG_ERROR,
"size:%zu netSize:%zu errorCode:%i bytesWorkedOn:%zu inClusterOffset:%ld block->offset:%ld \n", size,
netSize, *errorCode, *bytesWorkedOn, inClusterOffset, block->offset );
break;
} else if ( bytesWritten == 0 ) {
*errorCode = EIO;
logadd( LOG_ERROR,
"size:%zu netSize:%zu errorCode:%i bytesWorkedOn:%zu inClusterOffset:%ld block->offset:%ld \n", size,
netSize, *errorCode, *bytesWorkedOn, inClusterOffset, block->offset );
break;
}
totalBytesWritten += bytesWritten;
}
atomic_fetch_add( bytesWorkedOn, netSize );
setBitsInBitfield( block->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ),
(int)( ( inClusterOffset + totalBytesWritten - 1 ) / DNBD3_BLOCK_SIZE ), 1 );
block->timeChanged = time( NULL );
}
/**
* @brief Increases the metadata->dataFileSize by COW_DATA_CLUSTER_SIZE.
* The space is not reserved on disk.
*
* @param block for which the space should be reserved.
*/
static bool allocateMetaBlockData( cow_l2_entry_t *block )
{
block->offset = (atomic_long)atomic_fetch_add( &metadata->dataFileSize, COW_DATA_CLUSTER_SIZE );
return true;
}
/**
* @brief Get the cow_l2_entry_t from l1Index and l2Index.
* l1 offset must be valid
*
* @param l1Index
* @param l2Index
* @return cow_l2_entry_t*
*/
static cow_l2_entry_t *getL2Entry( int l1Index, int l2Index )
{
cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );
if ( block->offset == -1 ) {
allocateMetaBlockData( block );
}
return block;
}
/**
* @brief creates an new L2 Block and initializes the containing cow_l2_entry_t blocks
*
* @param l1Index
*/
static bool createL2Block( int l1Index )
{
pthread_mutex_lock( &cow.l2CreateLock );
if ( cow.l1[l1Index] == -1 ) {
for ( int i = 0; i < COW_L2_TABLE_SIZE; i++ ) {
cow.firstL2[metadata->nextL2][i].offset = -1;
cow.firstL2[metadata->nextL2][i].timeChanged = ATOMIC_VAR_INIT( 0 );
cow.firstL2[metadata->nextL2][i].uploads = ATOMIC_VAR_INIT( 0 );
for ( int j = 0; j < COW_BITFIELD_SIZE; j++ ) {
cow.firstL2[metadata->nextL2][i].bitfield[j] = ATOMIC_VAR_INIT( 0 );
}
}
cow.l1[l1Index] = metadata->nextL2;
metadata->nextL2 += 1;
}
pthread_mutex_unlock( &cow.l2CreateLock );
return true;
}
/**
* @brief Is called once a fuse write request ist finished.
* Calls the corrsponding fuse reply depending on the type and
* success of the request.
*
* @param req fuse_req_t
* @param cowRequest
*/
static void finishWriteRequest( fuse_req_t req, cow_request_t *cowRequest )
{
if ( cowRequest->errorCode != 0 ) {
fuse_reply_err( req, cowRequest->errorCode );
} else {
uint64_t oldSize = metadata->imageSize;
uint64_t ns = MAX( oldSize, cowRequest->bytesWorkedOn + cowRequest->fuseRequestOffset );
atomic_compare_exchange_strong( &metadata->imageSize, &oldSize, ns );
fuse_reply_write( req, cowRequest->bytesWorkedOn );
}
free( cowRequest );
}
/**
* @brief Called after the padding data was received from the dnbd3 server.
* The data from the write request will be combined with the data from the server
* so that we get a full DNBD3_BLOCK and is then written on the disk.
* @param sRequest
*/
static void writePaddedBlock( cow_sub_request_t *sRequest )
{
//copy write Data
// TODO Assert that we have enough space in writeBuffer at that offset
memcpy( ( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ) ), sRequest->writeSrc,
sRequest->size );
writeData( sRequest->writeBuffer, DNBD3_BLOCK_SIZE, (ssize_t)sRequest->size, &sRequest->cowRequest->errorCode,
&sRequest->cowRequest->bytesWorkedOn, sRequest->block,
( sRequest->inClusterOffset - ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ) ) );
if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) {
finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest );
}
free( sRequest );
}
/**
* @brief If a block does not start or finish on an multiple of DNBD3_BLOCK_SIZE, the blocks need to be
* padded. If this block is inside the original image size, the padding data will be read from the server.
* Otherwise it will be padded with 0 since the it must be the block at the end of the image.
* TODO: Properly document the arguments and what value range they can be, i.e. see below for the 4k case
*
*/
static void padBlockFromRemote( fuse_req_t req, off_t offset, cow_request_t *cowRequest, const char *buffer,
size_t size, cow_l2_entry_t *block, off_t inClusterOffset )
{
// TODO: Is this *guaranteed* to be the case on the caller site? Add comment to ^
assert( ( offset % DNBD3_BLOCK_SIZE ) + size <= DNBD3_BLOCK_SIZE );
if ( offset >= (off_t)metadata->originalImageSize ) {
// Writing past the end of the image
inClusterOffset -= inClusterOffset % DNBD3_BLOCK_SIZE;
char buf[DNBD3_BLOCK_SIZE] = { 0 };
memcpy( buf + ( offset % DNBD3_BLOCK_SIZE ), buffer, size );
// At this point we should have a 4k block with user-space data to write, and possibly
// zero-padding at start and/or end
writeData( buf, DNBD3_BLOCK_SIZE, (ssize_t)size, &cowRequest->errorCode, &cowRequest->bytesWorkedOn,
block, inClusterOffset );
return;
}
// Need to fetch padding from upstream
cow_sub_request_t *sRequest = calloc( sizeof( cow_sub_request_t ) + DNBD3_BLOCK_SIZE, 1 );
sRequest->callback = writePaddedBlock;
sRequest->inClusterOffset = inClusterOffset;
sRequest->block = block;
sRequest->size = size;
sRequest->writeSrc = buffer;
sRequest->cowRequest = cowRequest;
sRequest->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, metadata->originalImageSize - offset );
sRequest->dRequest.offset = offset - ( offset % DNBD3_BLOCK_SIZE );
sRequest->dRequest.fuse_req = req;
atomic_fetch_add( &cowRequest->workCounter, 1 );
if ( !connection_read( &sRequest->dRequest ) ) {
cowRequest->errorCode = EIO;
if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) {
finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest );
}
free( sRequest );
return;
}
}
/**
* @brief Will be called after a dnbd3_async_t is finished.
* Calls the corrsponding callback function, either writePaddedBlock or readRemoteData
* depending if the original fuse request was a write or read.
*
*/
void cowfile_handleCallback( dnbd3_async_t *request )
{
cow_sub_request_t *sRequest = container_of( request, cow_sub_request_t, dRequest );
sRequest->callback( sRequest );
}
/**
* @brief called once dnbd3_async_t is finished. Increases bytesWorkedOn by the number of bytes
* this request had. Also checks if it was the last dnbd3_async_t to finish the fuse request, if
* so replys to fuse and cleans up the request.
*
*/
void readRemoteData( cow_sub_request_t *sRequest )
{
atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length );
if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) {
if ( sRequest->cowRequest->bytesWorkedOn < sRequest->cowRequest->fuseRequestSize ) {
// TODO: Is this a logic bug somewhere, reagarding accounting?
// Because connection_read() will always return exactly as many bytes as requested,
// or simply never finish.
// Otherwise, we should return EIO...
logadd( LOG_ERROR, "pad read to small\n" );
}
fuse_reply_buf( sRequest->dRequest.fuse_req, sRequest->cowRequest->readBuffer,
sRequest->cowRequest->bytesWorkedOn );
free( sRequest->cowRequest->readBuffer );
free( sRequest->cowRequest );
}
free( sRequest );
}
/**
* @brief changes the imageSize
*
* @param req fuse request
* @param size new size the image should have
* @param ino fuse_ino_t
* @param fi fuse_file_info
*/
void cowfile_setSize( fuse_req_t req, size_t size, fuse_ino_t ino, struct fuse_file_info *fi )
{
// decrease
if ( size < metadata->imageSize ) {
if ( size < metadata->originalImageSize ) {
metadata->originalImageSize = size;
}
// TODO.... so....
// originalImageSize = smallest we have seen
// imageSize = current
// ?
// increase
} else if ( size > metadata->imageSize ) {
off_t offset = metadata->imageSize;
int l1Index = offsetToL1Index( offset );
int l2Index = offsetToL2Index( offset );
int l1EndIndex = offsetToL1Index( size );
int l2EndIndex = offsetToL2Index( size );
// special case first block TODO: What is the special case? What is happening here?
if ( cow.l1[l1Index] != -1 ) {
cow_l2_entry_t *block = getL2Entry( l1Index, l2Index );
if ( metadata->imageSize % DNBD3_BLOCK_SIZE != 0 ) {
off_t inClusterOffset = metadata->imageSize % COW_DATA_CLUSTER_SIZE;
size_t sizeToWrite = DNBD3_BLOCK_SIZE - ( metadata->imageSize % DNBD3_BLOCK_SIZE );
if ( checkBit( block->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ) ) ) {
char buf[sizeToWrite];
memset( buf, 0, sizeToWrite );
ssize_t bytesWritten = pwrite( cow.fhd, buf, sizeToWrite, block->offset + inClusterOffset );
if ( bytesWritten < (ssize_t)sizeToWrite ) {
fuse_reply_err( req, bytesWritten == -1 ? errno : EIO );
return;
}
block->timeChanged = time( NULL );
offset += sizeToWrite;
}
}
// rest of block set bits 0
l1Index = offsetToL1Index( offset );
l2Index = offsetToL2Index( offset );
block = getL2Entry( l1Index, l2Index );
off_t inClusterOffset = offset % COW_DATA_CLUSTER_SIZE;
setBitsInBitfield(
block->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ), ( COW_BITFIELD_SIZE * 8 ) - 1, 0 );
block->timeChanged = time( NULL );
l2Index++;
if ( l2Index >= COW_L2_TABLE_SIZE ) {
l2Index = 0;
l1Index++;
}
}
// null all bitfields
while ( !( l1Index > l1EndIndex || ( l1Index == l1EndIndex && l2EndIndex < l2Index ) ) ) {
if ( cow.l1[l1Index] == -1 ) {
l1Index++;
l2Index = 0;
continue;
}
cow_l2_entry_t *block = getL2Entry( l1Index, l2Index );
setBitsInBitfield( block->bitfield, 0, ( COW_BITFIELD_SIZE * 8 ) - 1, 0 );
block->timeChanged = time( NULL );
l2Index++;
if ( l2Index >= COW_L2_TABLE_SIZE ) {
l2Index = 0;
l1Index++;
}
}
}
metadata->imageSize = size;
if ( req != NULL ) {
image_ll_getattr( req, ino, fi );
}
}
/**
* @brief Implementation of a write request.
*
* @param req fuse_req_t
* @param cowRequest
* @param offset Offset where the write starts,
* @param size Size of the write.
*/
void cowfile_write( fuse_req_t req, cow_request_t *cowRequest, off_t offset, size_t size )
{
// if beyond end of file, pad with 0
if ( offset > (off_t)metadata->imageSize ) {
cowfile_setSize( NULL, offset, 0, NULL );
}
off_t currentOffset = offset;
off_t endOffset = offset + size;
int l1Index = offsetToL1Index( currentOffset );
int l2Index = offsetToL2Index( currentOffset );
while ( currentOffset < endOffset ) {
if ( cow.l1[l1Index] == -1 ) {
createL2Block( l1Index );
}
//loop over L2 array (metadata)
while ( currentOffset < endOffset && l2Index < COW_L2_TABLE_SIZE ) {
cow_l2_entry_t *metaBlock = getL2Entry( l1Index, l2Index );
// Calc absolute offset in image corresponding to current cluster
size_t clusterAbsoluteStartOffset = l1Index * COW_FULL_L2_TABLE_DATA_SIZE + l2Index * COW_DATA_CLUSTER_SIZE;
size_t inClusterOffset = currentOffset - clusterAbsoluteStartOffset;
// How many bytes we can write to this cluster before crossing a boundary, or before the write request is completed
size_t bytesToWriteToCluster =
MIN( (size_t)( endOffset - currentOffset ), COW_DATA_CLUSTER_SIZE - inClusterOffset );
/////////////////////////
// lock for the half block probably needed
if ( currentOffset % DNBD3_BLOCK_SIZE != 0
&& !checkBit( metaBlock->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ) ) ) {
// Block has not been written locally before, and write does not start on block boundary.
// Need to fetch the first couple bytes of the block from remote before writing the block to disk.
size_t writeSize = MIN( bytesToWriteToCluster, DNBD3_BLOCK_SIZE - ( (size_t)currentOffset % DNBD3_BLOCK_SIZE ) );
const char *sbuf = cowRequest->writeBuffer + ( ( currentOffset - offset ) );
padBlockFromRemote( req, currentOffset, cowRequest, sbuf, writeSize, metaBlock, (off_t)inClusterOffset );
currentOffset += writeSize;
continue;
}
size_t endPaddedSize = 0; // In case we need to skip over a pending pad request to remote
if ( ( currentOffset + bytesToWriteToCluster ) % DNBD3_BLOCK_SIZE != 0
&& metadata->originalImageSize > currentOffset + bytesToWriteToCluster ) {
// Write request does not end on block boundary, and ends before end of image
// End offset of this write
off_t clusterEndOffset = currentOffset + bytesToWriteToCluster;
// Start of last block of write, i.e. start of the last, incomplete block
off_t lastBlockStartOffset = clusterEndOffset - ( clusterEndOffset % DNBD3_BLOCK_SIZE );
// Where that last block starts relative to its cluster
off_t inClusterBlockOffset = lastBlockStartOffset - clusterAbsoluteStartOffset;
if ( !checkBit( metaBlock->bitfield, (int)( inClusterBlockOffset / DNBD3_BLOCK_SIZE ) ) ) {
// Block indeed not modified before, need to fetch
const char *sbuf = cowRequest->writeBuffer + ( ( lastBlockStartOffset - offset ) );
padBlockFromRemote( req, lastBlockStartOffset, cowRequest, sbuf, clusterEndOffset - lastBlockStartOffset, metaBlock,
inClusterBlockOffset );
bytesToWriteToCluster -= clusterEndOffset - lastBlockStartOffset;
endPaddedSize = clusterEndOffset - lastBlockStartOffset;
}
}
writeData( cowRequest->writeBuffer + ( ( currentOffset - offset ) ), (ssize_t)bytesToWriteToCluster,
bytesToWriteToCluster, &cowRequest->errorCode, &cowRequest->bytesWorkedOn, metaBlock, inClusterOffset );
currentOffset += bytesToWriteToCluster;
// Account for skipped-over bytes
currentOffset += endPaddedSize;
l2Index++;
}
l1Index++;
l2Index = 0;
}
if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
finishWriteRequest( req, cowRequest );
}
}
/**
* @brief Request data, that is not available locally, via the network.
*
* @param req fuse_req_t
* @param offset from the start of the file
* @param size of data to request
* @param buffer into which the data is to be written
* @param workCounter workCounter is increased by one and later reduced by one again when the request is completed. TODO There is no such param, but cowRequest..
*/
static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer, cow_request_t *cowRequest )
{
// edgecase: Image size got reduced before on a non block border
if ( offset + size > (long int) metadata->originalImageSize ) { // TODO How does this check if it's a non block border?
size_t padZeroSize = ( offset + size ) - metadata->originalImageSize;
off_t padZeroOffset = metadata->originalImageSize - offset;
assert( offset > 0 ); // TODO Should this be padZeroOffset?
// ... But isn't it possible that offset > originalImageSize, in which case it would be negative?
memset( ( buffer + padZeroOffset ), 0, padZeroSize );
atomic_fetch_add( &cowRequest->bytesWorkedOn, padZeroSize );
}
cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) );
sRequest->callback = readRemoteData;
sRequest->dRequest.length = (uint32_t)size;
sRequest->dRequest.offset = offset;
sRequest->dRequest.fuse_req = req;
sRequest->cowRequest = cowRequest;
sRequest->buffer = buffer;
atomic_fetch_add( &cowRequest->workCounter, 1 );
if ( !connection_read( &sRequest->dRequest ) ) {
cowRequest->errorCode = EIO; // TODO We set an error...
free( sRequest );
if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
// .... but would still report success if this happens to be the last pending sub-request!?
fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn );
}
free( cowRequest->readBuffer );
free( cowRequest );
return;
}
}
/**
* @brief Get the Block Data Source object
*
* @param block
* @param bitfieldOffset
* @param offset
* @return enum dataSource
*/
enum dataSource getBlockDataSource( cow_l2_entry_t *block, off_t bitfieldOffset, off_t offset )
{
if ( block != NULL && checkBit( block->bitfield, (int)bitfieldOffset ) ) {
return local;
}
if ( offset >= (off_t)metadata->originalImageSize ) {
return zero;
}
return remote;
}
/**
* @brief Reads data at given offset. If the data are available locally,
* they are read locally, otherwise they are requested remotely.
*
* @param req fuse_req_t
* @param size of date to read
* @param offset offset where the read starts.
* @return uint64_t Number of bytes read.
*/
void cowfile_read( fuse_req_t req, size_t size, off_t offset )
{
cow_request_t *cowRequest = malloc( sizeof( cow_request_t ) );
cowRequest->fuseRequestSize = size;
cowRequest->bytesWorkedOn = ATOMIC_VAR_INIT( 0 );
cowRequest->workCounter = ATOMIC_VAR_INIT( 1 );
cowRequest->errorCode = ATOMIC_VAR_INIT( 0 );
cowRequest->readBuffer = malloc( size );
cowRequest->fuseRequestOffset = offset;
off_t lastReadOffset = offset;
off_t endOffset = offset + size;
off_t searchOffset = offset;
int l1Index = offsetToL1Index( offset );
int l2Index = offsetToL2Index( offset );
int bitfieldOffset = getBitfieldOffsetBit( offset );
enum dataSource dataState;
cow_l2_entry_t *cluster = NULL;
if ( cow.l1[l1Index] != -1 ) {
cluster = getL2Entry( l1Index, l2Index );
}
bool doRead = false;
bool firstLoop = true;
bool updateBlock = false;
while ( searchOffset < endOffset ) {
if ( firstLoop ) {
firstLoop = false;
lastReadOffset = searchOffset;
// TODO: Why is this only set on first iteration and not for every block/cluster?
dataState = getBlockDataSource( cluster, bitfieldOffset, searchOffset );
} else if ( getBlockDataSource( cluster, bitfieldOffset, searchOffset ) != dataState ) {
// TODO So data source changed, but we don't update the dataState var... How can this possibly work?
doRead = true;
} else {
bitfieldOffset++;
}
if ( bitfieldOffset >= COW_BITFIELD_SIZE * 8 ) {
// Advance to next cluster in current l2 table
bitfieldOffset = 0;
l2Index++;
if ( l2Index >= COW_L2_TABLE_SIZE ) {
// Advance to next l1 entry, reset l2 index
l2Index = 0;
l1Index++;
}
// Also set flag that we need to update the 'cluster' struct at the end of this iteration
// TODO: Why do we update all the values above, but not the cluster struct? We access those
// variables in the code below, so we have updated offset and index, but operate on the
// old cluster struct. How does that make sense?
updateBlock = true;
if ( dataState == local ) {
doRead = true;
}
}
// compute the original file offset from bitfieldOffset, l2Index and l1Index
// TODO ??? As stated above, this is using the updated values, so isn't this the next
// offset tather than original offset?
searchOffset = DNBD3_BLOCK_SIZE * ( bitfieldOffset ) + l2Index * COW_DATA_CLUSTER_SIZE
+ l1Index * COW_FULL_L2_TABLE_DATA_SIZE;
if ( doRead || searchOffset >= endOffset ) {
ssize_t sizeToRead = MIN( searchOffset, endOffset );
if ( dataState == remote ) {
if ( sizeToRead > (ssize_t) metadata->originalImageSize ) {
//pad rest with 0
memset( cowRequest->readBuffer
+ ( ( lastReadOffset - offset ) + ( metadata->originalImageSize - offset ) ),
0, sizeToRead - metadata->originalImageSize );
atomic_fetch_add( &cowRequest->bytesWorkedOn, sizeToRead - metadata->originalImageSize );
sizeToRead = metadata->originalImageSize;
}
sizeToRead -= lastReadOffset;
readRemote(
req, lastReadOffset, sizeToRead, cowRequest->readBuffer + ( lastReadOffset - offset ), cowRequest );
} else if ( dataState == zero ) {
sizeToRead -= lastReadOffset;
memset( cowRequest->readBuffer + ( lastReadOffset - offset ), 0, sizeToRead );
atomic_fetch_add( &cowRequest->bytesWorkedOn, sizeToRead );
} else {
sizeToRead -= lastReadOffset;
// Compute the offset in the data file where the read starts
off_t localRead =
cluster->offset + ( ( lastReadOffset % COW_FULL_L2_TABLE_DATA_SIZE ) % COW_DATA_CLUSTER_SIZE );
ssize_t totalBytesRead = 0;
while ( totalBytesRead < sizeToRead ) {
ssize_t bytesRead =
pread( cow.fhd, cowRequest->readBuffer + ( lastReadOffset - offset ), sizeToRead, localRead );
if ( bytesRead == -1 ) {
cowRequest->errorCode = errno;
goto fail;
} else if ( bytesRead <= 0 ) {
cowRequest->errorCode = EIO;
goto fail;
}
totalBytesRead += bytesRead;
}
atomic_fetch_add( &cowRequest->bytesWorkedOn, totalBytesRead );
}
lastReadOffset = searchOffset;
doRead = false;
firstLoop = true;
}
if ( updateBlock ) {
if ( cow.l1[l1Index] != -1 ) {
cluster = getL2Entry( l1Index, l2Index );
} else {
cluster = NULL;
}
updateBlock = false;
}
}
fail:;
if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
if ( cowRequest->errorCode != 0 || cowRequest->bytesWorkedOn < size ) {
logadd( LOG_ERROR, "incomplete read or I/O error (errno=%d)", cowRequest->errorCode );
fuse_reply_err( req, cowRequest->errorCode != 0 ? cowRequest->errorCode : EIO );
} else {
fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn );
}
free( cowRequest->readBuffer );
free( cowRequest );
}
}
/**
* @brief stops the StatUpdater and CowUploader threads
* and waits for them to finish, then cleans up curl.
*
*/
void cowfile_close()
{
uploadLoop = false;
if ( statFile || statStdout ) {
pthread_join( tidStatUpdater, NULL );
}
pthread_join( tidCowUploader, NULL );
if ( curl ) {
curl_global_cleanup();
curl_easy_cleanup( curl );
}
}