summaryrefslogblamecommitdiffstats
path: root/src/fuse/connection.c
blob: 8f28c09f0362bdc7039798d9747cbc85d4f68ab3 (plain) (tree)
1
2
3
4
5
6
7
8
9

                       
                            
                               
                               
                                 
                          
 
                   



                    

                  
                     
 
               
                                   

                           
                                 

                                                                                            
                     
 


                      

                                       
                                                             
                               
 
                           





                                
                           



                      


               

                                  
                                    

                                   
             
 
                    
                            




                             
                      
               



                                                 




                                                      
                                                          
 
                            
                             
                              
                                                            
                                
                                                       
 

                                                            
 

                           
 



                                                                                   
                    



                                          
                                   
                        
                                               

                                         
                                                   



                                                              
                                       







                                                                                              






                                                                                                         
                                          
                                                               
                                                                              
                          





                                                                                                   








                                                                                                 
                         

                                                       
                                           



                                                                                                                       
                                         



                                                                                                  



                                                                                                                                        
                                                                                                                  
                                

                                                                                                     
                                                                                         
                                                                  

                                                        



                                                                                                                  
                                                                      



                                                                    
                                                    






                                              
                 
                                   
                                                  


                                           
                                     


                          
































                                                                                                                              




                                  

                                            
                                                
                                                    
                                  
                                        
                                                                                                                      

                                                                 
                                                                      
                                                              
                                    

                 
                                                      




                       


                                                                                 

                                         
                                    








                                                         

 
                                                            

                
                               


                                                                                                                    

                                
                 




                                                 







                                                                                            

                                                    


                                        


                                                                                                

                                    
                                                                  
                           
                          
                                                 
                                       
                                                         



                                                  
                 

                                                                                                              






                                                 






                                        



                                                        
                                         
 
                               



                                                                       

                                                                    
                                                                                                 
                                  
                 


                                                                 

                                                                                               


                                                                                                                               
                                                  
                                 

                                                

                                                                                                          
                                                                                                           
                                                                   
                                                  


                                                          
                                                         
                                                               
                         









                                                                                                            








                                                                                                                                 

                 
                                                                           

                                                                              
                                                    

                                                                            

                                            
                                                      
         
                                                      



                                                                      
 



                                                                




                                                                                         
                                                                                      
                                                        
                                                                                                                         


                                                   
                                                           
                                    
                                                     
                                        
                                         
                                          
                                                                                                            





                                                                                            
                                             

                                                                    


                                                                   
                                                                                   




                                                                                                             
                                                           








                                                                                        

                        







                                                                  
                                                                                                
                                                 
                         
                 
                                                      


                                                                
                                                                                

                                      
                                                                                         
                                                                                                                                  
                                                                                                              



                                            


                                                                                    
                                                              
                                                       









                                                                      




































                                                                                                                          


                                   

                            
                              
                          
                                         


                                        
                      
                                                   
 















                                                                                                                
                                                                                               


                                                                 

                                                                                                 

                                 
                                                   





                                                  
                          
                                                                                
                                   

                                                                                                 

                                                                              
                                                                           


                                                                                                                      
                                                                                 

                                  
                                                           

                                                                                                                                             


                                                                                        

                                                                                                    

                                  
                                                                               
                                                                      

                                  


                                                                                                  

                                  





















                                                                                                         
                               






                                                      







                                                              


                                                                                   
                                                

                                                                     

                                                                                


                                                  



                                        
                         
      


                                      


                                                           



                                                                                                          

                                                             
                                               
                                                                     


                                                                                                 
                                                                                                                                                





                                                                   





                                                                    
                                                                                                                  
                                                                                                                                   







                                                                                                               
                                              
                         
                                                                                                                       




                                                            
                                                                     
                                       










                                                           
                                             

















                                                                       
                            



                                                                                                                                                  
                                                      




                                                                                                      
                                          


                                                            



                                                                            
                                             
                                                                                                                                          


                                                                                                                  
                                                                      



                                                              

 

















                                                                                                            

                                                      
                        

                                  

                                                                                                    
                                     
                                    



                    
                                                  




                                            
                                                                           








                                                        
                                                           

                                            
                                                                           





                                                                                       

                                                               










                                                          



















                                                                       
 
