From 8f020a152a80853780db2998daf797e3ae842606 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 23 Aug 2013 18:44:57 +0200 Subject: [SERVER] WIP: On-the-fly image cloning --- LOCKS | 1 + src/config.h | 2 + src/server/altservers.c | 72 ++++++++-------------------- src/server/image.c | 124 ++++++++++++++++++++++++++++++++++++++++++++++-- src/server/protocol.h | 102 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 244 insertions(+), 57 deletions(-) create mode 100644 src/server/protocol.h diff --git a/LOCKS b/LOCKS index 9662d74..9eb9622 100644 --- a/LOCKS +++ b/LOCKS @@ -13,6 +13,7 @@ have to be aquired if you must hold multiple locks: _clients_lock _clients[].lock integrityQueueLock +remoteCloneLock _images_lock _images[].lock uplink.queueLock diff --git a/src/config.h b/src/config.h index 00b92e9..7a22c01 100644 --- a/src/config.h +++ b/src/config.h @@ -36,6 +36,8 @@ #define SERVER_RTT_DELAY_INIT 5 #define SERVER_RTT_DELAY_MAX 15 +#define SERVER_REMOTE_IMAGE_CHECK_CACHETIME 600 // 10 minutes + // +++++ Network +++++ // Default port #define PORT 5003 diff --git a/src/server/altservers.c b/src/server/altservers.c index da41857..9d5f77c 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -14,7 +14,7 @@ #include #include #include -#include "../serialize.h" +#include "protocol.h" static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; static pthread_spinlock_t pendingLock; @@ -277,20 +277,14 @@ static void *altserver_main(void *data) struct epoll_event ev, events[MAXEVENTS]; int readPipe = -1, fdEpoll = -1; int numSocks, ret, itLink, itAlt, numAlts; - int found, len; + int found; char buffer[DNBD3_BLOCK_SIZE ]; - dnbd3_host_t servers[ALTS + 1]; - dnbd3_request_t request; dnbd3_reply_t reply; + dnbd3_host_t servers[ALTS + 1]; serialized_buffer_t serialized; - struct iovec iov[2]; struct timespec start, end; setThreadName( "altserver-check" ); - // Make valgrind happy - memset( &reply, 0, sizeof(reply) ); - memset( &request, 0, sizeof(request) ); - request.magic = dnbd3_packet_magic; // Init spinlock spin_init( &pendingLock, PTHREAD_PROCESS_PRIVATE ); // Init waiting links queue @@ -370,71 +364,43 @@ static void *altserver_main(void *data) int sock = sock_connect( &servers[itAlt], 750, 1250 ); if ( sock < 0 ) continue; // Select image ++++++++++++++++++++++++++++++ - serializer_reset_write( &serialized ); - serializer_put_uint16( &serialized, PROTOCOL_VERSION ); - serializer_put_string( &serialized, uplink->image->lower_name ); - serializer_put_uint16( &serialized, uplink->image->rid ); - serializer_put_uint8( &serialized, 1 ); // isServer = TRUE - len = serializer_get_written_length( &serialized ); - request.cmd = CMD_SELECT_IMAGE; - request.size = len; - fixup_request( request ); - iov[0].iov_base = &request; - iov[0].iov_len = sizeof(request); - iov[1].iov_base = &serialized; - iov[1].iov_len = len; - if ( writev( sock, iov, 2 ) != len + sizeof(request) ) { + if ( !dnbd3_select_image( sock, uplink->image->lower_name, uplink->image->rid, FLAGS8_SERVER ) ) { goto server_failed; } // See if selecting the image succeeded ++++++++++++++++++++++++++++++ - if ( recv( sock, &reply, sizeof(reply), MSG_WAITALL ) != sizeof(reply) ) { - goto server_failed; - } - // check reply header - fixup_reply( reply ); - if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD || reply.magic != dnbd3_packet_magic ) { + uint16_t protocolVersion, rid; + uint64_t imageSize; + char *name; + if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { goto server_failed; } - // Not found - // receive reply payload - if ( recv( sock, &serialized, reply.size, MSG_WAITALL ) != reply.size ) { - ERROR_GOTO_VA( server_failed, "[ERROR] Cold not read CMD_SELECT_IMAGE payload (%s)", uplink->image->lower_name ); - } - // handle/check reply payload - serializer_reset_read( &serialized, reply.size ); - const uint16_t protocol_version = serializer_get_uint16( &serialized ); - if ( protocol_version < MIN_SUPPORTED_SERVER ) goto server_failed; - const char *name = serializer_get_string( &serialized ); + if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed; if ( name == NULL || strcmp( name, uplink->image->lower_name ) != 0 ) { ERROR_GOTO_VA( server_failed, "[ERROR] Server offers image '%s', requested '%s'", name, uplink->image->lower_name ); } - const uint16_t rid = serializer_get_uint16( &serialized ); if ( rid != uplink->image->rid ) { ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)", (int)rid, (int)uplink->image->rid, uplink->image->lower_name ); } - const uint64_t image_size = serializer_get_uint64( &serialized ); - if ( image_size != uplink->image->filesize ) { + if ( imageSize != uplink->image->filesize ) { ERROR_GOTO_VA( server_failed, "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)", - image_size, uplink->image->filesize, uplink->image->lower_name ); + imageSize, uplink->image->filesize, uplink->image->lower_name ); } // Request random block ++++++++++++++++++++++++++++++ - request.cmd = CMD_GET_BLOCK; - request.offset = (((uint64_t)start.tv_nsec | (uint64_t)rand()) * DNBD3_BLOCK_SIZE ) % uplink->image->filesize; - request.size = DNBD3_BLOCK_SIZE; fixup_request( request ); - if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) ERROR_GOTO_VA( server_failed, - "[ERROR] Could not request random block for %s", uplink->image->lower_name ); + if ( !dnbd3_get_block( sock, + (((uint64_t)start.tv_nsec | (uint64_t)rand()) * DNBD3_BLOCK_SIZE )% uplink->image->filesize, + DNBD3_BLOCK_SIZE) ) { + ERROR_GOTO_VA( server_failed, "[ERROR] Could not request random block for %s", uplink->image->lower_name ); + } // See if requesting the block succeeded ++++++++++++++++++++++ - const int retlen = recv( sock, &reply, sizeof(reply), MSG_WAITALL ); - if ( retlen != sizeof(reply) ) { + if ( !dnbd3_get_reply( sock, &reply ) ) { char buf[100] = { 0 }; host_to_string( &servers[itAlt], buf, 100 ); - ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header (%d, %s) after CMD_GET_BLOCK (%s)", - retlen, buf, uplink->image->lower_name ); + ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header (%s) after CMD_GET_BLOCK (%s)", + buf, uplink->image->lower_name ); } // check reply header - fixup_reply( reply ); if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { ERROR_GOTO_VA( server_failed, "[ERROR] Reply to random block request is %d bytes for %s", reply.size, uplink->image->lower_name ); diff --git a/src/server/image.c b/src/server/image.c index ccfd60e..b81d15a 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -4,6 +4,8 @@ #include "uplink.h" #include "locks.h" #include "integrity.h" +#include "protocol.h" +#include "sockhelper.h" #include #include @@ -15,6 +17,7 @@ #include #include #include +#include // ########################################## @@ -22,12 +25,25 @@ dnbd3_image_t *_images[SERVER_MAX_IMAGES]; int _num_images = 0; pthread_spinlock_t _images_lock; +static pthread_mutex_t remoteCloneLock = PTHREAD_MUTEX_INITIALIZER; +#define NAMELEN 500 +#define CACHELEN 100 +typedef struct +{ + char name[NAMELEN]; + uint16_t rid; + time_t deadline; +} imagecache; +static imagecache remoteCloneCache[CACHELEN]; +static int remoteCloneCacheIndex = -1; + // ########################################## static int image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); static int image_load_all_internal(char *base, char *path); static int image_try_load(char *base, char *path); static int64_t image_pad(const char *path, const int64_t currentSize); +static int image_clone(int sock, dnbd3_host_t *server, char *name, uint16_t revision, uint64_t imageSize); // ########################################## @@ -588,7 +604,9 @@ static int image_try_load(char *base, char *path) memlogf( "[INFO] Found CRC-32 list for already loaded image, adding...", path ); existing->crc32 = crc32list; crc32list = NULL; - } else { + function_return = TRUE; + goto load_error; + } else { // Nothing changed about the existing image, so do nothing function_return = TRUE; goto load_error; } @@ -729,6 +747,104 @@ int image_create(char *image, int revision, uint64_t size) return FALSE; } +/** + * Does the same as image_get, but if the image is not found locally, + * it will try to clone it from an authoritative dnbd3 server and return the + * image. If the return value is not NULL, image_release needs to be called + * on the image at some point. + * Locks on: remoteCloneLock, _images_lock, _images[].lock + */ +dnbd3_image_t* image_getOrClone(char *name, uint16_t revision) +{ + // TODO: Simply return image_get if no authoritative servers are configured + int i; + const size_t len = strlen( name ); + // Sanity check + if ( len == 0 || name[len - 1] == '/' || name[0] == '/' ) return NULL ; + // Already existing locally? + dnbd3_image_t *image = image_get( name, revision ); + if ( image != NULL ) return image; + // Doesn't exist, try remote if not already tried it recently + if ( remoteCloneCacheIndex == -1 ) { + remoteCloneCacheIndex = 0; + memset( remoteCloneCache, 0, sizeof(remoteCloneCache) ); + } + const time_t now = time( NULL ); + + char *cmpname = name; + if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN; + pthread_mutex_lock( &remoteCloneLock ); + for (i = 0; i < CACHELEN; ++i) { + if ( remoteCloneCache[i].rid != revision || strcmp( cmpname, remoteCloneCache[i].name ) != 0 ) continue; + if ( remoteCloneCache[i].deadline < now ) { + remoteCloneCache[i].name[0] = '\0'; + continue; + } + pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked... + return NULL ; + } + // Re-check to prevent two clients at the same time triggering this + image = image_get( name, revision ); + if ( image != NULL ) { + pthread_mutex_unlock( &remoteCloneLock ); + return image; + } + // Reaching this point means we should contact an authority server + serialized_buffer_t serialized; + // Mark as recently checked + remoteCloneCacheIndex = (remoteCloneCacheIndex + 1) % CACHELEN; + remoteCloneCache[remoteCloneCacheIndex].deadline = now + SERVER_REMOTE_IMAGE_CHECK_CACHETIME; + snprintf( remoteCloneCache[remoteCloneCacheIndex].name, NAMELEN, "%s", cmpname ); + remoteCloneCache[remoteCloneCacheIndex].rid = revision; + for (;;) { + dnbd3_host_t server; // TODO: Get server :-) + int sock = sock_connect( &server, 500, 1500 ); + if ( sock < 0 ) continue; + if ( !dnbd3_select_image( sock, name, revision, FLAGS8_SERVER ) ) goto server_fail; + uint16_t remoteVersion, remoteRid; + uint64_t remoteImageSize; + char *remoteName; + if ( !dnbd3_select_image_reply( &serialized, sock, &remoteVersion, &remoteName, &remoteRid, &remoteImageSize ) ) goto server_fail; + if ( remoteVersion < MIN_SUPPORTED_SERVER ) goto server_fail; + if ( revision != 0 && remoteVersion != revision ) goto server_fail; + if ( remoteImageSize < DNBD3_BLOCK_SIZE || remoteName == NULL || strcmp( name, remoteName ) != 0 ) goto server_fail; + image_clone( sock, &server, name, remoteRid, remoteImageSize ); + break; + server_fail: ; + close( sock ); + } + pthread_mutex_unlock( &remoteCloneLock ); + return image_get( name, revision ); +} + +static int image_clone(int sock, dnbd3_host_t *server, char *name, uint16_t revision, uint64_t imageSize) +{ + // Allocate disk space and create cache map + if ( !image_create( name, revision, imageSize ) ) return FALSE; + // CRC32 + const size_t len = strlen( _basePath ) + strlen( name ) + 20; + char crcFile[len]; + snprintf( crcFile, len, "%s/%s.r%d.crc", _basePath, name, (int)revision ); + if ( !file_exists( crcFile ) ) { + // Get crc32list from remote server + size_t crc32len = IMGSIZE_TO_HASHBLOCKS(imageSize) * sizeof(uint32_t); + uint8_t *crc32 = malloc( crc32len ); + if ( !dnbd3_get_crc32( sock, crc32, &crc32len ) ) { + free( crc32 ); + return FALSE; + } + if ( crc32len > 0 ) { + int fd = open( crcFile, O_WRONLY | O_CREAT, 0640 ); + write( fd, crc32, crc32len ); + close( fd ); + } + free( crc32 ); + } + // HACK: Chop of ".crc" to get the image file name + crcFile[strlen( crcFile ) - 4] = '\0'; + return image_try_load( _basePath, crcFile ); +} + /** * Generate the crc32 block list file for the given file. * This function wants a plain file name instead of a dnbd3_image_t, @@ -741,7 +857,7 @@ int image_generateCrcFile(char *image) printf( "Could not open %s.\n", image ); return FALSE; } - // force size to be multiple of DNBD3_BLOCK_SIZE +// force size to be multiple of DNBD3_BLOCK_SIZE int64_t fileLen = lseek( fdImage, 0, SEEK_END ); if ( fileLen <= 0 ) { printf( "Error seeking to end, or file is empty.\n" ); @@ -778,7 +894,7 @@ int image_generateCrcFile(char *image) close( fdImage ); return FALSE; } - // CRC of all CRCs goes first. Don't know it yet, write 4 bytes dummy data. +// CRC of all CRCs goes first. Don't know it yet, write 4 bytes dummy data. if ( write( fdCrc, crcFile, 4 ) != 4 ) { printf( "Write error\n" ); close( fdImage ); @@ -829,7 +945,7 @@ int image_generateCrcFile(char *image) close( fdImage ); printf( "done!\nGenerating master-crc..." ); fflush( stdout ); - // File is written - read again to calc master crc +// File is written - read again to calc master crc if ( lseek( fdCrc, 4, SEEK_SET ) != 4 ) { printf( "Could not seek to beginning of crc list in file\n" ); close( fdCrc ); diff --git a/src/server/protocol.h b/src/server/protocol.h new file mode 100644 index 0000000..96856f8 --- /dev/null +++ b/src/server/protocol.h @@ -0,0 +1,102 @@ +#ifndef _PROTOCOL_H_ +#define _PROTOCOL_H_ + +#include "../types.h" +#include "../serialize.h" + +#define FLAGS8_SERVER 1 + +static inline int dnbd3_get_reply(int sock, dnbd3_reply_t *reply) +{ + if ( recv( sock, reply, sizeof(*reply), MSG_WAITALL ) != sizeof(*reply) ) { + return FALSE; + } + fixup_reply( *reply ); + if ( reply->magic != dnbd3_packet_magic ) return FALSE; + return TRUE; +} + +static inline int dnbd3_select_image(int sock, char *lower_name, uint16_t rid, uint8_t flags8) +{ + serialized_buffer_t serialized; + dnbd3_request_t request; + struct iovec iov[2]; + serializer_reset_write( &serialized ); + serializer_put_uint16( &serialized, PROTOCOL_VERSION ); + serializer_put_string( &serialized, lower_name ); + serializer_put_uint16( &serialized, rid ); + serializer_put_uint8( &serialized, flags8 ); + const ssize_t len = serializer_get_written_length( &serialized ); + request.magic = dnbd3_packet_magic; + request.cmd = CMD_SELECT_IMAGE; + request.size = len; +#ifdef _DEBUG + request.handle = 0; + request.offset = 0; +#endif + fixup_request( request ); + iov[0].iov_base = &request; + iov[0].iov_len = sizeof(request); + iov[1].iov_base = &serialized; + iov[1].iov_len = len; + return writev( sock, iov, 2 ) == len + sizeof(request); +} + +static inline int dnbd3_get_block(int sock, uint64_t offset, uint32_t size) +{ + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + request.handle = 0; + request.cmd = CMD_GET_BLOCK; + request.offset = offset; + request.size = size; + fixup_request( request ); + return send( sock, &request, sizeof(request), 0 ) == sizeof(request); +} + +static inline int dnbd3_get_crc32(int sock, uint8_t *buffer, size_t *bufferLen) +{ + dnbd3_request_t request; + dnbd3_reply_t reply; + request.magic = dnbd3_packet_magic; + request.handle = 0; + request.cmd = CMD_GET_CRC32; + request.offset = 0; + request.size = 0; + fixup_request( request ); + if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) return FALSE; + if ( !dnbd3_get_reply( sock, &reply ) ) return FALSE; + if ( reply.cmd != CMD_GET_CRC32 || reply.size > *bufferLen ) return FALSE; + *bufferLen = reply.size; + if ( reply.size == 0 ) return TRUE; + return recv( sock, buffer, reply.size, 0 ) == (int)reply.size; +} + +/** + * Pass a full serialized_buffer_t and a socket fd. Parsed data will be returned in further arguments. + * Note that all strings will point into the passed buffer, so there's no need to free them. + */ +static inline int dnbd3_select_image_reply(serialized_buffer_t *buffer, int sock, uint16_t *protocol_version, char **name, uint16_t *rid, + uint64_t *imageSize) +{ + dnbd3_reply_t reply; + if ( !dnbd3_get_reply( sock, &reply ) ) { + return FALSE; + } + if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD ) { + return FALSE; + } +// receive reply payload + if ( recv( sock, buffer, reply.size, MSG_WAITALL ) != reply.size ) { + return FALSE; + } +// handle/check reply payload + serializer_reset_read( buffer, reply.size ); + *protocol_version = serializer_get_uint16( buffer ); + *name = serializer_get_string( buffer ); + *rid = serializer_get_uint16( buffer ); + *imageSize = serializer_get_uint64( buffer ); + return TRUE; +} + +#endif -- cgit v1.2.3-55-g7522