From c10ba55bc317ebbac8024bef95703d04e8f3394a Mon Sep 17 00:00:00 2001 From: Michael Scherle Date: Mon, 8 Oct 2018 04:22:20 -0700 Subject: Initial Commit --- src/fuse/cow.c | 673 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 673 insertions(+) create mode 100644 src/fuse/cow.c (limited to 'src/fuse/cow.c') diff --git a/src/fuse/cow.c b/src/fuse/cow.c new file mode 100644 index 0000000..1834ce7 --- /dev/null +++ b/src/fuse/cow.c @@ -0,0 +1,673 @@ +/* + * cow.c + * + * Created on: 22.08.2018 + * Author: michael + */ + +#include "cow.h" + + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define ClearBit(A,k) ( A[(k/32)] &= ~(1 << (k%32)) ) +#define TestBit(A,k) ( A[(k/32)] & (1 << (k%32)) ) +#define SetBit(A,k) ( A[(k/32)] |= (1 << (k%32)) ) +const unsigned int version = 1; +uint64_t MaxImageSizeInBytes=1099511627776; //1 Tebibyte in byte +off_t *filePointers; +uint64_t imageSubBlockCount; +size_t imageBlockCount; +int fh; +off_t dataStart; +bool debug = true; +uint64_t remoteImageSize; + + + +typedef struct { + cow_request *head; + cow_request *tail; +} cow_requests_queue; + + +#define SIGPOOLSIZE 6 +static cow_requests_queue cowRequestsactive; +static cow_requests_queue cowRequestsQueued; +static pthread_spinlock_t requestsQueueLock; +static pthread_spinlock_t sigLock; +static dnbd3_signal_t *signalPool[SIGPOOLSIZE]; +static dnbd3_signal_t **sigEnd = signalPool + SIGPOOLSIZE; + +static void signalInit() +{ + pthread_spin_init( &sigLock, PTHREAD_PROCESS_PRIVATE ); + for ( size_t i = 0; i < SIGPOOLSIZE; ++i ) { + signalPool[i] = NULL; + } +} +static inline dnbd3_signal_t *signalGet() +{ + pthread_spin_lock( &sigLock ); + for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) { + if ( *it != NULL ) { + dnbd3_signal_t *ret = *it; + *it = NULL; + pthread_spin_unlock( &sigLock ); + return ret; + } + } + pthread_spin_unlock( &sigLock ); + return signal_newBlocking(); +} +static inline void signalPut(dnbd3_signal_t *signal) +{ + pthread_spin_lock( &sigLock ); + for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) { + if ( *it == NULL ) { + *it = signal; + pthread_spin_unlock( &sigLock ); + return; + } + } + pthread_spin_unlock( &sigLock ); + signal_close( signal ); +} + +static void enqueueCowRequest(cow_requests_queue queue,cow_request *request) +{ + request->next = NULL; + + if ( queue.head == NULL ) { + queue.head = queue.tail = request; + } else { + queue.tail->next = request; + queue.tail = request; + } +} + +static cow_request* removeCowRequest(cow_requests_queue queue,cow_request *request) +{ + cow_request *iterator, *prev = NULL; + for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) { + if ( iterator == request ) { + // Found it, break! + if ( prev != NULL ) { + prev->next = iterator->next; + } else { + queue.head = iterator->next; + } + if ( queue.tail == iterator ) { + queue.tail = prev; + } + break; + } + prev = iterator; + } + return iterator; +} + + + +bool create_cow_file(char *cow_path, char *image_Name,uint64_t imageSize){ + + remoteImageSize =(size_t) imageSize; + if((fh = open (cow_path, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR))==-1){ + puts( "Could not create COW File. Bye.\n" ); + return false; + } + + write(fh,&version,sizeof(unsigned int)); + int nameLenght =(unsigned int) strlen(image_Name); + write(fh,&nameLenght,sizeof(unsigned int)); + write(fh,image_Name,sizeof(char)*strlen(image_Name)); + write(fh,&imageSize,sizeof(uint64_t)); + int pageSize = getpagesize(); + write(fh,&pageSize,sizeof(int)); + imageBlockCount = (MaxImageSizeInBytes/(1024*1024)); + + + off_t mmapStart = lseek(fh, 0L, SEEK_CUR); + + int maxPageSize = 8192; + //compute next fitting multiple of getpagesize() + + + mmapStart= ((mmapStart+maxPageSize-1)/maxPageSize)*maxPageSize; + //write(fh,&mmapStart,sizeof(mmapStart)); + + + + dataStart = lseek(fh, mmapStart+imageBlockCount*sizeof(off_t), SEEK_CUR); + if(dataStart ==-1){ + close(fh); + printf("Error expanding file for mmap.Bye.\n "); + return false; + } + write(fh, "", 1); + dataStart = ((dataStart+pageSize-1)/pageSize)*pageSize; + + filePointers = mmap(NULL,imageBlockCount*sizeof(off_t), PROT_READ | PROT_WRITE, MAP_SHARED,fh, mmapStart); + if(filePointers == MAP_FAILED ){ + close(fh); + printf("Error creating mmap in COW File.\n%s\nBye.\n ", strerror(errno)); + return false; + } + + + for(unsigned int i = 0; i < imageBlockCount;i++){ + filePointers[i]=0; + } + + if(debug){ + printf("Creating COW File\n"); + printf("Version: %u\n",version); + printf("ImageName: %s\n",image_Name); + printf("Size: %ld\n", (long)imageSize); + printf("Blocks: %i\n",imageBlockCount); + printf("mmap start: %i\n",mmapStart); + printf("End: %lu\n",lseek(fh,0L,SEEK_CUR)); + } + signalInit(); + if(pthread_spin_init( &requestsQueueLock, PTHREAD_PROCESS_PRIVATE ) != 0){ + printf("Spinlock init failure"); + } + return true; +} + + + + +bool createBigBlock(unsigned long id){ + + //goto File end and then to next pagesize + off_t blockStart = lseek(fh,0, SEEK_END); + int pageSize = getpagesize(); + blockStart= ((blockStart+pageSize-1)/pageSize)*pageSize; + filePointers[id] = blockStart; + // go to next Page size + off_t currentFilePos= lseek(fh,blockStart, SEEK_SET); + int blockState[8] = {0}; + write(fh,&blockState,sizeof(int)*8); + + // go to next Page size + currentFilePos = (((currentFilePos+sizeof(int)*8)+pageSize-1)/pageSize)*pageSize; + currentFilePos = lseek(fh,currentFilePos, SEEK_SET); + char data[256*4096] = {0}; + write(fh,&data,sizeof(char)*4096*256); + + return true; +} + + +void onClose(){ + close(fh); +} + +bool queckQueuesForDependencies(cow_request *request,cow_requests_queue queue){ + bool foundDependencie = false; + if(queue.head !=NULL){ + cow_request *iterator = NULL; + for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) { + if(!(((request->end)offset)||(request->offset >(iterator->end)))){ + foundDependencie = true; + int i=0; + while(i<=6) + { + if(i==6){ + printf("Error to much requests in queue"); + break; + }else if(iterator->dependencies[i]==NULL){ + iterator->dependencies[i] = request; + break; + } + i++; + } + i=0; + while(i<=6) + { + if(i==6){ + printf("Error to much requests in queue"); + break; + }else if(request->myDependencies[i]==NULL){ + request->myDependencies[i]=request; + break; + } + i++; + } + } + } + } + return foundDependencie; +} + + + +cow_request getAccess(off_t offset,size_t size){ + // TODO LOCK THIS ON BLOCKSIZE + cow_request request; + request.offset = (offset-(offset % 4096)); + request.end =(offset+(off_t)size)+( (offset+(off_t)size)% 4096)-1; + request.signal = signalGet(); + for (int i = 0; i < 6; i++){ + request.dependencies[i] = NULL; + } + for (int i = 0; i < 6; i++){ + request.myDependencies[i] = NULL; + } + pthread_spin_lock( &requestsQueueLock ); + + if(queckQueuesForDependencies(&request, cowRequestsactive)){ + queckQueuesForDependencies(&request, cowRequestsQueued); + enqueueCowRequest(cowRequestsQueued,&request); + pthread_spin_unlock( &requestsQueueLock ); + int ret = signal_wait( request.signal, 200000 ); + if(ret<0){ + //TODO + printf("Error CowRequest timed out"); + } + signalPut(request.signal); + + }else{ + enqueueCowRequest(cowRequestsactive,&request); + pthread_spin_unlock( &requestsQueueLock ); + } + return request; + // CHECK AKTIVE -> not enque start + + + + /* + pthread_mutex_lock(&bigBlockLockArray_mutex); + if( (!TestBit(bigBlockLockArray,id1)) &&(!TestBit(bigBlockLockArray,id2))){ + SetBit(bigBlockLockArray,id1); + SetBit(bigBlockLockArray,id2); + pthread_mutex_unlock(&bigBlockLockArray_mutex); + }else{ + cow_request request; + request.bigBlockStartId = id1; + request.bigBlockEndId = id2; + request.signal = signalGet(); + enqueueCowRequest(&request); + pthread_mutex_unlock(&bigBlockLockArray_mutex); + + int ret = signal_wait( request.signal, 200000 ); + if(ret<0){ + //TODO + printf("Error CowRequest timed out"); + } + signalPut(request.signal); + } + */ +} + + +void closeAcccess(cow_request *request){ + pthread_spin_lock( &requestsQueueLock ); + removeCowRequest(cowRequestsactive,request); + for (int i = 0;i< 6; i++){ + cow_request *otherRequest = request->dependencies[i]; + if(otherRequest != NULL){ + bool canStart=true; + for (int j = 0;j< 6; j++){ + if(otherRequest->myDependencies[j]==otherRequest){ + otherRequest->myDependencies[j]=NULL; + }else if(otherRequest->myDependencies[j]!=NULL){ + canStart=false; + } + + } + if(canStart){ + + removeCowRequest(cowRequestsQueued,otherRequest); + enqueueCowRequest(cowRequestsactive,otherRequest); + signal_call(otherRequest->signal); + } + } + request->dependencies[i] = NULL; + } + pthread_spin_unlock( &requestsQueueLock ); + + + /* + pthread_mutex_lock(&bigBlockLockArray_mutex); + pthread_spin_lock( &cow_requests.lock ); + ClearBit(bigBlockLockArray,id1); + ClearBit(bigBlockLockArray,id2); + if(cow_requests.head !=NULL){ + cow_request *iterator,*prev = NULL; + for ( iterator = cow_requests.head; iterator != NULL; iterator = iterator->next ) { + if((iterator->bigBlockStartId == id1 )||(iterator->bigBlockEndId == id1 )){ + + if((!TestBit(bigBlockLockArray,iterator->bigBlockStartId))&&(!TestBit(bigBlockLockArray,iterator->bigBlockEndId))){ + SetBit(bigBlockLockArray,iterator->bigBlockStartId); + SetBit(bigBlockLockArray,iterator->bigBlockEndId); + removeFromQueueUnsafe(iterator, prev); + signal_call(iterator->signal); + + if(id1 == id2){ + pthread_spin_unlock( &cow_requests.lock); + pthread_mutex_unlock(&bigBlockLockArray_mutex); + return; + }else{ + id1 = id2; + } + } + }else if((iterator->bigBlockStartId == id2 )||(iterator->bigBlockEndId == id2 )){ + if((!TestBit(bigBlockLockArray,iterator->bigBlockStartId))&&(!TestBit(bigBlockLockArray,iterator->bigBlockEndId))){ + removeFromQueueUnsafe(iterator, prev); + SetBit(bigBlockLockArray,iterator->bigBlockStartId); + SetBit(bigBlockLockArray,iterator->bigBlockEndId); + signal_call(iterator->signal); + + if(id1 == id2){ + pthread_spin_unlock( &cow_requests.lock ); + pthread_mutex_unlock(&bigBlockLockArray_mutex); + return; + }else{ + id2 = id1; + } + } + } + prev = iterator; + //iterator++; + } + } + pthread_spin_unlock( &cow_requests.lock ); + pthread_mutex_unlock(&bigBlockLockArray_mutex); + */ +} + + +int write_cow(const char *data, size_t size, off_t offset) { + size_t totalWrittenBytes = 0; + int writtenBytes = 0; + size_t sizeToBigBlock = 0; + off_t bigBlockOffset = 0; + off_t bigBlockStart = 0; + unsigned long bigBlockStartId = (offset)/(4096*256); + unsigned long bigBlockId = bigBlockStartId; + cow_request request = getAccess(offset,size); + while(totalWrittenBytes < size){ + + + + bigBlockStart = (bigBlockId*(4096*256)); + bigBlockOffset =(offset+writtenBytes)- bigBlockStart; + + + // how much i can write in this block + sizeToBigBlock = 4096*256 - (bigBlockOffset -bigBlockStart); + + if((size-writtenBytes)< sizeToBigBlock){ + sizeToBigBlock = size-writtenBytes; + } + + writtenBytes = writeToBigBlock(bigBlockId,data+totalWrittenBytes,bigBlockOffset,sizeToBigBlock); + totalWrittenBytes +=writtenBytes; + bigBlockId++; + } + + closeAcccess(&request); + return writtenBytes; +} + + +/* + * Writes Data in a bigblock, offset reltive to start of bigblock + * + */ + + +int writeToBigBlock(unsigned long bigBlockId,const char *data, off_t offset,size_t size){ + int writtenBytes = 0; + if(filePointers[bigBlockId]==0){ + createBigBlock(bigBlockId); + } + int firstSmallBlock = getSmallBlockId(offset); + int lastSmallBlock = getSmallBlockId(offset+size); + if(firstSmallBlock>255||lastSmallBlock>255){ + printf("Error SmallBLock > 255"); + } + lseek(fh,filePointers[bigBlockId],SEEK_SET); + int blockState[8] ; + read(fh,&blockState,sizeof(int)*8); + lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET); + + //If not on Block border and don't have this block, get the data. + if((offset % 4096 !=0 )&&(!TestBit(blockState,firstSmallBlock))){ + char *startData = malloc(offset % 4096); + + off_t tmp =offset+(bigBlockId*(4096*256)); + tmp = tmp - (tmp % 4096); + image_read_internal(startData, offset % 4096, tmp); + + write(fh,startData,offset % 4096); + free(startData); + + } + + writtenBytes +=(int)write(fh,data,size); + //If not on Block border and don't have this block, get the data. + if(((offset+size) % 4096 !=0 )&&(!TestBit(blockState,lastSmallBlock))){ + + size_t sizeToAppend= 4096-((((size_t)offset)+size)% 4096); + //TODO TEST -1? + off_t offsetToAppend = bigBlockId*256*4096+((off_t)offset)+size; + + char *startData = calloc(sizeToAppend,1); + + + if((((size_t)offsetToAppend)+sizeToAppend) > remoteImageSize ){ + + sizeToAppend = remoteImageSize-offsetToAppend; + } + if(((size_t)offsetToAppend)< remoteImageSize){ + image_read_internal(startData,sizeToAppend, offsetToAppend); + } + //lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET); + write(fh,startData,sizeToAppend); + free(startData); + /* + //size_t tmpSize = (4096-((offset+size) % 4096)); + size_t tmpSize = (((size_t)offset)+size); + tmpSize= tmpSize %4096; + tmpSize = 4096-tmpSize; + + off_t tmp =(offset+size-tmpSize)+(bigBlockId*(4096*256)); + + + if(((size_t)tmp) < remoteImageSize ){ + char *startData = calloc(tmpSize,1); + size_t remoteReadSize = tmpSize; + if ( (tmp + tmpSize) > remoteImageSize ) { + remoteReadSize = remoteImageSize - tmp; + + } + image_read_internal(startData,tmpSize, remoteReadSize); + lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET); + write(fh,startData,tmpSize); + free(startData); + + } + */ + } + + for (long i = firstSmallBlock; i < lastSmallBlock;i++ ){ + SetBit(blockState,i); + } + lseek(fh,filePointers[bigBlockId],SEEK_SET); + write(fh,&blockState,sizeof(int)*8); + return writtenBytes; +} + + + +int getSmallBlockId(off_t offset){ + + return (int)(offset/4096) % 256; +} +int cow_read(char *buf, size_t size, off_t offset) +{ + + unsigned long bigBlockStartId = (offset)/(4096*256); + unsigned long bigBlockEndId = (offset+size)/(4096*256); + unsigned long bigBlockId = bigBlockStartId; + cow_request request =getAccess(offset,size); + size_t bigBlockStart = (bigBlockId*(4096*256)); + size_t bigBlockOffset =offset- bigBlockStart; + // how much i can write from this block + //TODO IS THIS RIGHT? + size_t sizeToBigBlock = ((4096*256) - (bigBlockOffset)); + if(sizeToBigBlock>size){ + sizeToBigBlock = size; + } + + int bytesRead = readBigBlock(bigBlockStartId, buf,sizeToBigBlock, bigBlockOffset); + if(bigBlockStartId != bigBlockEndId && (size-sizeToBigBlock)>0){ + + bytesRead +=readBigBlock(bigBlockEndId, buf+sizeToBigBlock,(size-sizeToBigBlock), 0); + } + closeAcccess(&request); + return bytesRead; +} + + + +int readBigBlock(long bigBlockId ,char *buf,size_t size, off_t offset){ + + // If block isn't local + if(filePointers[bigBlockId] == 0){ + return image_read_internal(buf, size, (offset+(bigBlockId*(4096*256)))); + } + int blockState[8]; + lseek(fh,filePointers[bigBlockId],SEEK_SET); + read(fh,&blockState,sizeof(int)*8); + int block = getSmallBlockId(offset); + int endBlock = getSmallBlockId(offset+size-1); + int startBlock; + char *curBuf = buf; + size_t readBytes = 0; + + + while(readBytes < size){ + startBlock=block; + if(!TestBit(blockState,block)){ + while(!TestBit(blockState,block)&& block!= endBlock){ + block++; + if(block >255){ + printf("ERROR SmallBlack id > 255"); + } + + } + + off_t startOffset = startBlock*4096; + if(startOffset < offset){ + startOffset = offset; + } + size_t sizeToRead = ((block+1)*4096)-startOffset; + if(sizeToRead>size-readBytes){ + sizeToRead= size-readBytes; + } + size_t sizeToRemoteRead = sizeToRead; + if(sizeToRead+startOffset>remoteImageSize){ + sizeToRemoteRead = remoteImageSize-startOffset; + } + //request data over network + + //TODO Check if offset compution is correct + startOffset = startOffset+(bigBlockId*4096*256); + readBytes +=image_read_internal(curBuf, sizeToRemoteRead, startOffset); + // If on File End fill up rest with 0 + if(sizeToRead+startOffset>remoteImageSize){ + for(size_t i = 0; i<(sizeToRead-sizeToRemoteRead); i++){ + curBuf[i] = 0; + } + } + + curBuf +=sizeToRead; + + }else{ + while(TestBit(blockState,block)&& block!= endBlock){ + block++; + if(block >255){ + printf("ERROR SmallBlack id > 255"); + } + } + off_t startOffset = startBlock*4096; + if(startOffset < offset){ + startOffset = offset; + } + size_t sizeToRead = ((block+1)*4096)-startOffset; + if(sizeToRead>size-readBytes){ + sizeToRead= size-readBytes; + } + //read Data local + lseek(fh,filePointers[bigBlockId]+4096+startOffset,SEEK_SET); + readBytes += read(fh,curBuf,sizeToRead); + curBuf +=readBytes; + + } + } + return (int) readBytes; +} +/* +void test(){ + create_cow_file("cow.bin","sampleImage",10000000); + printf("====Test=====\n"); + unsigned int versionAfterRead ; + lseek(fh,0, SEEK_SET); + read(fh,&versionAfterRead,sizeof(unsigned int)); + + unsigned int l; + read(fh,&l,sizeof(unsigned int)); + + char *buffer = malloc(sizeof(char)*(l+1)); + read(fh,buffer,l*sizeof(char)); + buffer[l]='\0'; + uint64_t size; + read(fh,&size,sizeof(uint64_t)); + int pageSize; + read(fh,&pageSize,sizeof(int)); + + + + printf("Version: %u\n",versionAfterRead); + printf("länge: %i \n",l); + printf("Image Name: %s\n",buffer); + printf("Size: %ld\n", (long)size); + printf("pageSize: %i\n", pageSize); + free(buffer); + + + //int imageSubBlockCount = (int)( size + 4095 ) / 4096; + //int imageBlockCount= (imageSubBlockCount+255)/256; + + //for(int i = 0; i < imageBlockCount;i++){ + // printf("Pointer: %i Value: %ld \n",i, (long)filePointers[i]); + //} + + char *smpledata = "1111111"; + write_cow(smpledata, 7, 310); + createBigBlock(0); + createBigBlock(5); + createBigBlock(2); + for(int i = 0; i < imageBlockCount;i++){ + printf("Pointer: %i Value: %ld \n",i, (long)filePointers[i]); + } + puts("success"); + onClose(); + +} +*/ -- cgit v1.2.3-55-g7522