#include "connection.h"
#include "helper.h"
#include "../clientconfig.h"
#include "../shared/protocol.h"
#include "../shared/fdsignal.h"
#include "../shared/sockhelper.h"
#include "../shared/log.h"

#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <inttypes.h>

/* Constants */
static const size_t SHORTBUF = 100;
#define MAX_ALTS (16)
#define MAX_ALTS_ACTIVE (5)
#define MAX_HOSTS_PER_ADDRESS (2)
// If a server wasn't reachable this many times, we slowly start skipping it on measurements
static const int FAIL_BACKOFF_START_COUNT = 8;
#define RTT_COUNT (4)

/* Module variables */

// Init guard
static bool connectionInitDone = false;
static bool threadInitDone = false;
static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER;
static bool keepRunning = true;

// List of pending requests
static struct {
	dnbd3_async_t *head;
	dnbd3_async_t *tail;
	pthread_spinlock_t lock;
} requests;

// Connection for the image
static struct {
	char *name;
	uint16_t rid;
	uint64_t size;
} image;

static struct {
	int sockFd;
	pthread_mutex_t sendMutex;
	dnbd3_signal_t* panicSignal;
	dnbd3_host_t currentServer;
	uint64_t startupTime;
} connection;

// Known alt servers
typedef struct _alt_server {
	dnbd3_host_t host;
	int consecutiveFails;
	int rtt;
	int rtts[RTT_COUNT];
	int rttIndex;
	int bestCount;
} alt_server_t;

static alt_server_t altservers[MAX_ALTS];
static dnbd3_server_entry_t newservers[MAX_ALTS];
static pthread_spinlock_t altLock;

/* Static methods */


static void* connection_receiveThreadMain(void *sock);
static void* connection_backgroundThread(void *something);

static void addAltServers();
static void sortAltServers();
static void probeAltServers();
static void switchConnection(int sockFd, alt_server_t *srv);
static void requestAltServers();
static bool throwDataAway(int sockFd, uint32_t amount);

static void enqueueRequest(dnbd3_async_t *request);
static dnbd3_async_t* removeRequest(dnbd3_async_t *request);

static uint64_t nowMilli();
static uint64_t nowMicro();

bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid)
{
	int sock = -1;
	char host[SHORTBUF];
	size_t hlen;
	serialized_buffer_t buffer;
	uint16_t remoteVersion, remoteRid;
	char *remoteName;
	uint64_t remoteSize;
	struct sockaddr_storage sa;
	socklen_t salen;
	poll_list_t *cons = sock_newPollList();

	pthread_mutex_lock( &mutexInit );
	if ( !connectionInitDone && keepRunning ) {
		dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
		const char *current, *end;
		int altIndex = 0;
		memset( altservers, 0, sizeof altservers );
		connection.sockFd = -1;
		current = hosts;
		do {
			// Get next host from string
			while ( *current == ' ' ) current++;
			end = strchr( current, ' ' );
			size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1);
			if ( len > SHORTBUF ) len = SHORTBUF;
			snprintf( host, len, "%s", current );
			int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS );
			for ( int i = 0; i < newHosts; ++i ) {
				if ( altIndex >= MAX_ALTS )
					break;
				altservers[altIndex].host = tempHosts[i];
				altIndex += 1;
			}
			current = end + 1;
		} while ( end != NULL && altIndex < MAX_ALTS );
		logadd( LOG_INFO, "Got %d servers from init call", altIndex );
		// Connect
		for ( int i = 0; i < altIndex + 5; ++i ) {
			if ( i >= altIndex ) {
				// Additional iteration - no corresponding slot in altservers, this
				// is just so we can make a final calls with longer timeout
				sock = sock_multiConnect( cons, NULL, 400, 1000 );
				if ( sock == -2 ) {
					logadd( LOG_ERROR, "Could not connect to any host" );
					sock = -1;
					break;
				}
			} else {
				if ( altservers[i].host.type == 0 )
					continue;
				// Try to connect - 100ms timeout
				sock = sock_multiConnect( cons, &altservers[i].host, 100, 1000 );
			}
			if ( sock == -2 || sock == -1 )
				continue;
			salen = sizeof(sa);
			if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) {
				logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno );
				close( sock );
				sock = -1;
				continue;
			}
			hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) );
			logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host );
			if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
				logadd( LOG_ERROR, "Could not send select image" );
			} else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
				logadd( LOG_ERROR, "Could not read select image reply (%d)", errno );
			} else if ( rid != 0 && rid != remoteRid ) {
				logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid );
			} else {
				logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid );
				logadd( LOG_INFO, "Returned:  '%s:%d'", remoteName, (int)remoteRid );
				sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 );
				image.name = strdup( remoteName );
				image.rid = remoteRid;
				image.size = remoteSize;
				if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) {
					logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" );
					connection.currentServer.type = 0;
				}
				connection.panicSignal = signal_new();
				connection.startupTime = nowMilli();
				connection.sockFd = sock;
				requests.head = NULL;
				requests.tail = NULL;
				requestAltServers();
				break;
			}
			// Failed
			if ( sock != -1 ) {
				close( sock );
				sock = -1;
			}
		}
		if ( sock != -1 ) {
			connectionInitDone = true;
		}
	}
	pthread_mutex_unlock( &mutexInit );
	sock_destroyPollList( cons );
	return sock != -1;
}

