/*
* cow.c
*
* Created on: 22.08.2018
* Author: michael
*/
#include "cow.h"
#include <stdlib.h>
#include <sys/mman.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
#define ClearBit(A,k) ( A[(k/32)] &= ~(1 << (k%32)) )
#define TestBit(A,k) ( A[(k/32)] & (1 << (k%32)) )
#define SetBit(A,k) ( A[(k/32)] |= (1 << (k%32)) )
const unsigned int version = 1;
uint64_t MaxImageSizeInBytes=1099511627776; //1 Tebibyte in byte
off_t *filePointers;
uint64_t imageSubBlockCount;
size_t imageBlockCount;
int fh;
off_t dataStart;
bool debug = true;
uint64_t remoteImageSize;
typedef struct {
cow_request *head;
cow_request *tail;
} cow_requests_queue;
#define SIGPOOLSIZE 6
static cow_requests_queue cowRequestsactive;
static cow_requests_queue cowRequestsQueued;
static pthread_spinlock_t requestsQueueLock;
static pthread_spinlock_t sigLock;
static dnbd3_signal_t *signalPool[SIGPOOLSIZE];
static dnbd3_signal_t **sigEnd = signalPool + SIGPOOLSIZE;
static void signalInit()
{
pthread_spin_init( &sigLock, PTHREAD_PROCESS_PRIVATE );
for ( size_t i = 0; i < SIGPOOLSIZE; ++i ) {
signalPool[i] = NULL;
}
}
static inline dnbd3_signal_t *signalGet()
{
pthread_spin_lock( &sigLock );
for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
if ( *it != NULL ) {
dnbd3_signal_t *ret = *it;
*it = NULL;
pthread_spin_unlock( &sigLock );
return ret;
}
}
pthread_spin_unlock( &sigLock );
return signal_newBlocking();
}
static inline void signalPut(dnbd3_signal_t *signal)
{
pthread_spin_lock( &sigLock );
for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) {
if ( *it == NULL ) {
*it = signal;
pthread_spin_unlock( &sigLock );
return;
}
}
pthread_spin_unlock( &sigLock );
signal_close( signal );
}
static void enqueueCowRequest(cow_requests_queue queue,cow_request *request)
{
request->next = NULL;
if ( queue.head == NULL ) {
queue.head = queue.tail = request;
} else {
queue.tail->next = request;
queue.tail = request;
}
}
static cow_request* removeCowRequest(cow_requests_queue queue,cow_request *request)
{
cow_request *iterator, *prev = NULL;
for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) {
if ( iterator == request ) {
// Found it, break!
if ( prev != NULL ) {
prev->next = iterator->next;
} else {
queue.head = iterator->next;
}
if ( queue.tail == iterator ) {
queue.tail = prev;
}
break;
}
prev = iterator;
}
return iterator;
}
void writeImageSizeToFile(uint64_t size){
lseek(fh,sizeof(unsigned int),SEEK_SET);
write(fh, &size, sizeof(uint64_t));
}
bool create_cow_file(char *cow_path, char *image_Name,uint64_t imageSize){
remoteImageSize =(size_t) imageSize;
if((fh = open (cow_path, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR))==-1){
puts( "Could not create COW File. Bye.\n" );
return false;
}
write(fh,&version,sizeof(unsigned int));
write(fh,&imageSize,sizeof(uint64_t));
int nameLenght =(unsigned int) strlen(image_Name);
write(fh,&nameLenght,sizeof(unsigned int));
write(fh,image_Name,sizeof(char)*strlen(image_Name));
int pageSize = getpagesize();
write(fh,&pageSize,sizeof(int));
imageBlockCount = (MaxImageSizeInBytes/(1024*1024));
off_t mmapStart = lseek(fh, 0L, SEEK_CUR);
int maxPageSize = 8192;
//compute next fitting multiple of getpagesize()
mmapStart= ((mmapStart+maxPageSize-1)/maxPageSize)*maxPageSize;
//write(fh,&mmapStart,sizeof(mmapStart));
dataStart = lseek(fh, mmapStart+imageBlockCount*sizeof(off_t), SEEK_CUR);
if(dataStart ==-1){
close(fh);
printf("Error expanding file for mmap.Bye.\n ");
return false;
}
write(fh, "", 1);
dataStart = ((dataStart+pageSize-1)/pageSize)*pageSize;
filePointers = mmap(NULL,imageBlockCount*sizeof(off_t), PROT_READ | PROT_WRITE, MAP_SHARED,fh, mmapStart);
if(filePointers == MAP_FAILED ){
close(fh);
printf("Error creating mmap in COW File.\n%s\nBye.\n ", strerror(errno));
return false;
}
for(unsigned int i = 0; i < imageBlockCount;i++){
filePointers[i]=0;
}
if(debug){
printf("Creating COW File\n");
printf("Version: %u\n",version);
printf("ImageName: %s\n",image_Name);
printf("Size: %ld\n", (long)imageSize);
printf("Blocks: %i\n",imageBlockCount);
printf("mmap start: %i\n",mmapStart);
printf("End: %lu\n",lseek(fh,0L,SEEK_CUR));
}
signalInit();
if(pthread_spin_init( &requestsQueueLock, PTHREAD_PROCESS_PRIVATE ) != 0){
printf("Spinlock init failure");
}
return true;
}
bool load_cow_file(char *cow_path, uint64_t imageSize){
remoteImageSize =(size_t) imageSize;
if((fh = open (cow_path, O_RDONLY))==-1){
printf( "Could not open Cow File.\n" );
return false;
}
unsigned int version ;
uint64_t size;
read(fh,&version,sizeof(unsigned int));
read(fh,&size,sizeof(uint64_t));
int l;
read(fh,&l,sizeof(int));
char *buffer = malloc(sizeof(char)*(l+1));
read(fh,buffer,l*sizeof(char));
buffer[l]='\0';
int pageSize;
read(fh,&pageSize,sizeof(int));
if(debug){
printf("Version: %u\n",version);
printf("länge: %i \n",l);
printf("Image Name: %s\n",buffer);
printf("Size: %ld\n", (long)size);
printf("pageSize: %i\n", (pageSize));
}
free(buffer);
//TODO Image Validation
setImagesize(size);
off_t mmapStart = lseek(fh, 0L, SEEK_CUR);
int maxPageSize = 8192;
mmapStart= ((mmapStart+maxPageSize-1)/maxPageSize)*maxPageSize;
imageBlockCount = (MaxImageSizeInBytes/(1024*1024));
filePointers = mmap(NULL,imageBlockCount*sizeof(off_t), PROT_READ, MAP_SHARED,fh, mmapStart);
if(filePointers == MAP_FAILED ){
printf("Error creating mmap in COW File.\n%s\nBye.\n ", strerror(errno));
close(fh);
return false;
}
signalInit();
if(pthread_spin_init( &requestsQueueLock, PTHREAD_PROCESS_PRIVATE ) != 0){
printf("Spinlock init failure");
}
return true;
}
bool createBigBlock(unsigned long id){
//goto File end and then to next 4096 Block
off_t blockStart = lseek(fh,0, SEEK_END);
blockStart= ((blockStart+4096-1)/4096)*4096;
filePointers[id] = blockStart;
// go to next Page size
off_t currentFilePos= lseek(fh,blockStart, SEEK_SET);
int blockState[8] = {0};
write(fh,&blockState,sizeof(int)*8);
// go to next Page size
currentFilePos = (((currentFilePos+sizeof(int)*8)+4096-1)/4096)*4096;
currentFilePos = lseek(fh,currentFilePos, SEEK_SET);
char data[256*4096] = {0};
write(fh,&data,sizeof(char)*4096*256);
return true;
}
void onClose(){
close(fh);
}
bool queckQueuesForDependencies(cow_request *request,cow_requests_queue queue){
bool foundDependencie = false;
if(queue.head !=NULL){
cow_request *iterator = NULL;
for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) {
if(!(((request->end)<iterator->offset)||(request->offset >(iterator->end)))){
foundDependencie = true;
int i=0;
while(i<=6)
{
if(i==6){
printf("Error to much requests in queue");
break;
}else if(iterator->dependencies[i]==NULL){
iterator->dependencies[i] = request;
break;
}
i++;
}
i=0;
while(i<=6)
{
if(i==6){
printf("Error to much requests in queue");
break;
}else if(request->myDependencies[i]==NULL){
request->myDependencies[i]=request;
break;
}
i++;
}
}
}
}
return foundDependencie;
}
cow_request getAccess(off_t offset,size_t size){
cow_request request;
request.offset = (offset-(offset % 4096));
request.end =(offset+(off_t)size)+( (offset+(off_t)size)% 4096)-1;
request.signal = signalGet();
for (int i = 0; i < 6; i++){
request.dependencies[i] = NULL;
}
for (int i = 0; i < 6; i++){
request.myDependencies[i] = NULL;
}
pthread_spin_lock( &requestsQueueLock );
if(queckQueuesForDependencies(&request, cowRequestsactive)){
queckQueuesForDependencies(&request, cowRequestsQueued);
enqueueCowRequest(cowRequestsQueued,&request);
pthread_spin_unlock( &requestsQueueLock );
int ret = -1;
while(ret<0){
int ret = signal_wait( request.signal, 5000 );
if(ret<0){
printf("Error Cow Request timed out");
}
}
signalPut(request.signal);
}else{
enqueueCowRequest(cowRequestsactive,&request);
pthread_spin_unlock( &requestsQueueLock );
}
return request;
}
void closeAcccess(cow_request *request){
pthread_spin_lock(&requestsQueueLock);
removeCowRequest(cowRequestsactive, request);
for (int i = 0; i< 6; i++){
cow_request *otherRequest = request->dependencies[i];
if(otherRequest != NULL){
bool canStart = true;
for (int j = 0; j< 6; j++){
if(otherRequest->myDependencies[j] == request){
otherRequest->myDependencies[j] = NULL;
}else if(otherRequest->myDependencies[j] != NULL){
canStart=false;
}
}
if(canStart){
removeCowRequest(cowRequestsQueued, otherRequest);
enqueueCowRequest(cowRequestsactive, otherRequest);
signal_call(otherRequest->signal);
}
}
request->dependencies[i] = NULL;
}
pthread_spin_unlock(&requestsQueueLock);
}
int write_cow(const char *data, size_t size, off_t offset) {
int writtenBytes = 0;
size_t sizeToBigBlock = 0;
off_t bigBlockOffset = 0;
off_t bigBlockStart = 0;
unsigned long bigBlockStartId = (offset)/(4096*256);
unsigned long bigBlockId = bigBlockStartId;
cow_request request = getAccess(offset,size);
while(writtenBytes < size){
bigBlockStart = (bigBlockId*(4096*256));
bigBlockOffset =(offset+writtenBytes)- bigBlockStart;
// how much i can write in this block
//TODO CHECK THIS
//sizeToBigBlock = 4096*256 - (bigBlockOffset -bigBlockStart);
sizeToBigBlock = 4096*256 - bigBlockOffset;
if((size-writtenBytes)< sizeToBigBlock){
sizeToBigBlock = size-writtenBytes;
}
writtenBytes += writeToBigBlock(bigBlockId,data+writtenBytes,bigBlockOffset,sizeToBigBlock);
bigBlockId++;
}
///////////////////////////////
char *tmp = malloc(size);
cow_read(tmp, size, offset);
if(debug){
if(strncmp(data, tmp,size) != 0){
printf("Error\n");
printf("%.*s", size, data);
printf("\n");
printf("%.*s", size, tmp);
}
}
free(tmp);
///////////////////////////////
closeAcccess(&request);
return writtenBytes;
}
/*
* Writes Data in a bigblock, offset reltive to start of bigblock
*
*/
int writeToBigBlock(unsigned long bigBlockId,const char *data, off_t offset,size_t size){
int writtenBytes = 0;
if(filePointers[bigBlockId]==0){
createBigBlock(bigBlockId);
}
int firstSmallBlock = getSmallBlockId(offset);
int lastSmallBlock = getSmallBlockId(offset+size-1);
if(firstSmallBlock>255||lastSmallBlock>255){
printf("Error SmallBLock > 255");
}
lseek(fh,filePointers[bigBlockId],SEEK_SET);
int blockState[8] ;
read(fh,&blockState,sizeof(int)*8);
//If not on Block border and don't have this block, get the data.
if((offset % 4096 !=0 )&&(!TestBit(blockState,firstSmallBlock))){
size_t sizeToPrepend = offset % 4096;
lseek(fh,filePointers[bigBlockId]+4096+offset-sizeToPrepend,SEEK_SET);
char *startData = calloc(sizeToPrepend,1);
off_t offsetToPrepend =offset+(bigBlockId*(4096*256))-sizeToPrepend;
// TODO CHECK THIS
//offsetToPrepend = offsetToPrepend - (offsetToPrepend % 4096);
if((((uint64_t)offsetToPrepend)+((uint64_t)sizeToPrepend)) > remoteImageSize ){
sizeToPrepend = remoteImageSize-offsetToPrepend;
}
if(((uint64_t)offsetToPrepend)< remoteImageSize){
image_read_internal(startData,sizeToPrepend, offsetToPrepend);
}
write(fh,startData,(offset % 4096));
free(startData);
} else{
lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET);
}
writtenBytes +=(int)write(fh,data,size);
//If not on Block border and don't have this block, get the data.
if(((offset+size) % 4096 !=0 )&&(!TestBit(blockState,lastSmallBlock))){
size_t sizeToAppend= 4096-((((size_t)offset)+size)% 4096);
//TODO TEST -1?
off_t offsetToAppend = bigBlockId*256*4096+((off_t)offset)+size;
char *startData = calloc(sizeToAppend,1);
if((((size_t)offsetToAppend)+sizeToAppend) > remoteImageSize ){
sizeToAppend = remoteImageSize-offsetToAppend;
}
if(((size_t)offsetToAppend)< remoteImageSize){
image_read_internal(startData,sizeToAppend, offsetToAppend);
}
//lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET);
write(fh,startData,(4096-((((size_t)offset)+size)% 4096)));
free(startData);
}
for (long i = firstSmallBlock; i <= lastSmallBlock;i++ ){
SetBit(blockState,i);
}
lseek(fh,filePointers[bigBlockId],SEEK_SET);
write(fh,&blockState,sizeof(int)*8);
return writtenBytes;
}
int getSmallBlockId(off_t offset){
return (int)(offset/4096) % 256;
}
int cow_read(char *buf, size_t size, off_t offset)
{
unsigned long bigBlockStartId = (offset)/(4096*256);
//TODO CHECK THIS
unsigned long bigBlockEndId = (offset+size-1)/(4096*256);
unsigned long bigBlockId = bigBlockStartId;
cow_request request =getAccess(offset,size);
size_t bigBlockStart = (bigBlockId*(4096*256));
size_t bigBlockOffset =offset- bigBlockStart;
// how much i can write from this block
//TODO IS THIS RIGHT?
size_t sizeToBigBlock = ((4096*256) - (bigBlockOffset));
if(sizeToBigBlock>size){
sizeToBigBlock = size;
}
int bytesRead = readBigBlock(bigBlockStartId, buf,sizeToBigBlock, bigBlockOffset);
if(bigBlockStartId != bigBlockEndId && (size-sizeToBigBlock)>0){
bytesRead +=readBigBlock(bigBlockEndId, buf+sizeToBigBlock,(size-sizeToBigBlock), 0);
}
closeAcccess(&request);
return bytesRead;
}
int readBigBlock(long bigBlockId ,char *buf,size_t size, off_t offset){
// If block isn't local
if(filePointers[bigBlockId] == 0){
return image_read_internal(buf, size, (offset+(bigBlockId*(4096*256))));
}
int blockState[8];
lseek(fh,filePointers[bigBlockId],SEEK_SET);
read(fh,&blockState,sizeof(int)*8);
int block = getSmallBlockId(offset);
int endBlock = getSmallBlockId(offset+size-1);
int startBlock;
char *curBuf = buf;
size_t readBytes = 0;
while(readBytes < size){
startBlock=block;
if(!TestBit(blockState,block)){
while(!TestBit(blockState,block)&& block!= endBlock){
block++;
if(block >255){
printf("ERROR SmallBlack id > 255");
}
}
off_t startOffset = offset+readBytes;
size_t sizeToRead = ((block+1)*4096)-startOffset;
if(sizeToRead>size-readBytes){
sizeToRead= size-readBytes;
}
size_t sizeToRemoteRead = sizeToRead;
if(((uint64_t)sizeToRead)+((uint64_t)startOffset)>remoteImageSize){
if(((uint64_t)startOffset)>remoteImageSize){
sizeToRemoteRead = 0;
}else{
sizeToRemoteRead =((size_t)remoteImageSize)-startOffset;
}
}
startOffset = startOffset+(bigBlockId*4096*256);
if(sizeToRemoteRead > 0){
readBytes +=image_read_internal(curBuf, sizeToRemoteRead,startOffset);
}
if(readBytes < sizeToRead){
for(int i =((int)readBytes); readBytes < sizeToRead; i++){
curBuf[i] = 0;
readBytes++;
}
}
curBuf +=sizeToRead;
/*
off_t startOffset = startBlock*4096;
if(startOffset < offset){
startOffset = offset;
}
size_t sizeToRead = ((block+1)*4096)-startOffset;
if(sizeToRead>size-readBytes){
sizeToRead= size-readBytes;
}
size_t sizeToRemoteRead = sizeToRead;
if(((uint64_t)sizeToRead)+((uint64_t)startOffset)>remoteImageSize){
if(((uint64_t)startOffset)>remoteImageSize){
sizeToRemoteRead = 0;
}else{
sizeToRemoteRead =((size_t)remoteImageSize)-startOffset;
}
}
//request data over network
//TODO Check if offset compution is correct
startOffset = startOffset+(bigBlockId*4096*256);
if(sizeToRemoteRead > 0){
readBytes +=image_read_internal(curBuf, sizeToRemoteRead,startOffset);
}
// If on File End fill up rest with 0
if(((uint64_t)sizeToRead)+((uint64_t)startOffset)>remoteImageSize){
for(size_t i = 0; i<(sizeToRead-sizeToRemoteRead); i++){
curBuf[i] = 0;
}
readBytes+=(sizeToRead-sizeToRemoteRead);
}
curBuf +=sizeToRead;
*/
}else{
while(TestBit(blockState,block)&& block!= endBlock){
block++;
if(block >255){
printf("ERROR SmallBlack id > 255");
}
}
off_t startOffset = offset+readBytes;
size_t sizeToRead = ((block+1)*4096)-startOffset;
if(sizeToRead>size-readBytes){
sizeToRead= size-readBytes;
}
//read Data local
lseek(fh,filePointers[bigBlockId]+4096+startOffset,SEEK_SET);
readBytes += read(fh,curBuf,sizeToRead);
curBuf +=readBytes;
/*
off_t startOffset = startBlock*4096;
if(startOffset < offset){
startOffset = offset;
}
size_t sizeToRead = ((block+1)*4096)-startOffset;
if(sizeToRead>size-readBytes){
sizeToRead= size-readBytes;
}
//read Data local
lseek(fh,filePointers[bigBlockId]+4096+startOffset,SEEK_SET);
readBytes += read(fh,curBuf,sizeToRead);
curBuf +=readBytes;
*/
}
}
return (int) readBytes;
}
/*
void test(){
create_cow_file("cow.bin","sampleImage",10000000);
printf("====Test=====\n");
unsigned int versionAfterRead ;
lseek(fh,0, SEEK_SET);
read(fh,&versionAfterRead,sizeof(unsigned int));
unsigned int l;
read(fh,&l,sizeof(unsigned int));
char *buffer = malloc(sizeof(char)*(l+1));
read(fh,buffer,l*sizeof(char));
buffer[l]='\0';
uint64_t size;
read(fh,&size,sizeof(uint64_t));
int pageSize;
read(fh,&pageSize,sizeof(int));
printf("Version: %u\n",versionAfterRead);
printf("länge: %i \n",l);
printf("Image Name: %s\n",buffer);
printf("Size: %ld\n", (long)size);
printf("pageSize: %i\n", pageSize);
free(buffer);
//int imageSubBlockCount = (int)( size + 4095 ) / 4096;
//int imageBlockCount= (imageSubBlockCount+255)/256;
//for(int i = 0; i < imageBlockCount;i++){
// printf("Pointer: %i Value: %ld \n",i, (long)filePointers[i]);
//}
char *smpledata = "1111111";
write_cow(smpledata, 7, 310);
createBigBlock(0);
createBigBlock(5);
createBigBlock(2);
for(int i = 0; i < imageBlockCount;i++){
printf("Pointer: %i Value: %ld \n",i, (long)filePointers[i]);
}
puts("success");
onClose();
}
*/