summaryrefslogblamecommitdiffstats
path: root/src/fuse/cowfile.c
blob: 8e816a2a74426f0259e42e562e783b1c5df12b06 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
                    











                             

                                                                                          

                                         








                                                  
                             

                                                                         















                                         
                                                           

                                     
                       
   
                                           
 
                                                             


   
                                                           

                                     
                       
   
                                           
 
                                                                                         







                                                                        
                                                
























                                                                              
                                    


















                                                                                       
                                    




















                                                                                                



                                                                                                  


                             
                                                            



















































                                                                                               
                                                    



                                                                                     
                                           



















                                                                                                  
                                               





                                                                                                    
                                                          
                                                                        

                                                                                                               
                                                         





                                                                                                                             
 
                                              
                                                                           


                                                                                                          









                                                              














































































                                                                                                      

                                                                                                                        

























                                                                                                     

                          
                           
                                           
                                       
                                    
                
                               




                                               






                                                                                           

                                      




                                                      




                                                 



                                                                                             

                                                                                  
















                                                                                                                          
                                                                                                          
 
                                                                                    
                                  

                                                                                 

                                 

                                                                                           

                                                                            
                                                                                                           



































                                                                                                                             
                                                                                                                    
































                                                                                         

                                                                                       




































                                                                                                                           
                                                                                                   




                           
                                                         
 
                                                                                   




                                                                  
                                                



                                                                                                                                           
                                      


                                                                   
                                  















                                                                                 
                                        

                                                                                         




                                                                                                          
                 


                                                                                           
                                                    
                                                          
                         









                                                                                                                    
                                 











                                                                                                                                                                                                          




                                     
                                                                           



















                                                                                                      
                                                                                                                  
                                            

                                                                                         

                                         

                                                                                                   


























                                                                                                                         
                    




                                                     
                                                                          





































                                                                                                                               
                                            




























                                                                                                              
                      
   

                                                                             





















                                                                                                    
                                                                        

                                              
                                                                                                                







                                                                                                          

                                                                                                           




                                                                                                           






                                                                                                  


                                                                          






















                                                                                        


                                                      









                                                                  
                                   






















                                                                                                                   
                                                                                


                                                                            
                                                                                


























                                                                                                     

                                                                                                    



















                                                                                                           

                                                                










                                                                                                                  


                                                                                           








                                                                       
                                                                                                                



                                                                







                                                                                     
                                                                                        


                                                                           
                                       
                                                                                               


                                                                                   
         








                                                                                                                                      


                                                                 

                                                                                               
                                                                                            
 
                                                                             


                                                                                                                 
                                                                                      
                                           
                                           


                                                                                                                                             

                                                 
                                         


                                                                                                                                             




                                                   

                                                                                                     




                                          
                                                                        



                                                       
                                                          
 
                                                                                                        



                    

                                                          
   


                           
   
                                                             
 
                                                                           






                                               
                                                                                      
   
                  
   
                                        

                                                

                                                               






                                                                                                    
                                                   






                                                  
                                                           



                                                              
                    







                                                                           


                                                                                                        






                                                                           
                                                                                 





                                                                     

                                                                                                                 


                                                                                                                      
                                                                                                           








                                                                                        



                                                                                                          


                                                                                                            
                                                                           
 




                                                                                         
                                                   


                                                                                                      
 

                                                                                                                    

                       

                                                                                                  
                                              
                                                    



                                          
 

                                                                                                            

                                          


                                                        


                                                                                                
                                 




























                                                                                                    



                                                                                                           

                                                                   

                                                                                              

















                                                                                              
                                           


                                                           



                                                            

                           
                                                  
                                                   






                                                                                                   
                                                                            
                                                                                                    

                                                                                                                   
                                                                                                                 


                                                                      
                                                                                                                                    









                                                                                                        



                                                                               
                                          
                                                                                                                                         
                                                          



                                                             


                                     



                                                                                                          


                                         
                                                                               

                                                                                                  



                                                             









                                                 
                                            









                                                                                          
                                                         





                                        

                                                       
                                             

                                                 

                                               

                                                                                    
 

                                                                                                                                    
 



                                                                                                                                           



                                                                  



                                                                                                                                                 
                                                                                                            

                                                                                                                                         


                                         


















                                                                                                                                                            

                                 

                                                                                                                                                
 

                                                               


                                                       
                                  
                 

                            













                                                                       
                                                                                                                                                                 



                                                                                                             
                                                                                                                               

                                                                                     


                                                                                                                 












                                                                            
                                                                       

                                                                             
                                                                                                                   















                                                                                                 
                                                                                               






























                                                                                  


                                                            
                                  
                                       
 

                                                         








                                                      



                                                                                                                            





                                                                
                                                                      
                                           




                                                                           
                         



                                                                                                                   




                                                   




                                                                                                      


                                                                            
                                                                                           

















                                                                                                                                                     
                                                                                                                                               





















                                                                                                                                                      

                                                                         
                                
                                               





                                                                     


                                                                                                              
                        

























                                                                                                 
#include "cowfile.h"
#include "main.h"
#include "connection.h"

#include <dnbd3/shared/log.h>
#include <sys/mman.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <curl/curl.h>

#define UUID_STRLEN 36

extern void image_ll_getattr( fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi );

static const int CURRENT_COW_VERSION = 1;

static bool statStdout;
static bool statFile;
static pthread_t tidCowUploader;
static pthread_t tidStatUpdater;
static char *cowServerAddress;
static CURL *curl;
static cowfile_metadata_header_t *metadata = NULL;
static atomic_uint_fast64_t bytesUploaded;
static uint64_t totalBlocksUploaded = 0;
static int activeUploads = 0;
atomic_bool uploadLoop = true; // Keep upload loop running?
atomic_bool uploadLoopDone = false; // Upload loop has finished all work?

static struct cow
{
	pthread_mutex_t l2CreateLock;
	int fhm;
	int fhd;
	int fhs;
	char *metadata_mmap;
	l1 *l1;
	l2 *firstL2;
	size_t maxImageSize;
	size_t l1Size; //size of l1 array

} cow;

/**
 * @brief Computes the l1 index for an absolute file offset
 * 
 * @param offset absolute file offset
 * @return int l1 index
 */
static int offsetToL1Index( size_t offset )
{
	return (int)( offset / COW_FULL_L2_TABLE_DATA_SIZE );
}

/**
 * @brief Computes the l2 index for an absolute file offset
 * 
 * @param offset absolute file offset
 * @return int l2 index
 */
static int offsetToL2Index( size_t offset )
{
	return (int)( ( offset % COW_FULL_L2_TABLE_DATA_SIZE ) / COW_DATA_CLUSTER_SIZE );
}

/**
 * @brief Computes the bit in the bitfield from the absolute file offset
 * 
 * @param offset absolute file offset
 * @return int bit(0-319) in the bitfield
 */
static int getBitfieldOffsetBit( size_t offset )
{
	return (int)( offset / DNBD3_BLOCK_SIZE ) % ( COW_BITFIELD_SIZE * 8 );
}

/**
 * @brief Sets the specified bits in the specified range threadsafe to 1.
 * 
 * @param byte of a bitfield
 * @param from start bit
 * @param to end bit
 * @param value set bits to 1 or 0
 */
static void setBits( atomic_char *byte, int from, int to, bool value )
{
	char mask = (char)( ( 255 >> ( 7 - ( to - from ) ) ) << from );
	if ( value ) {
		atomic_fetch_or( byte, mask );
	} else {
		atomic_fetch_and( byte, ~mask );
	}
}

/**
 * @brief Sets the specified bits in the specified range threadsafe to 1.
 * 
 * @param bitfield of a cow_l2_entry
 * @param from start bit
 * @param to end bit
 * @param value set bits to 1 or 0
 */
static void setBitsInBitfield( atomic_char *bitfield, int from, int to, bool value )
{
	assert( from >= 0 || to < COW_BITFIELD_SIZE * 8 );
	int start = from / 8;
	int end = to / 8;

	for ( int i = start; i <= end; i++ ) {
		setBits( ( bitfield + i ), from - i * 8, MIN( 7, to - i * 8 ), value );
		from = ( i + 1 ) * 8;
	}
}

/**
 * @brief Checks if the n bit of a bit field is 0 or 1.
 * 
 * @param bitfield of a cow_l2_entry
 * @param n the bit which should be checked
 */
static bool checkBit( atomic_char *bitfield, int n )
{
	return ( atomic_load( ( bitfield + ( n / 8 ) ) ) >> ( n % 8 ) ) & 1;
}


