From 19b16581573019a79166ce30376f7b6da6885730 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 13 Nov 2013 22:28:10 +0100 Subject: [SERVER] Automatic replication of images that are not complete on proxy. Speed probably needs tweaking for different link speeds etc. --- src/server/globals.h | 2 ++ src/server/uplink.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/server/globals.h b/src/server/globals.h index 1243eb8..e44b26d 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -55,6 +55,8 @@ struct _dnbd3_connection uint8_t *recvBuffer; // Buffer for receiving payload int recvBufferLen; // Len of ^^ volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown() + int replicatedLastBlock; // bool telling if the last block has been replicated yet + time_t lastReplication; // timestamp of when last replication requests were sent }; typedef struct diff --git a/src/server/uplink.c b/src/server/uplink.c index d19c1d9..b38be45 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -27,6 +27,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly); static void uplink_handle_receive(dnbd3_connection_t *link); static int uplink_send_keepalive(const int fd); static void uplink_addCrc32(dnbd3_connection_t *uplink); +static void uplink_sendReplicationRequest(dnbd3_connection_t *link); // ############ uplink connection handling @@ -87,11 +88,11 @@ void uplink_shutdown(dnbd3_image_t *image) image->uplink = NULL; uplink->shutdown = TRUE; spin_unlock( &uplink->queueLock ); - spin_destroy( &uplink->queueLock ); if ( uplink->signal != -1 ) write( uplink->signal, "", 1 ); if ( uplink->image != NULL ) { pthread_join( uplink->thread, NULL ); } + spin_destroy( &uplink->queueLock ); free( uplink->recvBuffer ); free( uplink ); } @@ -271,6 +272,7 @@ static void* uplink_mainloop(void *data) } // Check all events for (i = 0; i < numSocks; ++i) { + // Check for errors.... if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { if ( events[i].data.fd == link->signal ) { memlogf( "[WARNING] epoll error on signal-pipe!" ); @@ -289,6 +291,7 @@ static void* uplink_mainloop(void *data) } // No error, handle normally if ( events[i].data.fd == link->signal ) { + // Event on the signal fd -> a client requests data int ret; do { ret = read( link->signal, buffer, sizeof buffer ); @@ -303,6 +306,7 @@ static void* uplink_mainloop(void *data) } } if ( link->fd != -1 ) { + // Uplink seems fine, relay requests to it... uplink_send_requests( link, TRUE ); } } else if ( events[i].data.fd == link->fd ) { @@ -315,6 +319,8 @@ static void* uplink_mainloop(void *data) } } // Done handling epoll sockets + // Replicate missing blocks from the image so the proxy will eventually have a full copy + uplink_sendReplicationRequest( link ); // See if we should trigger an RTT measurement if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { const time_t now = time( NULL ); @@ -414,7 +420,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection // is reestablished. - printf( "[DEBUG] Error sending request to uplink server!\n" ); + printf( "[DEBUG] Error forwarding request to uplink server!\n" ); altservers_serverFailed( &link->currentServer ); return; } @@ -423,6 +429,52 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) spin_unlock( &link->queueLock ); } +/** + * Sent a block request to an uplink server without really having + * any client that needs that data. This will be used for background replication + */ +static void uplink_sendReplicationRequest(dnbd3_connection_t *link) +{ + if ( link == NULL || link->fd == -1 ) return; + dnbd3_image_t * const image = link->image; + if ( image == NULL || image->cache_map == NULL || image->filesize < DNBD3_BLOCK_SIZE ) return; + const time_t now = time( NULL ); + if ( now <= link->lastReplication + 1 ) return; + link->lastReplication = now; + spin_lock( &image->lock ); + if ( image == NULL || image->cache_map == NULL ) { + spin_unlock( &image->lock ); + return; + } + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + int sent = 0; + const size_t len = IMGSIZE_TO_MAPBYTES( image->filesize ) - 1; + for (int i = 0; i <= len; ++i) { + if ( image->cache_map == NULL || link->fd == -1 || sent > 20 ) break; + if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue; + if ( i == len ) link->replicatedLastBlock = TRUE; + spin_unlock( &image->lock ); + // Unlocked - do not break or continue here... + ++sent; + request.cmd = CMD_GET_BLOCK; + request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8; + request.size = DNBD3_BLOCK_SIZE * (uint64_t)8; + if ( request.offset + request.size > image->filesize ) { + request.size = image->filesize - request.offset; + } + fixup_request( request ); + const int ret = send( link->fd, &request, sizeof request, MSG_NOSIGNAL ); + if ( ret != sizeof(request) ) { + printf( "[DEBUG] Error sending background replication request to uplink server!\n" ); + return; + } + // Lock again... + spin_lock( &image->lock ); + } + spin_unlock( &image->lock ); +} + /** * Receive data from uplink server and process/dispatch * Locks on: link.lock, indirectly on images[].lock -- cgit v1.2.3-55-g7522