summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2014-12-22 15:51:30 +0100
committerSimon Rettberg2014-12-22 15:51:30 +0100
commitcce7cf2c1428d174dd49177358dc52b234e97e5c (patch)
treedb0fa8c046e5c694bde7ff5afdff96c773ad6d64 /src/server/uplink.c
parentget-version.sh will always work in the directory it's placed in (diff)
downloaddnbd3-cce7cf2c1428d174dd49177358dc52b234e97e5c.tar.gz
dnbd3-cce7cf2c1428d174dd49177358dc52b234e97e5c.tar.xz
dnbd3-cce7cf2c1428d174dd49177358dc52b234e97e5c.zip
[SERVER] Configurable client timeout, adaptive replication speed (to be tested against varying bw/latency), retry sendfile call if ret <= len
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c37
1 files changed, 22 insertions, 15 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 0a60ff1..6c604c1 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -55,6 +55,7 @@ int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
link->queueLen = 0;
link->fd = -1;
link->signal = -1;
+ link->replicationHandle = 0;
if ( sock >= 0 ) {
link->betterFd = sock;
link->betterServer = *host;
@@ -250,10 +251,11 @@ static void* uplink_mainloop(void *data)
if ( link->image->crc32 == NULL ) {
uplink_addCrc32( link );
}
- // Re-send all pending requests
- uplink_send_requests( link, FALSE );
link->betterFd = -1;
link->currentServer = link->betterServer;
+ link->replicationHandle = 0;
+ // Re-send all pending requests
+ uplink_send_requests( link, FALSE );
link->image->working = TRUE;
buffer[0] = '@';
if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -331,8 +333,6 @@ static void* uplink_mainloop(void *data)
}
}
// Done handling epoll sockets
- // Replicate missing blocks from the image so the proxy will eventually have a full copy
- uplink_sendReplicationRequest( link );
// See if we should trigger an RTT measurement
if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
const time_t now = time( NULL );
@@ -373,6 +373,7 @@ static void* uplink_mainloop(void *data)
}
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
+ int resend = FALSE;
time_t deadline = time( NULL ) - 10;
spin_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
@@ -381,12 +382,15 @@ static void* uplink_mainloop(void *data)
"%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name,
link->queue[i].from, link->queue[i].to, link->queue[i].status );
link->queue[i].status = ULR_NEW;
+ resend = TRUE;
spin_unlock( &link->queueLock );
printf("%s", buffer);
spin_lock( &link->queueLock );
}
}
spin_unlock( &link->queueLock );
+ if ( resend )
+ uplink_send_requests( link, TRUE );
}
#endif
}
@@ -458,27 +462,24 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
if ( link == NULL || link->fd == -1 ) return;
dnbd3_image_t * const image = link->image;
if ( image == NULL || image->cache_map == NULL || image->filesize < DNBD3_BLOCK_SIZE ) return;
- const time_t now = time( NULL );
- if ( now <= link->lastReplication + 1 ) return;
- link->lastReplication = now;
spin_lock( &image->lock );
- if ( image == NULL || image->cache_map == NULL ) {
+ if ( image == NULL || image->cache_map == NULL || link->replicationHandle != 0 ) {
spin_unlock( &image->lock );
return;
}
dnbd3_request_t request;
request.magic = dnbd3_packet_magic;
- int sent = 0;
const size_t len = IMGSIZE_TO_MAPBYTES( image->filesize ) - 1;
for (int i = 0; i <= len; ++i) {
- if ( image->cache_map == NULL || link->fd == -1 || sent > 20 ) break;
+ if ( image->cache_map == NULL || link->fd == -1 ) break;
if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue;
if ( i == len ) link->replicatedLastBlock = TRUE;
+ link->replicationHandle = 1; // Prevent race condition
spin_unlock( &image->lock );
// Unlocked - do not break or continue here...
- ++sent;
request.cmd = CMD_GET_BLOCK;
- request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8;
+ // Needs to be 8 (bit->byte, bitmap)
+ link->replicationHandle = request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8;
request.size = DNBD3_BLOCK_SIZE * (uint64_t)8;
if ( request.offset + request.size > image->filesize ) {
request.size = image->filesize - request.offset;
@@ -489,10 +490,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
printf( "[DEBUG] Error sending background replication request to uplink server!\n" );
return;
}
- // Lock again...
- spin_lock( &image->lock );
+ return; // Request was sent, bail out, nothing is locked
}
spin_unlock( &image->lock );
+ // Replication might be complete, uplink_mainloop should take care....
}
/**
@@ -592,15 +593,21 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
writev( client->sock, iov, 2 );
pthread_mutex_unlock( &client->sendMutex );
spin_lock( &link->queueLock );
- if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
+ if ( i == link->queueLen - 1 ) link->queueLen--;
}
spin_unlock( &link->queueLock );
+ if ( start == link->replicationHandle ) link->replicationHandle = 0;
+ }
+ if ( link->queueLen == 0 ) {
+ uplink_sendReplicationRequest( link );
}
error_cleanup: ;
altservers_serverFailed( &link->currentServer );
const int fd = link->fd;
link->fd = -1;
+ link->replicationHandle = 0;
if ( fd != -1 ) close( fd );
+ altservers_findUplink( link ); // Can we just call it here?
}
/**