/**
 * @brief Implementation of CURLOPT_WRITEFUNCTION , this function will be called when
 * the server sends back data.
 * for more details see: https://curl.se/libcurl/c/CURLOPT_WRITEFUNCTION .html
 *  
 * @param buffer that contains the response data from the server
 * @param itemSize size of one item
 * @param nitems number of items
 * @param response userdata which will later contain the uuid
 * @return size_t size that have been read
 */
size_t curlCallbackCreateSession( char *buffer, size_t itemSize, size_t nitems, void *response )
{
	uint64_t done = strlen( response );
	uint64_t bytes = itemSize * nitems;
	if ( done + bytes > UUID_STRLEN ) {
		logadd( LOG_INFO, "strlen(response): %"PRIu64" bytes: %"PRIu64"\n", done, bytes );
		return bytes;
	}

	strncat( response, buffer, UUID_STRLEN - done + 1 );
	return bytes;
}

/**
 * @brief Create a Session with the cow server and gets the session guid.
 * 
 * @param imageName 
 * @param version of the original Image
 */
bool createSession( const char *imageName, uint16_t version )
{
	CURLcode res;
	char url[COW_URL_STRING_SIZE];
	snprintf( url, COW_URL_STRING_SIZE, COW_API_CREATE, cowServerAddress );
	logadd( LOG_INFO, "COW_API_CREATE URL: %s", url );
	curl_easy_setopt( curl, CURLOPT_POST, 1L );
	curl_easy_setopt( curl, CURLOPT_URL, url );

	curl_mime *mime;
	curl_mimepart *part;
	mime = curl_mime_init( curl );
	part = curl_mime_addpart( mime );
	curl_mime_name( part, "imageName" );
	curl_mime_data( part, imageName, CURL_ZERO_TERMINATED );
	part = curl_mime_addpart( mime );
	curl_mime_name( part, "version" );
	char buf[sizeof( int ) * 3 + 2];
	snprintf( buf, sizeof buf, "%d", version );
	curl_mime_data( part, buf, CURL_ZERO_TERMINATED );

	part = curl_mime_addpart( mime );
	curl_mime_name( part, "bitfieldSize" );
	snprintf( buf, sizeof buf, "%d", metadata->bitfieldSize );
	curl_mime_data( part, buf, CURL_ZERO_TERMINATED );

	curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime );

	metadata->uuid[0] = '\0';
	curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, curlCallbackCreateSession );
	curl_easy_setopt( curl, CURLOPT_WRITEDATA, &metadata->uuid );

	res = curl_easy_perform( curl );
	curl_mime_free( mime );

	/* Check for errors */
	if ( res != CURLE_OK ) {
		logadd( LOG_ERROR, "COW_API_CREATE  failed: %s\n", curl_easy_strerror( res ) );
		return false;
	}

	long http_code = 0;
	curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
	if ( http_code < 200 || http_code >= 300 ) {
		logadd( LOG_ERROR, "COW_API_CREATE  failed http: %ld\n", http_code );
		return false;
	}
	curl_easy_reset( curl );
	metadata->uuid[UUID_STRLEN] = '\0';
	logadd( LOG_DEBUG1, "Cow session started, guid: %s\n", metadata->uuid );
	return true;
}

/**
 * @brief Implementation of CURLOPT_READFUNCTION, this function will first send the bit field and
 * then the block data in one bitstream. this function is usually called multiple times per block,
 * because the buffer is usually not large for one block and its bitfield.
 * for more details see: https://curl.se/libcurl/c/CURLOPT_READFUNCTION.html
 * 
 * @param ptr to the buffer
 * @param size of one element in buffer
 * @param nmemb number of elements in buffer
 * @param userdata from CURLOPT_READFUNCTION
 * @return size_t size written in buffer
 */
size_t curlReadCallbackUploadBlock( char *ptr, size_t size, size_t nmemb, void *userdata )
{
	cow_curl_read_upload_t *uploadBlock = (cow_curl_read_upload_t *)userdata;
	size_t len = 0;
	// Check if we're still in the bitfield
	if ( uploadBlock->position < (size_t)metadata->bitfieldSize ) {
		size_t lenCpy = MIN( metadata->bitfieldSize - uploadBlock->position, size * nmemb );
		memcpy( ptr, uploadBlock->block->bitfield + uploadBlock->position, lenCpy );
		uploadBlock->position += lenCpy;
		len += lenCpy;
	}
	// No elseif here, might just have crossed over...
	if ( uploadBlock->position >= (size_t)metadata->bitfieldSize ) {
		ssize_t wantRead = (ssize_t)MIN(
				COW_DATA_CLUSTER_SIZE - ( uploadBlock->position - ( metadata->bitfieldSize ) ),
				( size * nmemb ) - len );
		off_t inClusterOffset = uploadBlock->position - metadata->bitfieldSize;
		ssize_t lengthRead = pread( cow.fhd, ( ptr + len ), wantRead, uploadBlock->block->offset + inClusterOffset );
		if ( lengthRead == -1 ) {
			logadd( LOG_ERROR, "Upload: Reading from COW file failed with errno %d", errno );
			return CURL_READFUNC_ABORT;
		}

		if ( wantRead > lengthRead ) {
			// fill up since last block may not be a full block
			memset( ptr + len + lengthRead, 0, wantRead - lengthRead );
			// TODO what about partial read? We should know how much data there actually is...
			lengthRead = wantRead;
		}
		uploadBlock->position += lengthRead;
		len += lengthRead;
	}
	return len;
}


/**
 * @brief Requests the merging of the image on the cow server.
 */
bool mergeRequest()
{
	CURLcode res;
	curl_easy_setopt( curl, CURLOPT_POST, 1L );

	char url[COW_URL_STRING_SIZE];
	snprintf( url, COW_URL_STRING_SIZE, COW_API_START_MERGE, cowServerAddress );
	curl_easy_setopt( curl, CURLOPT_URL, url );


	curl_mime *mime;
	curl_mimepart *part;
	mime = curl_mime_init( curl );

	part = curl_mime_addpart( mime );
	curl_mime_name( part, "guid" );
	curl_mime_data( part, metadata->uuid, CURL_ZERO_TERMINATED );

	part = curl_mime_addpart( mime );
	curl_mime_name( part, "originalFileSize" );
	char buf[21];
	snprintf( buf, sizeof buf, "%" PRIu64, metadata->originalImageSize );
	curl_mime_data( part, buf, CURL_ZERO_TERMINATED );

	part = curl_mime_addpart( mime );
	curl_mime_name( part, "newFileSize" );
	snprintf( buf, sizeof buf, "%" PRIu64, metadata->imageSize );
	curl_mime_data( part, buf, CURL_ZERO_TERMINATED );

	curl_easy_setopt( curl, CURLOPT_MIMEPOST, mime );


	res = curl_easy_perform( curl );
	if ( res != CURLE_OK ) {
		logadd( LOG_WARNING, "COW_API_START_MERGE  failed: %s\n", curl_easy_strerror( res ) );
		curl_easy_reset( curl );
		return false;
	}
	long http_code = 0;
	curl_easy_getinfo( curl, CURLINFO_RESPONSE_CODE, &http_code );
	if ( http_code != 200 ) {
		logadd( LOG_WARNING, "COW_API_START_MERGE  failed http: %ld\n", http_code );
		curl_easy_reset( curl );
		return false;
	}
	curl_easy_reset( curl );
	curl_mime_free( mime );
	return true;
}

/**
 * @brief Wrapper for mergeRequest so if its fails it will be tried again.
 * 
 */
void startMerge()
{
	int fails = 0;
	bool success = false;
	success = mergeRequest();
	while ( fails <= 5 && !success ) {
		fails++;
		logadd( LOG_WARNING, "Trying again. %i/5", fails );
		mergeRequest();
	}
}

/**
 * @brief Implementation of the CURLOPT_XFERINFOFUNCTION.
 * For more infos see: https://curl.se/libcurl/c/CURLOPT_XFERINFOFUNCTION.html
 * 
 * Each active transfer callbacks this function.
 * This function computes the uploaded bytes between each call and adds it to
 * bytesUploaded, which is used to compute the kb/s uploaded over all transfers.
 * 
 * @param clientp 
 * @param ulNow number of bytes uploaded by this transfer so far.
 * @return int always returns 0 to continue the callbacks.
 */
