summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2013-11-13 22:28:10 +0100
committerSimon Rettberg2013-11-13 22:28:10 +0100
commit19b16581573019a79166ce30376f7b6da6885730 (patch)
treef8ebbbe5da6d7e8e9873f5051d26f0600e1e1741
parent[SERVER] Increase RTT check delay for uplinks that failed too many times to s... (diff)
downloaddnbd3-19b16581573019a79166ce30376f7b6da6885730.tar.gz
dnbd3-19b16581573019a79166ce30376f7b6da6885730.tar.xz
dnbd3-19b16581573019a79166ce30376f7b6da6885730.zip
[SERVER] Automatic replication of images that are not complete on proxy. Speed probably needs tweaking for different link speeds etc.
-rw-r--r--src/server/globals.h2
-rw-r--r--src/server/uplink.c56
2 files changed, 56 insertions, 2 deletions
diff --git a/src/server/globals.h b/src/server/globals.h
index 1243eb8..e44b26d 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -55,6 +55,8 @@ struct _dnbd3_connection
uint8_t *recvBuffer; // Buffer for receiving payload
int recvBufferLen; // Len of ^^
volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown()
+ int replicatedLastBlock; // bool telling if the last block has been replicated yet
+ time_t lastReplication; // timestamp of when last replication requests were sent
};
typedef struct
diff --git a/src/server/uplink.c b/src/server/uplink.c
index d19c1d9..b38be45 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -27,6 +27,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
static void uplink_handle_receive(dnbd3_connection_t *link);
static int uplink_send_keepalive(const int fd);
static void uplink_addCrc32(dnbd3_connection_t *uplink);
+static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
// ############ uplink connection handling
@@ -87,11 +88,11 @@ void uplink_shutdown(dnbd3_image_t *image)
image->uplink = NULL;
uplink->shutdown = TRUE;
spin_unlock( &uplink->queueLock );
- spin_destroy( &uplink->queueLock );
if ( uplink->signal != -1 ) write( uplink->signal, "", 1 );
if ( uplink->image != NULL ) {
pthread_join( uplink->thread, NULL );
}
+ spin_destroy( &uplink->queueLock );
free( uplink->recvBuffer );
free( uplink );
}
@@ -271,6 +272,7 @@ static void* uplink_mainloop(void *data)
}
// Check all events
for (i = 0; i < numSocks; ++i) {
+ // Check for errors....
if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) {
if ( events[i].data.fd == link->signal ) {
memlogf( "[WARNING] epoll error on signal-pipe!" );
@@ -289,6 +291,7 @@ static void* uplink_mainloop(void *data)
}
// No error, handle normally
if ( events[i].data.fd == link->signal ) {
+ // Event on the signal fd -> a client requests data
int ret;
do {
ret = read( link->signal, buffer, sizeof buffer );
@@ -303,6 +306,7 @@ static void* uplink_mainloop(void *data)
}
}
if ( link->fd != -1 ) {
+ // Uplink seems fine, relay requests to it...
uplink_send_requests( link, TRUE );
}
} else if ( events[i].data.fd == link->fd ) {
@@ -315,6 +319,8 @@ static void* uplink_mainloop(void *data)
}
}
// Done handling epoll sockets
+ // Replicate missing blocks from the image so the proxy will eventually have a full copy
+ uplink_sendReplicationRequest( link );
// See if we should trigger an RTT measurement
if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
const time_t now = time( NULL );
@@ -414,7 +420,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
- printf( "[DEBUG] Error sending request to uplink server!\n" );
+ printf( "[DEBUG] Error forwarding request to uplink server!\n" );
altservers_serverFailed( &link->currentServer );
return;
}
@@ -424,6 +430,52 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
}
/**
+ * Sent a block request to an uplink server without really having
+ * any client that needs that data. This will be used for background replication
+ */
+static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
+{
+ if ( link == NULL || link->fd == -1 ) return;
+ dnbd3_image_t * const image = link->image;
+ if ( image == NULL || image->cache_map == NULL || image->filesize < DNBD3_BLOCK_SIZE ) return;
+ const time_t now = time( NULL );
+ if ( now <= link->lastReplication + 1 ) return;
+ link->lastReplication = now;
+ spin_lock( &image->lock );
+ if ( image == NULL || image->cache_map == NULL ) {
+ spin_unlock( &image->lock );
+ return;
+ }
+ dnbd3_request_t request;
+ request.magic = dnbd3_packet_magic;
+ int sent = 0;
+ const size_t len = IMGSIZE_TO_MAPBYTES( image->filesize ) - 1;
+ for (int i = 0; i <= len; ++i) {
+ if ( image->cache_map == NULL || link->fd == -1 || sent > 20 ) break;
+ if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue;
+ if ( i == len ) link->replicatedLastBlock = TRUE;
+ spin_unlock( &image->lock );
+ // Unlocked - do not break or continue here...
+ ++sent;
+ request.cmd = CMD_GET_BLOCK;
+ request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8;
+ request.size = DNBD3_BLOCK_SIZE * (uint64_t)8;
+ if ( request.offset + request.size > image->filesize ) {
+ request.size = image->filesize - request.offset;
+ }
+ fixup_request( request );
+ const int ret = send( link->fd, &request, sizeof request, MSG_NOSIGNAL );
+ if ( ret != sizeof(request) ) {
+ printf( "[DEBUG] Error sending background replication request to uplink server!\n" );
+ return;
+ }
+ // Lock again...
+ spin_lock( &image->lock );
+ }
+ spin_unlock( &image->lock );
+}
+
+/**
* Receive data from uplink server and process/dispatch
* Locks on: link.lock, indirectly on images[].lock
*/