/*
* cow.c
*
* Created on: 22.08.2018
* Author: Michael Scherle
*/
#include "cow.h"
#include "../types.h"
#include <stdlib.h>
#include <sys/mman.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <inttypes.h>
#include "../shared/log.h"
#define ClearBit(A,k) ( A[(k/32)] &= ~(1 << (k%32)) )
#define TestBit(A,k) ( A[(k/32)] & (1 << (k%32)) )
#define SetBit(A,k) ( A[(k/32)] |= (1 << (k%32)) )
const unsigned int version = 1;
int fh;
uint64_t MaxImageSizeInBytes = 1099511627776; //1 Tebibyte in byte
off_t *filePointers;
size_t imageBlockCount;
bool debug = true;
uint64_t remoteImageSize;
cow_metadata metadata;
typedef struct {
cow_request *head;
cow_request *tail;
} cow_requests_queue;
#define SIGPOOLSIZE 6
static cow_requests_queue cowRequestsactive;
static cow_requests_queue cowRequestsQueued;
static pthread_spinlock_t requestsQueueLock;
static pthread_spinlock_t sigLock;
static dnbd3_signal_t *signalPool[SIGPOOLSIZE];
static dnbd3_signal_t **sigEnd = signalPool + SIGPOOLSIZE;
static void signalInit()
{
pthread_spin_init( &sigLock, PTHREAD_PROCESS_PRIVATE );
for ( size_t i = 0; i < SIGPOOLSIZE; ++i ) {
signalPool[i] = NULL;
}
}
static inline dnbd3_signal_t *signalGet()
{
pthread_spin_lock( &sigLock );
for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
if ( *it != NULL ) {
dnbd3_signal_t *ret = *it;
*it = NULL;
pthread_spin_unlock( &sigLock );
return ret;
}
}
pthread_spin_unlock( &sigLock );
return signal_newBlocking();
}
static inline void signalPut(dnbd3_signal_t *signal)
{
pthread_spin_lock( &sigLock );
for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
if ( *it == NULL ) {
*it = signal;
pthread_spin_unlock( &sigLock );
return;
}
}
pthread_spin_unlock( &sigLock );
signal_close( signal );
}
static void enqueueCowRequest(cow_requests_queue queue,cow_request *request)
{
request->next = NULL;
if ( queue.head == NULL ) {
queue.head = queue.tail = request;
} else {
queue.tail->next = request;
queue.tail = request;
}
}
static cow_request* removeCowRequest(cow_requests_queue queue,cow_request *request)
{
cow_request *iterator, *prev = NULL;
for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) {
if ( iterator == request ) {
// Found it, break!
if ( prev != NULL ) {
prev->next = iterator->next;
} else {
queue.head = iterator->next;
}
if ( queue.tail == iterator ) {
queue.tail = prev;
}
break;
}
prev = iterator;
}
return iterator;
}
void cow_writeImageSizeToFile(uint64_t size)
{
if(debug) {
printf("ImageSize Changed to %"PRIu64"\n", size);
}
size = net_order_64(size);
metadata.imageSize = size;
pwrite( fh, &metadata, sizeof(cow_metadata), 0 );
}
uint64_t cow_init(char *cow_path, char *image_Name, uint64_t imageSize, bool overWrite){
if( access( cow_path, F_OK ) != -1 && !overWrite ) {
return loadFile( cow_path, image_Name, imageSize );
} else {
return createFile( cow_path, image_Name, imageSize );
}
}
uint64_t createFile(char *cow_path, char *image_Name, uint64_t imageSize)
{
remoteImageSize = imageSize;
if(( fh = open (cow_path, O_RDWR|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR) ) == -1 ){
logadd( LOG_ERROR, "Could not create COW File. Bye.\n" );
return false;
}
int pageSize = getpagesize();
int nameLenght = (unsigned int) strlen( image_Name );
metadata.version = version;
metadata.imageSize = imageSize;
metadata.nameLenght = nameLenght;
metadata.pageSize = pageSize;
fixup_cow_metadata(metadata);
write( fh, &metadata, sizeof(metadata));
write( fh, image_Name, sizeof(char)*strlen( image_Name ));
imageBlockCount = ( MaxImageSizeInBytes / (1024 * 1024));
off_t mmapStart = lseek( fh, 0L, SEEK_CUR);
int maxPageSize = 8192;
//compute next fitting multiple of getpagesize()
mmapStart = (( mmapStart + maxPageSize - 1 ) / maxPageSize ) * maxPageSize;
pwrite( fh, "", 1, mmapStart + imageBlockCount * sizeof( uint64_t ));
filePointers = mmap( NULL, imageBlockCount * sizeof( uint64_t ), PROT_READ | PROT_WRITE, MAP_SHARED, fh, mmapStart );
if(filePointers == MAP_FAILED ){
close( fh );
printf( "Error creating mmap in COW File.\n%s\nBye.\n ", strerror( errno ) );
return 0;
}
for( unsigned int i = 0; i < imageBlockCount; i++ ){
filePointers[i] = 0;
}
if( debug ){
printf( "Creating COW File\n" );
printf( "Version: %u\n", version );
printf( "ImageName: %s\n", image_Name );
printf( "Size: %"PRIu64"\n", imageSize );
printf( "Blocks: %"PRIu64"\n", imageBlockCount );
printf( "mmap start: %"PRIu64"\n", mmapStart );
printf( "End: %lu\n", lseek( fh, 0L, SEEK_CUR ) );
}
signalInit();
if( pthread_spin_init( &requestsQueueLock, PTHREAD_PROCESS_PRIVATE ) != 0 ){
printf( "Spinlock init failure" );
}
return imageSize;
}
uint64_t loadFile( char *cow_path, char *image_Name, uint64_t imageSize )
{
remoteImageSize = imageSize;
if(( fh = open (cow_path, O_RDWR, S_IRUSR|S_IWUSR)) == -1 ){
logadd( LOG_ERROR, "Could not load COW File. Bye.\n" );
return false;
}
read( fh, &metadata, sizeof( cow_metadata ) );
fixup_cow_metadata( metadata );
char *imageName = malloc(sizeof(char) * ( metadata.nameLenght ) );
read( fh, imageName, metadata.nameLenght * sizeof( char ) );
off_t mmapStart = lseek( fh, 0L, SEEK_CUR );
int maxPageSize = 8192;
mmapStart = ( ( mmapStart + maxPageSize - 1) / maxPageSize ) * maxPageSize;
if( debug ){
printf( "Version: %u\n", metadata.version );
printf( "länge: %i \n", metadata.nameLenght);
printf( "Image Name: %s\n", imageName );
printf( "Size: %ld\n", (long) metadata.imageSize );
printf( "pageSize: %i\n", metadata.pageSize );
printf( "mmap start: %"PRIu64"\n", mmapStart );
}
if( strcmp( image_Name, imageName ) != 0 ) {
logadd( LOG_ERROR, "Wrong COW File for this Image.\n" );
return 0;
}
free( imageName );
imageBlockCount = ( MaxImageSizeInBytes / (1024 * 1024) );
filePointers = mmap( NULL, imageBlockCount * sizeof( uint64_t ), PROT_READ | PROT_WRITE, MAP_SHARED, fh, mmapStart );
if( filePointers == MAP_FAILED ){
printf( "Error creating mmap in COW File.\n%s\nBye.\n", strerror( errno ) );
close( fh );
return 0;
}
signalInit();
if( pthread_spin_init( &requestsQueueLock, PTHREAD_PROCESS_PRIVATE ) != 0 ){
printf( "Spinlock init failure" );
}
uint64_t imageSizeCow = metadata.imageSize;
fixup_cow_metadata( metadata );
return imageSizeCow ;
}
bool createBigBlock(unsigned long id)
{
//goto File end and then to next 4096 Block
off_t blockStart = lseek( fh, 0, SEEK_END );
blockStart = ( ( blockStart + 4096 - 1) / 4096) * 4096;
filePointers[id] = (uint64_t) blockStart;
// go to next Page size
int blockState[8] = {0};
pwrite( fh, &blockState, sizeof( int ) * 8, blockStart);
// go to next Page size
blockStart = ( ( ( blockStart + sizeof( int ) * 8 ) + 4096 - 1 ) / 4096 ) * 4096;
char data[ 256 * 4096 ] = {0};
pwrite( fh, &data, sizeof( char ) * 4096 * 256, blockStart);
return true;
}
void onClose()
{
close(fh);
}
bool queckAndUpdateQueues(cow_request *request, cow_requests_queue queue)
{
bool foundDependencie = false;
if( queue.head != NULL ){
cow_request *iterator = NULL;
for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) {
if( !( ( ( request->end ) < iterator->offset ) || ( request->offset > ( iterator->end ) ) ) ) {
foundDependencie = true;
int i = 0;
while( i <= 6)
{
if( i == 6 ){
printf( "Error to much requests in queue" );
break;
}else if( iterator->dependencies[i] == NULL ) {
iterator->dependencies[i] = request;
break;
}
i++;
}
i = 0;
while( i <= 6 )
{
if( i == 6 ) {
printf( "Error to much requests in queue" );
break;
} else if ( request->myDependencies[i] == NULL ) {
request->myDependencies[i] = request;
break;
}
i++;
}
}
}
}
return foundDependencie;
}
cow_request getAccess(off_t offset, size_t size)
{
cow_request request;
request.offset = ( offset - ( offset % 4096 ) );
request.end = ( offset + (off_t) size)
+ ( ( offset + (off_t)size ) % 4096) - 1;
request.signal = signalGet();
for ( int i = 0; i < 6; i++ ) {
request.dependencies[i] = NULL;
}
for ( int i = 0; i < 6; i++ ) {
request.myDependencies[i] = NULL;
}
pthread_spin_lock( &requestsQueueLock );
if( queckAndUpdateQueues( &request, cowRequestsactive ) ) {
queckAndUpdateQueues( &request, cowRequestsQueued);
enqueueCowRequest(cowRequestsQueued, &request);
pthread_spin_unlock( &requestsQueueLock );
int ret = -1;
while( ret < 0 ){
int ret = signal_wait( request.signal, 5000 );
if( ret < 0 ) {
printf( "Error Cow Request timed out" );
}
}
signalPut( request.signal );
}else{
enqueueCowRequest( cowRequestsactive, &request );
pthread_spin_unlock( &requestsQueueLock );
}
return request;
}
void closeAcccess(cow_request *request)
{
pthread_spin_lock( &requestsQueueLock );
removeCowRequest( cowRequestsactive, request );
for ( int i = 0; i< 6; i++ ) {
cow_request *otherRequest = request->dependencies[i];
if( otherRequest != NULL ) {
bool canStart = true;
for ( int j = 0; j< 6; j++ ) {
if( otherRequest->myDependencies[j] == request ) {
otherRequest->myDependencies[j] = NULL;
} else if( otherRequest->myDependencies[j] != NULL ) {
canStart=false;
}
}
if( canStart ) {
removeCowRequest( cowRequestsQueued, otherRequest );
enqueueCowRequest( cowRequestsactive, otherRequest );
signal_call( otherRequest->signal );
}
}
request->dependencies[i] = NULL;
}
pthread_spin_unlock(&requestsQueueLock);
}
int cow_write(const char *data, size_t size, off_t offset)
{
int writtenBytes = 0;
size_t sizeToBigBlock = 0;
off_t bigBlockOffset = 0;
off_t bigBlockStart = 0;
unsigned long bigBlockStartId = offset / ( 4096 * 256 );
unsigned long bigBlockId = bigBlockStartId;
cow_request request = getAccess( offset, size );
while ( writtenBytes < (int) size ) {
bigBlockStart = ( bigBlockId * ( 4096 * 256 ) );
bigBlockOffset =( offset + writtenBytes )- bigBlockStart;
// how much i can write in this block
sizeToBigBlock = 4096 * 256 - bigBlockOffset;
if( ( size - writtenBytes ) < sizeToBigBlock ) {
sizeToBigBlock = size - writtenBytes;
}
writtenBytes += writeToBigBlock( bigBlockId, data + writtenBytes, bigBlockOffset, sizeToBigBlock );
bigBlockId++;
}
///////////////////////////////
/*
if( debug ){
char *tmp = malloc( size );
cow_read( tmp, size, offset );
if( strncmp( data, tmp, size ) != 0 ) {
printf( "Error\n" );
printf( "%.*s", size, data );
printf( "\n");
printf( "%.*s", size, tmp );
}
free(tmp);
}
*/
///////////////////////////////
closeAcccess( &request );
return writtenBytes;
}
/*
* Writes Data in a bigblock, offset reltive to start of bigblock
*
*/
int writeToBigBlock(unsigned long bigBlockId, const char *data, off_t offset,size_t size)
{
int writtenBytes = 0;
if( filePointers[bigBlockId] == 0 ) {
createBigBlock( bigBlockId );
}
int firstSmallBlock = getSmallBlockId( offset );
int lastSmallBlock = getSmallBlockId( offset + size - 1) ;
if( firstSmallBlock > 255 || lastSmallBlock > 255 ) {
printf( "Error SmallBLock > 255" );
}
int blockState[8] ;
pread( fh, &blockState, sizeof( int ) * 8, ( (off_t) filePointers[bigBlockId] ) );
//If not on Block border and don't have this block, get the data.
if( ( offset % 4096 != 0 ) && ( !TestBit( blockState, firstSmallBlock ) ) ) {
size_t sizeToPrepend = offset % 4096;
char *startData = calloc(sizeToPrepend,1);
off_t offsetToPrepend = offset + ( bigBlockId * ( 4096 * 256 ) ) - sizeToPrepend;
if( ( ( ( uint64_t ) offsetToPrepend ) + ( ( uint64_t ) sizeToPrepend) ) > remoteImageSize ) {
sizeToPrepend = remoteImageSize-offsetToPrepend;
}
if( ( ( uint64_t ) offsetToPrepend ) < remoteImageSize ) {
imageReadInternal( startData, sizeToPrepend, offsetToPrepend );
}
pwrite( fh, startData, ( offset % 4096 ),( (off_t) filePointers[bigBlockId] + 4096 + offset - sizeToPrepend ) );
free( startData );
}
int written = ( int ) pwrite( fh, data, size, ( (off_t) filePointers[bigBlockId] + 4096 + offset ) );
if( written < 0 ){
printf ( "Error on writing to Cow File, size: %zu offset: %"PRIu64"\n Error: %s \n", size, filePointers[bigBlockId] + 4096 + offset, strerror( errno ) );
}else {
writtenBytes += written;
}
//If not on Block border and don't have this block, get the data.
if( ( ( offset + size ) % 4096 != 0 ) && ( !TestBit( blockState, lastSmallBlock ) ) ) {
size_t sizeToAppend= 4096 - ( ( ( ( size_t ) offset ) + size ) % 4096 );
off_t offsetToAppend = bigBlockId * 256 * 4096 +( ( off_t ) offset ) + size;
char *startData = calloc( sizeToAppend, 1 );
if( ( ( ( size_t ) offsetToAppend ) + sizeToAppend ) > remoteImageSize ) {
sizeToAppend = remoteImageSize - offsetToAppend;
}
if( ( ( size_t ) offsetToAppend) < remoteImageSize ) {
imageReadInternal( startData, sizeToAppend, offsetToAppend );
}
pwrite( fh, startData, ( 4096 -( ( ( ( size_t ) offset ) + size ) % 4096 ) ), ( (off_t) filePointers[bigBlockId] + 4096 + offset + size ) );
free( startData );
}
for ( long i = firstSmallBlock; i <= lastSmallBlock; i++ ) {
SetBit( blockState, i );
}
pwrite( fh, &blockState, sizeof( int32_t ) * 8, ( (off_t) filePointers[bigBlockId] ) );
return writtenBytes;
}
int getSmallBlockId(off_t offset)
{
return ( int ) ( offset / 4096 ) % 256;
}
int cow_read(char *buf, size_t size, off_t offset)
{
unsigned long bigBlockStartId = offset / ( 4096 * 256 );
unsigned long bigBlockEndId = ( offset + size - 1) / ( 4096 * 256 );
unsigned long bigBlockId = bigBlockStartId;
cow_request request = getAccess( offset, size );
size_t bigBlockStart = ( bigBlockId * ( 4096 * 256 ) );
size_t bigBlockOffset = offset- bigBlockStart;
// how much i can read from this block
size_t sizeToBigBlock = ( ( 4096 * 256 ) - bigBlockOffset );
if( sizeToBigBlock > size) {
sizeToBigBlock = size;
}
int bytesRead = readBigBlock( bigBlockStartId, buf, sizeToBigBlock, bigBlockOffset );
if( bigBlockStartId != bigBlockEndId && ( size - sizeToBigBlock ) > 0 ) {
bytesRead += readBigBlock( bigBlockEndId, buf + sizeToBigBlock, ( size - sizeToBigBlock ), 0 );
}
closeAcccess( &request );
return bytesRead;
}
int readBigBlock(long bigBlockId, char *buf, size_t size, off_t offset)
{
// If block isn't local
if( filePointers[bigBlockId] == 0) {
return imageReadInternal( buf, size, ( offset + ( bigBlockId * ( 4096 * 256 ) ) ) );
}
int blockState[8];
pread( fh, &blockState, sizeof( int ) * 8, ( (off_t) filePointers[bigBlockId] ));
int block = getSmallBlockId( offset );
int endBlock = getSmallBlockId( offset + size - 1 );
size_t readBytes = 0;
char *curBuf = buf;
while( readBytes < size ) {
if( !TestBit( blockState, block ) ) {
// test
while( !TestBit( blockState, ( block + 1 ) ) && block != endBlock ) {
block++;
if( block > 255 ) {
printf( "ERROR SmallBlack id > 255" );
}
}
off_t startOffset = offset + readBytes;
size_t sizeToRead = ( ( block + 1 ) * 4096 ) - startOffset;
if( sizeToRead > size - readBytes ) {
sizeToRead = size - readBytes;
}
size_t sizeToRemoteRead = sizeToRead;
if( ( ( uint64_t ) sizeToRead ) + ( ( uint64_t ) startOffset ) > remoteImageSize ) {
if( ( ( uint64_t ) startOffset ) > remoteImageSize ) {
sizeToRemoteRead = 0;
} else {
sizeToRemoteRead = ( ( size_t ) remoteImageSize ) - startOffset;
}
}
startOffset = startOffset + ( bigBlockId * 4096 * 256 );
if( sizeToRemoteRead > 0 ) {
readBytes += imageReadInternal( (curBuf), sizeToRemoteRead, startOffset );
curBuf += sizeToRemoteRead;
}
/*
char str[sizeToRemoteRead];
readBytes += imageReadInternal( str, sizeToRemoteRead, startOffset );
memcpy( buf + readBytes, str, sizeToRemoteRead );
*/
if( readBytes < sizeToRead ) {
for( int i = ( ( int )readBytes ); readBytes < sizeToRead; i++ ) {
buf[i] = 0;
readBytes++;
}
}
} else {
//test
while( TestBit( blockState, (block + 1) ) && block != endBlock) {
block++;
if( block > 255 ) {
printf( "ERROR SmallBlack id > 255" );
}
}
off_t startOffset = offset + readBytes;
size_t sizeToRead = ( ( block + 1 ) * 4096) - startOffset;
if( sizeToRead > size - readBytes ) {
sizeToRead = size - readBytes;
}
//read Data local
size_t singleReadBytes = pread(fh, (curBuf), sizeToRead, ( (off_t) filePointers[bigBlockId] + 4096 + startOffset ) );
/*
char str[sizeToRead];
size_t singleReadBytes = pread(fh, str, sizeToRead, ( (off_t) filePointers[bigBlockId] + 4096 + startOffset ) );
memcpy( buf + readBytes, str, sizeToRead);
*/
if (singleReadBytes < sizeToRead) {
printf("Error on reading data from COW File. File end reached?");
}
curBuf += singleReadBytes;
readBytes += singleReadBytes;
}
block++;
}
return (int) readBytes;
}