int progress_callback( void *clientp, __attribute__((unused)) curl_off_t dlTotal,
		__attribute__((unused)) curl_off_t dlNow, __attribute__((unused)) curl_off_t ulTotal, curl_off_t ulNow )
{
	CURL *eh = (CURL *)clientp;
	cow_curl_read_upload_t *curlUploadBlock;
	CURLcode res;
	res = curl_easy_getinfo( eh, CURLINFO_PRIVATE, &curlUploadBlock );
	if ( res != CURLE_OK ) {
		logadd( LOG_ERROR, "ERROR" );
		return 0;
	}
	bytesUploaded += ( ulNow - curlUploadBlock->ulLast );
	curlUploadBlock->ulLast = ulNow;
	return 0;
}

/**
 * @brief Updates the status to the stdout/statfile depending on the startup parameters.
 * 
 * @param inQueue Blocks that have changes old enough to be uploaded.
 * @param modified Blocks that have been changed but whose changes are not old enough to be uploaded.
 * @param idle Blocks that do not contain changes that have not yet been uploaded.
 * @param speedBuffer ptr to char array that contains the current upload speed.
 */

void updateCowStatsFile( uint64_t inQueue, uint64_t modified, uint64_t idle, char *speedBuffer )
{
	char buffer[300];
	const char *state;

	if ( uploadLoop ) {
		state = "backgroundUpload";
	} else if ( !uploadLoopDone ) {
		state = "uploading";
	} else {
		state = "done";
	}

	int len = snprintf( buffer, 300,
			"state=%s\n"
			"inQueue=%" PRIu64 "\n"
			"modifiedClusters=%" PRIu64 "\n"
			"idleClusters=%" PRIu64 "\n"
			"totalClustersUploaded=%" PRIu64 "\n"
			"activeUploads=:%i\n"
			"%s%s",
			state, inQueue, modified, idle, totalBlocksUploaded, activeUploads,
			COW_SHOW_UL_SPEED ? "ulspeed=" : "",
			speedBuffer );

	if ( len == -1 ) {
		logadd( LOG_ERROR, "snprintf error" );
		return;
	}

	if ( statStdout ) {
		logadd( LOG_INFO, "%s", buffer );
	}

	if ( statFile ) {
		// Pad with a bunch of newlines so we don't change the file size all the time
		ssize_t extra = MIN( 20, sizeof(buffer) - len - 1 );
		memset( buffer + len, '\n', extra );
		if ( pwrite( cow.fhs, buffer, len + extra, 43 ) != len ) {
			logadd( LOG_WARNING, "Could not update cow status file" );
		}
#ifdef COW_DUMP_BLOCK_UPLOADS
		if ( !uploadLoop && uploadLoopDone ) {
			dumpBlockUploads();
		}
#endif
	}
}
int cmpfunc( const void *a, const void *b )
{
	return (int)( ( (cow_block_upload_statistics_t *)b )->uploads - ( (cow_block_upload_statistics_t *)a )->uploads );
}
/**
 * @brief Writes all block numbers sorted by the number of uploads into the statsfile.
 * 
 */
void dumpBlockUploads()
{
	long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );

	cow_block_upload_statistics_t blockUploads[l1MaxOffset * COW_L2_TABLE_SIZE];
	uint64_t currentBlock = 0;
	for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
		if ( cow.l1[l1Index] == -1 ) {
			continue;
		}
		for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
			cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );

			blockUploads[currentBlock].uploads = block->uploads;
			blockUploads[currentBlock].blocknumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
			currentBlock++;
		}
	}
	qsort( blockUploads, currentBlock, sizeof( cow_block_upload_statistics_t ), cmpfunc );
	lseek( cow.fhs, 0, SEEK_END );

	dprintf( cow.fhs, "\n\nblocknumber: uploads\n==Block Upload Dump===\n" );
	for ( uint64_t i = 0; i < currentBlock; i++ ) {
		dprintf( cow.fhs, "%" PRIu64 ": %" PRIu64 " \n", blockUploads[i].blocknumber, blockUploads[i].uploads );
	}
}

/**
 * @brief Starts the upload of a given block.
 * 
 * @param cm Curl_multi
 * @param curlUploadBlock containing the data for the block to upload.
 */
bool addUpload( CURLM *cm, cow_curl_read_upload_t *curlUploadBlock, struct curl_slist *headers )
{
	CURL *eh = curl_easy_init();

	char url[COW_URL_STRING_SIZE];

	snprintf( url, COW_URL_STRING_SIZE, COW_API_UPDATE, cowServerAddress, metadata->uuid, curlUploadBlock->blocknumber );

	curl_easy_setopt( eh, CURLOPT_URL, url );
	curl_easy_setopt( eh, CURLOPT_POST, 1L );
	curl_easy_setopt( eh, CURLOPT_READFUNCTION, curlReadCallbackUploadBlock );
	curl_easy_setopt( eh, CURLOPT_READDATA, (void *)curlUploadBlock );
	curl_easy_setopt( eh, CURLOPT_PRIVATE, (void *)curlUploadBlock );
	// min upload speed of 1kb/s over 10 sec otherwise the upload is canceled.
	curl_easy_setopt( eh, CURLOPT_LOW_SPEED_TIME, 10L );
	curl_easy_setopt( eh, CURLOPT_LOW_SPEED_LIMIT, 1000L );

	curl_easy_setopt(
			eh, CURLOPT_POSTFIELDSIZE_LARGE, (long)( metadata->bitfieldSize + COW_DATA_CLUSTER_SIZE ) );
	if ( COW_SHOW_UL_SPEED ) {
		curlUploadBlock->ulLast = 0;
		curl_easy_setopt( eh, CURLOPT_NOPROGRESS, 0L );
		curl_easy_setopt( eh, CURLOPT_XFERINFOFUNCTION, progress_callback );
		curl_easy_setopt( eh, CURLOPT_XFERINFODATA, eh );
	}
	curl_easy_setopt( eh, CURLOPT_HTTPHEADER, headers );
	curl_multi_add_handle( cm, eh );

	return true;
}

/**
 * @brief After an upload completes, either successful or unsuccessful this
 * function cleans everything up. If unsuccessful and there are some tries left
 * retries to upload the block.
 * 
 * @param cm Curl_multi
 * @param msg CURLMsg
 * @return true returned if the upload was successful or retries are still possible.
 * @return false returned if the upload was unsuccessful.
 */
bool finishUpload( CURLM *cm, CURLMsg *msg, struct curl_slist *headers )
{
	bool status = true;
	cow_curl_read_upload_t *curlUploadBlock;
	CURLcode res;
	CURLcode res2;
	res = curl_easy_getinfo( msg->easy_handle, CURLINFO_PRIVATE, &curlUploadBlock );

	long http_code = 0;
	res2 = curl_easy_getinfo( msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_code );

	if ( res != CURLE_OK || res2 != CURLE_OK || http_code < 200 || http_code >= 300
			|| msg->msg != CURLMSG_DONE ) {
		curlUploadBlock->fails++;
		logadd( LOG_ERROR, "COW_API_UPDATE  failed %i/5: %s\n", curlUploadBlock->fails,
				curl_easy_strerror( msg->data.result ) );
		if ( curlUploadBlock->fails <= 5 ) {
			addUpload( cm, curlUploadBlock, headers );
			goto CLEANUP;
		}
		free( curlUploadBlock );
		status = false;
		goto CLEANUP;
	}

	// everything went ok, update timeChanged
	atomic_compare_exchange_strong( &curlUploadBlock->block->timeChanged, &curlUploadBlock->time, 0 );

	curlUploadBlock->block->uploads++;

	totalBlocksUploaded++;
	free( curlUploadBlock );
CLEANUP:
	curl_multi_remove_handle( cm, msg->easy_handle );
	curl_easy_cleanup( msg->easy_handle );
	return status;
}

/**
 * @brief 
 * 
 * @param cm Curl_multi
 * @param activeUploads ptr to integer which holds the number of current uploads
 * @param breakIfNotMax will return as soon as there are not all upload slots used, so they can be filled up.
 * @param foregroundUpload used to determine the number of max uploads. If true COW_MAX_PARALLEL_UPLOADS will be the limit,
 * else COW_MAX_PARALLEL_BACKGROUND_UPLOADS.
 * @return true returned if all upload's were successful 
 * @return false returned if  one ore more upload's failed.
 */
