From a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 12 Aug 2013 18:04:39 +0200 Subject: [SERVER] Improve proxy mode, implement integrity check in proxy mode --- LOCKS | 1 + src/config.h | 2 +- src/server/altservers.c | 17 +++- src/server/globals.h | 1 + src/server/helper.c | 5 +- src/server/image.c | 96 ++++++++++++++---- src/server/image.h | 4 + src/server/integrity.c | 151 ++++++++++++++++++++++++++++ src/server/integrity.h | 12 +++ src/server/net.c | 12 ++- src/server/server.c | 14 ++- src/server/uplink.c | 262 ++++++++++++++++++++++++++++++------------------ src/server/uplink.h | 3 +- 13 files changed, 449 insertions(+), 131 deletions(-) create mode 100644 src/server/integrity.c create mode 100644 src/server/integrity.h diff --git a/LOCKS b/LOCKS index 1f7f22b..9662d74 100644 --- a/LOCKS +++ b/LOCKS @@ -12,6 +12,7 @@ This is a list of used locks, in the order they have to be aquired if you must hold multiple locks: _clients_lock _clients[].lock +integrityQueueLock _images_lock _images[].lock uplink.queueLock diff --git a/src/config.h b/src/config.h index 6ad99dd..00b92e9 100644 --- a/src/config.h +++ b/src/config.h @@ -29,7 +29,7 @@ #define SERVER_MAX_IMAGES 5000 #define SERVER_MAX_ALTS 1000 #define SERVER_MAX_UPLINK_QUEUE 1000 -#define SERVER_MAX_PENDING_ALT_CHECKS 100 +#define SERVER_MAX_PENDING_ALT_CHECKS 50 // +++++ Other magic constants #define SERVER_RTT_PROBES 5 diff --git a/src/server/altservers.c b/src/server/altservers.c index 89c724f..f255b58 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -107,9 +107,16 @@ int altservers_add(dnbd3_host_t *host, const char *comment) */ void altserver_find_uplink(dnbd3_connection_t *uplink) { - if ( uplink->rttTestResult == RTT_INPROGRESS ) return; + int i; spin_lock( &pendingLock ); - for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( uplink->rttTestResult == RTT_INPROGRESS ) { + for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( pending[i] != uplink ) continue; + spin_unlock( &pendingLock ); + return; + } + } + for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { if ( pending[i] != NULL ) continue; pending[i] = uplink; uplink->rttTestResult = RTT_INPROGRESS; @@ -190,8 +197,8 @@ int altservers_get(dnbd3_host_t *output, int size) { int count = 0, i, j, num; spin_lock( &_alts_lock ); - if ( size <= _num_alts ) { - for (i = 0; i < size; ++i) { + if ( size >= _num_alts ) { + for (i = 0; i < _num_alts; ++i) { if ( _alt_servers[i].host.type == 0 ) continue; output[count++] = _alt_servers[i].host; } @@ -426,7 +433,7 @@ static void *altserver_main(void *data) // Measurement done - everything fine so far const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs const unsigned int avg = altservers_update_rtt( &servers[itAlt], rtt ); - if ( is_same_server( &servers[itAlt], &uplink->currentServer ) ) { + if ( uplink->fd != -1 && is_same_server( &servers[itAlt], &uplink->currentServer ) ) { currentRtt = avg; close( sock ); } else if ( avg < bestRtt ) { diff --git a/src/server/globals.h b/src/server/globals.h index baa13f3..2159021 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -31,6 +31,7 @@ typedef struct volatile uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) dnbd3_client_t * volatile client; // Client to send reply to volatile int status; // status of this entry: ULR_* + time_t entered; // When this request entered the queue (for debugging) } dnbd3_queued_request_t; #define RTT_IDLE 0 // Not in progress diff --git a/src/server/helper.c b/src/server/helper.c index 65239ea..24ee0fb 100644 --- a/src/server/helper.c +++ b/src/server/helper.c @@ -23,6 +23,7 @@ char parse_address(char *string, dnbd3_host_t *host) struct in_addr v4; struct in6_addr v6; + memset( host, 0, sizeof(*host) ); // Try IPv4 without port if ( 1 == inet_pton( AF_INET, string, &v4 ) ) { host->type = AF_INET; @@ -83,11 +84,11 @@ char host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen) if ( targetlen < 10 ) return FALSE; if ( host->type == AF_INET6 ) { *target++ = '['; - inet_ntop( AF_INET6, host->addr, target, targetlen - 9 ); + inet_ntop( AF_INET6, host->addr, target, targetlen - 10 ); target += strlen( target ); *target++ = ']'; } else if ( host->type == AF_INET ) { - inet_ntop( AF_INET, host->addr, target, targetlen - 7 ); + inet_ntop( AF_INET, host->addr, target, targetlen - 8 ); target += strlen( target ); } else { snprintf( target, targetlen, "", (int)host->type ); diff --git a/src/server/image.c b/src/server/image.c index 1393d5b..9be9827 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -3,6 +3,7 @@ #include "memlog.h" #include "uplink.h" #include "locks.h" +#include "integrity.h" #include #include @@ -23,9 +24,9 @@ pthread_spinlock_t _images_lock; // ########################################## +static int image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); static int image_load_all_internal(char *base, char *path); static int image_try_load(char *base, char *path); -static int image_check_blocks_crc32(int fd, uint32_t *crc32list, int *blocks); static int64_t image_pad(const char *path, const int64_t currentSize); // ########################################## @@ -66,6 +67,7 @@ int image_isComplete(dnbd3_image_t *image) } /** * Update cache-map of given image for the given byte range + * start (inclusive) - end (exclusive) * Locks on: images[].lock */ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const int set) @@ -76,12 +78,12 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co end &= ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); start = (uint64_t)(start + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); int dirty = FALSE; - int pos = start; + uint64_t pos = start; spin_lock( &image->lock ); if ( image->cache_map == NULL ) { // Image seems already complete - printf( "[DEBUG] image_update_cachemap with no cache_map: %s", image->path ); spin_unlock( &image->lock ); + printf( "[DEBUG] image_updateCachemap with no cache_map: %s", image->path ); return; } while ( pos < end ) { @@ -96,8 +98,7 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co } pos += DNBD3_BLOCK_SIZE; } - spin_unlock( &image->lock ); - if ( set && dirty ) { + if ( dirty && image->crc32 != NULL ) { // If dirty is set, at least one of the blocks was not cached before, so queue all hash blocks // for checking, even though this might lead to checking some hash block again, if it was // already complete and the block range spanned at least two hash blocks. @@ -106,12 +107,17 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co end = (end + HASH_BLOCK_SIZE - 1) & ~(uint64_t)(HASH_BLOCK_SIZE - 1); pos = start; while ( pos < end ) { + if ( image->cache_map == NULL ) break; const int block = pos / HASH_BLOCK_SIZE; - // TODO: Actually queue the hash block for checking as soon as there's a worker for that - (void)block; + if ( image_isHashBlockComplete( image->cache_map, block, image->filesize ) ) { + spin_unlock( &image->lock ); + integrity_check( image, block ); + spin_lock( &image->lock ); + } pos += HASH_BLOCK_SIZE; } } + spin_unlock( &image->lock ); } /** @@ -163,15 +169,17 @@ int image_saveCacheMap(dnbd3_image_t *image) spin_lock( &image->lock ); image->users--; spin_unlock( &image->lock ); + free( map ); return FALSE; } - write( fd, map, ((image->filesize + (1 << 15) - 1) >> 15) * sizeof(char) ); + write( fd, map, size ); if ( image->cacheFd != -1 ) { fsync( image->cacheFd ); } fsync( fd ); close( fd ); + free( map ); spin_lock( &image->lock ); image->users--; @@ -226,6 +234,29 @@ dnbd3_image_t* image_get(char *name, uint16_t revision) return candidate; // Success :-) } +/** + * Lock the image by increasing its users count + * Returns the image on success, NULL if it is not found in the image list + * Every call to image_lock() needs to be followed by a call to image_release() at some point. + * Locks on: _images_lock, _images[].lock + */ +dnbd3_image_t* image_lock(dnbd3_image_t *image) +{ + int i; + spin_lock( &_images_lock ); + for (i = 0; i < _num_images; ++i) { + if ( _images[i] == image ) { + spin_lock( &image->lock ); + spin_unlock( &_images_lock ); + image->users++; + spin_unlock( &image->lock ); + return image; + } + } + spin_unlock( &_images_lock ); + return NULL ; +} + /** * Release given image. This will decrease the reference counter of the image. * If the usage counter reaches 0 and the image is not in the images array @@ -337,6 +368,27 @@ int image_loadAll(char *path) return image_load_all_internal( path, path ); } +static int image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize) +{ + if ( cacheMap == NULL ) return TRUE; + const uint32_t end = (block + 1) * HASH_BLOCK_SIZE; + if ( end <= fileSize ) { + for (uint64_t mapPos = block * HASH_BLOCK_SIZE; mapPos < end; mapPos += (DNBD3_BLOCK_SIZE * 8)) { + if ( cacheMap[mapPos / (DNBD3_BLOCK_SIZE * 8)] != 0xff ) { + return FALSE; + } + } + } else { + for (uint64_t mapPos = block * HASH_BLOCK_SIZE; mapPos < fileSize; mapPos += DNBD3_BLOCK_SIZE ) { + const int map_y = mapPos >> 15; + const int map_x = (mapPos >> 12) & 7; // mod 8 + const int mask = 1 << map_x; + if ( (cacheMap[map_y] & mask) == 0 ) return FALSE; + } + } + return TRUE; +} + /** * Load all images in the given path recursively, * consider bash the base path that is to be cut off @@ -512,12 +564,15 @@ static int image_try_load(char *base, char *path) // This checks the first block and two random blocks (which might accidentally be the same) // for corruption via the known crc32 list. This is very sloppy and is merely supposed // to detect accidental corruption due to broken dnbd3-proxy functionality or file system - // corruption. If the image size is not a multiple of the hash block size, do not take the - // last block into consideration. It would always fail. - int blcks = hashBlocks; - if ( fileSize % HASH_BLOCK_SIZE != 0 ) blcks--; - int blocks[] = { 0, rand() % blcks, rand() % blcks, -1 }; - if ( !image_check_blocks_crc32( fdImage, crc32list, blocks ) ) { + // corruption. + int blocks[4], index = 0, block; // = { 0, rand() % blcks, rand() % blcks, -1 }; + if ( image_isHashBlockComplete( cache_map, 0, fileSize ) ) blocks[index++] = 0; + block = rand() % hashBlocks; + if ( image_isHashBlockComplete( cache_map, block, fileSize ) ) blocks[index++] = block; + block = rand() % hashBlocks; + if ( image_isHashBlockComplete( cache_map, block, fileSize ) ) blocks[index++] = block; + blocks[index] = -1; + if ( !image_checkBlocksCrc32( fdImage, crc32list, blocks, fileSize ) ) { memlogf( "[ERROR] Quick integrity check for '%s' failed.", path ); goto load_error; } @@ -809,19 +864,21 @@ int image_generateCrcFile(char *image) /** * Check the CRC-32 of the given blocks. The array blocks is of variable length. * !! pass -1 as the last block so the function knows when to stop !! + * Returns TRUE or FALSE */ -static int image_check_blocks_crc32(int fd, uint32_t *crc32list, int *blocks) +int image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, const uint64_t fileSize) { char buffer[40000]; while ( *blocks != -1 ) { - if ( lseek( fd, *blocks * HASH_BLOCK_SIZE, SEEK_SET ) != *blocks * HASH_BLOCK_SIZE ) { + if ( lseek( fd, (int64_t)*blocks * HASH_BLOCK_SIZE, SEEK_SET ) != (int64_t)*blocks * HASH_BLOCK_SIZE ) { memlogf( "Seek error" ); return FALSE; } uint32_t crc = crc32( 0L, Z_NULL, 0 ); int bytes = 0; - while ( bytes < HASH_BLOCK_SIZE ) { - const int n = MIN(sizeof(buffer), HASH_BLOCK_SIZE - bytes); + const int bytesToGo = MIN(HASH_BLOCK_SIZE, fileSize - ((int64_t)*blocks * HASH_BLOCK_SIZE)); + while ( bytes < bytesToGo ) { + const int n = MIN(sizeof(buffer), bytesToGo - bytes); const int r = read( fd, buffer, n ); if ( r <= 0 ) { memlogf( "Read error" ); @@ -848,9 +905,6 @@ static int64_t image_pad(const char *path, const int64_t currentSize) int success = FALSE; if ( tmpFd < 0 ) { memlogf( "[WARNING] Can't open image for writing, can't fix %s", path ); - } else if ( lseek( tmpFd, 0, SEEK_CUR ) != currentSize ) { - const int64_t cur = lseek( tmpFd, 0, SEEK_CUR ); - memlogf( "[WARNING] File size of %s changed when told to extend. (is: %" PRIi64 ", should: %" PRIi64 ")", path, cur, currentSize ); } else if ( lseek( tmpFd, currentSize, SEEK_SET ) != currentSize ) { memlogf( "[WARNING] lseek() failed, can't fix %s", path ); } else if ( write( tmpFd, buffer, missing ) != missing ) { diff --git a/src/server/image.h b/src/server/image.h index e1eddd3..c59ae9e 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -18,8 +18,12 @@ int image_saveCacheMap(dnbd3_image_t *image); dnbd3_image_t* image_get(char *name, uint16_t revision); +dnbd3_image_t* image_lock(dnbd3_image_t *image); + void image_release(dnbd3_image_t *image); +int image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, const uint64_t fileSize); + void image_killUplinks(); dnbd3_image_t* image_free(dnbd3_image_t *image); diff --git a/src/server/integrity.c b/src/server/integrity.c new file mode 100644 index 0000000..875b62d --- /dev/null +++ b/src/server/integrity.c @@ -0,0 +1,151 @@ +#include "integrity.h" + +#include "locks.h" +#include "image.h" +#include "globals.h" +#include "memlog.h" + +#include +#include +#include +#include +#include +#include +#include + +#define CHECK_QUEUE_SIZE 100 + +typedef struct +{ + dnbd3_image_t * volatile image; + int volatile block; +} queue_entry; + +static pthread_t thread; +static queue_entry checkQueue[CHECK_QUEUE_SIZE]; +static pthread_mutex_t integrityQueueLock; +static pthread_cond_t queueSignal; +static int queueLen = -1; +static int bRunning = FALSE; + +static void* integrity_main(void *data); + +/** + * Initialize the integrity check thread + */ +void integrity_init() +{ + assert( queueLen == -1 ); + pthread_mutex_init( &integrityQueueLock, NULL ); + pthread_cond_init( &queueSignal, NULL ); + if ( 0 != pthread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) { + memlogf( "[WARNING] Could not start integrity check thread. Corrupted images will not be detected." ); + return; + } + queueLen = 0; +} + +void integrity_shutdown() +{ + assert( queueLen != -1 ); + printf( "[DEBUG] Shutting down integrity checker...\n" ); + pthread_mutex_lock( &integrityQueueLock ); + pthread_cond_signal( &queueSignal ); + pthread_mutex_unlock( &integrityQueueLock ); + pthread_join( thread, NULL ); + while ( bRunning ) + usleep( 10000 ); + pthread_mutex_destroy( &integrityQueueLock ); + pthread_cond_destroy( &queueSignal ); + printf( "[DEBUG] Integrity checker exited normally.\n" ); +} + +/** + * Schedule an integrity check on the given image for the given hash block. + * It is not checked whether the block is completely cached locally, so + * make sure it is before calling, otherwise it will result in falsely + * detected corruption. + */ +void integrity_check(dnbd3_image_t *image, int block) +{ + printf( "Queueing %d of %s\n", block, image->lower_name ); + int i, freeSlot = -1; + pthread_mutex_lock( &integrityQueueLock ); + for (i = 0; i < queueLen; ++i) { + if ( freeSlot == -1 && checkQueue[i].image == NULL ) { + freeSlot = i; + } else if ( checkQueue[i].image == image && checkQueue[i].block == block ) { + pthread_mutex_unlock( &integrityQueueLock ); + return; + } + } + if ( freeSlot == -1 ) { + if ( queueLen >= CHECK_QUEUE_SIZE ) { + pthread_mutex_unlock( &integrityQueueLock ); + printf( "[DEBUG] Check queue full, discarding check request...\n" ); + return; + } + freeSlot = queueLen++; + } + printf( "In slot %d\n", freeSlot ); + checkQueue[freeSlot].image = image; + checkQueue[freeSlot].block = block; + pthread_cond_signal( &queueSignal ); + pthread_mutex_unlock( &integrityQueueLock ); +} + +static void* integrity_main(void *data) +{ + bRunning = TRUE; + int i; + uint8_t *buffer = NULL; + size_t bufferSize = 0; + pthread_mutex_lock( &integrityQueueLock ); + while ( !_shutdown ) { + for (i = queueLen - 1; i >= 0; --i) { + if ( checkQueue[i].image == NULL ) continue; + dnbd3_image_t * const image = image_lock( checkQueue[i].image ); + checkQueue[i].image = NULL; + if ( image == NULL ) continue; + // We have the image. Call image_release() some time + if ( i + 1 == queueLen ) queueLen--; + spin_lock( &image->lock ); + if ( image->crc32 != NULL && image->filesize != 0 ) { + int const blocks[2] = { checkQueue[i].block, -1 }; + pthread_mutex_unlock( &integrityQueueLock ); + const uint64_t fileSize = image->filesize; + const size_t required = IMGSIZE_TO_HASHBLOCKS(image->filesize) * sizeof(uint32_t); + if ( required > bufferSize ) { + bufferSize = required; + if ( buffer != NULL ) free( buffer ); + buffer = malloc( bufferSize ); + } + memcpy( buffer, image->crc32, required ); + spin_unlock( &image->lock ); + int fd = open( image->path, O_RDONLY ); + if ( fd >= 0 ) { + if ( image_checkBlocksCrc32( fd, (uint32_t*)buffer, blocks, fileSize ) ) { + printf( "[DEBUG] CRC check of block %d for %s succeeded :-)\n", blocks[0], image->lower_name ); + } else { + memlogf( "[WARNING] Hash check for block %d of %s failed!", blocks[0], image->lower_name ); + image_updateCachemap( image, blocks[0] * HASH_BLOCK_SIZE, (blocks[0] + 1) * HASH_BLOCK_SIZE, FALSE ); + } + close( fd ); + } + pthread_mutex_lock( &integrityQueueLock ); + } else { + spin_unlock( &image->lock ); + } + // Release :-) + image_release( image ); + } + if ( queueLen == 0 ) { + pthread_cond_wait( &queueSignal, &integrityQueueLock ); + printf( "Queue woke up. %d jobs pending...\n", queueLen ); + } + } + pthread_mutex_unlock( &integrityQueueLock ); + if ( buffer != NULL ) free( buffer ); + bRunning = FALSE; + return NULL ; +} diff --git a/src/server/integrity.h b/src/server/integrity.h new file mode 100644 index 0000000..c3c2b44 --- /dev/null +++ b/src/server/integrity.h @@ -0,0 +1,12 @@ +#ifndef _INTEGRITY_H_ +#define _INTEGRITY_H_ + +#include "globals.h" + +void integrity_init(); + +void integrity_shutdown(); + +void integrity_check(dnbd3_image_t *image, int block); + +#endif /* INTEGRITY_H_ */ diff --git a/src/server/net.c b/src/server/net.c index 8309672..4af577e 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -28,6 +28,8 @@ #include #include #include +#include +#include #include "sockhelper.h" #include "helper.h" @@ -47,7 +49,8 @@ static inline char recv_request_header(int sock, dnbd3_request_t *request) // Read request header from socket if ( (ret = recv( sock, request, sizeof(*request), MSG_WAITALL )) != sizeof(*request) ) { if ( ret == 0 ) return FALSE; - printf( "[DEBUG] Error receiving request: Could not read message header (%d/%d)\n", ret, (int)sizeof(*request) ); + const int err = errno; + printf( "[DEBUG] Error receiving request: Could not read message header (%d/%d, e=%d)\n", ret, (int)sizeof(*request), err ); return FALSE; } // Make sure all bytes are in the right order (endianness) @@ -126,6 +129,7 @@ void *net_client_handler(void *dnbd3_client) char *image_name; uint16_t rid, client_version; uint64_t start, end; + char buffer[100]; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -153,7 +157,7 @@ void *net_client_handler(void *dnbd3_client) } else { image = image_get( image_name, rid ); if ( image == NULL ) { - printf( "[DEBUG] Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + //printf( "[DEBUG] Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); } else if ( !image->working ) { printf( "[DEBUG] Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid ); } else { @@ -185,6 +189,9 @@ void *net_client_handler(void *dnbd3_client) } else if ( !client->is_server && _clientPenalty != 0 ) { usleep( _clientPenalty ); } + if ( host_to_string( &client->host, buffer, sizeof buffer ) ) { + printf( "[DEBUG] Client %s gets %s\n", buffer, image_name ); + } // client handling mainloop while ( recv_request_header( client->sock, &request ) ) { switch ( request.cmd ) { @@ -214,6 +221,7 @@ void *net_client_handler(void *dnbd3_client) send_reply( client->sock, &reply, NULL ); break; } + //printf( "Request - size: %" PRIu32 ", offset: %" PRIu64 "\n", request.size, request.offset ); if ( request.size != 0 && image->cache_map != NULL ) { // This is a proxyed image, check if we need to relay the request... diff --git a/src/server/server.c b/src/server/server.c index 7571cbc..d2238cc 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ #include "altservers.h" #include "memlog.h" #include "globals.h" +#include "integrity.h" #define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on static int sockets[MAX_SERVER_SOCKETS], socket_count = 0; @@ -117,6 +119,9 @@ void dnbd3_cleanup() // Terminate all uplinks image_killUplinks(); + // Terminate integrity checker + integrity_shutdown(); + // Clean up clients spin_lock( &_clients_lock ); for (i = 0; i < _num_clients; ++i) { @@ -258,6 +263,7 @@ int main(int argc, char *argv[]) spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE ); spin_init( &_images_lock, PTHREAD_PROCESS_PRIVATE ); altserver_init(); + integrity_init(); memlogf( "DNBD3 server starting.... Machine type: " ENDIAN_MODE ); if ( altservers_load() < 0 ) { @@ -311,7 +317,8 @@ int main(int argc, char *argv[]) len = sizeof(client); fd = accept_any( sockets, socket_count, &client, &len ); if ( fd < 0 ) { - memlogf( "[ERROR] Client accept failure" ); + const int err = errno; + memlogf( "[ERROR] Client accept failure (err=%d)", err ); usleep( 10000 ); // 10ms continue; } @@ -404,7 +411,10 @@ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) spin_lock( &client->lock ); if ( client->sock >= 0 ) close( client->sock ); client->sock = -1; - if ( client->image != NULL ) image_release( client->image ); + if ( client->image != NULL ) { + if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); + image_release( client->image ); + } client->image = NULL; spin_unlock( &client->lock ); spin_destroy( &client->lock ); diff --git a/src/server/uplink.c b/src/server/uplink.c index 4dbe75a..6d05f94 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -19,6 +20,7 @@ static void* uplink_mainloop(void *data); static void uplink_send_requests(dnbd3_connection_t *link, int newOnly); static void uplink_handle_receive(dnbd3_connection_t *link); +static int uplink_send_keepalive(const int fd); // ############ uplink connection handling @@ -78,6 +80,24 @@ void uplink_shutdown(dnbd3_image_t *image) free( uplink ); } +/** + * Remove given client from uplink request queue + */ +void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) +{ + spin_lock( &uplink->queueLock ); + for (int i = 0; i < uplink->queueLen; ++i) { + if ( uplink->queue[i].client == client ) { + // Lock on the send mutex as the uplink thread might just be writing to the client + pthread_mutex_lock( &client->sendMutex ); + uplink->queue[i].client = NULL; + uplink->queue[i].status = ULR_FREE; + pthread_mutex_unlock( &client->sendMutex ); + } + } + spin_unlock( &uplink->queueLock ); +} + /** * Request a chunk of data through an uplink server */ @@ -118,10 +138,14 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint uplink->queue[freeSlot].handle = handle; uplink->queue[freeSlot].client = client; uplink->queue[freeSlot].status = (foundExisting ? ULR_PENDING : ULR_NEW); +#ifdef _DEBUG + uplink->queue[freeSlot].entered = time( NULL ); +#endif spin_unlock( &uplink->queueLock ); if ( !foundExisting ) { - write( uplink->signal, "", 1 ); + static uint64_t counter = 1; + write( uplink->signal, &counter, sizeof(uint64_t) ); } return TRUE; } @@ -135,7 +159,7 @@ static void* uplink_mainloop(void *data) const int MAXEVENTS = 3; struct epoll_event ev, events[MAXEVENTS]; dnbd3_connection_t *link = (dnbd3_connection_t*)data; - int fdEpoll = -1, fdPipe = -1; + int fdEpoll = -1; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; int bFree = FALSE; @@ -151,27 +175,24 @@ static void* uplink_mainloop(void *data) goto cleanup; } { - int pipes[2]; - if ( pipe( pipes ) < 0 ) { + link->signal = eventfd( 0, EFD_NONBLOCK ); + if ( link->signal < 0 ) { memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); goto cleanup; } - sock_set_nonblock( pipes[0] ); - sock_set_nonblock( pipes[1] ); - fdPipe = pipes[0]; - link->signal = pipes[1]; memset( &ev, 0, sizeof(ev) ); ev.events = EPOLLIN; - ev.data.fd = fdPipe; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, fdPipe, &ev ) < 0 ) { - memlogf( "[WARNING] adding signal-pipe to epoll set failed" ); + ev.data.fd = link->signal; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) { + memlogf( "[WARNING] adding eventfd to epoll set failed" ); goto cleanup; } } while ( !_shutdown && !link->shutdown ) { // epoll() if ( link->fd == -1 ) { - waitTime = 1500; + waitTime = 2000; + nextAltCheck = 0; } else { waitTime = (time( NULL ) - nextAltCheck) * 1000; if ( waitTime < 1500 ) waitTime = 1500; @@ -185,7 +206,7 @@ static void* uplink_mainloop(void *data) } for (i = 0; i < numSocks; ++i) { // Check all events if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { - if ( events[i].data.fd == fdPipe ) { + if ( events[i].data.fd == link->signal ) { memlogf( "[WARNING] epoll error on signal-pipe!" ); goto cleanup; } @@ -195,23 +216,23 @@ static void* uplink_mainloop(void *data) printf( "[DEBUG] Uplink gone away, panic!\n" ); nextAltCheck = 0; } else { - printf( "[DEBUG] Error on unknown FD in uplink epoll" ); + printf( "[DEBUG] Error on unknown FD in uplink epoll\n" ); close( events[i].data.fd ); } continue; } // No error, handle normally - if ( events[i].data.fd == fdPipe ) { + if ( events[i].data.fd == link->signal ) { int ret; do { - ret = read( fdPipe, buffer, sizeof buffer ); + ret = read( link->signal, buffer, sizeof buffer ); } while ( ret > 0 ); // Throw data away, this is just used for waking this thread up if ( ret == 0 ) { - memlogf( "[WARNING] Signal pipe of uplink for %s closed! Things will break!", link->image->lower_name ); + memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name ); } ret = errno; if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { - memlogf( "[WARNING] Errno %d on pipe-read on uplink for %s! Things will break!", ret, link->image->lower_name ); + memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name ); } if ( link->fd != -1 ) { uplink_send_requests( link, TRUE ); @@ -263,8 +284,9 @@ static void* uplink_mainloop(void *data) // It seems it's time for a check if ( image_isComplete( link->image ) ) { // Quit work if image is complete + memlogf( "[INFO] Replication of %s complete.", link->image->lower_name ); if ( spin_trylock( &link->image->lock ) == 0 ) { - image_markComplete(link->image); + image_markComplete( link->image ); link->image->uplink = NULL; link->shutdown = TRUE; free( link->recvBuffer ); @@ -280,11 +302,34 @@ static void* uplink_mainloop(void *data) } else { // Not complete- do measurement altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) + // Also send a keepalive packet to the currently connected server + if ( link->fd != -1 ) { + if ( !uplink_send_keepalive( link->fd ) ) { + printf( "[DEBUG] Error sending keep-alive to uplink\n" ); + const int fd = link->fd; + link->fd = -1; + close( fd ); + } + } } altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); nextAltCheck = now + altCheckInterval; } } +#ifdef _DEBUG + if ( link->fd != -1 ) { + time_t deadline = time( NULL ) - 10; + spin_lock( &link->queueLock ); + for (i = 0; i < link->queueLen; ++i) { + if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) { + printf( "[DEBUG WARNING] Starving request detected:\n" + "%s\n(from %" PRIu64 " to %" PRIu64 "\n", link->queue[i].client->image->lower_name, link->queue[i].from, + link->queue[i].to ); + } + } + spin_unlock( &link->queueLock ); + } +#endif } cleanup: ; const int fd = link->fd; @@ -293,7 +338,6 @@ static void* uplink_mainloop(void *data) link->signal = -1; if ( fd != -1 ) close( fd ); if ( signal != -1 ) close( signal ); - if ( fdPipe != -1 ) close( fdPipe ); if ( fdEpoll != -1 ) close( fdEpoll ); // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) @@ -324,7 +368,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection // is reestablished. - printf( "[DEBUG] Error sending request to uplink server!" ); + printf( "[DEBUG] Error sending request to uplink server!\n" ); } spin_lock( &link->queueLock ); } @@ -337,90 +381,114 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) */ static void uplink_handle_receive(dnbd3_connection_t *link) { - dnbd3_reply_t reply; + dnbd3_reply_t inReply, outReply; int ret, i; - ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL ); - if ( ret != sizeof reply ) { - memlogf( "[INFO] Lost connection to uplink server for %s", link->image->path ); - goto error_cleanup; - } - fixup_reply( reply ); - if ( reply.magic != dnbd3_packet_magic ) { - memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); - goto error_cleanup; - } - if ( reply.size > 9000000 ) { - memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); - goto error_cleanup; - } - if ( link->recvBufferLen < reply.size ) { - if ( link->recvBuffer != NULL ) free( link->recvBuffer ); - link->recvBufferLen = MIN(9000000, reply.size + 8192); - link->recvBuffer = malloc( link->recvBufferLen ); - } - uint32_t done = 0; - while ( done < reply.size ) { - ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 ); - if ( ret <= 0 ) { - memlogf( "[INFO] Lost connection to uplink server of", link->image->path ); + for (;;) { + ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT ); + if ( ret < 0 ) { + const int err = errno; + if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases goto error_cleanup; } - done += ret; - } - // Payload read completely - // Bail out if we're not interested - if ( reply.cmd != CMD_GET_BLOCK ) return; - // Is a legit block reply - const uint64_t start = reply.handle; - const uint64_t end = reply.handle + reply.size; - // 1) Write to cache file - assert( link->image->cacheFd != -1 ); - if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { - memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); - } else { - ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size ); - if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE ); - } - // 2) Figure out which clients are interested in it - struct iovec iov[2]; - spin_lock( &link->queueLock ); - for (i = 0; i < link->queueLen; ++i) { - dnbd3_queued_request_t * const req = &link->queue[i]; - assert( req->status != ULR_PROCESSING ); - if ( req->status != ULR_PENDING ) continue; - if ( req->from >= start && req->to <= end ) { // Match :-) - req->status = ULR_PROCESSING; + if ( ret == 0 ) { + memlogf( "[INFO] Uplink: Remote host hung up (%s)", link->image->path ); + goto error_cleanup; } - } - // 3) Send to interested clients - reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here - for (i = link->queueLen - 1; i >= 0; --i) { - dnbd3_queued_request_t * const req = &link->queue[i]; - if ( req->status != ULR_PROCESSING ) continue; - assert( req->from >= start && req->to <= end ); - reply.cmd = CMD_GET_BLOCK; - reply.handle = req->handle; - reply.size = req->to - req->from; - iov[0].iov_base = &reply; - iov[0].iov_len = sizeof reply; - iov[1].iov_base = link->recvBuffer + (req->from - start); - iov[1].iov_len = reply.size; - fixup_reply( reply ); - spin_unlock( &link->queueLock ); - // send: Don't care about errors here, let the client - // connection thread deal with it if something goes wrong - pthread_mutex_lock( &req->client->sendMutex ); - writev( req->client->sock, iov, 2 ); - pthread_mutex_unlock( &req->client->sendMutex ); + if ( ret != sizeof inReply ) ret += recv( link->fd, &inReply + ret, sizeof(inReply) - ret, MSG_WAITALL ); + if ( ret != sizeof inReply ) { + const int err = errno; + memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply), + err ); + goto error_cleanup; + } + fixup_reply( inReply ); + if ( inReply.magic != dnbd3_packet_magic ) { + memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); + goto error_cleanup; + } + if ( inReply.size > 9000000 ) { + memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); + goto error_cleanup; + } + if ( link->recvBufferLen < inReply.size ) { + if ( link->recvBuffer != NULL ) free( link->recvBuffer ); + link->recvBufferLen = MIN(9000000, inReply.size + 8192); + link->recvBuffer = malloc( link->recvBufferLen ); + } + uint32_t done = 0; + while ( done < inReply.size ) { + ret = recv( link->fd, link->recvBuffer + done, inReply.size - done, 0 ); + if ( ret <= 0 ) { + memlogf( "[INFO] Lost connection to uplink server of %s (payload)", link->image->path ); + goto error_cleanup; + } + done += ret; + } + // Payload read completely + // Bail out if we're not interested + if ( inReply.cmd != CMD_GET_BLOCK ) return; + // Is a legit block reply + const uint64_t start = inReply.handle; + const uint64_t end = inReply.handle + inReply.size; + // 1) Write to cache file + assert( link->image->cacheFd != -1 ); + if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { + memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); + } else { + ret = (int)write( link->image->cacheFd, link->recvBuffer, inReply.size ); + if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE ); + } + // 2) Figure out which clients are interested in it + struct iovec iov[2]; spin_lock( &link->queueLock ); - req->status = ULR_FREE; - if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; + for (i = 0; i < link->queueLen; ++i) { + dnbd3_queued_request_t * const req = &link->queue[i]; + assert( req->status != ULR_PROCESSING ); + if ( req->status != ULR_PENDING ) continue; + if ( req->from >= start && req->to <= end ) { // Match :-) + req->status = ULR_PROCESSING; + } + } + // 3) Send to interested clients + outReply.magic = dnbd3_packet_magic; + for (i = link->queueLen - 1; i >= 0; --i) { + dnbd3_queued_request_t * const req = &link->queue[i]; + if ( req->status != ULR_PROCESSING ) continue; + assert( req->from >= start && req->to <= end ); + outReply.cmd = CMD_GET_BLOCK; + outReply.handle = req->handle; + outReply.size = req->to - req->from; + iov[0].iov_base = &outReply; + iov[0].iov_len = sizeof outReply; + iov[1].iov_base = link->recvBuffer + (req->from - start); + iov[1].iov_len = outReply.size; + fixup_reply( outReply ); + pthread_mutex_lock( &req->client->sendMutex ); + spin_unlock( &link->queueLock ); + writev( req->client->sock, iov, 2 ); + pthread_mutex_unlock( &req->client->sendMutex ); + spin_lock( &link->queueLock ); + if ( req->status == ULR_PROCESSING ) req->status = ULR_FREE; // Might have changed in the meantime + if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; + } + spin_unlock( &link->queueLock ); } - spin_unlock( &link->queueLock ); - return; error_cleanup: ; const int fd = link->fd; link->fd = -1; if ( fd != -1 ) close( fd ); - return; +} + +/** + * Send keep alive request to server + */ +static int uplink_send_keepalive(const int fd) +{ + static dnbd3_request_t request = { 0, 0, 0, 0, 0 }; + if ( request.magic == 0 ) { + request.magic = dnbd3_packet_magic; + request.cmd = CMD_KEEPALIVE; + fixup_request( request ); + } + return send( fd, &request, sizeof(request), 0 ) == sizeof(request); } diff --git a/src/server/uplink.h b/src/server/uplink.h index cb1d4e7..b2d24a6 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -4,9 +4,10 @@ #include "../types.h" #include "globals.h" - int uplink_init(dnbd3_image_t *image); +void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client); + int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length); void uplink_shutdown(dnbd3_image_t *image); -- cgit v1.2.3-55-g7522