From 47c28e25055cfa3c4c37035cc7888d0b87392dc1 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 22 Dec 2014 19:46:42 +0100 Subject: [SERVER] Improve replication and reconnecting behaviour --- src/server/altservers.c | 9 ++++---- src/server/altservers.h | 2 +- src/server/globals.h | 2 +- src/server/image.c | 2 +- src/server/net.c | 3 ++- src/server/uplink.c | 56 +++++++++++++++++++++++++------------------------ 6 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/server/altservers.c b/src/server/altservers.c index 25d9bc9..7923f1a 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -226,7 +226,7 @@ int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int * This function is suited for finding uplink servers as * it includes private servers and ignores any "client only" servers */ -int altservers_get(dnbd3_host_t *output, int size) +int altservers_get(dnbd3_host_t *output, int size, int emergency) { if ( size <= 0 ) return 0; int count = 0, i; @@ -245,10 +245,10 @@ int altservers_get(dnbd3_host_t *output, int size) if ( altServers[i].host.type == 0 ) continue; // Slot is empty if ( _proxyPrivateOnly && !altServers[i].isPrivate ) continue; // Config says to consider private alt-servers only? ignore! if ( altServers[i].isClientOnly ) continue; - if ( altServers[i].numFails > SERVER_MAX_UPLINK_FAILS // server failed X times in a row + if ( !emergency && altServers[i].numFails > SERVER_MAX_UPLINK_FAILS // server failed X times in a row && now - altServers[i].lastFail > SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore! // server seems ok, include in output and reset its fail counter - altServers[i].numFails = 0; + if ( !emergency ) altServers[i].numFails = 0; output[count++] = altServers[i].host; if ( count >= size ) break; } @@ -395,7 +395,6 @@ static void *altservers_main(void *data) // Empty pipe do { ret = read( readPipe, buffer, sizeof buffer ); - if ( ret > 0 ) printf("*********** altserver thread woke up\n"); } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up if ( ret == 0 ) { memlogf( "[WARNING] Signal pipe of alservers_main closed! Things will break!" ); @@ -425,7 +424,7 @@ static void *altservers_main(void *data) } assert( uplink->rttTestResult == RTT_INPROGRESS ); // Now get 4 alt servers - numAlts = altservers_get( servers, ALTS ); + numAlts = altservers_get( servers, ALTS, uplink->fd == -1 ); if ( uplink->fd != -1 ) { // Add current server if not already in list found = FALSE; diff --git a/src/server/altservers.h b/src/server/altservers.h index 13b0685..459c546 100644 --- a/src/server/altservers.h +++ b/src/server/altservers.h @@ -17,7 +17,7 @@ void altservers_removeUplink(dnbd3_connection_t *uplink); int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); -int altservers_get(dnbd3_host_t *output, int size); +int altservers_get(dnbd3_host_t *output, int size, int emergency); int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2); diff --git a/src/server/globals.h b/src/server/globals.h index c215916..037cd08 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -41,7 +41,7 @@ typedef struct #define RTT_NOT_REACHABLE 4 // No uplink was reachable struct _dnbd3_connection { - int fd; // socket fd to remote server + volatile int fd; // socket fd to remote server int signal; // eventfd used to wake up the process pthread_t thread; // thread holding the connection pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. diff --git a/src/server/image.c b/src/server/image.c index 78b907c..d6fad22 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -901,7 +901,7 @@ dnbd3_image_t* image_getOrClone(char *name, uint16_t revision) dnbd3_host_t servers[4]; int uplinkSock = -1; dnbd3_host_t *uplinkServer = NULL; - const int count = altservers_get( servers, 4 ); + const int count = altservers_get( servers, 4, FALSE ); uint16_t remoteVersion, remoteRid; uint64_t remoteImageSize; for (i = 0; i < count; ++i) { diff --git a/src/server/net.c b/src/server/net.c index 5ea2e36..7d7ecb4 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -306,7 +306,8 @@ void *net_client_handler(void *dnbd3_client) const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size ); if ( ret <= 0 ) { pthread_mutex_unlock( &client->sendMutex ); - printf( "[ERROR] sendfile failed (image to net. ret=%d, sent %d/%d, errno=%d)\n", (int)ret, (int)done, (int)request.size, (int)errno ); + printf( "[ERROR] sendfile failed (image to net. ret=%d, sent %d/%d, errno=%d)\n", + (int)ret, (int)done, (int)request.size, (int)errno ); goto exit_client_cleanup; } done += ret; diff --git a/src/server/uplink.c b/src/server/uplink.c index 6c604c1..2821a27 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -256,6 +256,7 @@ static void* uplink_mainloop(void *data) link->replicationHandle = 0; // Re-send all pending requests uplink_send_requests( link, FALSE ); + uplink_sendReplicationRequest( link ); link->image->working = TRUE; buffer[0] = '@'; if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { @@ -338,8 +339,8 @@ static void* uplink_mainloop(void *data) const time_t now = time( NULL ); if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) { // This probably means the system time was changed - handle this case properly by capping the timeout - nextAltCheck = now + SERVER_RTT_DELAY_FAILED; - } else if ( now >= nextAltCheck ) { + nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2; + } else if ( now >= nextAltCheck || link->fd == -1 ) { // It seems it's time for a check if ( image_isComplete( link->image ) ) { // Quit work if image is complete @@ -381,8 +382,8 @@ static void* uplink_mainloop(void *data) snprintf( buffer, sizeof(buffer), "[DEBUG WARNING] Starving request detected:\n" "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name, link->queue[i].from, link->queue[i].to, link->queue[i].status ); - link->queue[i].status = ULR_NEW; - resend = TRUE; + //link->queue[i].status = ULR_NEW; + //resend = TRUE; spin_unlock( &link->queueLock ); printf("%s", buffer); spin_lock( &link->queueLock ); @@ -508,7 +509,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT | MSG_NOSIGNAL ); if ( ret < 0 ) { const int err = errno; - if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases + if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) break; // OK cases goto error_cleanup; } if ( ret == 0 ) { @@ -547,7 +548,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) } // Payload read completely // Bail out if we're not interested - if ( inReply.cmd != CMD_GET_BLOCK ) return; + if ( inReply.cmd != CMD_GET_BLOCK ) continue; // Is a legit block reply const uint64_t start = inReply.handle; const uint64_t end = inReply.handle + inReply.size; @@ -576,31 +577,32 @@ static void uplink_handle_receive(dnbd3_connection_t *link) outReply.magic = dnbd3_packet_magic; for (i = link->queueLen - 1; i >= 0; --i) { dnbd3_queued_request_t * const req = &link->queue[i]; - if ( req->status != ULR_PROCESSING ) continue; - assert( req->from >= start && req->to <= end ); - dnbd3_client_t * const client = req->client; - outReply.cmd = CMD_GET_BLOCK; - outReply.handle = req->handle; - outReply.size = req->to - req->from; - iov[0].iov_base = &outReply; - iov[0].iov_len = sizeof outReply; - iov[1].iov_base = link->recvBuffer + (req->from - start); - iov[1].iov_len = outReply.size; - fixup_reply( outReply ); - req->status = ULR_FREE; - pthread_mutex_lock( &client->sendMutex ); - spin_unlock( &link->queueLock ); - writev( client->sock, iov, 2 ); - pthread_mutex_unlock( &client->sendMutex ); - spin_lock( &link->queueLock ); - if ( i == link->queueLen - 1 ) link->queueLen--; + if ( req->status == ULR_PROCESSING ) { + assert( req->from >= start && req->to <= end ); + dnbd3_client_t * const client = req->client; + outReply.cmd = CMD_GET_BLOCK; + outReply.handle = req->handle; + outReply.size = req->to - req->from; + iov[0].iov_base = &outReply; + iov[0].iov_len = sizeof outReply; + iov[1].iov_base = link->recvBuffer + (req->from - start); + iov[1].iov_len = outReply.size; + fixup_reply( outReply ); + req->status = ULR_FREE; + pthread_mutex_lock( &client->sendMutex ); + spin_unlock( &link->queueLock ); + writev( client->sock, iov, 2 ); + pthread_mutex_unlock( &client->sendMutex ); + spin_lock( &link->queueLock ); + } + if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; } spin_unlock( &link->queueLock ); if ( start == link->replicationHandle ) link->replicationHandle = 0; } - if ( link->queueLen == 0 ) { - uplink_sendReplicationRequest( link ); - } + if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link ); + return; + // Error handling from failed receive or message parsing error_cleanup: ; altservers_serverFailed( &link->currentServer ); const int fd = link->fd; -- cgit v1.2.3-55-g7522