bool MessageHandler(
		CURLM *cm,  bool breakIfNotMax, bool foregroundUpload, struct curl_slist *headers )
{
	CURLMsg *msg;
	int msgsLeft = -1;
	bool status = true;
	do {
		curl_multi_perform( cm, &activeUploads );

		while ( ( msg = curl_multi_info_read( cm, &msgsLeft ) ) != NULL ) {
			if ( !finishUpload( cm, msg, headers ) ) {
				status = false;
			}
		}
		if ( breakIfNotMax
				&& activeUploads
						< ( foregroundUpload ? COW_MAX_PARALLEL_UPLOADS : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) ) {
			break;
		}
		// ony wait if there are active uploads
		if ( activeUploads ) {
			curl_multi_wait( cm, NULL, 0, 1000, NULL );
		}

	} while ( activeUploads );
	return status;
}

/**
 * @brief loops through all blocks and uploads them.
 * 
 * @param ignoreMinUploadDelay If true uploads all blocks that have changes while
 * ignoring COW_MIN_UPLOAD_DELAY
 * @param cm Curl_multi
 * @return true if all blocks uploaded successful
 * @return false if one ore more blocks failed to upload
 */
bool uploaderLoop( bool ignoreMinUploadDelay, CURLM *cm )
{
	bool success = true;
	struct curl_slist *headers = NULL;
	const time_t now = time( NULL );
	headers = curl_slist_append( headers, "Content-Type: application/octet-stream" );

	long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
	// Iterate over all blocks, L1 first
	for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
		if ( cow.l1[l1Index] == -1 ) {
			continue; // Not allocated
		}
		// Now all L2 blocks
		for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
			cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );
			if ( block->offset == -1 ) {
				continue; // Not allocated
			}
			if ( block->timeChanged == 0 ) {
				continue; // Not changed
			}
			if ( !ignoreMinUploadDelay && ( now - block->timeChanged < COW_MIN_UPLOAD_DELAY ) ) {
				continue; // Last change not old enough
			}
			// Run curl mainloop at least one, but keep doing so while max concurrent uploads is reached
			do {
				if ( !MessageHandler( cm, true, ignoreMinUploadDelay, headers ) ) {
					success = false;
				}
			} while ( ( activeUploads >= ( ignoreMinUploadDelay ? COW_MAX_PARALLEL_UPLOADS
																				 : COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) )
					&& activeUploads > 0 );
			cow_curl_read_upload_t *b = malloc( sizeof( cow_curl_read_upload_t ) );
			b->block = block;
			b->blocknumber = ( l1Index * COW_L2_TABLE_SIZE + l2Index );
			b->fails = 0;
			b->position = 0;
			b->time = block->timeChanged;
			addUpload( cm, b, headers );
			if ( !ignoreMinUploadDelay && !uploadLoop ) {
				goto DONE;
			}
		}
	}
DONE:
	while ( activeUploads > 0 ) {
		MessageHandler( cm, false, ignoreMinUploadDelay, headers );
	}
	curl_slist_free_all( headers );
	return success;
}


/**
 * @brief Computes the data for the status to the stdout/statfile every COW_STATS_UPDATE_TIME seconds.
 * 
 */

void *cowfile_statUpdater( __attribute__( ( unused ) ) void *something )
{
	uint64_t lastUpdateTime = time( NULL );

	while ( !uploadLoopDone ) {
		sleep( COW_STATS_UPDATE_TIME );
		int modified = 0;
		int inQueue = 0;
		int idle = 0;
		long unsigned int l1MaxOffset = 1 + ( ( metadata->imageSize - 1 ) / COW_FULL_L2_TABLE_DATA_SIZE );
		uint64_t now = time( NULL );
		for ( long unsigned int l1Index = 0; l1Index < l1MaxOffset; l1Index++ ) {
			if ( cow.l1[l1Index] == -1 ) {
				continue;
			}
			for ( int l2Index = 0; l2Index < COW_L2_TABLE_SIZE; l2Index++ ) {
				cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );
				if ( block->offset == -1 ) {
					continue;
				}
				if ( block->timeChanged != 0 ) {
					if ( !uploadLoop || now > block->timeChanged + COW_MIN_UPLOAD_DELAY ) {
						inQueue++;
					} else {
						modified++;
					}
				} else {
					idle++;
				}
			}
		}
		char speedBuffer[20];

		if ( COW_SHOW_UL_SPEED ) {
			now = time( NULL );
			uint64_t bytes = atomic_exchange( &bytesUploaded, 0 );
			snprintf( speedBuffer, 20, "%.2f", (double)( ( bytes ) / ( 1 + now - lastUpdateTime ) / 1000 ) );

			lastUpdateTime = now;
		}


		updateCowStatsFile( inQueue, modified, idle, speedBuffer );
	}
	return NULL;
}

/**
 * @brief main loop for blockupload in the background
 */
static void *uploaderThreadMain( __attribute__((unused)) void *something )
{
	CURLM *cm;

	cm = curl_multi_init();
	curl_multi_setopt(
			cm, CURLMOPT_MAXCONNECTS, (long)MAX( COW_MAX_PARALLEL_UPLOADS, COW_MAX_PARALLEL_BACKGROUND_UPLOADS ) );


	while ( uploadLoop ) {
		uploaderLoop( false, cm );
		sleep( 2 );
	}
	logadd( LOG_DEBUG1, "start uploading the remaining blocks." );

	// force the upload of all remaining blocks because the user dismounted the image
	if ( !uploaderLoop( true, cm ) ) {
		logadd( LOG_ERROR, "one or more blocks failed to upload" );
		curl_multi_cleanup( cm );
		uploadLoopDone = true;
		return NULL;
	}
	uploadLoopDone = true;
	curl_multi_cleanup( cm );
	logadd( LOG_DEBUG1, "all blocks uploaded" );
	if ( cow_merge_after_upload ) {
		startMerge();
		logadd( LOG_DEBUG1, "Requesting merge." );
	}
	return NULL;
}

/**
 * @brief Create a Cow Stats File  an inserts the session guid
 * 
 * @param path where the file is created
 * @return true 
 * @return false if failed to create or to write into the file
 */
