From c4d2c6c6753fd2c41c3db1877c5c46613dc45510 Mon Sep 17 00:00:00 2001 From: sr Date: Thu, 25 Jul 2013 23:44:18 +0200 Subject: Work in progress: uplink --- src/config.h | 2 +- src/server/globals.h | 1 + src/server/helper.h | 13 ------------ src/server/image.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++++--- src/server/image.h | 15 ++++++++++++++ src/server/uplink.c | 44 ++++++++++++++++++++++++++++++--------- src/server/uplink.h | 2 +- 7 files changed, 107 insertions(+), 28 deletions(-) diff --git a/src/config.h b/src/config.h index f208b68..3c2f166 100644 --- a/src/config.h +++ b/src/config.h @@ -79,7 +79,7 @@ // +++++ Block Device +++++ #define KERNEL_SECTOR_SIZE 512 -#define DNBD3_BLOCK_SIZE 4096 +#define DNBD3_BLOCK_SIZE 4096 // NEVER CHANGE THIS OR THE WORLD WILL END! #define NUMBER_DEVICES 8 #define DEFAULT_READ_AHEAD_KB 512 diff --git a/src/server/globals.h b/src/server/globals.h index 3e5af8b..106531e 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -48,6 +48,7 @@ struct _dnbd3_connection int betterFd; // Active connection to better server, ready to use uint8_t *recvBuffer; // Buffer for receiving payload int recvBufferLen; // Len of ^^ + volatile int shutdown; // bool to signal thread to stop }; typedef struct diff --git a/src/server/helper.h b/src/server/helper.h index a9a8e79..daa6695 100644 --- a/src/server/helper.h +++ b/src/server/helper.h @@ -92,17 +92,4 @@ static inline int strend(char *string, char *suffix) return strcmp(string + len1 - len2, suffix) == 0; } -// one byte in the map covers 8 4kib blocks, so 32kib per byte -// "+ (1 << 15) - 1" is required to account for the last bit of -// the image that is smaller than 32kib -// this would be the case whenever the image file size is not a -// multiple of 32kib (= the number of blocks is not divisible by 8) -// ie: if the image is 49152 bytes and you do 49152 >> 15 you get 1, -// but you actually need 2 bytes to have a complete cache map -#define IMGSIZE_TO_MAPBYTES(bytes) ((int)(((bytes) + (1 << 15) - 1) >> 15)) - -// calculate number of hash blocks in file. One hash block is 16MiB -#define HASH_BLOCK_SIZE ((int64_t)(1 << 24)) -#define IMGSIZE_TO_HASHBLOCKS(bytes) ((int)(((bytes) + HASH_BLOCK_SIZE - 1) / HASH_BLOCK_SIZE)) - #endif diff --git a/src/server/image.c b/src/server/image.c index 4bc26b4..abecda5 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -62,10 +62,59 @@ int image_is_complete(dnbd3_image_t *image) } return complete; } +/** + * Update cache-map of given image for the given byte range + * Locks on: images[].lock + */ +void image_update_cachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const int set) +{ + assert( image != NULL ); + // This should always be block borders due to how the protocol works, but better be safe + // than accidentally mark blocks as cached when they really aren't entirely cached. + end &= ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + start = (start + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + int dirty = FALSE; + int pos = start; + spin_lock( &image->lock ); + if ( image->cache_map == NULL ) { + // Image seems already complete + printf( "[DEBUG] image_update_cachemap with no cache_map: %s", image->path ); + spin_unlock( &image->lock ); + return; + } + while ( pos < end ) { + const int map_y = pos >> 15; + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = 0b00000001 << map_x; + if ( set ) { + if ( (image->cache_map[map_y] & bit_mask) == 0 ) dirty = TRUE; + image->cache_map[map_y] |= bit_mask; + } else { + image->cache_map[map_y] &= ~bit_mask; + } + pos += DNBD3_BLOCK_SIZE; + } + spin_unlock( &image->lock ); + if ( dirty ) { + // If dirty is set, at least one of the blocks was not cached before, so queue all hash blocks + // for checking, even though this might lead to checking some hash block again, if it was + // already complete and the block range spanned at least two hash blocks. + // First set start and end to borders of hash blocks + start &= ~(uint64_t)(HASH_BLOCK_SIZE - 1); + end = (end + HASH_BLOCK_SIZE - 1) & ~(uint64_t)(HASH_BLOCK_SIZE - 1); + pos = start; + while ( pos < end ) { + const int block = pos / HASH_BLOCK_SIZE; + // TODO: Actually queue the hash block for checking as soon as there's a worker for that + pos += HASH_BLOCK_SIZE; + } + } +} /** * Saves the cache map of the given image. * Return TRUE on success. + * DOES NOT lock */ int image_save_cache_map(dnbd3_image_t *image) { @@ -201,7 +250,8 @@ dnbd3_image_t* image_free(dnbd3_image_t *image) free( image->crc32 ); free( image->path ); free( image->lower_name ); - uplink_shutdown( image->uplink ); + image->uplink = uplink_shutdown( image->uplink ); + if ( image->cacheFd != -1 ) close( image->cacheFd ); spin_destroy( &image->lock ); // memset( image, 0, sizeof(dnbd3_image_t) ); @@ -451,6 +501,7 @@ static int image_try_load(char *base, char *path) image->filesize = fileSize; image->rid = revision; image->users = 0; + image->cacheFd = -1; if ( stat( path, &st ) == 0 ) { image->atime = st.st_mtime; } else { @@ -468,6 +519,7 @@ static int image_try_load(char *base, char *path) image->working = FALSE; image->cacheFd = open( path, O_WRONLY ); if ( image->cacheFd < 0 ) { + image->cacheFd = -1; memlogf( "[ERROR] Could not open incomplete image %s for writing!", path ); image = image_free( image ); goto load_error; @@ -530,7 +582,7 @@ int image_generate_crc_file(char *image) close( fdImage ); return FALSE; } -// CRC of all CRCs goes first. Don't know it yet, write 4 bytes dummy data. + // CRC of all CRCs goes first. Don't know it yet, write 4 bytes dummy data. if ( write( fdCrc, crcFile, 4 ) != 4 ) { printf( "Write error\n" ); close( fdImage ); @@ -581,7 +633,7 @@ int image_generate_crc_file(char *image) close( fdImage ); printf( "done!\nGenerating master-crc..." ); fflush( stdout ); -// File is written - read again to calc master crc + // File is written - read again to calc master crc if ( lseek( fdCrc, 4, SEEK_SET ) != 4 ) { printf( "Could not seek to beginning of crc list in file\n" ); close( fdCrc ); diff --git a/src/server/image.h b/src/server/image.h index 7a4e72e..6910395 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -10,6 +10,8 @@ extern pthread_spinlock_t _images_lock; int image_is_complete(dnbd3_image_t *image); +void image_update_cachemap( dnbd3_image_t *image, uint64_t start, uint64_t end, const int set ); + int image_save_cache_map(dnbd3_image_t *image); dnbd3_image_t* image_get(char *name, uint16_t revision); @@ -23,4 +25,17 @@ int image_load_all(char *path); int image_generate_crc_file(char *image); +// one byte in the map covers 8 4kib blocks, so 32kib per byte +// "+ (1 << 15) - 1" is required to account for the last bit of +// the image that is smaller than 32kib +// this would be the case whenever the image file size is not a +// multiple of 32kib (= the number of blocks is not divisible by 8) +// ie: if the image is 49152 bytes and you do 49152 >> 15 you get 1, +// but you actually need 2 bytes to have a complete cache map +#define IMGSIZE_TO_MAPBYTES(bytes) ((int)(((bytes) + (1 << 15) - 1) >> 15)) + +// calculate number of hash blocks in file. One hash block is 16MiB +#define HASH_BLOCK_SIZE ((int64_t)(1 << 24)) +#define IMGSIZE_TO_HASHBLOCKS(bytes) ((int)(((bytes) + HASH_BLOCK_SIZE - 1) / HASH_BLOCK_SIZE)) + #endif diff --git a/src/server/uplink.c b/src/server/uplink.c index 92cf944..0116fda 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -2,6 +2,7 @@ #include "locks.h" #include "memlog.h" #include "sockhelper.h" +#include "image.h" #include #include #include @@ -108,6 +109,7 @@ int uplink_init(dnbd3_image_t *image) link->betterFd = -1; link->rttTestResult = RTT_IDLE; link->recvBufferLen = 0; + link->shutdown = FALSE; spin_init( &link->lock, PTHREAD_PROCESS_PRIVATE ); if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) { memlogf( "[ERROR] Could not start thread for new client." ); @@ -122,13 +124,21 @@ int uplink_init(dnbd3_image_t *image) return FALSE; } -void uplink_shutdown(dnbd3_connection_t *uplink) +dnbd3_connection_t* uplink_shutdown(dnbd3_connection_t *uplink) { assert( uplink != NULL ); - if ( uplink->fd != -1 ) close( uplink->fd ); + if ( uplink->shutdown ) return NULL ; + uplink->shutdown = TRUE; + if ( uplink->signal != -1 ) write( uplink->signal, uplink, 1 ); pthread_join( uplink->thread, NULL ); + free( uplink ); + return NULL ; } +/** + * Uplink thread. + * Locks are irrelevant as this is never called from another function + */ static void* uplink_mainloop(void *data) { const int MAXEVENTS = 3; @@ -166,7 +176,7 @@ static void* uplink_mainloop(void *data) goto cleanup; } } - while ( !_shutdown ) { + while ( !_shutdown && !link->shutdown ) { if ( link->rttTestResult == RTT_DOCHANGE ) { link->rttTestResult = RTT_IDLE; // The rttTest worker thread has finished our request. @@ -193,6 +203,7 @@ static void* uplink_mainloop(void *data) if ( waitTime < 1500 ) waitTime = 1500; } numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime ); + if ( _shutdown || link->shutdown ) break; if ( numSocks < 0 ) { // Error? memlogf( "[DEBUG] epoll_wait() error %d", (int)errno); usleep( 10000 ); @@ -235,18 +246,22 @@ static void* uplink_mainloop(void *data) return NULL ; } +/** + * Receive data from uplink server and process/dispatch + * Locks on: link.lock, indirectly on images[].lock + */ static void uplink_handle_receive(dnbd3_connection_t *link) { dnbd3_reply_t reply; int ret, i; ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL ); if ( ret != sizeof reply ) { - memlogf( "[INFO] Lost connection to uplink server." ); + memlogf( "[INFO] Lost connection to uplink server for %s", link->image->path ); goto error_cleanup; } fixup_reply( reply ); if ( reply.size > 9000000 ) { - memlogf( "[WARNING] Pure evil: Uplink server sent too much payload!" ); + memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } if ( link->recvBufferLen < reply.size ) { @@ -258,19 +273,16 @@ static void uplink_handle_receive(dnbd3_connection_t *link) while ( done < reply.size ) { ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 ); if ( ret <= 0 ) { - memlogf( "[INFO] Lost connection to uplink server" ); + memlogf( "[INFO] Lost connection to uplink server of", link->image->path ); goto error_cleanup; } done += ret; } // Payload read completely - // 1) Write to cache file - assert( link->image->cacheFd != -1 ); - // 2) Figure out which clients are interested in it + // 1) Figure out which clients are interested in it const uint64_t start = reply.handle; const uint64_t end = reply.handle + reply.size; struct iovec iov[2]; - reply.magic = dnbd3_packet_magic; spin_lock( &link->lock ); for (i = 0; i < link->queuelen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; @@ -280,6 +292,18 @@ static void uplink_handle_receive(dnbd3_connection_t *link) req->status = ULR_PROCESSING; } } + spin_unlock( &link->lock ); + // 2) Write to cache file + assert( link->image->cacheFd != -1 ); + if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { + memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); + } else { + ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size ); + if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE); + } + // 3) Send to interested clients + reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here + spin_lock( &link->lock ); for (i = link->queuelen - 1; i >= 0; --i) { dnbd3_queued_request_t * const req = &link->queue[i]; if ( req->status != ULR_PROCESSING ) continue; diff --git a/src/server/uplink.h b/src/server/uplink.h index 7c9878e..82412b4 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -14,6 +14,6 @@ int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2); int uplink_init(dnbd3_image_t *image); -void uplink_shutdown( dnbd3_connection_t *uplink); +dnbd3_connection_t* uplink_shutdown( dnbd3_connection_t *uplink); #endif /* UPLINK_H_ */ -- cgit v1.2.3-55-g7522