diff options
-rw-r--r-- | src/fuse/connection.c | 61 |
1 files changed, 49 insertions, 12 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c index e2cebb9..eb65940 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -58,6 +58,7 @@ typedef struct _alt_server { int rtt; int rtts[RTT_COUNT]; int rttIndex; + int bestCount; } alt_server_t; alt_server_t altservers[MAX_ALTS]; dnbd3_server_entry_t newservers[MAX_ALTS]; @@ -270,8 +271,8 @@ int connection_printStats(char *buffer, const int len) value = altservers[i].rtt; width += 3; } - ret = snprintf( buffer, remaining, "% *d %s Unreachable: % 3d\n", - width, value, unit, altservers[i].consecutiveFails ); + ret = snprintf( buffer, remaining, "% *d %s Unreachable: % 4d BestCount: % 4d\n", + width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount ); remaining -= ret; buffer += ret; } @@ -436,6 +437,7 @@ static void addAltServers() sock_printHost( &newservers[nIdx].host, txt, 200 ); logadd( LOG_DEBUG1, "new server %s in slot %d", txt, slot ); altservers[slot].consecutiveFails = 0; + altservers[slot].bestCount = 0; altservers[slot].rtts[0] = RTT_UNREACHABLE; altservers[slot].rttIndex = 1; altservers[slot].host = newservers[nIdx].host; @@ -451,11 +453,13 @@ static void probeAltServers() serialized_buffer_t buffer; dnbd3_reply_t reply; int bestIndex = -1; + int currentIndex = -1; int bestSock = -1; int currentRtt = RTT_UNREACHABLE; uint16_t remoteRid, remoteProto; uint64_t remoteSize; char *remoteName; + bool doSwitch; const bool panic = connection.sockFd == -1; for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { @@ -466,17 +470,18 @@ static void probeAltServers() && srv->consecutiveFails % ( srv->consecutiveFails / 8 ) != 0 ) { continue; } - if (srv->rttIndex >= RTT_COUNT) { + if ( srv->rttIndex >= RTT_COUNT ) { srv->rttIndex = 0; } else { srv->rttIndex += 1; } // Probe const uint64_t start = nowMicro(); + errno = 0; int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); if ( sock == -1 ) { - logadd( LOG_WARNING, "Could not crrate socket for probing. errno = %d", errno ); - continue; + logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno ); + goto fail; } if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { logadd( LOG_DEBUG1, "-> select fail" ); @@ -503,7 +508,7 @@ static void probeAltServers() int a = 111, b = 111; if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != RTT_BLOCK_SIZE || !(b = throwDataAway( sock, RTT_BLOCK_SIZE )) ) { - logadd( LOG_DEBUG1, "<- block paxload fail %d %d %d", a, (int)reply.size, b ); + logadd( LOG_DEBUG1, "<- block payload fail %d %d %d", a, (int)reply.size, b ); goto fail; } // Yay, success @@ -525,6 +530,7 @@ static void probeAltServers() // Remember rtt if this server matches the current one if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) { currentRtt = srv->rtt; + currentIndex = altIndex; } // Keep socket open if this is currently the best one if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) { @@ -538,19 +544,50 @@ static void probeAltServers() } continue; fail:; - close( sock ); + if ( sock != -1 ) { + close( sock ); + } srv->rtts[srv->rttIndex] = RTT_UNREACHABLE; srv->consecutiveFails += 1; } + doSwitch = false; + if ( bestIndex != -1 ) { + // Time-sensitive switch decision: If a server was best for some consecutive measurements, + // we switch no matter how small the difference to the current server is + for ( int i = 0; i < MAX_ALTS; ++i ) { + if ( i == bestIndex ) { + if ( altservers[i].bestCount < 30 ) { + altservers[i].bestCount += 2; + } + // Switch with increasing probability the higher the bestCount is + if ( altservers[i].bestCount > 5 && altservers[i].rtt < currentRtt && altservers[i].bestCount > rand() % 25 ) { + doSwitch = true; + } + } else if ( altservers[i].bestCount > 0 ) { + altservers[i].bestCount--; + } + } + // This takes care of the situation where two servers alternate being the best server all the time + if ( doSwitch && currentIndex != -1 && altservers[bestIndex].bestCount - altservers[currentIndex].bestCount < 6 ) { + doSwitch = false; + } + // Regular logic: Apply threshold when considering switch + if ( !doSwitch ) { + doSwitch = currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD + || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1000; + } + } // Switch if a better server was found - if ( bestIndex != -1 - && ( currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD - || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1500 ) ) { + if ( doSwitch ) { logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt ); + for ( int i = 0; i < MAX_ALTS; ++i ) { + if ( i != bestIndex ) { + altservers[i].bestCount = 0; + } + } switchConnection( bestSock, &altservers[bestIndex] ); } else if ( bestIndex != -1 ) { // No switch - logadd( LOG_DEBUG1, "Current: %dµs, best: %dµs. Will not switch.", currentRtt, altservers[bestIndex].rtt ); close( bestSock ); } } @@ -561,7 +598,7 @@ static void switchConnection(int sockFd, alt_server_t *srv) struct sockaddr_storage addr; socklen_t addrLen = sizeof(addr); char message[200] = "Connection switched to "; - size_t len = strlen(message); + const size_t len = strlen( message ); int ret; dnbd3_async_t *queue, *it; |