static bool createCowStatsFile( char *path )
{
	char pathStatus[strlen( path ) + 12];

	snprintf( pathStatus, strlen( path ) + 12, "%s%s", path, "/status.txt" );

	char buffer[100];
	int len = snprintf( buffer, 100, "uuid=%s\nstate: active\n", metadata->uuid );
	if ( statStdout ) {
		logadd( LOG_INFO, "%s", buffer );
	}
	if ( statFile ) {
		if ( ( cow.fhs = open( pathStatus, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) {
			logadd( LOG_ERROR, "Could not create cow status file. Bye.\n" );
			return false;
		}

		if ( pwrite( cow.fhs, buffer, len, 0 ) != len ) {
			logadd( LOG_ERROR, "Could not write to cow status file. Bye.\n" );
			return false;
		}
	}
	return true;
}

/**
 * @brief initializes the cow functionality, creates the data & meta file.
 * 
 * @param path where the files should be stored
 * @param image_Name name of the original file/image
 * @param imageSizePtr
 */
bool cowfile_init( char *path, const char *image_Name, uint16_t imageVersion,
		atomic_uint_fast64_t **imageSizePtr,
		char *serverAddress, bool sStdout, bool sfile )
{
	statStdout = sStdout;
	statFile = sfile;
	char pathMeta[strlen( path ) + 6];
	char pathData[strlen( path ) + 6];

	snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
	snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );

	if ( ( cow.fhm = open( pathMeta, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) {
		logadd( LOG_ERROR, "Could not create cow meta file. Bye.\n %s \n", pathMeta );
		return false;
	}

	if ( ( cow.fhd = open( pathData, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR ) ) == -1 ) {
		logadd( LOG_ERROR, "Could not create cow data file. Bye.\n" );
		return false;
	}

	int maxPageSize = 8192;

	size_t metaDataSizeHeader = sizeof( cowfile_metadata_header_t );

	cow.maxImageSize = COW_MAX_IMAGE_SIZE;
	cow.l1Size = ( ( cow.maxImageSize + COW_FULL_L2_TABLE_DATA_SIZE - 1LL ) / COW_FULL_L2_TABLE_DATA_SIZE );

	// size of l1 array + number of l2's * size of l2
	size_t metadata_size = cow.l1Size * sizeof( l1 ) + cow.l1Size * sizeof( l2 );

	// compute next fitting multiple of getpagesize()
	size_t meta_data_start = ( ( metaDataSizeHeader + maxPageSize - 1 ) / maxPageSize ) * maxPageSize;

	size_t metadataFileSize = meta_data_start + metadata_size;
	if ( ftruncate( cow.fhm, metadataFileSize ) != 0 ) {
		logadd( LOG_ERROR, "Could not set file size of meta data file (errno=%d). Bye.\n", errno );
		return false;
	}

	cow.metadata_mmap = mmap( NULL, metadataFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fhm, 0 );

	if ( cow.metadata_mmap == MAP_FAILED ) {
		logadd( LOG_ERROR, "Error while mapping mmap:\n%s \n Bye.\n", strerror( errno ) );
		return false;
	}

	metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );
	metadata->magicValue = COW_FILE_META_MAGIC_VALUE;
	metadata->version = CURRENT_COW_VERSION;
	metadata->dataFileSize = ATOMIC_VAR_INIT( COW_DATA_CLUSTER_SIZE );
	metadata->metadataFileSize = ATOMIC_VAR_INIT( metadataFileSize );
	metadata->blocksize = DNBD3_BLOCK_SIZE;
	metadata->originalImageSize = **imageSizePtr;
	metadata->imageSize = metadata->originalImageSize;
	metadata->creationTime = time( NULL );
	*imageSizePtr = &metadata->imageSize;
	metadata->metaDataStart = meta_data_start;
	metadata->bitfieldSize = COW_BITFIELD_SIZE;
	metadata->maxImageSize = cow.maxImageSize;
	snprintf( metadata->imageName, 200, "%s", image_Name );
	cow.l1 = (l1 *)( cow.metadata_mmap + meta_data_start );
	metadata->nextL2 = 0;

	for ( size_t i = 0; i < cow.l1Size; i++ ) {
		cow.l1[i] = -1;
	}
	cow.firstL2 = (l2 *)( ( (char *)cow.l1 ) + cow.l1Size );

	// write header to data file
	uint64_t header = COW_FILE_DATA_MAGIC_VALUE;
	if ( pwrite( cow.fhd, &header, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) {
		logadd( LOG_ERROR, "Could not write header to cow data file. Bye.\n" );
		return false;
	}

	pthread_mutex_init( &cow.l2CreateLock, NULL );

	cowServerAddress = serverAddress;
	curl_global_init( CURL_GLOBAL_ALL );
	curl = curl_easy_init();
	if ( !curl ) {
		logadd( LOG_ERROR, "Error on curl init. Bye.\n" );
		return false;
	}
	if ( !createSession( image_Name, imageVersion ) ) {
		return false;
	}
	createCowStatsFile( path );
	return true;
}

/**
 * @brief loads an existing cow state from the meta & data files
 * 
 * @param path where the meta & data file is located 
 * @param imageSizePtr 
 */
bool cowfile_load( char *path, atomic_uint_fast64_t **imageSizePtr, char *serverAddress, bool sStdout, bool sFile )
{
	statStdout = sStdout;
	statFile = sFile;
	cowServerAddress = serverAddress;
	curl_global_init( CURL_GLOBAL_ALL );
	curl = curl_easy_init();
	char pathMeta[strlen( path ) + 6];
	char pathData[strlen( path ) + 6];

	snprintf( pathMeta, strlen( path ) + 6, "%s%s", path, "/meta" );
	snprintf( pathData, strlen( path ) + 6, "%s%s", path, "/data" );


	if ( ( cow.fhm = open( pathMeta, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) {
		logadd( LOG_ERROR, "Could not open cow meta file. Bye.\n" );
		return false;
	}
	if ( ( cow.fhd = open( pathData, O_RDWR, S_IRUSR | S_IWUSR ) ) == -1 ) {
		logadd( LOG_ERROR, "Could not open cow data file. Bye.\n" );
		return false;
	}

	cowfile_metadata_header_t header;
	{
		size_t sizeToRead = sizeof( cowfile_metadata_header_t );
		size_t readBytes = 0;
		while ( readBytes < sizeToRead ) {
			ssize_t bytes = pread( cow.fhm, ( ( &header ) + readBytes ), sizeToRead, 0 );
			if ( bytes <= 0 ) {
				logadd( LOG_ERROR, "Error while reading meta file header. Bye.\n" );
				return false;
			}
			readBytes += bytes;
		}


		if ( header.magicValue != COW_FILE_META_MAGIC_VALUE ) {
			if ( __builtin_bswap64( header.magicValue ) == COW_FILE_META_MAGIC_VALUE ) {
				logadd( LOG_ERROR, "cow meta file of wrong endianess. Bye.\n" );
				return false;
			}
			logadd( LOG_ERROR, "cow meta file of unkown format. Bye.\n" );
			return false;
		}
		struct stat st;
		fstat( cow.fhm, &st );
		if ( st.st_size < (off_t)( header.metaDataStart + header.nextL2 * sizeof( l2 ) ) ) {
			logadd( LOG_ERROR, "cow meta file to small. Bye.\n" );
			return false;
		}
	}
	{
		uint64_t magicValueDataFile;
		if ( pread( cow.fhd, &magicValueDataFile, sizeof( uint64_t ), 0 ) != sizeof( uint64_t ) ) {
			logadd( LOG_ERROR, "Error while reading cow data file, wrong file?. Bye.\n" );
			return false;
		}

		if ( magicValueDataFile != COW_FILE_DATA_MAGIC_VALUE ) {
			if ( __builtin_bswap64( magicValueDataFile ) == COW_FILE_DATA_MAGIC_VALUE ) {
				logadd( LOG_ERROR, "cow data file of wrong endianess. Bye.\n" );
				return false;
			}
			logadd( LOG_ERROR, "cow data file of unkown format. Bye.\n" );
			return false;
		}
		struct stat st;
		fstat( cow.fhd, &st );
		if ( (off_t)header.dataFileSize > st.st_size ) {
			logadd( LOG_ERROR, "cow data file to small. Bye.\n" );
			return false;
		}
	}

	cow.metadata_mmap = mmap( NULL, header.metadataFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, cow.fhm, 0 );

	if ( cow.metadata_mmap == MAP_FAILED ) {
		logadd( LOG_ERROR, "Error while mapping mmap:\n%s \n Bye.\n", strerror( errno ) );
		return false;
	}
	if ( header.version != CURRENT_COW_VERSION ) {
		logadd( LOG_ERROR, "Error wrong file version got: %i expected: %i. Bye.\n",
				metadata->version, CURRENT_COW_VERSION );
		return false;
	}


	metadata = (cowfile_metadata_header_t *)( cow.metadata_mmap );

	*imageSizePtr = &metadata->imageSize;
	cow.l1 = (l1 *)( cow.metadata_mmap + metadata->metaDataStart );
	cow.maxImageSize = metadata->maxImageSize;
	cow.l1Size = ( ( cow.maxImageSize + COW_FULL_L2_TABLE_DATA_SIZE - 1LL ) / COW_FULL_L2_TABLE_DATA_SIZE );

	cow.firstL2 = (l2 *)( ( (char *)cow.l1 ) + cow.l1Size );
	pthread_mutex_init( &cow.l2CreateLock, NULL );
	createCowStatsFile( path );
	return true;
}
/**
 * @brief Starts the cow BackgroundThreads which are needed for stats and data upload
 * 
 */
bool cowfile_startBackgroundThreads() {

	if( pthread_create( &tidCowUploader, NULL, &uploaderThreadMain, NULL ) != 0  ) {
		logadd( LOG_ERROR, "Could not create cow uploader thread");
		return false;
	}
	if ( statFile || statStdout ) {
		if(pthread_create( &tidStatUpdater, NULL, &cowfile_statUpdater, NULL ) != 0 ) {
			logadd( LOG_ERROR, "Could not create stat updater thread");
			return false;
		}
	}
	return true;
}

/**
 * @brief writes the given data in the data file 
 * 
 * @param buffer containing the data
 * @param size of the buffer
 * @param netSize which actually contributes to the fuse write request (can be different from size if partial full blocks are written)
 * @param cowRequest  <---- !???? TODO
 * @param block block being written to
 * @param inClusterOffset offset in this cluster to be written to
 */
static void writeData( const char *buffer, ssize_t size, size_t netSize, atomic_int *errorCode,
		atomic_size_t *bytesWorkedOn, cow_l2_entry_t *block, off_t inClusterOffset )
{
	// TODO: Assert that size + inClusterOffset <= COW_DATA_CLUSTER_SIZE?
	ssize_t totalBytesWritten = 0;
	while ( totalBytesWritten < size ) {
		ssize_t bytesWritten = pwrite( cow.fhd, ( buffer + totalBytesWritten ), size - totalBytesWritten,
				block->offset + inClusterOffset + totalBytesWritten );
		if ( bytesWritten == -1 ) {
			*errorCode = errno;
			logadd( LOG_ERROR,
					"size:%zu netSize:%zu errorCode:%i bytesWorkedOn:%zu inClusterOffset:%ld block->offset:%ld \n", size,
					netSize, *errorCode, *bytesWorkedOn, inClusterOffset, block->offset );
			break;
		} else if ( bytesWritten == 0 ) {
			*errorCode = EIO;
			logadd( LOG_ERROR,
					"size:%zu netSize:%zu errorCode:%i bytesWorkedOn:%zu inClusterOffset:%ld block->offset:%ld \n", size,
					netSize, *errorCode, *bytesWorkedOn, inClusterOffset, block->offset );
			break;
		}
		totalBytesWritten += bytesWritten;
	}
	atomic_fetch_add( bytesWorkedOn, netSize );
	setBitsInBitfield( block->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ),
			(int)( ( inClusterOffset + totalBytesWritten - 1 ) / DNBD3_BLOCK_SIZE ), 1 );

	block->timeChanged = time( NULL );
}

/**
 * @brief Increases the metadata->dataFileSize by COW_DATA_CLUSTER_SIZE.
 * The space is not reserved on disk.
 * 
 * @param block for which the space should be reserved.
 */
static bool allocateMetaBlockData( cow_l2_entry_t *block )
{
	block->offset = (atomic_long)atomic_fetch_add( &metadata->dataFileSize, COW_DATA_CLUSTER_SIZE );
	return true;
}

/**
 * @brief Get the cow_l2_entry_t from l1Index and l2Index.
 * l1 offset must be valid
 * 
 * @param l1Index 
 * @param l2Index 
 * @return cow_l2_entry_t* 
 */
static cow_l2_entry_t *getL2Entry( int l1Index, int l2Index )
{
	cow_l2_entry_t *block = ( cow.firstL2[cow.l1[l1Index]] + l2Index );
	if ( block->offset == -1 ) {
		allocateMetaBlockData( block );
	}
	return block;
}

/**
 * @brief creates an new L2 Block and initializes the containing cow_l2_entry_t blocks
 * 
 * @param l1Index 
 */
static bool createL2Block( int l1Index )
{
	pthread_mutex_lock( &cow.l2CreateLock );
	if ( cow.l1[l1Index] == -1 ) {
		for ( int i = 0; i < COW_L2_TABLE_SIZE; i++ ) {
			cow.firstL2[metadata->nextL2][i].offset = -1;
			cow.firstL2[metadata->nextL2][i].timeChanged = ATOMIC_VAR_INIT( 0 );
			cow.firstL2[metadata->nextL2][i].uploads = ATOMIC_VAR_INIT( 0 );
			for ( int j = 0; j < COW_BITFIELD_SIZE; j++ ) {
				cow.firstL2[metadata->nextL2][i].bitfield[j] = ATOMIC_VAR_INIT( 0 );
			}
		}
		cow.l1[l1Index] = metadata->nextL2;
		metadata->nextL2 += 1;
	}
	pthread_mutex_unlock( &cow.l2CreateLock );
	return true;
}

/**
 * @brief Is called once a fuse write request ist finished.
 * Calls the corrsponding fuse reply depending on the type and
 * success of the request.
 * 
 * @param req fuse_req_t
 * @param cowRequest
 */

static void finishWriteRequest( fuse_req_t req, cow_request_t *cowRequest )
{
	if ( cowRequest->errorCode != 0 ) {
		fuse_reply_err( req, cowRequest->errorCode );

	} else {
		uint64_t oldSize = metadata->imageSize;
		uint64_t ns = MAX( oldSize, cowRequest->bytesWorkedOn + cowRequest->fuseRequestOffset );
		atomic_compare_exchange_strong( &metadata->imageSize, &oldSize, ns );
		fuse_reply_write( req, cowRequest->bytesWorkedOn );
	}
	free( cowRequest );
}

/**
 * @brief Called after the padding data was received from the dnbd3 server.
 * The data from the write request will be combined with the data from the server
 * so that we get a full DNBD3_BLOCK and is then written on the disk.
 * @param sRequest 
 */
static void writePaddedBlock( cow_sub_request_t *sRequest )
{
	//copy write Data
	// TODO Assert that we have enough space in writeBuffer at that offset
	memcpy( ( sRequest->writeBuffer + ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ) ), sRequest->writeSrc,
			sRequest->size );
	writeData( sRequest->writeBuffer, DNBD3_BLOCK_SIZE, (ssize_t)sRequest->size, &sRequest->cowRequest->errorCode,
			&sRequest->cowRequest->bytesWorkedOn, sRequest->block,
			( sRequest->inClusterOffset - ( sRequest->inClusterOffset % DNBD3_BLOCK_SIZE ) ) );


	if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) {
		finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest );
	}
	free( sRequest );
}

/**
 * @brief If a block does not start or finish on an multiple of DNBD3_BLOCK_SIZE, the blocks need to be
 * padded. If this block is inside the original image size, the padding data will be read from the server.
 * Otherwise it will be padded with 0 since the it must be the block at the end of the image.
 * TODO: Properly document the arguments and what value range they can be, i.e. see below for the 4k case
 * 
 */
static void padBlockFromRemote( fuse_req_t req, off_t offset, cow_request_t *cowRequest, const char *buffer,
		size_t size, cow_l2_entry_t *block, off_t inClusterOffset )
{
	// TODO: Is this *guaranteed* to be the case on the caller site? Add comment to ^
	assert( ( offset % DNBD3_BLOCK_SIZE ) + size <= DNBD3_BLOCK_SIZE );
	if ( offset >= (off_t)metadata->originalImageSize ) {
		// Writing past the end of the image
		inClusterOffset -= inClusterOffset % DNBD3_BLOCK_SIZE;
		char buf[DNBD3_BLOCK_SIZE] = { 0 };
		memcpy( buf + ( offset % DNBD3_BLOCK_SIZE ), buffer, size );
		// At this point we should have a 4k block with user-space data to write, and possibly
		// zero-padding at start and/or end

		writeData( buf, DNBD3_BLOCK_SIZE, (ssize_t)size, &cowRequest->errorCode, &cowRequest->bytesWorkedOn,
				block, inClusterOffset );
		return;
	}
	// Need to fetch padding from upstream
	cow_sub_request_t *sRequest = calloc( sizeof( cow_sub_request_t ) + DNBD3_BLOCK_SIZE, 1 );
	sRequest->callback = writePaddedBlock;
	sRequest->inClusterOffset = inClusterOffset;
	sRequest->block = block;
	sRequest->size = size;
	sRequest->writeSrc = buffer;
	sRequest->cowRequest = cowRequest;

	sRequest->dRequest.length = (uint32_t)MIN( DNBD3_BLOCK_SIZE, metadata->originalImageSize - offset );
	sRequest->dRequest.offset = offset - ( offset % DNBD3_BLOCK_SIZE );
	sRequest->dRequest.fuse_req = req;

	atomic_fetch_add( &cowRequest->workCounter, 1 );
	if ( !connection_read( &sRequest->dRequest ) ) {
		cowRequest->errorCode = EIO;
		if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) {
			finishWriteRequest( sRequest->dRequest.fuse_req, sRequest->cowRequest );
		}
		free( sRequest );
		return;
	}
}