bool connection_initThreads()
{
	pthread_mutex_lock( &mutexInit );
	if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) {
		pthread_mutex_unlock( &mutexInit );
		return false;
	}
	bool success = true;
	pthread_t thread;
	threadInitDone = true;
	logadd( LOG_DEBUG1, "Initializing stuff" );
	if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
			|| pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0
			|| pthread_spin_init( &altLock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
		logadd( LOG_ERROR, "Mutex or spinlock init failure" );
		success = false;
	} else {
		if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) {
			logadd( LOG_ERROR, "Could not create receive thread" );
			success = false;
		} else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
			logadd( LOG_ERROR, "Could not create background thread" );
			success = false;
		}
	}
	if ( !success ) {
		close( connection.sockFd );
		connection.sockFd = -1;
	}
	pthread_mutex_unlock( &mutexInit );
	return success;
}

uint64_t connection_getImageSize()
{
	return image.size;
}

bool connection_read(dnbd3_async_t *request)
{
	if ( !connectionInitDone ) return false;
	pthread_mutex_lock( &connection.sendMutex );
	enqueueRequest( request );
	if ( connection.sockFd != -1 ) {
		if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) {
			shutdown( connection.sockFd, SHUT_RDWR );
			connection.sockFd = -1;
			pthread_mutex_unlock( &connection.sendMutex );
			signal_call( connection.panicSignal );
			return true;
		}
	}
	pthread_mutex_unlock( &connection.sendMutex );
	return true;
}

void connection_close()
{
	if ( keepRunning ) {
		logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" );
	}
	pthread_mutex_lock( &mutexInit );
	keepRunning = false;
	if ( !connectionInitDone ) {
		pthread_mutex_unlock( &mutexInit );
		return;
	}
	pthread_mutex_unlock( &mutexInit );
	pthread_mutex_lock( &connection.sendMutex );
	if ( connection.sockFd != -1 ) {
		shutdown( connection.sockFd, SHUT_RDWR );
	}
	pthread_mutex_unlock( &connection.sendMutex );
}

size_t connection_printStats(char *buffer, const size_t len)
{
	int ret;
	size_t remaining = len;
	if ( remaining > 0 ) {
		ret = snprintf( buffer, remaining, "Image:    %s\nRevision: %d\n\nCurrent connection time: %ds\n\n",
				image.name, (int)image.rid, (int)( (nowMilli() - connection.startupTime) / 1000 ) );
		if ( ret < 0 ) {
			ret = 0;
		}
		if ( (size_t)ret >= remaining ) {
			return len;
		}
		remaining -= ret;
		buffer += ret;
	}
	int i = -1;
	pthread_spin_lock( &altLock );
	while ( remaining > 3 && ++i < MAX_ALTS ) {
		if ( altservers[i].host.type == 0 )
			continue;
		if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) {
			*buffer++ = '*';
		} else if ( i >= MAX_ALTS_ACTIVE ) {
			*buffer++ = '-';
		} else {
			*buffer++ = ' ';
		}
		const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining );
		remaining -= (addrlen + 1); // For space or * above
		buffer += addrlen;
		if ( remaining < 3 )
			break;
		int width = addrlen >= 35 ? 0 : 35 - (int)addrlen;
		char *unit;
		int value;
		if ( altservers[i].rtt > 5000 ) {
			unit = "ms   ";
			value = altservers[i].rtt / 1000;
		} else {
			unit = "µs";
			value = altservers[i].rtt;
			width += 3;
		}
		ret = snprintf( buffer, remaining, "% *d %s   Unreachable: % 4d   BestCount: % 4d\n",
				width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount );
		if ( ret < 0 ) {
			ret = 0;
		}
		if ( (size_t)ret >= remaining ) {
			remaining = 0;
			break;
		}
		remaining -= ret;
		buffer += ret;
	}
	pthread_spin_unlock( &altLock );
	return len - remaining;
}

