From cce7cf2c1428d174dd49177358dc52b234e97e5c Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 22 Dec 2014 15:51:30 +0100 Subject: [SERVER] Configurable client timeout, adaptive replication speed (to be tested against varying bw/latency), retry sendfile call if ret <= len --- src/server/uplink.c | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 0a60ff1..6c604c1 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -55,6 +55,7 @@ int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->queueLen = 0; link->fd = -1; link->signal = -1; + link->replicationHandle = 0; if ( sock >= 0 ) { link->betterFd = sock; link->betterServer = *host; @@ -250,10 +251,11 @@ static void* uplink_mainloop(void *data) if ( link->image->crc32 == NULL ) { uplink_addCrc32( link ); } - // Re-send all pending requests - uplink_send_requests( link, FALSE ); link->betterFd = -1; link->currentServer = link->betterServer; + link->replicationHandle = 0; + // Re-send all pending requests + uplink_send_requests( link, FALSE ); link->image->working = TRUE; buffer[0] = '@'; if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { @@ -331,8 +333,6 @@ static void* uplink_mainloop(void *data) } } // Done handling epoll sockets - // Replicate missing blocks from the image so the proxy will eventually have a full copy - uplink_sendReplicationRequest( link ); // See if we should trigger an RTT measurement if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { const time_t now = time( NULL ); @@ -373,6 +373,7 @@ static void* uplink_mainloop(void *data) } #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { + int resend = FALSE; time_t deadline = time( NULL ) - 10; spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { @@ -381,12 +382,15 @@ static void* uplink_mainloop(void *data) "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name, link->queue[i].from, link->queue[i].to, link->queue[i].status ); link->queue[i].status = ULR_NEW; + resend = TRUE; spin_unlock( &link->queueLock ); printf("%s", buffer); spin_lock( &link->queueLock ); } } spin_unlock( &link->queueLock ); + if ( resend ) + uplink_send_requests( link, TRUE ); } #endif } @@ -458,27 +462,24 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) if ( link == NULL || link->fd == -1 ) return; dnbd3_image_t * const image = link->image; if ( image == NULL || image->cache_map == NULL || image->filesize < DNBD3_BLOCK_SIZE ) return; - const time_t now = time( NULL ); - if ( now <= link->lastReplication + 1 ) return; - link->lastReplication = now; spin_lock( &image->lock ); - if ( image == NULL || image->cache_map == NULL ) { + if ( image == NULL || image->cache_map == NULL || link->replicationHandle != 0 ) { spin_unlock( &image->lock ); return; } dnbd3_request_t request; request.magic = dnbd3_packet_magic; - int sent = 0; const size_t len = IMGSIZE_TO_MAPBYTES( image->filesize ) - 1; for (int i = 0; i <= len; ++i) { - if ( image->cache_map == NULL || link->fd == -1 || sent > 20 ) break; + if ( image->cache_map == NULL || link->fd == -1 ) break; if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue; if ( i == len ) link->replicatedLastBlock = TRUE; + link->replicationHandle = 1; // Prevent race condition spin_unlock( &image->lock ); // Unlocked - do not break or continue here... - ++sent; request.cmd = CMD_GET_BLOCK; - request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8; + // Needs to be 8 (bit->byte, bitmap) + link->replicationHandle = request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8; request.size = DNBD3_BLOCK_SIZE * (uint64_t)8; if ( request.offset + request.size > image->filesize ) { request.size = image->filesize - request.offset; @@ -489,10 +490,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) printf( "[DEBUG] Error sending background replication request to uplink server!\n" ); return; } - // Lock again... - spin_lock( &image->lock ); + return; // Request was sent, bail out, nothing is locked } spin_unlock( &image->lock ); + // Replication might be complete, uplink_mainloop should take care.... } /** @@ -592,15 +593,21 @@ static void uplink_handle_receive(dnbd3_connection_t *link) writev( client->sock, iov, 2 ); pthread_mutex_unlock( &client->sendMutex ); spin_lock( &link->queueLock ); - if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; + if ( i == link->queueLen - 1 ) link->queueLen--; } spin_unlock( &link->queueLock ); + if ( start == link->replicationHandle ) link->replicationHandle = 0; + } + if ( link->queueLen == 0 ) { + uplink_sendReplicationRequest( link ); } error_cleanup: ; altservers_serverFailed( &link->currentServer ); const int fd = link->fd; link->fd = -1; + link->replicationHandle = 0; if ( fd != -1 ) close( fd ); + altservers_findUplink( link ); // Can we just call it here? } /** -- cgit v1.2.3-55-g7522