/**
 * @brief Will be called after a dnbd3_async_t is finished.
 * Calls the corrsponding callback function, either writePaddedBlock or readRemoteData
 * depending if the original fuse request was a write or read.
 * 
 */
void cowfile_handleCallback( dnbd3_async_t *request )
{
	cow_sub_request_t *sRequest = container_of( request, cow_sub_request_t, dRequest );
	sRequest->callback( sRequest );
}


/**
 * @brief called once dnbd3_async_t is finished. Increases bytesWorkedOn by the number of bytes
 * this request had. Also checks if it was the last dnbd3_async_t to finish the fuse request, if
 * so replys to fuse and cleans up the request.
 * 
 */
void readRemoteData( cow_sub_request_t *sRequest )
{
	atomic_fetch_add( &sRequest->cowRequest->bytesWorkedOn, sRequest->dRequest.length );

	if ( atomic_fetch_sub( &sRequest->cowRequest->workCounter, 1 ) == 1 ) {
		if ( sRequest->cowRequest->bytesWorkedOn < sRequest->cowRequest->fuseRequestSize ) {
			// TODO: Is this a logic bug somewhere, reagarding accounting?
			// Because connection_read() will always return exactly as many bytes as requested,
			// or simply never finish.
			// Otherwise, we should return EIO...
			logadd( LOG_ERROR, "pad read to small\n" );
		}
		fuse_reply_buf( sRequest->dRequest.fuse_req, sRequest->cowRequest->readBuffer,
				sRequest->cowRequest->bytesWorkedOn );
		free( sRequest->cowRequest->readBuffer );
		free( sRequest->cowRequest );
	}
	free( sRequest );
}