static void* connection_receiveThreadMain(void *sockPtr)
{
	int sockFd = (int)(size_t)sockPtr;
	dnbd3_reply_t reply;
	pthread_detach( pthread_self() );

	while ( keepRunning ) {
		int ret;
		do {
			ret = dnbd3_read_reply( sockFd, &reply, true );
			if ( ret == REPLY_OK ) break;
		} while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
		if ( ret != REPLY_OK ) {
			logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
			goto fail;
		}

		if ( reply.cmd == CMD_GET_BLOCK ) {
			// Get block reply. find matching request
			dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
			if ( request == NULL ) {
				logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" );
				if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
					logadd( LOG_DEBUG1, "....and choked on reply payload" );
					goto fail;
				}
			} else {
				// Found a match
				const ssize_t ret = sock_recv( sockFd, request->buffer, request->length );
				if ( ret != (ssize_t)request->length ) {
					logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
					connection_read( request );
					goto fail;
				}
				// Success, wake up caller
				request->success = true;
				request->finished = true;
				signal_call( request->signal );
			}
		} else if ( reply.cmd == CMD_GET_SERVERS ) {
			// List of known alt servers
			dnbd3_server_entry_t entries[MAX_ALTS];
			const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS );
			const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count;
			if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize
					|| !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) {
				logadd( LOG_DEBUG1, "Error receiving list of alt servers." );
				goto fail;
			}
			pthread_spin_lock( &altLock );
			memcpy( newservers, entries, relevantSize );
			pthread_spin_unlock( &altLock );
		} else {
			// TODO: Handle the others?
			if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
				logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd );
				goto fail;
			}
		}
	}
	logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
fail:;
	// Make sure noone is trying to use the socket for sending by locking,
	pthread_mutex_lock( &connection.sendMutex );
	// then just set the fd to -1, but only if it's the same fd as ours,
	// as someone could have established a new connection already
	if ( connection.sockFd == sockFd ) {
		connection.sockFd = -1;
		signal_call( connection.panicSignal );
	}
	pthread_mutex_unlock( &connection.sendMutex );
	// As we're the only reader, it's safe to close the socket now
	close( sockFd );
	return NULL;
}

static void* connection_backgroundThread(void *something UNUSED)
{
	uint64_t nextKeepalive = 0;
	uint64_t nextRttCheck = 0;

	while ( keepRunning ) {
		const uint64_t now = nowMilli();
		if ( now < nextKeepalive && now < nextRttCheck ) {
			int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now );
			int waitRes = signal_wait( connection.panicSignal, waitTime );
			if ( waitRes == SIGNAL_ERROR ) {
				logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
			}
		}
		// Woken up, see what we have to do
		const bool panic = connection.sockFd == -1;
		// Check alt servers
		if ( panic || now >= nextRttCheck ) {
			addAltServers();
			sortAltServers();
			probeAltServers();
			if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
				nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull;
			} else {
				nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull;
			}
		}
		// Send keepalive packet
		if ( now >= nextKeepalive ) {
			pthread_mutex_lock( &connection.sendMutex );
			if ( connection.sockFd != -1 ) {
				dnbd3_request_t request;
				request.magic = dnbd3_packet_magic;
				request.cmd = CMD_KEEPALIVE;
				request.handle = request.offset = request.size = 0;
				fixup_request( request );
				ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 );
				if ( (size_t)ret != sizeof request ) {
					shutdown( connection.sockFd, SHUT_RDWR );
					connection.sockFd = -1;
					nextRttCheck = now;
				}
			}
			pthread_mutex_unlock( &connection.sendMutex );
			nextKeepalive = now + TIMER_INTERVAL_KEEPALIVE_PACKET * 1000ull;
		}
	}
	return NULL;
}

