summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2013-08-12 18:04:39 +0200
committerSimon Rettberg2013-08-12 18:04:39 +0200
commita1dd0acdbdd6a9b70f9d7aa447e323f2072c650a (patch)
treeea67a4cc5c3dd003184bb13ccfaf3fa90c42720c
parentI'm stupid (diff)
downloaddnbd3-a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a.tar.gz
dnbd3-a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a.tar.xz
dnbd3-a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a.zip
[SERVER] Improve proxy mode, implement integrity check in proxy mode
-rw-r--r--LOCKS1
-rw-r--r--src/config.h2
-rw-r--r--src/server/altservers.c17
-rw-r--r--src/server/globals.h1
-rw-r--r--src/server/helper.c5
-rw-r--r--src/server/image.c96
-rw-r--r--src/server/image.h4
-rw-r--r--src/server/integrity.c151
-rw-r--r--src/server/integrity.h12
-rw-r--r--src/server/net.c12
-rw-r--r--src/server/server.c14
-rw-r--r--src/server/uplink.c262
-rw-r--r--src/server/uplink.h3
13 files changed, 449 insertions, 131 deletions
diff --git a/LOCKS b/LOCKS
index 1f7f22b..9662d74 100644
--- a/LOCKS
+++ b/LOCKS
@@ -12,6 +12,7 @@ This is a list of used locks, in the order they
have to be aquired if you must hold multiple locks:
_clients_lock
_clients[].lock
+integrityQueueLock
_images_lock
_images[].lock
uplink.queueLock
diff --git a/src/config.h b/src/config.h
index 6ad99dd..00b92e9 100644
--- a/src/config.h
+++ b/src/config.h
@@ -29,7 +29,7 @@
#define SERVER_MAX_IMAGES 5000
#define SERVER_MAX_ALTS 1000
#define SERVER_MAX_UPLINK_QUEUE 1000
-#define SERVER_MAX_PENDING_ALT_CHECKS 100
+#define SERVER_MAX_PENDING_ALT_CHECKS 50
// +++++ Other magic constants
#define SERVER_RTT_PROBES 5
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 89c724f..f255b58 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -107,9 +107,16 @@ int altservers_add(dnbd3_host_t *host, const char *comment)
*/
void altserver_find_uplink(dnbd3_connection_t *uplink)
{
- if ( uplink->rttTestResult == RTT_INPROGRESS ) return;
+ int i;
spin_lock( &pendingLock );
- for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
+ if ( uplink->rttTestResult == RTT_INPROGRESS ) {
+ for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
+ if ( pending[i] != uplink ) continue;
+ spin_unlock( &pendingLock );
+ return;
+ }
+ }
+ for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != NULL ) continue;
pending[i] = uplink;
uplink->rttTestResult = RTT_INPROGRESS;
@@ -190,8 +197,8 @@ int altservers_get(dnbd3_host_t *output, int size)
{
int count = 0, i, j, num;
spin_lock( &_alts_lock );
- if ( size <= _num_alts ) {
- for (i = 0; i < size; ++i) {
+ if ( size >= _num_alts ) {
+ for (i = 0; i < _num_alts; ++i) {
if ( _alt_servers[i].host.type == 0 ) continue;
output[count++] = _alt_servers[i].host;
}
@@ -426,7 +433,7 @@ static void *altserver_main(void *data)
// Measurement done - everything fine so far
const unsigned int rtt = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; // µs
const unsigned int avg = altservers_update_rtt( &servers[itAlt], rtt );
- if ( is_same_server( &servers[itAlt], &uplink->currentServer ) ) {
+ if ( uplink->fd != -1 && is_same_server( &servers[itAlt], &uplink->currentServer ) ) {
currentRtt = avg;
close( sock );
} else if ( avg < bestRtt ) {
diff --git a/src/server/globals.h b/src/server/globals.h
index baa13f3..2159021 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -31,6 +31,7 @@ typedef struct
volatile uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191)
dnbd3_client_t * volatile client; // Client to send reply to
volatile int status; // status of this entry: ULR_*
+ time_t entered; // When this request entered the queue (for debugging)
} dnbd3_queued_request_t;
#define RTT_IDLE 0 // Not in progress
diff --git a/src/server/helper.c b/src/server/helper.c
index 65239ea..24ee0fb 100644
--- a/src/server/helper.c
+++ b/src/server/helper.c
@@ -23,6 +23,7 @@ char parse_address(char *string, dnbd3_host_t *host)
struct in_addr v4;
struct in6_addr v6;
+ memset( host, 0, sizeof(*host) );
// Try IPv4 without port
if ( 1 == inet_pton( AF_INET, string, &v4 ) ) {
host->type = AF_INET;
@@ -83,11 +84,11 @@ char host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen)
if ( targetlen < 10 ) return FALSE;
if ( host->type == AF_INET6 ) {
*target++ = '[';
- inet_ntop( AF_INET6, host->addr, target, targetlen - 9 );
+ inet_ntop( AF_INET6, host->addr, target, targetlen - 10 );
target += strlen( target );
*target++ = ']';
} else if ( host->type == AF_INET ) {
- inet_ntop( AF_INET, host->addr, target, targetlen - 7 );
+ inet_ntop( AF_INET, host->addr, target, targetlen - 8 );
target += strlen( target );
} else {
snprintf( target, targetlen, "<?addrtype=%d>", (int)host->type );
diff --git a/src/server/image.c b/src/server/image.c
index 1393d5b..9be9827 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -3,6 +3,7 @@
#include "memlog.h"
#include "uplink.h"
#include "locks.h"
+#include "integrity.h"
#include <assert.h>
#include <stdio.h>
@@ -23,9 +24,9 @@ pthread_spinlock_t _images_lock;
// ##########################################
+static int image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize);
static int image_load_all_internal(char *base, char *path);
static int image_try_load(char *base, char *path);
-static int image_check_blocks_crc32(int fd, uint32_t *crc32list, int *blocks);
static int64_t image_pad(const char *path, const int64_t currentSize);
// ##########################################
@@ -66,6 +67,7 @@ int image_isComplete(dnbd3_image_t *image)
}
/**
* Update cache-map of given image for the given byte range
+ * start (inclusive) - end (exclusive)
* Locks on: images[].lock
*/
void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const int set)
@@ -76,12 +78,12 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
end &= ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
start = (uint64_t)(start + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
int dirty = FALSE;
- int pos = start;
+ uint64_t pos = start;
spin_lock( &image->lock );
if ( image->cache_map == NULL ) {
// Image seems already complete
- printf( "[DEBUG] image_update_cachemap with no cache_map: %s", image->path );
spin_unlock( &image->lock );
+ printf( "[DEBUG] image_updateCachemap with no cache_map: %s", image->path );
return;
}
while ( pos < end ) {
@@ -96,8 +98,7 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
}
pos += DNBD3_BLOCK_SIZE;
}
- spin_unlock( &image->lock );
- if ( set && dirty ) {
+ if ( dirty && image->crc32 != NULL ) {
// If dirty is set, at least one of the blocks was not cached before, so queue all hash blocks
// for checking, even though this might lead to checking some hash block again, if it was
// already complete and the block range spanned at least two hash blocks.
@@ -106,12 +107,17 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
end = (end + HASH_BLOCK_SIZE - 1) & ~(uint64_t)(HASH_BLOCK_SIZE - 1);
pos = start;
while ( pos < end ) {
+ if ( image->cache_map == NULL ) break;
const int block = pos / HASH_BLOCK_SIZE;
- // TODO: Actually queue the hash block for checking as soon as there's a worker for that
- (void)block;
+ if ( image_isHashBlockComplete( image->cache_map, block, image->filesize ) ) {
+ spin_unlock( &image->lock );
+ integrity_check( image, block );
+ spin_lock( &image->lock );
+ }
pos += HASH_BLOCK_SIZE;
}
}
+ spin_unlock( &image->lock );
}
/**
@@ -163,15 +169,17 @@ int image_saveCacheMap(dnbd3_image_t *image)
spin_lock( &image->lock );
image->users--;
spin_unlock( &image->lock );
+ free( map );
return FALSE;
}
- write( fd, map, ((image->filesize + (1 << 15) - 1) >> 15) * sizeof(char) );
+ write( fd, map, size );
if ( image->cacheFd != -1 ) {
fsync( image->cacheFd );
}
fsync( fd );
close( fd );
+ free( map );
spin_lock( &image->lock );
image->users--;
@@ -227,6 +235,29 @@ dnbd3_image_t* image_get(char *name, uint16_t revision)
}
/**
+ * Lock the image by increasing its users count
+ * Returns the image on success, NULL if it is not found in the image list
+ * Every call to image_lock() needs to be followed by a call to image_release() at some point.
+ * Locks on: _images_lock, _images[].lock
+ */
+dnbd3_image_t* image_lock(dnbd3_image_t *image)
+{
+ int i;
+ spin_lock( &_images_lock );
+ for (i = 0; i < _num_images; ++i) {
+ if ( _images[i] == image ) {
+ spin_lock( &image->lock );
+ spin_unlock( &_images_lock );
+ image->users++;
+ spin_unlock( &image->lock );
+ return image;
+ }
+ }
+ spin_unlock( &_images_lock );
+ return NULL ;
+}
+
+/**
* Release given image. This will decrease the reference counter of the image.
* If the usage counter reaches 0 and the image is not in the images array
* anymore, the image will be freed
@@ -337,6 +368,27 @@ int image_loadAll(char *path)
return image_load_all_internal( path, path );
}
+static int image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize)
+{
+ if ( cacheMap == NULL ) return TRUE;
+ const uint32_t end = (block + 1) * HASH_BLOCK_SIZE;
+ if ( end <= fileSize ) {
+ for (uint64_t mapPos = block * HASH_BLOCK_SIZE; mapPos < end; mapPos += (DNBD3_BLOCK_SIZE * 8)) {
+ if ( cacheMap[mapPos / (DNBD3_BLOCK_SIZE * 8)] != 0xff ) {
+ return FALSE;
+ }
+ }
+ } else {
+ for (uint64_t mapPos = block * HASH_BLOCK_SIZE; mapPos < fileSize; mapPos += DNBD3_BLOCK_SIZE ) {
+ const int map_y = mapPos >> 15;
+ const int map_x = (mapPos >> 12) & 7; // mod 8
+ const int mask = 1 << map_x;
+ if ( (cacheMap[map_y] & mask) == 0 ) return FALSE;
+ }
+ }
+ return TRUE;
+}
+
/**
* Load all images in the given path recursively,
* consider bash the base path that is to be cut off
@@ -512,12 +564,15 @@ static int image_try_load(char *base, char *path)
// This checks the first block and two random blocks (which might accidentally be the same)
// for corruption via the known crc32 list. This is very sloppy and is merely supposed
// to detect accidental corruption due to broken dnbd3-proxy functionality or file system
- // corruption. If the image size is not a multiple of the hash block size, do not take the
- // last block into consideration. It would always fail.
- int blcks = hashBlocks;
- if ( fileSize % HASH_BLOCK_SIZE != 0 ) blcks--;
- int blocks[] = { 0, rand() % blcks, rand() % blcks, -1 };
- if ( !image_check_blocks_crc32( fdImage, crc32list, blocks ) ) {
+ // corruption.
+ int blocks[4], index = 0, block; // = { 0, rand() % blcks, rand() % blcks, -1 };
+ if ( image_isHashBlockComplete( cache_map, 0, fileSize ) ) blocks[index++] = 0;
+ block = rand() % hashBlocks;
+ if ( image_isHashBlockComplete( cache_map, block, fileSize ) ) blocks[index++] = block;
+ block = rand() % hashBlocks;
+ if ( image_isHashBlockComplete( cache_map, block, fileSize ) ) blocks[index++] = block;
+ blocks[index] = -1;
+ if ( !image_checkBlocksCrc32( fdImage, crc32list, blocks, fileSize ) ) {
memlogf( "[ERROR] Quick integrity check for '%s' failed.", path );
goto load_error;
}
@@ -809,19 +864,21 @@ int image_generateCrcFile(char *image)
/**
* Check the CRC-32 of the given blocks. The array blocks is of variable length.
* !! pass -1 as the last block so the function knows when to stop !!
+ * Returns TRUE or FALSE
*/
-static int image_check_blocks_crc32(int fd, uint32_t *crc32list, int *blocks)
+int image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, const uint64_t fileSize)
{
char buffer[40000];
while ( *blocks != -1 ) {
- if ( lseek( fd, *blocks * HASH_BLOCK_SIZE, SEEK_SET ) != *blocks * HASH_BLOCK_SIZE ) {
+ if ( lseek( fd, (int64_t)*blocks * HASH_BLOCK_SIZE, SEEK_SET ) != (int64_t)*blocks * HASH_BLOCK_SIZE ) {
memlogf( "Seek error" );
return FALSE;
}
uint32_t crc = crc32( 0L, Z_NULL, 0 );
int bytes = 0;
- while ( bytes < HASH_BLOCK_SIZE ) {
- const int n = MIN(sizeof(buffer), HASH_BLOCK_SIZE - bytes);
+ const int bytesToGo = MIN(HASH_BLOCK_SIZE, fileSize - ((int64_t)*blocks * HASH_BLOCK_SIZE));
+ while ( bytes < bytesToGo ) {
+ const int n = MIN(sizeof(buffer), bytesToGo - bytes);
const int r = read( fd, buffer, n );
if ( r <= 0 ) {
memlogf( "Read error" );
@@ -848,9 +905,6 @@ static int64_t image_pad(const char *path, const int64_t currentSize)
int success = FALSE;
if ( tmpFd < 0 ) {
memlogf( "[WARNING] Can't open image for writing, can't fix %s", path );
- } else if ( lseek( tmpFd, 0, SEEK_CUR ) != currentSize ) {
- const int64_t cur = lseek( tmpFd, 0, SEEK_CUR );
- memlogf( "[WARNING] File size of %s changed when told to extend. (is: %" PRIi64 ", should: %" PRIi64 ")", path, cur, currentSize );
} else if ( lseek( tmpFd, currentSize, SEEK_SET ) != currentSize ) {
memlogf( "[WARNING] lseek() failed, can't fix %s", path );
} else if ( write( tmpFd, buffer, missing ) != missing ) {
diff --git a/src/server/image.h b/src/server/image.h
index e1eddd3..c59ae9e 100644
--- a/src/server/image.h
+++ b/src/server/image.h
@@ -18,8 +18,12 @@ int image_saveCacheMap(dnbd3_image_t *image);
dnbd3_image_t* image_get(char *name, uint16_t revision);
+dnbd3_image_t* image_lock(dnbd3_image_t *image);
+
void image_release(dnbd3_image_t *image);
+int image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, const uint64_t fileSize);
+
void image_killUplinks();
dnbd3_image_t* image_free(dnbd3_image_t *image);
diff --git a/src/server/integrity.c b/src/server/integrity.c
new file mode 100644
index 0000000..875b62d
--- /dev/null
+++ b/src/server/integrity.c
@@ -0,0 +1,151 @@
+#include "integrity.h"
+
+#include "locks.h"
+#include "image.h"
+#include "globals.h"
+#include "memlog.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <assert.h>
+
+#define CHECK_QUEUE_SIZE 100
+
+typedef struct
+{
+ dnbd3_image_t * volatile image;
+ int volatile block;
+} queue_entry;
+
+static pthread_t thread;
+static queue_entry checkQueue[CHECK_QUEUE_SIZE];
+static pthread_mutex_t integrityQueueLock;
+static pthread_cond_t queueSignal;
+static int queueLen = -1;
+static int bRunning = FALSE;
+
+static void* integrity_main(void *data);
+
+/**
+ * Initialize the integrity check thread
+ */
+void integrity_init()
+{
+ assert( queueLen == -1 );
+ pthread_mutex_init( &integrityQueueLock, NULL );
+ pthread_cond_init( &queueSignal, NULL );
+ if ( 0 != pthread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) {
+ memlogf( "[WARNING] Could not start integrity check thread. Corrupted images will not be detected." );
+ return;
+ }
+ queueLen = 0;
+}
+
+void integrity_shutdown()
+{
+ assert( queueLen != -1 );
+ printf( "[DEBUG] Shutting down integrity checker...\n" );
+ pthread_mutex_lock( &integrityQueueLock );
+ pthread_cond_signal( &queueSignal );
+ pthread_mutex_unlock( &integrityQueueLock );
+ pthread_join( thread, NULL );
+ while ( bRunning )
+ usleep( 10000 );
+ pthread_mutex_destroy( &integrityQueueLock );
+ pthread_cond_destroy( &queueSignal );
+ printf( "[DEBUG] Integrity checker exited normally.\n" );
+}
+
+/**
+ * Schedule an integrity check on the given image for the given hash block.
+ * It is not checked whether the block is completely cached locally, so
+ * make sure it is before calling, otherwise it will result in falsely
+ * detected corruption.
+ */
+void integrity_check(dnbd3_image_t *image, int block)
+{
+ printf( "Queueing %d of %s\n", block, image->lower_name );
+ int i, freeSlot = -1;
+ pthread_mutex_lock( &integrityQueueLock );
+ for (i = 0; i < queueLen; ++i) {
+ if ( freeSlot == -1 && checkQueue[i].image == NULL ) {
+ freeSlot = i;
+ } else if ( checkQueue[i].image == image && checkQueue[i].block == block ) {
+ pthread_mutex_unlock( &integrityQueueLock );
+ return;
+ }
+ }
+ if ( freeSlot == -1 ) {
+ if ( queueLen >= CHECK_QUEUE_SIZE ) {
+ pthread_mutex_unlock( &integrityQueueLock );
+ printf( "[DEBUG] Check queue full, discarding check request...\n" );
+ return;
+ }
+ freeSlot = queueLen++;
+ }
+ printf( "In slot %d\n", freeSlot );
+ checkQueue[freeSlot].image = image;
+ checkQueue[freeSlot].block = block;
+ pthread_cond_signal( &queueSignal );
+ pthread_mutex_unlock( &integrityQueueLock );
+}
+
+static void* integrity_main(void *data)
+{
+ bRunning = TRUE;
+ int i;
+ uint8_t *buffer = NULL;
+ size_t bufferSize = 0;
+ pthread_mutex_lock( &integrityQueueLock );
+ while ( !_shutdown ) {
+ for (i = queueLen - 1; i >= 0; --i) {
+ if ( checkQueue[i].image == NULL ) continue;
+ dnbd3_image_t * const image = image_lock( checkQueue[i].image );
+ checkQueue[i].image = NULL;
+ if ( image == NULL ) continue;
+ // We have the image. Call image_release() some time
+ if ( i + 1 == queueLen ) queueLen--;
+ spin_lock( &image->lock );
+ if ( image->crc32 != NULL && image->filesize != 0 ) {
+ int const blocks[2] = { checkQueue[i].block, -1 };
+ pthread_mutex_unlock( &integrityQueueLock );
+ const uint64_t fileSize = image->filesize;
+ const size_t required = IMGSIZE_TO_HASHBLOCKS(image->filesize) * sizeof(uint32_t);
+ if ( required > bufferSize ) {
+ bufferSize = required;
+ if ( buffer != NULL ) free( buffer );
+ buffer = malloc( bufferSize );
+ }
+ memcpy( buffer, image->crc32, required );
+ spin_unlock( &image->lock );
+ int fd = open( image->path, O_RDONLY );
+ if ( fd >= 0 ) {
+ if ( image_checkBlocksCrc32( fd, (uint32_t*)buffer, blocks, fileSize ) ) {
+ printf( "[DEBUG] CRC check of block %d for %s succeeded :-)\n", blocks[0], image->lower_name );
+ } else {
+ memlogf( "[WARNING] Hash check for block %d of %s failed!", blocks[0], image->lower_name );
+ image_updateCachemap( image, blocks[0] * HASH_BLOCK_SIZE, (blocks[0] + 1) * HASH_BLOCK_SIZE, FALSE );
+ }
+ close( fd );
+ }
+ pthread_mutex_lock( &integrityQueueLock );
+ } else {
+ spin_unlock( &image->lock );
+ }
+ // Release :-)
+ image_release( image );
+ }
+ if ( queueLen == 0 ) {
+ pthread_cond_wait( &queueSignal, &integrityQueueLock );
+ printf( "Queue woke up. %d jobs pending...\n", queueLen );
+ }
+ }
+ pthread_mutex_unlock( &integrityQueueLock );
+ if ( buffer != NULL ) free( buffer );
+ bRunning = FALSE;
+ return NULL ;
+}
diff --git a/src/server/integrity.h b/src/server/integrity.h
new file mode 100644
index 0000000..c3c2b44
--- /dev/null
+++ b/src/server/integrity.h
@@ -0,0 +1,12 @@
+#ifndef _INTEGRITY_H_
+#define _INTEGRITY_H_
+
+#include "globals.h"
+
+void integrity_init();
+
+void integrity_shutdown();
+
+void integrity_check(dnbd3_image_t *image, int block);
+
+#endif /* INTEGRITY_H_ */
diff --git a/src/server/net.c b/src/server/net.c
index 8309672..4af577e 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -28,6 +28,8 @@
#include <sys/sendfile.h>
#include <sys/types.h>
#include <assert.h>
+#include <errno.h>
+#include <inttypes.h>
#include "sockhelper.h"
#include "helper.h"
@@ -47,7 +49,8 @@ static inline char recv_request_header(int sock, dnbd3_request_t *request)
// Read request header from socket
if ( (ret = recv( sock, request, sizeof(*request), MSG_WAITALL )) != sizeof(*request) ) {
if ( ret == 0 ) return FALSE;
- printf( "[DEBUG] Error receiving request: Could not read message header (%d/%d)\n", ret, (int)sizeof(*request) );
+ const int err = errno;
+ printf( "[DEBUG] Error receiving request: Could not read message header (%d/%d, e=%d)\n", ret, (int)sizeof(*request), err );
return FALSE;
}
// Make sure all bytes are in the right order (endianness)
@@ -126,6 +129,7 @@ void *net_client_handler(void *dnbd3_client)
char *image_name;
uint16_t rid, client_version;
uint64_t start, end;
+ char buffer[100];
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -153,7 +157,7 @@ void *net_client_handler(void *dnbd3_client)
} else {
image = image_get( image_name, rid );
if ( image == NULL ) {
- printf( "[DEBUG] Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ //printf( "[DEBUG] Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
} else if ( !image->working ) {
printf( "[DEBUG] Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
} else {
@@ -185,6 +189,9 @@ void *net_client_handler(void *dnbd3_client)
} else if ( !client->is_server && _clientPenalty != 0 ) {
usleep( _clientPenalty );
}
+ if ( host_to_string( &client->host, buffer, sizeof buffer ) ) {
+ printf( "[DEBUG] Client %s gets %s\n", buffer, image_name );
+ }
// client handling mainloop
while ( recv_request_header( client->sock, &request ) ) {
switch ( request.cmd ) {
@@ -214,6 +221,7 @@ void *net_client_handler(void *dnbd3_client)
send_reply( client->sock, &reply, NULL );
break;
}
+ //printf( "Request - size: %" PRIu32 ", offset: %" PRIu64 "\n", request.size, request.offset );
if ( request.size != 0 && image->cache_map != NULL ) {
// This is a proxyed image, check if we need to relay the request...
diff --git a/src/server/server.c b/src/server/server.c
index 7571cbc..d2238cc 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -26,6 +26,7 @@
#include <string.h>
#include <fcntl.h>
#include <sys/ioctl.h>
+#include <errno.h>
#include <stdint.h>
#include <unistd.h>
#include <assert.h>
@@ -42,6 +43,7 @@
#include "altservers.h"
#include "memlog.h"
#include "globals.h"
+#include "integrity.h"
#define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on
static int sockets[MAX_SERVER_SOCKETS], socket_count = 0;
@@ -117,6 +119,9 @@ void dnbd3_cleanup()
// Terminate all uplinks
image_killUplinks();
+ // Terminate integrity checker
+ integrity_shutdown();
+
// Clean up clients
spin_lock( &_clients_lock );
for (i = 0; i < _num_clients; ++i) {
@@ -258,6 +263,7 @@ int main(int argc, char *argv[])
spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE );
spin_init( &_images_lock, PTHREAD_PROCESS_PRIVATE );
altserver_init();
+ integrity_init();
memlogf( "DNBD3 server starting.... Machine type: " ENDIAN_MODE );
if ( altservers_load() < 0 ) {
@@ -311,7 +317,8 @@ int main(int argc, char *argv[])
len = sizeof(client);
fd = accept_any( sockets, socket_count, &client, &len );
if ( fd < 0 ) {
- memlogf( "[ERROR] Client accept failure" );
+ const int err = errno;
+ memlogf( "[ERROR] Client accept failure (err=%d)", err );
usleep( 10000 ); // 10ms
continue;
}
@@ -404,7 +411,10 @@ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client)
spin_lock( &client->lock );
if ( client->sock >= 0 ) close( client->sock );
client->sock = -1;
- if ( client->image != NULL ) image_release( client->image );
+ if ( client->image != NULL ) {
+ if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client );
+ image_release( client->image );
+ }
client->image = NULL;
spin_unlock( &client->lock );
spin_destroy( &client->lock );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 4dbe75a..6d05f94 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -10,6 +10,7 @@
#include <string.h>
#include <sys/epoll.h>
#include <sys/errno.h>
+#include <sys/eventfd.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
@@ -19,6 +20,7 @@
static void* uplink_mainloop(void *data);
static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
static void uplink_handle_receive(dnbd3_connection_t *link);
+static int uplink_send_keepalive(const int fd);
// ############ uplink connection handling
@@ -79,6 +81,24 @@ void uplink_shutdown(dnbd3_image_t *image)
}
/**
+ * Remove given client from uplink request queue
+ */
+void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
+{
+ spin_lock( &uplink->queueLock );
+ for (int i = 0; i < uplink->queueLen; ++i) {
+ if ( uplink->queue[i].client == client ) {
+ // Lock on the send mutex as the uplink thread might just be writing to the client
+ pthread_mutex_lock( &client->sendMutex );
+ uplink->queue[i].client = NULL;
+ uplink->queue[i].status = ULR_FREE;
+ pthread_mutex_unlock( &client->sendMutex );
+ }
+ }
+ spin_unlock( &uplink->queueLock );
+}
+
+/**
* Request a chunk of data through an uplink server
*/
int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
@@ -118,10 +138,14 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
uplink->queue[freeSlot].handle = handle;
uplink->queue[freeSlot].client = client;
uplink->queue[freeSlot].status = (foundExisting ? ULR_PENDING : ULR_NEW);
+#ifdef _DEBUG
+ uplink->queue[freeSlot].entered = time( NULL );
+#endif
spin_unlock( &uplink->queueLock );
if ( !foundExisting ) {
- write( uplink->signal, "", 1 );
+ static uint64_t counter = 1;
+ write( uplink->signal, &counter, sizeof(uint64_t) );
}
return TRUE;
}
@@ -135,7 +159,7 @@ static void* uplink_mainloop(void *data)
const int MAXEVENTS = 3;
struct epoll_event ev, events[MAXEVENTS];
dnbd3_connection_t *link = (dnbd3_connection_t*)data;
- int fdEpoll = -1, fdPipe = -1;
+ int fdEpoll = -1;
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
int bFree = FALSE;
@@ -151,27 +175,24 @@ static void* uplink_mainloop(void *data)
goto cleanup;
}
{
- int pipes[2];
- if ( pipe( pipes ) < 0 ) {
+ link->signal = eventfd( 0, EFD_NONBLOCK );
+ if ( link->signal < 0 ) {
memlogf( "[WARNING] error creating pipe. Uplink unavailable." );
goto cleanup;
}
- sock_set_nonblock( pipes[0] );
- sock_set_nonblock( pipes[1] );
- fdPipe = pipes[0];
- link->signal = pipes[1];
memset( &ev, 0, sizeof(ev) );
ev.events = EPOLLIN;
- ev.data.fd = fdPipe;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, fdPipe, &ev ) < 0 ) {
- memlogf( "[WARNING] adding signal-pipe to epoll set failed" );
+ ev.data.fd = link->signal;
+ if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) {
+ memlogf( "[WARNING] adding eventfd to epoll set failed" );
goto cleanup;
}
}
while ( !_shutdown && !link->shutdown ) {
// epoll()
if ( link->fd == -1 ) {
- waitTime = 1500;
+ waitTime = 2000;
+ nextAltCheck = 0;
} else {
waitTime = (time( NULL ) - nextAltCheck) * 1000;
if ( waitTime < 1500 ) waitTime = 1500;
@@ -185,7 +206,7 @@ static void* uplink_mainloop(void *data)
}
for (i = 0; i < numSocks; ++i) { // Check all events
if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) {
- if ( events[i].data.fd == fdPipe ) {
+ if ( events[i].data.fd == link->signal ) {
memlogf( "[WARNING] epoll error on signal-pipe!" );
goto cleanup;
}
@@ -195,23 +216,23 @@ static void* uplink_mainloop(void *data)
printf( "[DEBUG] Uplink gone away, panic!\n" );
nextAltCheck = 0;
} else {
- printf( "[DEBUG] Error on unknown FD in uplink epoll" );
+ printf( "[DEBUG] Error on unknown FD in uplink epoll\n" );
close( events[i].data.fd );
}
continue;
}
// No error, handle normally
- if ( events[i].data.fd == fdPipe ) {
+ if ( events[i].data.fd == link->signal ) {
int ret;
do {
- ret = read( fdPipe, buffer, sizeof buffer );
+ ret = read( link->signal, buffer, sizeof buffer );
} while ( ret > 0 ); // Throw data away, this is just used for waking this thread up
if ( ret == 0 ) {
- memlogf( "[WARNING] Signal pipe of uplink for %s closed! Things will break!", link->image->lower_name );
+ memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name );
}
ret = errno;
if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) {
- memlogf( "[WARNING] Errno %d on pipe-read on uplink for %s! Things will break!", ret, link->image->lower_name );
+ memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name );
}
if ( link->fd != -1 ) {
uplink_send_requests( link, TRUE );
@@ -263,8 +284,9 @@ static void* uplink_mainloop(void *data)
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
+ memlogf( "[INFO] Replication of %s complete.", link->image->lower_name );
if ( spin_trylock( &link->image->lock ) == 0 ) {
- image_markComplete(link->image);
+ image_markComplete( link->image );
link->image->uplink = NULL;
link->shutdown = TRUE;
free( link->recvBuffer );
@@ -280,11 +302,34 @@ static void* uplink_mainloop(void *data)
} else {
// Not complete- do measurement
altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ // Also send a keepalive packet to the currently connected server
+ if ( link->fd != -1 ) {
+ if ( !uplink_send_keepalive( link->fd ) ) {
+ printf( "[DEBUG] Error sending keep-alive to uplink\n" );
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ }
+ }
}
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
nextAltCheck = now + altCheckInterval;
}
}
+#ifdef _DEBUG
+ if ( link->fd != -1 ) {
+ time_t deadline = time( NULL ) - 10;
+ spin_lock( &link->queueLock );
+ for (i = 0; i < link->queueLen; ++i) {
+ if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) {
+ printf( "[DEBUG WARNING] Starving request detected:\n"
+ "%s\n(from %" PRIu64 " to %" PRIu64 "\n", link->queue[i].client->image->lower_name, link->queue[i].from,
+ link->queue[i].to );
+ }
+ }
+ spin_unlock( &link->queueLock );
+ }
+#endif
}
cleanup: ;
const int fd = link->fd;
@@ -293,7 +338,6 @@ static void* uplink_mainloop(void *data)
link->signal = -1;
if ( fd != -1 ) close( fd );
if ( signal != -1 ) close( signal );
- if ( fdPipe != -1 ) close( fdPipe );
if ( fdEpoll != -1 ) close( fdEpoll );
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
@@ -324,7 +368,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
- printf( "[DEBUG] Error sending request to uplink server!" );
+ printf( "[DEBUG] Error sending request to uplink server!\n" );
}
spin_lock( &link->queueLock );
}
@@ -337,90 +381,114 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
*/
static void uplink_handle_receive(dnbd3_connection_t *link)
{
- dnbd3_reply_t reply;
+ dnbd3_reply_t inReply, outReply;
int ret, i;
- ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL );
- if ( ret != sizeof reply ) {
- memlogf( "[INFO] Lost connection to uplink server for %s", link->image->path );
- goto error_cleanup;
- }
- fixup_reply( reply );
- if ( reply.magic != dnbd3_packet_magic ) {
- memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
- goto error_cleanup;
- }
- if ( reply.size > 9000000 ) {
- memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
- goto error_cleanup;
- }
- if ( link->recvBufferLen < reply.size ) {
- if ( link->recvBuffer != NULL ) free( link->recvBuffer );
- link->recvBufferLen = MIN(9000000, reply.size + 8192);
- link->recvBuffer = malloc( link->recvBufferLen );
- }
- uint32_t done = 0;
- while ( done < reply.size ) {
- ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 );
- if ( ret <= 0 ) {
- memlogf( "[INFO] Lost connection to uplink server of", link->image->path );
+ for (;;) {
+ ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT );
+ if ( ret < 0 ) {
+ const int err = errno;
+ if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases
goto error_cleanup;
}
- done += ret;
- }
- // Payload read completely
- // Bail out if we're not interested
- if ( reply.cmd != CMD_GET_BLOCK ) return;
- // Is a legit block reply
- const uint64_t start = reply.handle;
- const uint64_t end = reply.handle + reply.size;
- // 1) Write to cache file
- assert( link->image->cacheFd != -1 );
- if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
- memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
- } else {
- ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size );
- if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE );
- }
- // 2) Figure out which clients are interested in it
- struct iovec iov[2];
- spin_lock( &link->queueLock );
- for (i = 0; i < link->queueLen; ++i) {
- dnbd3_queued_request_t * const req = &link->queue[i];
- assert( req->status != ULR_PROCESSING );
- if ( req->status != ULR_PENDING ) continue;
- if ( req->from >= start && req->to <= end ) { // Match :-)
- req->status = ULR_PROCESSING;
+ if ( ret == 0 ) {
+ memlogf( "[INFO] Uplink: Remote host hung up (%s)", link->image->path );
+ goto error_cleanup;
}
- }
- // 3) Send to interested clients
- reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here
- for (i = link->queueLen - 1; i >= 0; --i) {
- dnbd3_queued_request_t * const req = &link->queue[i];
- if ( req->status != ULR_PROCESSING ) continue;
- assert( req->from >= start && req->to <= end );
- reply.cmd = CMD_GET_BLOCK;
- reply.handle = req->handle;
- reply.size = req->to - req->from;
- iov[0].iov_base = &reply;
- iov[0].iov_len = sizeof reply;
- iov[1].iov_base = link->recvBuffer + (req->from - start);
- iov[1].iov_len = reply.size;
- fixup_reply( reply );
- spin_unlock( &link->queueLock );
- // send: Don't care about errors here, let the client
- // connection thread deal with it if something goes wrong
- pthread_mutex_lock( &req->client->sendMutex );
- writev( req->client->sock, iov, 2 );
- pthread_mutex_unlock( &req->client->sendMutex );
+ if ( ret != sizeof inReply ) ret += recv( link->fd, &inReply + ret, sizeof(inReply) - ret, MSG_WAITALL );
+ if ( ret != sizeof inReply ) {
+ const int err = errno;
+ memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply),
+ err );
+ goto error_cleanup;
+ }
+ fixup_reply( inReply );
+ if ( inReply.magic != dnbd3_packet_magic ) {
+ memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
+ goto error_cleanup;
+ }
+ if ( inReply.size > 9000000 ) {
+ memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
+ goto error_cleanup;
+ }
+ if ( link->recvBufferLen < inReply.size ) {
+ if ( link->recvBuffer != NULL ) free( link->recvBuffer );
+ link->recvBufferLen = MIN(9000000, inReply.size + 8192);
+ link->recvBuffer = malloc( link->recvBufferLen );
+ }
+ uint32_t done = 0;
+ while ( done < inReply.size ) {
+ ret = recv( link->fd, link->recvBuffer + done, inReply.size - done, 0 );
+ if ( ret <= 0 ) {
+ memlogf( "[INFO] Lost connection to uplink server of %s (payload)", link->image->path );
+ goto error_cleanup;
+ }
+ done += ret;
+ }
+ // Payload read completely
+ // Bail out if we're not interested
+ if ( inReply.cmd != CMD_GET_BLOCK ) return;
+ // Is a legit block reply
+ const uint64_t start = inReply.handle;
+ const uint64_t end = inReply.handle + inReply.size;
+ // 1) Write to cache file
+ assert( link->image->cacheFd != -1 );
+ if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
+ memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
+ } else {
+ ret = (int)write( link->image->cacheFd, link->recvBuffer, inReply.size );
+ if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE );
+ }
+ // 2) Figure out which clients are interested in it
+ struct iovec iov[2];
spin_lock( &link->queueLock );
- req->status = ULR_FREE;
- if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
+ for (i = 0; i < link->queueLen; ++i) {
+ dnbd3_queued_request_t * const req = &link->queue[i];
+ assert( req->status != ULR_PROCESSING );
+ if ( req->status != ULR_PENDING ) continue;
+ if ( req->from >= start && req->to <= end ) { // Match :-)
+ req->status = ULR_PROCESSING;
+ }
+ }
+ // 3) Send to interested clients
+ outReply.magic = dnbd3_packet_magic;
+ for (i = link->queueLen - 1; i >= 0; --i) {
+ dnbd3_queued_request_t * const req = &link->queue[i];
+ if ( req->status != ULR_PROCESSING ) continue;
+ assert( req->from >= start && req->to <= end );
+ outReply.cmd = CMD_GET_BLOCK;
+ outReply.handle = req->handle;
+ outReply.size = req->to - req->from;
+ iov[0].iov_base = &outReply;
+ iov[0].iov_len = sizeof outReply;
+ iov[1].iov_base = link->recvBuffer + (req->from - start);
+ iov[1].iov_len = outReply.size;
+ fixup_reply( outReply );
+ pthread_mutex_lock( &req->client->sendMutex );
+ spin_unlock( &link->queueLock );
+ writev( req->client->sock, iov, 2 );
+ pthread_mutex_unlock( &req->client->sendMutex );
+ spin_lock( &link->queueLock );
+ if ( req->status == ULR_PROCESSING ) req->status = ULR_FREE; // Might have changed in the meantime
+ if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
+ }
+ spin_unlock( &link->queueLock );
}
- spin_unlock( &link->queueLock );
- return;
error_cleanup: ;
const int fd = link->fd;
link->fd = -1;
if ( fd != -1 ) close( fd );
- return;
+}
+
+/**
+ * Send keep alive request to server
+ */
+static int uplink_send_keepalive(const int fd)
+{
+ static dnbd3_request_t request = { 0, 0, 0, 0, 0 };
+ if ( request.magic == 0 ) {
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_KEEPALIVE;
+ fixup_request( request );
+ }
+ return send( fd, &request, sizeof(request), 0 ) == sizeof(request);
}
diff --git a/src/server/uplink.h b/src/server/uplink.h
index cb1d4e7..b2d24a6 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -4,9 +4,10 @@
#include "../types.h"
#include "globals.h"
-
int uplink_init(dnbd3_image_t *image);
+void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client);
+
int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length);
void uplink_shutdown(dnbd3_image_t *image);