/**
 * @brief changes the imageSize
 * 
 * @param req fuse request
 * @param size new size the image should have
 * @param ino fuse_ino_t
 * @param fi fuse_file_info
 */

void cowfile_setSize( fuse_req_t req, size_t size, fuse_ino_t ino, struct fuse_file_info *fi )
{
	// decrease
	if ( size < metadata->imageSize ) {
		if ( size < metadata->originalImageSize ) {
			metadata->originalImageSize = size;
		}
		// TODO.... so....
		// originalImageSize = smallest we have seen
		// imageSize = current
		// ?

		// increase
	} else if ( size > metadata->imageSize ) {
		off_t offset = metadata->imageSize;
		int l1Index = offsetToL1Index( offset );
		int l2Index = offsetToL2Index( offset );
		int l1EndIndex = offsetToL1Index( size );
		int l2EndIndex = offsetToL2Index( size );
		// special case first block TODO: What is the special case? What is happening here?
		if ( cow.l1[l1Index] != -1 ) {
			cow_l2_entry_t *block = getL2Entry( l1Index, l2Index );
			if ( metadata->imageSize % DNBD3_BLOCK_SIZE != 0 ) {
				off_t inClusterOffset = metadata->imageSize % COW_DATA_CLUSTER_SIZE;
				size_t sizeToWrite = DNBD3_BLOCK_SIZE - ( metadata->imageSize % DNBD3_BLOCK_SIZE );

				if ( checkBit( block->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ) ) ) {
					char buf[sizeToWrite];
					memset( buf, 0, sizeToWrite );

					ssize_t bytesWritten = pwrite( cow.fhd, buf, sizeToWrite, block->offset + inClusterOffset );

					if ( bytesWritten < (ssize_t)sizeToWrite ) {
						fuse_reply_err( req, bytesWritten == -1 ? errno : EIO );
						return;
					}
					block->timeChanged = time( NULL );
					offset += sizeToWrite;
				}
			}
			// rest of block set bits 0
			l1Index = offsetToL1Index( offset );
			l2Index = offsetToL2Index( offset );
			block = getL2Entry( l1Index, l2Index );
			off_t inClusterOffset = offset % COW_DATA_CLUSTER_SIZE;
			setBitsInBitfield(
					block->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ), ( COW_BITFIELD_SIZE * 8 ) - 1, 0 );
			block->timeChanged = time( NULL );
			l2Index++;
			if ( l2Index >= COW_L2_TABLE_SIZE ) {
				l2Index = 0;
				l1Index++;
			}
		}
		// null all bitfields
		while ( !( l1Index > l1EndIndex || ( l1Index == l1EndIndex && l2EndIndex < l2Index ) ) ) {
			if ( cow.l1[l1Index] == -1 ) {
				l1Index++;
				l2Index = 0;
				continue;
			}

			cow_l2_entry_t *block = getL2Entry( l1Index, l2Index );
			setBitsInBitfield( block->bitfield, 0, ( COW_BITFIELD_SIZE * 8 ) - 1, 0 );
			block->timeChanged = time( NULL );
			l2Index++;
			if ( l2Index >= COW_L2_TABLE_SIZE ) {
				l2Index = 0;
				l1Index++;
			}
		}
	}
	metadata->imageSize = size;
	if ( req != NULL ) {
		image_ll_getattr( req, ino, fi );
	}
}

/**
 * @brief Implementation of a write request.
 * 
 * @param req fuse_req_t
 * @param cowRequest 
 * @param offset Offset where the write starts,
 * @param size Size of the write.
 */
void cowfile_write( fuse_req_t req, cow_request_t *cowRequest, off_t offset, size_t size )
{
	// if beyond end of file, pad with 0
	if ( offset > (off_t)metadata->imageSize ) {
		cowfile_setSize( NULL, offset, 0, NULL );
	}


	off_t currentOffset = offset;
	off_t endOffset = offset + size;

	int l1Index = offsetToL1Index( currentOffset );
	int l2Index = offsetToL2Index( currentOffset );
	while ( currentOffset < endOffset ) {
		if ( cow.l1[l1Index] == -1 ) {
			createL2Block( l1Index );
		}
		//loop over L2 array (metadata)
		while ( currentOffset < endOffset && l2Index < COW_L2_TABLE_SIZE ) {
			cow_l2_entry_t *metaBlock = getL2Entry( l1Index, l2Index );

			// Calc absolute offset in image corresponding to current cluster
			size_t clusterAbsoluteStartOffset = l1Index * COW_FULL_L2_TABLE_DATA_SIZE + l2Index * COW_DATA_CLUSTER_SIZE;

			size_t inClusterOffset = currentOffset - clusterAbsoluteStartOffset;
			// How many bytes we can write to this cluster before crossing a boundary, or before the write request is completed
			size_t bytesToWriteToCluster =
					MIN( (size_t)( endOffset - currentOffset ), COW_DATA_CLUSTER_SIZE - inClusterOffset );

			/////////////////////////
			// lock for the half block probably needed
			if ( currentOffset % DNBD3_BLOCK_SIZE != 0
					&& !checkBit( metaBlock->bitfield, (int)( inClusterOffset / DNBD3_BLOCK_SIZE ) ) ) {
				// Block has not been written locally before, and write does not start on block boundary.
				// Need to fetch the first couple bytes of the block from remote before writing the block to disk.
				size_t writeSize = MIN( bytesToWriteToCluster, DNBD3_BLOCK_SIZE - ( (size_t)currentOffset % DNBD3_BLOCK_SIZE ) );
				const char *sbuf = cowRequest->writeBuffer + ( ( currentOffset - offset ) );
				padBlockFromRemote( req, currentOffset, cowRequest, sbuf, writeSize, metaBlock, (off_t)inClusterOffset );
				currentOffset += writeSize;
				continue;
			}

			size_t endPaddedSize = 0; // In case we need to skip over a pending pad request to remote
			if ( ( currentOffset + bytesToWriteToCluster ) % DNBD3_BLOCK_SIZE != 0
					&& metadata->originalImageSize > currentOffset + bytesToWriteToCluster ) {
				// Write request does not end on block boundary, and ends before end of image
				// End offset of this write
				off_t clusterEndOffset = currentOffset + bytesToWriteToCluster;
				// Start of last block of write, i.e. start of the last, incomplete block
				off_t lastBlockStartOffset = clusterEndOffset - ( clusterEndOffset % DNBD3_BLOCK_SIZE );
				// Where that last block starts relative to its cluster
				off_t inClusterBlockOffset = lastBlockStartOffset - clusterAbsoluteStartOffset;
				if ( !checkBit( metaBlock->bitfield, (int)( inClusterBlockOffset / DNBD3_BLOCK_SIZE ) ) ) {
					// Block indeed not modified before, need to fetch
					const char *sbuf = cowRequest->writeBuffer + ( ( lastBlockStartOffset - offset ) );
					padBlockFromRemote( req, lastBlockStartOffset, cowRequest, sbuf, clusterEndOffset - lastBlockStartOffset, metaBlock,
							inClusterBlockOffset );


					bytesToWriteToCluster -= clusterEndOffset - lastBlockStartOffset;
					endPaddedSize = clusterEndOffset - lastBlockStartOffset;
				}
			}
			writeData( cowRequest->writeBuffer + ( ( currentOffset - offset ) ), (ssize_t)bytesToWriteToCluster,
					bytesToWriteToCluster, &cowRequest->errorCode, &cowRequest->bytesWorkedOn, metaBlock, inClusterOffset );

			currentOffset += bytesToWriteToCluster;
			// Account for skipped-over bytes
			currentOffset += endPaddedSize;


			l2Index++;
		}
		l1Index++;
		l2Index = 0;
	}
	if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
		finishWriteRequest( req, cowRequest );
	}
}


/**
 * @brief Request data, that is not available locally, via the network.
 * 
 * @param req fuse_req_t 
 * @param offset from the start of the file
 * @param size of data to request
 * @param buffer into which the data is to be written
 * @param workCounter workCounter is increased by one and later reduced by one again when the request is completed. TODO There is no such param, but cowRequest..
 */