// Private quick helpers

static void addAltServers()
{
	pthread_spin_lock( &altLock );
	for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) {
		if ( newservers[nIdx].host.type == 0 )
			continue;
		// Got a new alt server, see if it's already known
		for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
			if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) {
				goto skip_server;
			}
		}
		// Not known yet, add - find free slot
		int slot = -1;
		for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) {
			if ( altservers[eIdx].host.type == 0 ) {
				slot = eIdx; // free - bail out and use this one
				break;
			}
			if ( altservers[eIdx].consecutiveFails > FAIL_BACKOFF_START_COUNT
					&& slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) {
				// Replace an existing alt-server that failed recently if we got no more slots
				slot = eIdx;
			}
		}
		if ( slot != -1 ) {
			char txt[200];
			sock_printHost( &newservers[nIdx].host, txt, 200 );
			logadd( LOG_DEBUG1, "new server %s in slot %d", txt, slot );
			altservers[slot].consecutiveFails = 0;
			altservers[slot].bestCount = 0;
			altservers[slot].rtts[0] = RTT_UNREACHABLE;
			altservers[slot].rttIndex = 1;
			altservers[slot].host = newservers[nIdx].host;
		}
skip_server:;
	}
	memset( newservers, 0, sizeof(newservers) );
	pthread_spin_unlock( &altLock );
}

/**
 * Find a server at index >= MAX_ALTS_ACTIVE (one that isn't considered for switching over)
 * that has been inactive for a while, then look if there's an active server that's failed
 * a couple of times recently. Swap both if found.
 */
static void sortAltServers()
{
	int ac = 0;
	pthread_spin_lock( &altLock );
	for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) {
		alt_server_t * const inactive = &altservers[ia];
		if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 )
			continue;
		while ( ac < MAX_ALTS_ACTIVE ) {
			if ( altservers[ac].host.type == 0 || altservers[ac].consecutiveFails > FAIL_BACKOFF_START_COUNT )
				break;
			ac++;
		}
		if ( ac == MAX_ALTS_ACTIVE )
			break;
		// Switch!
		alt_server_t * const active = &altservers[ac];
		dnbd3_host_t tmp = inactive->host;
		inactive->host = active->host;
		inactive->consecutiveFails = FAIL_BACKOFF_START_COUNT * 4;
		inactive->bestCount = 0;
		inactive->rtts[0] = RTT_UNREACHABLE;
		inactive->rttIndex = 1;
		active->host = tmp;
		active->consecutiveFails = 0;
		active->bestCount = 0;
		active->rtts[0] = RTT_UNREACHABLE;
		active->rttIndex = 1;
	}
	pthread_spin_unlock( &altLock );
}

