summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsr2013-07-26 18:42:52 +0200
committersr2013-07-26 18:42:52 +0200
commit8b65d18653bb7a5c7aba714de0767a1e93ef78c1 (patch)
tree207212bdedaa918c2dc84005fe54826db40b26a6
parentWork in progress: uplink (diff)
downloaddnbd3-8b65d18653bb7a5c7aba714de0767a1e93ef78c1.tar.gz
dnbd3-8b65d18653bb7a5c7aba714de0767a1e93ef78c1.tar.xz
dnbd3-8b65d18653bb7a5c7aba714de0767a1e93ef78c1.zip
[SERVER] Still working on the uplink... Almost there
-rw-r--r--CMakeLists.txt10
-rw-r--r--LOCKS2
-rw-r--r--src/server/globals.h24
-rw-r--r--src/server/image.c10
-rw-r--r--src/server/net.c166
-rw-r--r--src/server/server.c12
-rw-r--r--src/server/uplink.c190
-rw-r--r--src/server/uplink.h4
-rw-r--r--src/types.h4
9 files changed, 253 insertions, 169 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index aaba077..0b4b43d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -4,12 +4,16 @@
PROJECT(dnbd3)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8.0)
+IF (CMAKE_BUILD_TYPE STREQUAL "")
+ SET(CMAKE_BUILD_TYPE Debug)
+ENDIF()
+
+message( "Build Type selected: ${CMAKE_BUILD_TYPE}" )
-SET(CMAKE_BUILD_TYPE Debug)
SET(CMAKE_C_FLAGS_DEBUG "-std=c99 -O0 -g -Wall -Wno-unused-result -D_GNU_SOURCE -D_DEBUG -Wno-multichar")
-SET(CMAKE_C_FLAGS_RELEASE "-std=c99 -O2 -Wno-unused-result -D_GNU_SOURCE -Wno-multichar")
+SET(CMAKE_C_FLAGS_RELEASE "-std=c99 -O2 -Wno-unused-result -D_GNU_SOURCE -DNDEBUG -Wno-multichar")
SET(CMAKE_CXX_FLAGS_DEBUG "-std=c99 -O0 -g -Wall -Wno-unused-result -D_GNU_SOURCE -D_DEBUG")
-SET(CMAKE_CXX_FLAGS_RELEASE "-std=c99 -O2 -Wno-unused-result -D_GNU_SOURCE" )
+SET(CMAKE_CXX_FLAGS_RELEASE "-std=c99 -O2 -Wno-unused-result -D_GNU_SOURCE -DNDEBUG" )
ADD_DEFINITIONS(-DIPC_TCP)
ADD_DEFINITIONS(-D_FILE_OFFSET_BITS=64)
diff --git a/LOCKS b/LOCKS
index 61ce937..c542904 100644
--- a/LOCKS
+++ b/LOCKS
@@ -14,9 +14,9 @@ _clients_lock
_clients[].lock
_images_lock
_images[].lock
+uplink.lock
_alts_lock
-
If you need to lock multiple clients at once,
lock the client with the lowest array index first.
diff --git a/src/server/globals.h b/src/server/globals.h
index 106531e..7f47288 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -10,22 +10,26 @@
typedef struct _dnbd3_connection dnbd3_connection_t;
typedef struct _dnbd3_image dnbd3_image_t;
+typedef struct _dnbd3_client dnbd3_client_t;
// Slot is free, can be used.
// Must only be set in uplink_handle_receive() or uplink_remove_client()
#define ULR_FREE 0
-// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse.
+// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse.
// Must only be set in uplink_request()
-#define ULR_PENDING 1
+#define ULR_NEW 1
+// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse.
+// Must only be set in uplink_mainloop()
+#define ULR_PENDING 2
// Slot is being processed, do not consider for hop on.
// Must only be set in uplink_handle_receive()
-#define ULR_PROCESSING 2
+#define ULR_PROCESSING 3
typedef struct
{
uint64_t handle; // Client defined handle to pass back in reply
uint64_t from; // First byte offset of requested block (ie. 4096)
volatile uint32_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191)
- volatile int socket; // Socket to send reply to
+ dnbd3_client_t * volatile client; // Client to send reply to
volatile int status; // status of this entry: ULR_*
} dnbd3_queued_request_t;
@@ -38,9 +42,9 @@ struct _dnbd3_connection
int fd; // socket fd to remote server
int signal; // write end of pipe used to wake up the process
pthread_t thread; // thread holding the connection
- pthread_spinlock_t lock; // lock for synchronization on request queue etc.
+ pthread_spinlock_t queueLock; // lock for synchronization on request queue etc.
dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
- volatile int queuelen; // length of queue
+ volatile int queueLen; // length of queue
dnbd3_image_t *image; // image that this uplink is used for do not call get/release for this pointer
dnbd3_host_t currentServer; // Current server we're connected to
volatile int rttTestResult; // RTT_*
@@ -48,7 +52,7 @@ struct _dnbd3_connection
int betterFd; // Active connection to better server, ready to use
uint8_t *recvBuffer; // Buffer for receiving payload
int recvBufferLen; // Len of ^^
- volatile int shutdown; // bool to signal thread to stop
+ volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown()
};
typedef struct
@@ -99,7 +103,7 @@ struct _dnbd3_image
pthread_spinlock_t lock;
};
-typedef struct
+struct _dnbd3_client
{
int sock;
dnbd3_host_t host;
@@ -107,8 +111,8 @@ typedef struct
pthread_t thread;
dnbd3_image_t *image;
pthread_spinlock_t lock;
- //GSList *sendqueue; // list of dnbd3_binstring_t*
-} dnbd3_client_t;
+ pthread_mutex_t sendMutex;
+};
// #######################################################
diff --git a/src/server/image.c b/src/server/image.c
index abecda5..2a24092 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -106,6 +106,7 @@ void image_update_cachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, c
while ( pos < end ) {
const int block = pos / HASH_BLOCK_SIZE;
// TODO: Actually queue the hash block for checking as soon as there's a worker for that
+ (void)block;
pos += HASH_BLOCK_SIZE;
}
}
@@ -250,7 +251,7 @@ dnbd3_image_t* image_free(dnbd3_image_t *image)
free( image->crc32 );
free( image->path );
free( image->lower_name );
- image->uplink = uplink_shutdown( image->uplink );
+ uplink_shutdown( image );
if ( image->cacheFd != -1 ) close( image->cacheFd );
spin_destroy( &image->lock );
//
@@ -482,6 +483,10 @@ static int image_try_load(char *base, char *path)
} else if ( existing->crc32 != NULL && crc32list != NULL
&& memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlocks ) != 0 ) {
memlogf( "[WARNING] CRC32 list of image '%s' has changed.", path );
+ } else if ( existing->crc32 == NULL && crc32list != NULL ) {
+ memlogf( "[INFO] Found CRC-32 list for already loaded image, adding...", path );
+ existing->crc32 = crc32list;
+ crc32list = NULL;
} else {
function_return = TRUE;
goto load_error;
@@ -515,7 +520,8 @@ static int image_try_load(char *base, char *path)
free( image->cache_map );
image->cache_map = NULL;
image->working = TRUE;
- } else {
+ }
+ if ( image->cache_map != NULL ) {
image->working = FALSE;
image->cacheFd = open( path, O_WRONLY );
if ( image->cacheFd < 0 ) {
diff --git a/src/server/net.c b/src/server/net.c
index 16b202f..1383454 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -27,6 +27,7 @@
#include <fcntl.h>
#include <sys/sendfile.h>
#include <sys/types.h>
+#include <assert.h>
#include "sockhelper.h"
#include "helper.h"
@@ -123,15 +124,7 @@ void *net_client_handler(void *dnbd3_client)
serialized_buffer_t payload;
char *image_name;
uint16_t rid, client_version;
-
- /*
- char map_x, bit_mask;
- uint64_t map_y;
- uint64_t todo_size = 0;
- uint64_t todo_offset = 0;
- uint64_t cur_offset = 0;
- uint64_t last_offset = 0;
- */
+ uint64_t start, end;
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -175,7 +168,6 @@ void *net_client_handler(void *dnbd3_client)
if ( send_reply( client->sock, &reply, &payload ) ) {
client->image = image;
if ( !client->is_server ) image->atime = time( NULL );
-
bOk = TRUE;
}
}
@@ -216,109 +208,85 @@ void *net_client_handler(void *dnbd3_client)
break;
}
+ if ( request.size != 0 && image->cache_map != NULL ) {
+ // This is a proxyed image, check if we need to relay the request...
+ start = request.offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ end = (request.offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ int isCached = TRUE;
+ spin_lock( &image->lock );
+ // Check again as we only aquired the lock just now
+ if ( image->cache_map != NULL ) {
+ // First byte
+ uint64_t pos = start;
+ const uint64_t firstByte = start >> 15;
+ const uint64_t lastByte = (end - 1) >> 15;
+ do {
+ const int map_x = (pos >> 12) & 7; // mod 8
+ const uint8_t bit_mask = 0b00000001 << map_x;
+ if ( (image->cache_map[firstByte] & bit_mask) == 0 ) {
+ isCached = FALSE;
+ break;
+ }
+ pos += DNBD3_BLOCK_SIZE;
+ } while ( firstByte == (pos >> 15) );
+ // Middle - quick checking
+ if ( isCached ) {
+ pos = firstByte + 1;
+ while ( pos < lastByte ) {
+ if ( image->cache_map[pos] != 0xff ) {
+ isCached = FALSE;
+ break;
+ }
+ }
+ }
+ // Last byte
+ if ( isCached ) {
+ pos = lastByte << 15;
+ while ( pos < end ) {
+ assert( lastByte == (pos >> 15) );
+ const int map_x = (pos >> 12) & 7; // mod 8
+ const uint8_t bit_mask = 0b00000001 << map_x;
+ if ( (image->cache_map[lastByte] & bit_mask) == 0 ) {
+ isCached = FALSE;
+ break;
+ }
+ pos += DNBD3_BLOCK_SIZE;
+ }
+ }
+ }
+ spin_unlock( &image->lock );
+ if ( !isCached ) {
+ if ( !uplink_request( client, request.handle, request.offset, request.size ) ) {
+ printf( "[DEBUG] Could not relay uncached request to upstream proxy\n" );
+ goto exit_client_cleanup;
+ }
+ break; // DONE
+ }
+ }
+
reply.cmd = CMD_GET_BLOCK;
reply.size = request.size;
reply.handle = request.handle;
fixup_reply( reply );
- if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), MSG_MORE ) != sizeof(dnbd3_reply_t) ) {
+ pthread_mutex_lock( &client->sendMutex );
+ // Send reply header
+ if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) {
+ pthread_mutex_unlock( &client->sendMutex );
printf( "[DEBUG] Sending CMD_GET_BLOCK header failed\n" );
goto exit_client_cleanup;
}
- if ( request.size == 0 ) { // Request for 0 bytes, done after sending header
- send( client->sock, &reply, 0, 0 ); // Since we used MSG_MORE above...
- break;
- }
-
- // no cache map means image is complete
- if ( image->cache_map == NULL ) {
+ if ( request.size != 0 ) {
+ // Send payload if request length > 0
const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size );
if ( ret != request.size ) {
+ pthread_mutex_unlock( &client->sendMutex );
printf( "[ERROR] sendfile failed (image to net %d/%d)\n", (int)ret, (int)request.size );
goto exit_client_cleanup;
}
- break;
}
-
- printf( "[DEBUG] Caching/Proxying not implemented yet!\n" );
- goto exit_client_cleanup;
-
- /*
-
- // caching is on
- dirty = 0;
- todo_size = 0;
- todo_offset = request.offset;
- cur_offset = request.offset;
- last_offset = request.offset + request.size;
-
- // first make sure the whole requested part is in the local cache file
- while(cur_offset < last_offset)
- {
- map_y = cur_offset >> 15; // div 32768
- map_x = (cur_offset >> 12) & 7; // (X div 4096) mod 8
- bit_mask = 0b00000001 << (map_x);
-
- cur_offset += 4096;
-
- if ((image->cache_map[map_y] & bit_mask) != 0) // cache hit
- {
- if (todo_size != 0) // fetch missing chunks
- {
- lseek(image_cache, todo_offset, SEEK_SET);
- if (sendfile(image_cache, image_file, (off_t *) &todo_offset, todo_size) != todo_size)
- {
- if (image->file == NULL)
- printf("[ERROR] Device was closed when local copy was incomplete.");
- printf("[ERROR] sendfile failed (copy to cache 1)\n");
- goto exit_client_cleanup;
- }
- todo_size = 0;
- dirty = 1;
- }
- todo_offset = cur_offset;
- }
- else
- {
- todo_size += 4096;
- }
- }
-
- // whole request was missing
- if (todo_size != 0)
- {
- lseek(image_cache, todo_offset, SEEK_SET);
- if (sendfile(image_cache, image_file, (off_t *) &todo_offset, todo_size) != todo_size)
- {
- printf("[ERROR] sendfile failed (copy to cache 2)\n");
- goto exit_client_cleanup;
- }
- dirty = 1;
- }
-
- if (dirty) // cache map needs to be updated as something was missing locally
- {
- // set 1 in cache map for whole request
- cur_offset = request.offset;
- while(cur_offset < last_offset)
- {
- map_y = cur_offset >> 15;
- map_x = (cur_offset >> 12) & 7; // mod 8
- bit_mask = 0b00000001 << (map_x);
- image->cache_map[map_y] |= bit_mask;
- cur_offset += 4096;
- }
- }
-
- // send data to client
- if (sendfile(client->sock, image_cache, (off_t *) &request.offset, request.size) != request.size)
- {
- memlogf("[ERROR] sendfile failed (cache to net)\n");
- close(client->sock);
- client->sock = -1;
- }
- */
+ pthread_mutex_unlock( &client->sendMutex );
break;
case CMD_GET_SERVERS:
@@ -377,5 +345,5 @@ void *net_client_handler(void *dnbd3_client)
if ( image_file != -1 ) close( image_file );
dnbd3_remove_client( client );
client = dnbd3_free_client( client );
- pthread_exit( (void *)0 );
+ return NULL ;
}
diff --git a/src/server/server.c b/src/server/server.c
index 575d9c4..cc7a76a 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -28,6 +28,7 @@
#include <sys/ioctl.h>
#include <stdint.h>
#include <unistd.h>
+#include <assert.h>
#include "../types.h"
#include "../version.h"
@@ -147,7 +148,7 @@ int main(int argc, char *argv[])
static const struct option longOpts[] = { { "file", required_argument, NULL, 'f' }, { "delay", required_argument, NULL, 'd' }, {
"nodaemon", no_argument, NULL, 'n' }, { "reload", no_argument, NULL, 'r' }, { "stop", no_argument, NULL, 's' }, { "info",
no_argument, NULL, 'i' }, { "help", no_argument, NULL, 'H' }, { "version", no_argument, NULL, 'v' }, { "crc", required_argument,
- NULL, 'crc4' }, { 0, 0, 0, 0 } };
+ NULL, 'crc4' }, { "assert", no_argument, NULL, 'asrt' }, { 0, 0, 0, 0 } };
opt = getopt_long( argc, argv, optString, longOpts, &longIndex );
@@ -187,6 +188,11 @@ int main(int argc, char *argv[])
break;
case 'crc4':
return image_generate_crc_file( optarg ) ? 0 : EXIT_FAILURE;
+ case 'asrt':
+ printf("Testing a failing assertion:\n");
+ assert( 4 == 5 );
+ printf("Assertion 4 == 5 seems to hold. ;-)\n");
+ return EXIT_SUCCESS;
}
opt = getopt_long( argc, argv, optString, longOpts, &longIndex );
}
@@ -312,6 +318,7 @@ dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd)
}
dnbd3_client->sock = fd;
spin_init( &dnbd3_client->lock, PTHREAD_PROCESS_PRIVATE );
+ pthread_mutex_init( &dnbd3_client->sendMutex, NULL );
return dnbd3_client;
}
@@ -352,6 +359,9 @@ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client)
client->image = NULL;
spin_unlock( &client->lock );
spin_destroy( &client->lock );
+ pthread_mutex_lock( &client->sendMutex );
+ pthread_mutex_unlock( &client->sendMutex );
+ pthread_mutex_destroy( &client->sendMutex );
free( client );
return NULL ;
}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 0116fda..ab23b70 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -18,6 +18,7 @@ int _num_alts = 0;
pthread_spinlock_t _alts_lock;
static void* uplink_mainloop(void *data);
+static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
static void uplink_handle_receive(dnbd3_connection_t *link);
/**
@@ -103,14 +104,14 @@ int uplink_init(dnbd3_image_t *image)
}
link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
link->image = image;
- link->queuelen = 0;
+ link->queueLen = 0;
link->fd = -1;
link->signal = -1;
link->betterFd = -1;
link->rttTestResult = RTT_IDLE;
link->recvBufferLen = 0;
link->shutdown = FALSE;
- spin_init( &link->lock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) {
memlogf( "[ERROR] Could not start thread for new client." );
goto failure;
@@ -124,15 +125,61 @@ int uplink_init(dnbd3_image_t *image)
return FALSE;
}
-dnbd3_connection_t* uplink_shutdown(dnbd3_connection_t *uplink)
+void uplink_shutdown(dnbd3_image_t *image)
{
- assert( uplink != NULL );
- if ( uplink->shutdown ) return NULL ;
+ assert( image != NULL );
+ if ( image->uplink == NULL || image->uplink->shutdown ) return;
+ dnbd3_connection_t * const uplink = image->uplink;
+ image->uplink = NULL;
uplink->shutdown = TRUE;
- if ( uplink->signal != -1 ) write( uplink->signal, uplink, 1 );
+ if ( uplink->signal != -1 ) write( uplink->signal, "", 1 );
pthread_join( uplink->thread, NULL );
+ spin_lock( &uplink->queueLock );
+ spin_unlock( &uplink->queueLock );
+ spin_destroy( &uplink->queueLock );
free( uplink );
- return NULL ;
+}
+
+/**
+ * Request a chunk of data through an uplink server
+ */
+int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
+{
+ if ( client == NULL || client->image == NULL || client->image->uplink == NULL ) return FALSE;
+ dnbd3_connection_t * const uplink = client->image->uplink;
+ int foundExisting = FALSE; // Is there a pending request that is a superset of our range?
+ int i;
+ int freeSlot = -1;
+ const uint64_t end = start + length;
+
+ spin_lock( &uplink->queueLock );
+ for (i = 0; i < uplink->queueLen; ++i) {
+ if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i;
+ if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
+ if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
+ foundExisting = TRUE;
+ break;
+ }
+ }
+ if ( freeSlot == -1 ) {
+ if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
+ spin_unlock( &uplink->queueLock );
+ memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
+ return FALSE;
+ }
+ freeSlot = uplink->queueLen++;
+ }
+ uplink->queue[freeSlot].from = start;
+ uplink->queue[freeSlot].to = end;
+ uplink->queue[freeSlot].handle = handle;
+ uplink->queue[freeSlot].client = client;
+ uplink->queue[freeSlot].status = (foundExisting ? ULR_PENDING : ULR_NEW);
+ spin_unlock( &uplink->queueLock );
+
+ if ( !foundExisting ) {
+ write( uplink->signal, "", 1 );
+ }
+ return TRUE;
}
/**
@@ -151,7 +198,7 @@ static void* uplink_mainloop(void *data)
char buffer[100];
//
assert( link != NULL );
- assert( link->queuelen == 0 );
+ assert( link->queueLen == 0 );
//
fdEpoll = epoll_create( 2 );
if ( fdEpoll == -1 ) {
@@ -177,24 +224,6 @@ static void* uplink_mainloop(void *data)
}
}
while ( !_shutdown && !link->shutdown ) {
- if ( link->rttTestResult == RTT_DOCHANGE ) {
- link->rttTestResult = RTT_IDLE;
- // The rttTest worker thread has finished our request.
- // And says it's better to switch to another server
- if ( link->fd != -1 ) close( link->fd );
- link->fd = link->betterFd;
- link->betterFd = -1;
- link->currentServer = link->betterServer;
- memset( &ev, 0, sizeof(ev) );
- ev.events = EPOLLIN;
- ev.data.fd = link->fd;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
- memlogf( "[WARNING] adding uplink to epoll set failed" );
- goto cleanup;
- }
- // The rtt worker already did the handshake for our image, so there's nothing
- // more to do here
- }
// epoll()
if ( link->fd == -1 ) {
waitTime = 1500;
@@ -215,11 +244,14 @@ static void* uplink_mainloop(void *data)
memlogf( "[WARNING] epoll error on signal-pipe!" );
goto cleanup;
}
- close( events[i].data.fd );
if ( events[i].data.fd == link->fd ) {
link->fd = -1;
+ close( events[i].data.fd );
printf( "[DEBUG] Uplink gone away, panic!\n" );
nextAltCheck = 0;
+ } else {
+ printf( "[DEBUG] Error on unknown FD in uplink epoll" );
+ close( events[i].data.fd );
}
continue;
}
@@ -227,6 +259,9 @@ static void* uplink_mainloop(void *data)
if ( events[i].data.fd == fdPipe ) {
while ( read( fdPipe, buffer, sizeof buffer ) > 0 ) {
} // Throw data away, this is just used for waking this thread up
+ if ( link->fd != -1 ) {
+ uplink_send_requests( link, TRUE );
+ }
} else if ( events[i].data.fd == link->fd ) {
uplink_handle_receive( link );
if ( link->fd == -1 ) nextAltCheck = 0;
@@ -235,17 +270,71 @@ static void* uplink_mainloop(void *data)
close( events[i].data.fd );
}
}
+ // Done handling epoll sockets
+ // Check if server switch is in order
+ if ( link->rttTestResult == RTT_DOCHANGE ) {
+ link->rttTestResult = RTT_IDLE;
+ // The rttTest worker thread has finished our request.
+ // And says it's better to switch to another server
+ const int fd = link->fd;
+ link->fd = link->betterFd;
+ if ( fd != -1 ) close( fd );
+ // Re-send all pending requests
+ uplink_send_requests( link, FALSE );
+ link->betterFd = -1;
+ link->currentServer = link->betterServer;
+ memset( &ev, 0, sizeof(ev) );
+ ev.events = EPOLLIN;
+ ev.data.fd = link->fd;
+ if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
+ memlogf( "[WARNING] adding uplink to epoll set failed" );
+ goto cleanup;
+ }
+ nextAltCheck = time( NULL ) + altCheckInterval;
+ // The rtt worker already did the handshake for our image, so there's nothing
+ // more to do here
+ }
}
cleanup: ;
- if ( link->fd != -1 ) close( link->fd );
+ const int fd = link->fd;
+ const int signal = link->signal;
link->fd = -1;
- if ( link->signal != -1 ) close( link->signal );
link->signal = -1;
+ if ( fd != -1 ) close( fd );
+ if ( signal != -1 ) close( signal );
if ( fdPipe != -1 ) close( fdPipe );
if ( fdEpoll != -1 ) close( fdEpoll );
return NULL ;
}
+static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
+{
+ // Scan for new requests
+ int j;
+ dnbd3_request_t request;
+ request.magic = dnbd3_packet_magic;
+ spin_lock( &link->queueLock );
+ for (j = 0; j < link->queueLen; ++j) {
+ if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
+ link->queue[j].status = ULR_PENDING;
+ request.handle = link->queue[j].handle;
+ request.cmd = CMD_GET_BLOCK;
+ request.offset = link->queue[j].from;
+ request.size = link->queue[j].to - link->queue[j].from;
+ spin_unlock( &link->queueLock );
+ fixup_request( request );
+ const int ret = write( link->fd, &request, sizeof request );
+ if ( ret != sizeof(request) ) {
+ // Non-critical - if the connection dropped or the server was changed
+ // the thread will re-send this request as soon as the connection
+ // is reestablished.
+ printf( "[DEBUG] Error sending request to uplink server!" );
+ }
+ spin_lock( &link->queueLock );
+ }
+ spin_unlock( &link->queueLock );
+}
+
/**
* Receive data from uplink server and process/dispatch
* Locks on: link.lock, indirectly on images[].lock
@@ -279,12 +368,20 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
done += ret;
}
// Payload read completely
- // 1) Figure out which clients are interested in it
const uint64_t start = reply.handle;
const uint64_t end = reply.handle + reply.size;
+ // 1) Write to cache file
+ assert( link->image->cacheFd != -1 );
+ if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
+ memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
+ } else {
+ ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size );
+ if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE );
+ }
+ // 2) Figure out which clients are interested in it
struct iovec iov[2];
- spin_lock( &link->lock );
- for (i = 0; i < link->queuelen; ++i) {
+ spin_lock( &link->queueLock );
+ for (i = 0; i < link->queueLen; ++i) {
dnbd3_queued_request_t * const req = &link->queue[i];
assert( req->status != ULR_PROCESSING );
if ( req->status != ULR_PENDING ) continue;
@@ -292,19 +389,9 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
req->status = ULR_PROCESSING;
}
}
- spin_unlock( &link->lock );
- // 2) Write to cache file
- assert( link->image->cacheFd != -1 );
- if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
- memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
- } else {
- ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size );
- if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE);
- }
// 3) Send to interested clients
reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here
- spin_lock( &link->lock );
- for (i = link->queuelen - 1; i >= 0; --i) {
+ for (i = link->queueLen - 1; i >= 0; --i) {
dnbd3_queued_request_t * const req = &link->queue[i];
if ( req->status != ULR_PROCESSING ) continue;
assert( req->from >= start && req->to <= end );
@@ -316,18 +403,21 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
iov[1].iov_base = link->recvBuffer + (req->from - start);
iov[1].iov_len = reply.size;
fixup_reply( reply );
- spin_unlock( &link->lock );
+ spin_unlock( &link->queueLock );
// send: Don't care about errors here, let the client
- // connection thread deal with it if something goes wrong here
- writev( req->socket, iov, 2 );
- spin_lock( &link->lock );
+ // connection thread deal with it if something goes wrong
+ pthread_mutex_lock( &req->client->sendMutex );
+ writev( req->client->sock, iov, 2 );
+ pthread_mutex_unlock( &req->client->sendMutex );
+ spin_lock( &link->queueLock );
req->status = ULR_FREE;
- if ( i > 20 && i == link->queuelen - 1 ) link->queuelen--;
+ if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
}
- spin_unlock( &link->lock );
+ spin_unlock( &link->queueLock );
return;
error_cleanup: ;
- close( link->fd );
+ const int fd = link->fd;
link->fd = -1;
+ if ( fd != -1 ) close( fd );
return;
}
diff --git a/src/server/uplink.h b/src/server/uplink.h
index 82412b4..f6917be 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -14,6 +14,8 @@ int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2);
int uplink_init(dnbd3_image_t *image);
-dnbd3_connection_t* uplink_shutdown( dnbd3_connection_t *uplink);
+int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length);
+
+void uplink_shutdown(dnbd3_image_t *image);
#endif /* UPLINK_H_ */
diff --git a/src/types.h b/src/types.h
index 56f328b..8800b53 100644
--- a/src/types.h
+++ b/src/types.h
@@ -27,10 +27,10 @@
#endif
#ifndef TRUE
-#define TRUE (1)
+#define TRUE 1
#endif
#ifndef FALSE
-#define FALSE (0)
+#define FALSE 0
#endif
#ifndef MIN