static void readRemote( fuse_req_t req, off_t offset, ssize_t size, char *buffer, cow_request_t *cowRequest )
{
	// edgecase: Image size got reduced before on a non block border
	if ( offset + size > (long int) metadata->originalImageSize ) { // TODO How does this check if it's a non block border?
		size_t padZeroSize = ( offset + size ) - metadata->originalImageSize;
		off_t padZeroOffset = metadata->originalImageSize - offset;
		assert( offset > 0 ); // TODO Should this be padZeroOffset?
		// ... But isn't it possible that offset > originalImageSize, in which case it would be negative?
		memset( ( buffer + padZeroOffset ), 0, padZeroSize );

		atomic_fetch_add( &cowRequest->bytesWorkedOn, padZeroSize );
	}
	cow_sub_request_t *sRequest = malloc( sizeof( cow_sub_request_t ) );
	sRequest->callback = readRemoteData;
	sRequest->dRequest.length = (uint32_t)size;
	sRequest->dRequest.offset = offset;
	sRequest->dRequest.fuse_req = req;
	sRequest->cowRequest = cowRequest;
	sRequest->buffer = buffer;

	atomic_fetch_add( &cowRequest->workCounter, 1 );
	if ( !connection_read( &sRequest->dRequest ) ) {
		cowRequest->errorCode = EIO; // TODO We set an error...
		free( sRequest );
		if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
			// .... but would still report success if this happens to be the last pending sub-request!?
			fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn );
		}
		free( cowRequest->readBuffer );
		free( cowRequest );
		return;
	}
}

/**
 * @brief Get the Block Data Source object
 * 
 * @param block 
 * @param bitfieldOffset 
 * @param offset 
 * @return enum dataSource 
 */
enum dataSource getBlockDataSource( cow_l2_entry_t *block, off_t bitfieldOffset, off_t offset )
{
	if ( block != NULL && checkBit( block->bitfield, (int)bitfieldOffset ) ) {
		return local;
	}
	if ( offset >= (off_t)metadata->originalImageSize ) {
		return zero;
	}
	return remote;
}

/**
 * @brief Reads data at given offset. If the data are available locally,
 * they are read locally, otherwise they are requested remotely.
 * 
 * @param req fuse_req_t
 * @param size of date to read
 * @param offset offset where the read starts.
 * @return uint64_t Number of bytes read.
 */
void cowfile_read( fuse_req_t req, size_t size, off_t offset )
{
	cow_request_t *cowRequest = malloc( sizeof( cow_request_t ) );
	cowRequest->fuseRequestSize = size;
	cowRequest->bytesWorkedOn = ATOMIC_VAR_INIT( 0 );
	cowRequest->workCounter = ATOMIC_VAR_INIT( 1 );
	cowRequest->errorCode = ATOMIC_VAR_INIT( 0 );
	cowRequest->readBuffer = malloc( size );
	cowRequest->fuseRequestOffset = offset;
	off_t lastReadOffset = offset;
	off_t endOffset = offset + size;
	off_t searchOffset = offset;
	int l1Index = offsetToL1Index( offset );
	int l2Index = offsetToL2Index( offset );
	int bitfieldOffset = getBitfieldOffsetBit( offset );
	enum dataSource dataState;
	cow_l2_entry_t *cluster = NULL;

	if ( cow.l1[l1Index] != -1 ) {
		cluster = getL2Entry( l1Index, l2Index );
	}

	bool doRead = false;
	bool firstLoop = true;
	bool updateBlock = false;
	while ( searchOffset < endOffset ) {
		if ( firstLoop ) {
			firstLoop = false;
			lastReadOffset = searchOffset;
			// TODO: Why is this only set on first iteration and not for every block/cluster?
			dataState = getBlockDataSource( cluster, bitfieldOffset, searchOffset );
		} else if ( getBlockDataSource( cluster, bitfieldOffset, searchOffset ) != dataState ) {
			// TODO So data source changed, but we don't update the dataState var... How can this possibly work?
			doRead = true;
		} else {
			bitfieldOffset++;
		}

		if ( bitfieldOffset >= COW_BITFIELD_SIZE * 8 ) {
			// Advance to next cluster in current l2 table
			bitfieldOffset = 0;
			l2Index++;
			if ( l2Index >= COW_L2_TABLE_SIZE ) {
				// Advance to next l1 entry, reset l2 index
				l2Index = 0;
				l1Index++;
			}
			// Also set flag that we need to update the 'cluster' struct at the end of this iteration
			// TODO: Why do we update all the values above, but not the cluster struct? We access those
			// variables in the code below, so we have updated offset and index, but operate on the
			// old cluster struct. How does that make sense?
			updateBlock = true;
			if ( dataState == local ) {
				doRead = true;
			}
		}
		// compute the original file offset from bitfieldOffset, l2Index and l1Index
		// TODO ??? As stated above, this is using the updated values, so isn't this the next
		// offset tather than original offset?
		searchOffset = DNBD3_BLOCK_SIZE * ( bitfieldOffset ) + l2Index * COW_DATA_CLUSTER_SIZE
				+ l1Index * COW_FULL_L2_TABLE_DATA_SIZE;
		if ( doRead || searchOffset >= endOffset ) {
			ssize_t sizeToRead = MIN( searchOffset, endOffset );
			if ( dataState == remote ) {
				if ( sizeToRead > (ssize_t) metadata->originalImageSize ) {
					//pad rest with 0
					memset( cowRequest->readBuffer
									+ ( ( lastReadOffset - offset ) + ( metadata->originalImageSize - offset ) ),
							0, sizeToRead - metadata->originalImageSize );
					atomic_fetch_add( &cowRequest->bytesWorkedOn, sizeToRead - metadata->originalImageSize );
					sizeToRead = metadata->originalImageSize;
				}
				sizeToRead -= lastReadOffset;
				readRemote(
						req, lastReadOffset, sizeToRead, cowRequest->readBuffer + ( lastReadOffset - offset ), cowRequest );
			} else if ( dataState == zero ) {
				sizeToRead -= lastReadOffset;
				memset( cowRequest->readBuffer + ( lastReadOffset - offset ), 0, sizeToRead );
				atomic_fetch_add( &cowRequest->bytesWorkedOn, sizeToRead );
			} else {
				sizeToRead -= lastReadOffset;
				// Compute the offset in the data file where the read starts
				off_t localRead =
						cluster->offset + ( ( lastReadOffset % COW_FULL_L2_TABLE_DATA_SIZE ) % COW_DATA_CLUSTER_SIZE );
				ssize_t totalBytesRead = 0;
				while ( totalBytesRead < sizeToRead ) {
					ssize_t bytesRead =
							pread( cow.fhd, cowRequest->readBuffer + ( lastReadOffset - offset ), sizeToRead, localRead );
					if ( bytesRead == -1 ) {
						cowRequest->errorCode = errno;
						goto fail;
					} else if ( bytesRead <= 0 ) {
						cowRequest->errorCode = EIO;
						goto fail;
					}
					totalBytesRead += bytesRead;
				}

				atomic_fetch_add( &cowRequest->bytesWorkedOn, totalBytesRead );
			}
			lastReadOffset = searchOffset;
			doRead = false;
			firstLoop = true;
		}

		if ( updateBlock ) {
			if ( cow.l1[l1Index] != -1 ) {
				cluster = getL2Entry( l1Index, l2Index );
			} else {
				cluster = NULL;
			}
			updateBlock = false;
		}
	}
fail:;
	if ( atomic_fetch_sub( &cowRequest->workCounter, 1 ) == 1 ) {
		if ( cowRequest->errorCode != 0 || cowRequest->bytesWorkedOn < size ) {
			logadd( LOG_ERROR, "incomplete read or I/O error (errno=%d)", cowRequest->errorCode );
			fuse_reply_err( req, cowRequest->errorCode != 0 ? cowRequest->errorCode : EIO );
		} else {
			fuse_reply_buf( req, cowRequest->readBuffer, cowRequest->bytesWorkedOn );
		}
		free( cowRequest->readBuffer );
		free( cowRequest );
	}
}


/**
 * @brief stops the StatUpdater and CowUploader threads
 * and waits for them to finish, then cleans up curl.
 * 
 */
void cowfile_close()
{
	uploadLoop = false;
	if ( statFile || statStdout ) {
		pthread_join( tidStatUpdater, NULL );
	}
	pthread_join( tidCowUploader, NULL );

	if ( curl ) {
		curl_global_cleanup();
		curl_easy_cleanup( curl );
	}
}