/* * cow.c * * Created on: 22.08.2018 * Author: michael */ #include "cow.h" #include #include #include #include #include #include #include #include #include #include #define ClearBit(A,k) ( A[(k/32)] &= ~(1 << (k%32)) ) #define TestBit(A,k) ( A[(k/32)] & (1 << (k%32)) ) #define SetBit(A,k) ( A[(k/32)] |= (1 << (k%32)) ) const unsigned int version = 1; uint64_t MaxImageSizeInBytes=1099511627776; //1 Tebibyte in byte off_t *filePointers; uint64_t imageSubBlockCount; size_t imageBlockCount; int fh; off_t dataStart; bool debug = true; uint64_t remoteImageSize; typedef struct { cow_request *head; cow_request *tail; } cow_requests_queue; #define SIGPOOLSIZE 6 static cow_requests_queue cowRequestsactive; static cow_requests_queue cowRequestsQueued; static pthread_spinlock_t requestsQueueLock; static pthread_spinlock_t sigLock; static dnbd3_signal_t *signalPool[SIGPOOLSIZE]; static dnbd3_signal_t **sigEnd = signalPool + SIGPOOLSIZE; static void signalInit() { pthread_spin_init( &sigLock, PTHREAD_PROCESS_PRIVATE ); for ( size_t i = 0; i < SIGPOOLSIZE; ++i ) { signalPool[i] = NULL; } } static inline dnbd3_signal_t *signalGet() { pthread_spin_lock( &sigLock ); for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) { if ( *it != NULL ) { dnbd3_signal_t *ret = *it; *it = NULL; pthread_spin_unlock( &sigLock ); return ret; } } pthread_spin_unlock( &sigLock ); return signal_newBlocking(); } static inline void signalPut(dnbd3_signal_t *signal) { pthread_spin_lock( &sigLock ); for ( dnbd3_signal_t **it = signalPool; it < sigEnd; ++it ) { if ( *it == NULL ) { *it = signal; pthread_spin_unlock( &sigLock ); return; } } pthread_spin_unlock( &sigLock ); signal_close( signal ); } static void enqueueCowRequest(cow_requests_queue queue,cow_request *request) { request->next = NULL; if ( queue.head == NULL ) { queue.head = queue.tail = request; } else { queue.tail->next = request; queue.tail = request; } } static cow_request* removeCowRequest(cow_requests_queue queue,cow_request *request) { cow_request *iterator, *prev = NULL; for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) { if ( iterator == request ) { // Found it, break! if ( prev != NULL ) { prev->next = iterator->next; } else { queue.head = iterator->next; } if ( queue.tail == iterator ) { queue.tail = prev; } break; } prev = iterator; } return iterator; } bool create_cow_file(char *cow_path, char *image_Name,uint64_t imageSize){ remoteImageSize =(size_t) imageSize; if((fh = open (cow_path, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR))==-1){ puts( "Could not create COW File. Bye.\n" ); return false; } write(fh,&version,sizeof(unsigned int)); int nameLenght =(unsigned int) strlen(image_Name); write(fh,&nameLenght,sizeof(unsigned int)); write(fh,image_Name,sizeof(char)*strlen(image_Name)); write(fh,&imageSize,sizeof(uint64_t)); int pageSize = getpagesize(); write(fh,&pageSize,sizeof(int)); imageBlockCount = (MaxImageSizeInBytes/(1024*1024)); off_t mmapStart = lseek(fh, 0L, SEEK_CUR); int maxPageSize = 8192; //compute next fitting multiple of getpagesize() mmapStart= ((mmapStart+maxPageSize-1)/maxPageSize)*maxPageSize; //write(fh,&mmapStart,sizeof(mmapStart)); dataStart = lseek(fh, mmapStart+imageBlockCount*sizeof(off_t), SEEK_CUR); if(dataStart ==-1){ close(fh); printf("Error expanding file for mmap.Bye.\n "); return false; } write(fh, "", 1); dataStart = ((dataStart+pageSize-1)/pageSize)*pageSize; filePointers = mmap(NULL,imageBlockCount*sizeof(off_t), PROT_READ | PROT_WRITE, MAP_SHARED,fh, mmapStart); if(filePointers == MAP_FAILED ){ close(fh); printf("Error creating mmap in COW File.\n%s\nBye.\n ", strerror(errno)); return false; } for(unsigned int i = 0; i < imageBlockCount;i++){ filePointers[i]=0; } if(debug){ printf("Creating COW File\n"); printf("Version: %u\n",version); printf("ImageName: %s\n",image_Name); printf("Size: %ld\n", (long)imageSize); printf("Blocks: %i\n",imageBlockCount); printf("mmap start: %i\n",mmapStart); printf("End: %lu\n",lseek(fh,0L,SEEK_CUR)); } signalInit(); if(pthread_spin_init( &requestsQueueLock, PTHREAD_PROCESS_PRIVATE ) != 0){ printf("Spinlock init failure"); } return true; } bool createBigBlock(unsigned long id){ //goto File end and then to next pagesize off_t blockStart = lseek(fh,0, SEEK_END); int pageSize = getpagesize(); blockStart= ((blockStart+pageSize-1)/pageSize)*pageSize; filePointers[id] = blockStart; // go to next Page size off_t currentFilePos= lseek(fh,blockStart, SEEK_SET); int blockState[8] = {0}; write(fh,&blockState,sizeof(int)*8); // go to next Page size currentFilePos = (((currentFilePos+sizeof(int)*8)+pageSize-1)/pageSize)*pageSize; currentFilePos = lseek(fh,currentFilePos, SEEK_SET); char data[256*4096] = {0}; write(fh,&data,sizeof(char)*4096*256); return true; } void onClose(){ close(fh); } bool queckQueuesForDependencies(cow_request *request,cow_requests_queue queue){ bool foundDependencie = false; if(queue.head !=NULL){ cow_request *iterator = NULL; for ( iterator = queue.head; iterator != NULL; iterator = iterator->next ) { if(!(((request->end)offset)||(request->offset >(iterator->end)))){ foundDependencie = true; int i=0; while(i<=6) { if(i==6){ printf("Error to much requests in queue"); break; }else if(iterator->dependencies[i]==NULL){ iterator->dependencies[i] = request; break; } i++; } i=0; while(i<=6) { if(i==6){ printf("Error to much requests in queue"); break; }else if(request->myDependencies[i]==NULL){ request->myDependencies[i]=request; break; } i++; } } } } return foundDependencie; } cow_request getAccess(off_t offset,size_t size){ // TODO LOCK THIS ON BLOCKSIZE cow_request request; request.offset = (offset-(offset % 4096)); request.end =(offset+(off_t)size)+( (offset+(off_t)size)% 4096)-1; request.signal = signalGet(); for (int i = 0; i < 6; i++){ request.dependencies[i] = NULL; } for (int i = 0; i < 6; i++){ request.myDependencies[i] = NULL; } pthread_spin_lock( &requestsQueueLock ); if(queckQueuesForDependencies(&request, cowRequestsactive)){ queckQueuesForDependencies(&request, cowRequestsQueued); enqueueCowRequest(cowRequestsQueued,&request); pthread_spin_unlock( &requestsQueueLock ); int ret = signal_wait( request.signal, 200000 ); if(ret<0){ //TODO printf("Error CowRequest timed out"); } signalPut(request.signal); }else{ enqueueCowRequest(cowRequestsactive,&request); pthread_spin_unlock( &requestsQueueLock ); } return request; // CHECK AKTIVE -> not enque start /* pthread_mutex_lock(&bigBlockLockArray_mutex); if( (!TestBit(bigBlockLockArray,id1)) &&(!TestBit(bigBlockLockArray,id2))){ SetBit(bigBlockLockArray,id1); SetBit(bigBlockLockArray,id2); pthread_mutex_unlock(&bigBlockLockArray_mutex); }else{ cow_request request; request.bigBlockStartId = id1; request.bigBlockEndId = id2; request.signal = signalGet(); enqueueCowRequest(&request); pthread_mutex_unlock(&bigBlockLockArray_mutex); int ret = signal_wait( request.signal, 200000 ); if(ret<0){ //TODO printf("Error CowRequest timed out"); } signalPut(request.signal); } */ } void closeAcccess(cow_request *request){ pthread_spin_lock( &requestsQueueLock ); removeCowRequest(cowRequestsactive,request); for (int i = 0;i< 6; i++){ cow_request *otherRequest = request->dependencies[i]; if(otherRequest != NULL){ bool canStart=true; for (int j = 0;j< 6; j++){ if(otherRequest->myDependencies[j]==otherRequest){ otherRequest->myDependencies[j]=NULL; }else if(otherRequest->myDependencies[j]!=NULL){ canStart=false; } } if(canStart){ removeCowRequest(cowRequestsQueued,otherRequest); enqueueCowRequest(cowRequestsactive,otherRequest); signal_call(otherRequest->signal); } } request->dependencies[i] = NULL; } pthread_spin_unlock( &requestsQueueLock ); /* pthread_mutex_lock(&bigBlockLockArray_mutex); pthread_spin_lock( &cow_requests.lock ); ClearBit(bigBlockLockArray,id1); ClearBit(bigBlockLockArray,id2); if(cow_requests.head !=NULL){ cow_request *iterator,*prev = NULL; for ( iterator = cow_requests.head; iterator != NULL; iterator = iterator->next ) { if((iterator->bigBlockStartId == id1 )||(iterator->bigBlockEndId == id1 )){ if((!TestBit(bigBlockLockArray,iterator->bigBlockStartId))&&(!TestBit(bigBlockLockArray,iterator->bigBlockEndId))){ SetBit(bigBlockLockArray,iterator->bigBlockStartId); SetBit(bigBlockLockArray,iterator->bigBlockEndId); removeFromQueueUnsafe(iterator, prev); signal_call(iterator->signal); if(id1 == id2){ pthread_spin_unlock( &cow_requests.lock); pthread_mutex_unlock(&bigBlockLockArray_mutex); return; }else{ id1 = id2; } } }else if((iterator->bigBlockStartId == id2 )||(iterator->bigBlockEndId == id2 )){ if((!TestBit(bigBlockLockArray,iterator->bigBlockStartId))&&(!TestBit(bigBlockLockArray,iterator->bigBlockEndId))){ removeFromQueueUnsafe(iterator, prev); SetBit(bigBlockLockArray,iterator->bigBlockStartId); SetBit(bigBlockLockArray,iterator->bigBlockEndId); signal_call(iterator->signal); if(id1 == id2){ pthread_spin_unlock( &cow_requests.lock ); pthread_mutex_unlock(&bigBlockLockArray_mutex); return; }else{ id2 = id1; } } } prev = iterator; //iterator++; } } pthread_spin_unlock( &cow_requests.lock ); pthread_mutex_unlock(&bigBlockLockArray_mutex); */ } int write_cow(const char *data, size_t size, off_t offset) { size_t totalWrittenBytes = 0; int writtenBytes = 0; size_t sizeToBigBlock = 0; off_t bigBlockOffset = 0; off_t bigBlockStart = 0; unsigned long bigBlockStartId = (offset)/(4096*256); unsigned long bigBlockId = bigBlockStartId; cow_request request = getAccess(offset,size); while(totalWrittenBytes < size){ bigBlockStart = (bigBlockId*(4096*256)); bigBlockOffset =(offset+writtenBytes)- bigBlockStart; // how much i can write in this block sizeToBigBlock = 4096*256 - (bigBlockOffset -bigBlockStart); if((size-writtenBytes)< sizeToBigBlock){ sizeToBigBlock = size-writtenBytes; } writtenBytes = writeToBigBlock(bigBlockId,data+totalWrittenBytes,bigBlockOffset,sizeToBigBlock); totalWrittenBytes +=writtenBytes; bigBlockId++; } closeAcccess(&request); return writtenBytes; } /* * Writes Data in a bigblock, offset reltive to start of bigblock * */ int writeToBigBlock(unsigned long bigBlockId,const char *data, off_t offset,size_t size){ int writtenBytes = 0; if(filePointers[bigBlockId]==0){ createBigBlock(bigBlockId); } int firstSmallBlock = getSmallBlockId(offset); int lastSmallBlock = getSmallBlockId(offset+size); if(firstSmallBlock>255||lastSmallBlock>255){ printf("Error SmallBLock > 255"); } lseek(fh,filePointers[bigBlockId],SEEK_SET); int blockState[8] ; read(fh,&blockState,sizeof(int)*8); lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET); //If not on Block border and don't have this block, get the data. if((offset % 4096 !=0 )&&(!TestBit(blockState,firstSmallBlock))){ char *startData = malloc(offset % 4096); off_t tmp =offset+(bigBlockId*(4096*256)); tmp = tmp - (tmp % 4096); image_read_internal(startData, offset % 4096, tmp); write(fh,startData,offset % 4096); free(startData); } writtenBytes +=(int)write(fh,data,size); //If not on Block border and don't have this block, get the data. if(((offset+size) % 4096 !=0 )&&(!TestBit(blockState,lastSmallBlock))){ size_t sizeToAppend= 4096-((((size_t)offset)+size)% 4096); //TODO TEST -1? off_t offsetToAppend = bigBlockId*256*4096+((off_t)offset)+size; char *startData = calloc(sizeToAppend,1); if((((size_t)offsetToAppend)+sizeToAppend) > remoteImageSize ){ sizeToAppend = remoteImageSize-offsetToAppend; } if(((size_t)offsetToAppend)< remoteImageSize){ image_read_internal(startData,sizeToAppend, offsetToAppend); } //lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET); write(fh,startData,sizeToAppend); free(startData); /* //size_t tmpSize = (4096-((offset+size) % 4096)); size_t tmpSize = (((size_t)offset)+size); tmpSize= tmpSize %4096; tmpSize = 4096-tmpSize; off_t tmp =(offset+size-tmpSize)+(bigBlockId*(4096*256)); if(((size_t)tmp) < remoteImageSize ){ char *startData = calloc(tmpSize,1); size_t remoteReadSize = tmpSize; if ( (tmp + tmpSize) > remoteImageSize ) { remoteReadSize = remoteImageSize - tmp; } image_read_internal(startData,tmpSize, remoteReadSize); lseek(fh,filePointers[bigBlockId]+4096+offset,SEEK_SET); write(fh,startData,tmpSize); free(startData); } */ } for (long i = firstSmallBlock; i < lastSmallBlock;i++ ){ SetBit(blockState,i); } lseek(fh,filePointers[bigBlockId],SEEK_SET); write(fh,&blockState,sizeof(int)*8); return writtenBytes; } int getSmallBlockId(off_t offset){ return (int)(offset/4096) % 256; } int cow_read(char *buf, size_t size, off_t offset) { unsigned long bigBlockStartId = (offset)/(4096*256); unsigned long bigBlockEndId = (offset+size)/(4096*256); unsigned long bigBlockId = bigBlockStartId; cow_request request =getAccess(offset,size); size_t bigBlockStart = (bigBlockId*(4096*256)); size_t bigBlockOffset =offset- bigBlockStart; // how much i can write from this block //TODO IS THIS RIGHT? size_t sizeToBigBlock = ((4096*256) - (bigBlockOffset)); if(sizeToBigBlock>size){ sizeToBigBlock = size; } int bytesRead = readBigBlock(bigBlockStartId, buf,sizeToBigBlock, bigBlockOffset); if(bigBlockStartId != bigBlockEndId && (size-sizeToBigBlock)>0){ bytesRead +=readBigBlock(bigBlockEndId, buf+sizeToBigBlock,(size-sizeToBigBlock), 0); } closeAcccess(&request); return bytesRead; } int readBigBlock(long bigBlockId ,char *buf,size_t size, off_t offset){ // If block isn't local if(filePointers[bigBlockId] == 0){ return image_read_internal(buf, size, (offset+(bigBlockId*(4096*256)))); } int blockState[8]; lseek(fh,filePointers[bigBlockId],SEEK_SET); read(fh,&blockState,sizeof(int)*8); int block = getSmallBlockId(offset); int endBlock = getSmallBlockId(offset+size-1); int startBlock; char *curBuf = buf; size_t readBytes = 0; while(readBytes < size){ startBlock=block; if(!TestBit(blockState,block)){ while(!TestBit(blockState,block)&& block!= endBlock){ block++; if(block >255){ printf("ERROR SmallBlack id > 255"); } } off_t startOffset = startBlock*4096; if(startOffset < offset){ startOffset = offset; } size_t sizeToRead = ((block+1)*4096)-startOffset; if(sizeToRead>size-readBytes){ sizeToRead= size-readBytes; } size_t sizeToRemoteRead = sizeToRead; if(sizeToRead+startOffset>remoteImageSize){ sizeToRemoteRead = remoteImageSize-startOffset; } //request data over network //TODO Check if offset compution is correct startOffset = startOffset+(bigBlockId*4096*256); readBytes +=image_read_internal(curBuf, sizeToRemoteRead, startOffset); // If on File End fill up rest with 0 if(sizeToRead+startOffset>remoteImageSize){ for(size_t i = 0; i<(sizeToRead-sizeToRemoteRead); i++){ curBuf[i] = 0; } } curBuf +=sizeToRead; }else{ while(TestBit(blockState,block)&& block!= endBlock){ block++; if(block >255){ printf("ERROR SmallBlack id > 255"); } } off_t startOffset = startBlock*4096; if(startOffset < offset){ startOffset = offset; } size_t sizeToRead = ((block+1)*4096)-startOffset; if(sizeToRead>size-readBytes){ sizeToRead= size-readBytes; } //read Data local lseek(fh,filePointers[bigBlockId]+4096+startOffset,SEEK_SET); readBytes += read(fh,curBuf,sizeToRead); curBuf +=readBytes; } } return (int) readBytes; } /* void test(){ create_cow_file("cow.bin","sampleImage",10000000); printf("====Test=====\n"); unsigned int versionAfterRead ; lseek(fh,0, SEEK_SET); read(fh,&versionAfterRead,sizeof(unsigned int)); unsigned int l; read(fh,&l,sizeof(unsigned int)); char *buffer = malloc(sizeof(char)*(l+1)); read(fh,buffer,l*sizeof(char)); buffer[l]='\0'; uint64_t size; read(fh,&size,sizeof(uint64_t)); int pageSize; read(fh,&pageSize,sizeof(int)); printf("Version: %u\n",versionAfterRead); printf("länge: %i \n",l); printf("Image Name: %s\n",buffer); printf("Size: %ld\n", (long)size); printf("pageSize: %i\n", pageSize); free(buffer); //int imageSubBlockCount = (int)( size + 4095 ) / 4096; //int imageBlockCount= (imageSubBlockCount+255)/256; //for(int i = 0; i < imageBlockCount;i++){ // printf("Pointer: %i Value: %ld \n",i, (long)filePointers[i]); //} char *smpledata = "1111111"; write_cow(smpledata, 7, 310); createBigBlock(0); createBigBlock(5); createBigBlock(2); for(int i = 0; i < imageBlockCount;i++){ printf("Pointer: %i Value: %ld \n",i, (long)filePointers[i]); } puts("success"); onClose(); } */