static void probeAltServers()
{
	serialized_buffer_t buffer;
	dnbd3_reply_t reply;
	int bestIndex = -1;
	int currentIndex = -1;
	int bestSock = -1;
	int currentRtt = RTT_UNREACHABLE;
	uint16_t remoteRid, remoteProto;
	uint64_t remoteSize;
	char *remoteName;
	bool doSwitch;
	const bool panic = connection.sockFd == -1;

	uint64_t testOffset = 0;
	uint32_t testLength = RTT_BLOCK_SIZE;
	dnbd3_async_t *request = NULL;
	if ( panic ) {
		pthread_spin_lock( &requests.lock );
		if ( requests.head != NULL ) {
			request = requests.head;
			testOffset = requests.head->offset;
			testLength = requests.head->length;
		}
		pthread_spin_unlock( &requests.lock );
		if ( testOffset != 0 ) {
			logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength );
		}
	}

	for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) {
		alt_server_t * const srv = &altservers[altIndex];
		if ( srv->host.type == 0 )
			continue;
		if ( !panic && srv->consecutiveFails > FAIL_BACKOFF_START_COUNT
				&& rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) {
			continue;
		}
		if ( srv->rttIndex >= RTT_COUNT ) {
			srv->rttIndex = 0;
		} else {
			srv->rttIndex += 1;
		}
		// Probe
		const uint64_t start = nowMicro();
		errno = 0;
		int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
		if ( sock == -1 ) {
			logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno );
			goto fail;
		}
		if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
			logadd( LOG_DEBUG1, "probe: select_image failed" );
			goto fail;
		}
		if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
			logadd( LOG_DEBUG1, "probe: select image reply failed" );
			goto fail;
		}
		if ( remoteProto < MIN_SUPPORTED_SERVER ) {
			logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
			srv->consecutiveFails += 10;
			goto fail;
		}
		if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
			logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
			srv->consecutiveFails += 10;
			goto fail;
		}
		if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) {
			logadd( LOG_DEBUG1, "-> block request fail" );
			goto fail;
		}
		int a = 111;
		if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) {
			logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size );
			goto fail;
		}
		if ( request != NULL && removeRequest( request ) != NULL ) {
			// Request successfully removed from queue
			const ssize_t ret = sock_recv( sock, request->buffer, request->length );
			if ( ret != (ssize_t)request->length ) {
				logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" );
				// Failure, add to queue again
				connection_read( request );
				goto fail;
			}
			// Success, wake up caller
			logadd( LOG_DEBUG1, "[RTT] Successful direct probe" );
			request->success = true;
			request->finished = true;
			signal_call( request->signal );
		} else {
			// Wasn't a request that's in our request queue
			if ( !throwDataAway( sock, testLength ) ) {
				logadd( LOG_DEBUG1, "<- get block reply payload fail" );
				goto fail;
			}
		}

		// Yay, success
		// Panic mode? Just switch to server
		if ( panic ) {
			switchConnection( sock, srv );
			return;
		}
		// Non-panic mode:
		// Update stats of server
		const uint64_t end = nowMicro();
		srv->consecutiveFails = 0;
		srv->rtts[srv->rttIndex] = (int)(end - start);
		srv->rtt = 0;
		for ( int i = 0; i < RTT_COUNT; ++i ) {
			srv->rtt += srv->rtts[i];
		}
		srv->rtt /= RTT_COUNT;
		// Remember rtt if this server matches the current one
		if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) {
			currentRtt = srv->rtt;
			currentIndex = altIndex;
		}
		// Keep socket open if this is currently the best one
		if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) {
			bestIndex = altIndex;
			if ( bestSock != -1 ) {
				close( bestSock );
			}
			bestSock = sock;
		} else {
			close( sock );
		}
		continue;
fail:;
		if ( sock != -1 ) {
			close( sock );
		}
		srv->rtts[srv->rttIndex] = RTT_UNREACHABLE;
		srv->consecutiveFails += 1;
	}
	doSwitch = false;
	if ( bestIndex != -1 ) {
		// Time-sensitive switch decision: If a server was best for some consecutive measurements,
		// we switch no matter how small the difference to the current server is
		pthread_spin_lock( &altLock );
		for ( int i = 0; i < MAX_ALTS_ACTIVE; ++i ) {
			if ( i == bestIndex ) {
				if ( altservers[i].bestCount < 50 ) {
					altservers[i].bestCount += 2;
				}
				// Switch with increasing probability the higher the bestCount is
				if ( altservers[i].bestCount > 12 && altservers[i].rtt < currentRtt && altservers[i].bestCount > rand() % 50 ) {
					doSwitch = true;
				}
			} else if ( altservers[i].bestCount > 0 ) {
				altservers[i].bestCount--;
			}
		}
		for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) {
			if ( altservers[i].consecutiveFails > 0 ) {
				altservers[i].consecutiveFails--;
			}
		}
		pthread_spin_unlock( &altLock );
		// This takes care of the situation where two servers alternate being the best server all the time
		if ( doSwitch && currentIndex != -1 && altservers[bestIndex].bestCount - altservers[currentIndex].bestCount < 8 ) {
			doSwitch = false;
		}
		// Regular logic: Apply threshold when considering switch
		if ( !doSwitch ) {
			doSwitch = currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD
					|| RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1000;
		}
	}
	// Switch if a better server was found
	if ( doSwitch ) {
		logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt );
		for ( int i = 0; i < MAX_ALTS; ++i ) {
			if ( i != bestIndex ) {
				altservers[i].bestCount = 0;
			}
		}
		switchConnection( bestSock, &altservers[bestIndex] );
	} else if ( bestIndex != -1 ) {
		// No switch
		close( bestSock );
	}
}

