From 47c28e25055cfa3c4c37035cc7888d0b87392dc1 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 22 Dec 2014 19:46:42 +0100 Subject: [SERVER] Improve replication and reconnecting behaviour --- src/server/uplink.c | 56 +++++++++++++++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 27 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 6c604c1..2821a27 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -256,6 +256,7 @@ static void* uplink_mainloop(void *data) link->replicationHandle = 0; // Re-send all pending requests uplink_send_requests( link, FALSE ); + uplink_sendReplicationRequest( link ); link->image->working = TRUE; buffer[0] = '@'; if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { @@ -338,8 +339,8 @@ static void* uplink_mainloop(void *data) const time_t now = time( NULL ); if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) { // This probably means the system time was changed - handle this case properly by capping the timeout - nextAltCheck = now + SERVER_RTT_DELAY_FAILED; - } else if ( now >= nextAltCheck ) { + nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2; + } else if ( now >= nextAltCheck || link->fd == -1 ) { // It seems it's time for a check if ( image_isComplete( link->image ) ) { // Quit work if image is complete @@ -381,8 +382,8 @@ static void* uplink_mainloop(void *data) snprintf( buffer, sizeof(buffer), "[DEBUG WARNING] Starving request detected:\n" "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name, link->queue[i].from, link->queue[i].to, link->queue[i].status ); - link->queue[i].status = ULR_NEW; - resend = TRUE; + //link->queue[i].status = ULR_NEW; + //resend = TRUE; spin_unlock( &link->queueLock ); printf("%s", buffer); spin_lock( &link->queueLock ); @@ -508,7 +509,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT | MSG_NOSIGNAL ); if ( ret < 0 ) { const int err = errno; - if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases + if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) break; // OK cases goto error_cleanup; } if ( ret == 0 ) { @@ -547,7 +548,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) } // Payload read completely // Bail out if we're not interested - if ( inReply.cmd != CMD_GET_BLOCK ) return; + if ( inReply.cmd != CMD_GET_BLOCK ) continue; // Is a legit block reply const uint64_t start = inReply.handle; const uint64_t end = inReply.handle + inReply.size; @@ -576,31 +577,32 @@ static void uplink_handle_receive(dnbd3_connection_t *link) outReply.magic = dnbd3_packet_magic; for (i = link->queueLen - 1; i >= 0; --i) { dnbd3_queued_request_t * const req = &link->queue[i]; - if ( req->status != ULR_PROCESSING ) continue; - assert( req->from >= start && req->to <= end ); - dnbd3_client_t * const client = req->client; - outReply.cmd = CMD_GET_BLOCK; - outReply.handle = req->handle; - outReply.size = req->to - req->from; - iov[0].iov_base = &outReply; - iov[0].iov_len = sizeof outReply; - iov[1].iov_base = link->recvBuffer + (req->from - start); - iov[1].iov_len = outReply.size; - fixup_reply( outReply ); - req->status = ULR_FREE; - pthread_mutex_lock( &client->sendMutex ); - spin_unlock( &link->queueLock ); - writev( client->sock, iov, 2 ); - pthread_mutex_unlock( &client->sendMutex ); - spin_lock( &link->queueLock ); - if ( i == link->queueLen - 1 ) link->queueLen--; + if ( req->status == ULR_PROCESSING ) { + assert( req->from >= start && req->to <= end ); + dnbd3_client_t * const client = req->client; + outReply.cmd = CMD_GET_BLOCK; + outReply.handle = req->handle; + outReply.size = req->to - req->from; + iov[0].iov_base = &outReply; + iov[0].iov_len = sizeof outReply; + iov[1].iov_base = link->recvBuffer + (req->from - start); + iov[1].iov_len = outReply.size; + fixup_reply( outReply ); + req->status = ULR_FREE; + pthread_mutex_lock( &client->sendMutex ); + spin_unlock( &link->queueLock ); + writev( client->sock, iov, 2 ); + pthread_mutex_unlock( &client->sendMutex ); + spin_lock( &link->queueLock ); + } + if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; } spin_unlock( &link->queueLock ); if ( start == link->replicationHandle ) link->replicationHandle = 0; } - if ( link->queueLen == 0 ) { - uplink_sendReplicationRequest( link ); - } + if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link ); + return; + // Error handling from failed receive or message parsing error_cleanup: ; altservers_serverFailed( &link->currentServer ); const int fd = link->fd; -- cgit v1.2.3-55-g7522