summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2014-12-22 19:46:42 +0100
committerSimon Rettberg2014-12-22 19:46:42 +0100
commit47c28e25055cfa3c4c37035cc7888d0b87392dc1 (patch)
treef3213a89b18ddca597106af892f54bc429fc7689 /src/server/uplink.c
parent[SERVER] Configurable client timeout, adaptive replication speed (to be teste... (diff)
downloaddnbd3-47c28e25055cfa3c4c37035cc7888d0b87392dc1.tar.gz
dnbd3-47c28e25055cfa3c4c37035cc7888d0b87392dc1.tar.xz
dnbd3-47c28e25055cfa3c4c37035cc7888d0b87392dc1.zip
[SERVER] Improve replication and reconnecting behaviour
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c56
1 files changed, 29 insertions, 27 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 6c604c1..2821a27 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -256,6 +256,7 @@ static void* uplink_mainloop(void *data)
link->replicationHandle = 0;
// Re-send all pending requests
uplink_send_requests( link, FALSE );
+ uplink_sendReplicationRequest( link );
link->image->working = TRUE;
buffer[0] = '@';
if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -338,8 +339,8 @@ static void* uplink_mainloop(void *data)
const time_t now = time( NULL );
if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) {
// This probably means the system time was changed - handle this case properly by capping the timeout
- nextAltCheck = now + SERVER_RTT_DELAY_FAILED;
- } else if ( now >= nextAltCheck ) {
+ nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2;
+ } else if ( now >= nextAltCheck || link->fd == -1 ) {
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
@@ -381,8 +382,8 @@ static void* uplink_mainloop(void *data)
snprintf( buffer, sizeof(buffer), "[DEBUG WARNING] Starving request detected:\n"
"%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name,
link->queue[i].from, link->queue[i].to, link->queue[i].status );
- link->queue[i].status = ULR_NEW;
- resend = TRUE;
+ //link->queue[i].status = ULR_NEW;
+ //resend = TRUE;
spin_unlock( &link->queueLock );
printf("%s", buffer);
spin_lock( &link->queueLock );
@@ -508,7 +509,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT | MSG_NOSIGNAL );
if ( ret < 0 ) {
const int err = errno;
- if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases
+ if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) break; // OK cases
goto error_cleanup;
}
if ( ret == 0 ) {
@@ -547,7 +548,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
}
// Payload read completely
// Bail out if we're not interested
- if ( inReply.cmd != CMD_GET_BLOCK ) return;
+ if ( inReply.cmd != CMD_GET_BLOCK ) continue;
// Is a legit block reply
const uint64_t start = inReply.handle;
const uint64_t end = inReply.handle + inReply.size;
@@ -576,31 +577,32 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
outReply.magic = dnbd3_packet_magic;
for (i = link->queueLen - 1; i >= 0; --i) {
dnbd3_queued_request_t * const req = &link->queue[i];
- if ( req->status != ULR_PROCESSING ) continue;
- assert( req->from >= start && req->to <= end );
- dnbd3_client_t * const client = req->client;
- outReply.cmd = CMD_GET_BLOCK;
- outReply.handle = req->handle;
- outReply.size = req->to - req->from;
- iov[0].iov_base = &outReply;
- iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = link->recvBuffer + (req->from - start);
- iov[1].iov_len = outReply.size;
- fixup_reply( outReply );
- req->status = ULR_FREE;
- pthread_mutex_lock( &client->sendMutex );
- spin_unlock( &link->queueLock );
- writev( client->sock, iov, 2 );
- pthread_mutex_unlock( &client->sendMutex );
- spin_lock( &link->queueLock );
- if ( i == link->queueLen - 1 ) link->queueLen--;
+ if ( req->status == ULR_PROCESSING ) {
+ assert( req->from >= start && req->to <= end );
+ dnbd3_client_t * const client = req->client;
+ outReply.cmd = CMD_GET_BLOCK;
+ outReply.handle = req->handle;
+ outReply.size = req->to - req->from;
+ iov[0].iov_base = &outReply;
+ iov[0].iov_len = sizeof outReply;
+ iov[1].iov_base = link->recvBuffer + (req->from - start);
+ iov[1].iov_len = outReply.size;
+ fixup_reply( outReply );
+ req->status = ULR_FREE;
+ pthread_mutex_lock( &client->sendMutex );
+ spin_unlock( &link->queueLock );
+ writev( client->sock, iov, 2 );
+ pthread_mutex_unlock( &client->sendMutex );
+ spin_lock( &link->queueLock );
+ }
+ if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;
}
spin_unlock( &link->queueLock );
if ( start == link->replicationHandle ) link->replicationHandle = 0;
}
- if ( link->queueLen == 0 ) {
- uplink_sendReplicationRequest( link );
- }
+ if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link );
+ return;
+ // Error handling from failed receive or message parsing
error_cleanup: ;
altservers_serverFailed( &link->currentServer );
const int fd = link->fd;