static void switchConnection(int sockFd, alt_server_t *srv)
{
	pthread_t thread;
	struct sockaddr_storage addr;
	socklen_t addrLen = sizeof(addr);
	char message[200] = "Connection switched to ";
	const size_t len = strlen( message );
	int ret;
	dnbd3_async_t *queue, *it;

	pthread_mutex_lock( &connection.sendMutex );
	if ( connection.sockFd != -1 ) {
		shutdown( connection.sockFd, SHUT_RDWR );
	}
	ret = getpeername( sockFd, (struct sockaddr*)&addr, &addrLen );
	if ( ret == 0 ) {
		connection.currentServer = srv->host;
		connection.sockFd = sockFd;
		pthread_spin_lock( &requests.lock );
		queue = requests.head;
		requests.head = requests.tail = NULL;
		pthread_spin_unlock( &requests.lock );
	} else {
		connection.sockFd = -1;
	}
	requestAltServers();
	pthread_mutex_unlock( &connection.sendMutex );
	if ( ret != 0 ) {
		close( sockFd );
		logadd( LOG_WARNING, "Could not getpeername after connection switch, assuming connection already dead again. (Errno=%d)", errno );
		signal_call( connection.panicSignal );
		return;
	}
	connection.startupTime = nowMilli();
	pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd );
	sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len );
	logadd( LOG_INFO, "%s", message );
	// resend queue
	if ( queue != NULL ) {
		pthread_mutex_lock( &connection.sendMutex );
		dnbd3_async_t *next = NULL;
		for ( it = queue; it != NULL; it = next ) {
			logadd( LOG_DEBUG1, "Requeue after server change" );
			next = it->next;
			enqueueRequest( it );
			if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it, 0 ) ) {
				logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" );
				shutdown( connection.sockFd, SHUT_RDWR );
				connection.sockFd = -1;
				signal_call( connection.panicSignal );
			}
		}
		pthread_mutex_unlock( &connection.sendMutex );
	}
}

/**
 * Does not lock, so get the sendMutex first!
 */
static void requestAltServers()
{
	if ( connection.sockFd == -1 )
		return;
	dnbd3_request_t request = { 0 };
	request.magic = dnbd3_packet_magic;
	request.cmd = CMD_GET_SERVERS;
	fixup_request( request );
	if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) {
		logadd( LOG_WARNING, "Connection failed while requesting alt server list" );
		shutdown( connection.sockFd, SHUT_RDWR );
		connection.sockFd = -1;
	}
}

static bool throwDataAway(int sockFd, uint32_t amount)
{
	size_t done = 0;
	char tempBuffer[SHORTBUF];
	while ( done < amount ) {
		const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) );
		if ( ret <= 0 )
			return false;
		done += (size_t)ret;
	}
	return true;
}

static void enqueueRequest(dnbd3_async_t *request)
{
	request->next = NULL;
	request->finished = false;
	request->success = false;
	pthread_spin_lock( &requests.lock );
	//logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
	if ( requests.head == NULL ) {
		requests.head = requests.tail = request;
	} else {
		requests.tail->next = request;
		requests.tail = request;
	}
	pthread_spin_unlock( &requests.lock );
}

static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
{
	pthread_spin_lock( &requests.lock );
	//logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line );
	dnbd3_async_t *iterator, *prev = NULL;
	for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
		if ( iterator == request ) {
			// Found it, break!
			if ( prev != NULL ) {
				prev->next = iterator->next;
			} else {
				requests.head = iterator->next;
			}
			if ( requests.tail == iterator ) {
				requests.tail = prev;
			}
			break;
		}
		prev = iterator;
	}
	pthread_spin_unlock( &requests.lock );
	return iterator;
}

static uint64_t nowMilli()
{
	struct timespec ts;
	if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) {
		printf( "clock_gettime() failed. Errno: %d\n", errno );
		return 0;
	}
	return ( ts.tv_sec * 1000ull ) + ( ts.tv_nsec / 1000000ull );
}

static uint64_t nowMicro()
{
	struct timespec ts;
	if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) {
		printf( "clock_gettime() failed. Errno: %d\n", errno );
		return 0;
	}
	return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull );
}