summaryrefslogblamecommitdiffstats
path: root/src/server/uplink.c
blob: 8a83124dfb55c124cf58c3939db070559c14b529 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                   
                   
                  
                  
                       
                



                                    
                       
                      
 
                   
                     
                  
                 
                   
                      
 

                                              



                                                                                   
                                                   
 



                                    
                                                      
                                       
                                         








                                                                                         

                                                                       
                                                                                                                                                                 
 

                                                                                        

                                          

                         

 

                                       
                                            

 




                                                         
                                                                                 
 
                                                   
                                

                                                                          
                                   

                                                                     
                                             



                                              
                                                         
         
                                            
                                                                                                                    

                             

                                                                                                   
                                                            




                                                            
                                          
                                                           
                             
                             
                             




                                                                                    
                                       


                                           
                                      
                                     

                                     
                           
                                         

                                                                                               
                                                     
                                                 
                
                                       
                                                 
         


                                         
                                                                                                
                                                                              

                             
                                                            
                                     
                    
          
                               
                                                                 
                                                                                              
         
                                     
                     

 

                                   
                                                     
                      
   
                                          
 
                                
                                   

                                                                     
                                             
                            
         
                                         

                                                                                
                                                                       
                                              
                
                                                                                    
         

                                              
                                           
                                                   
                                      
                                     
                      







                                                     



                                                        
                                                                               


                                                               
                 


                                                     
         
                             
                             
                                             

 
                                      

                                                                              
                                                                                
                                        


                                               















                                            
                                                                                                    
                                                                                                                
                                       
                              


   
                                                
                             
   
                                                                                     
 
                                         
                                                                                    
                                                                                  

                                                                                                


                                                                   

                                                    
                         

                 
                                           

 



                                                                                  
                                                                                                                                           
 
                                                     

                                                                                        

                             


                                                                             
                                                                     


                                                                                        
                 
         
                                                                                                          
                                                                                                       
                 
                                                                                                            
                                             
                                              
                                                                           







                                                                                                             




                                                                               














                                                                                              




























                                                                                                     

                                                                                                
                                                                               

                                               

                                                                                                


                                                   





                                                                                                




                                                                                           

                                                                                                                   
                             

         
               


                                                                                           
                                                                                       


                          




                                                                       
                                                                                








                                                                                                 

                                                         


                                                                                     























                                                                                                                           
                                       
                 


                                                                        
                 









                                                     
            
                                                
      




                                                                                     
                        
                                              
                 








                                                                                                                
                 








                                                                                                                              
                 





                                                                                                        
                                                                                                       
















                                                                                                              
                                                                 























                                                                                         
 
                                   
                                         





                                                          
                                                                                 

                                                       
                                                     



                                                                                         
                                                        
                                                                                                                   
         

                                                           
         
 

                                                                                            
         
                    
 

                                           
                     

 



                                                                     
                                        
 



                                       
                                                              
                               
                                                        
                          
                                       
                                          
                         
                                            
                                    
                                     
          
                                 
                                       
                                        
                                  
                                             
                                                
                                                                                     
                                                                                                                                                                
         
          
                                          
                                                                  
                                  


                                                                               
                                                   
                         
                                                              
                                                                               
                                     



                                                                             
                                                                      
                 
                                                          
                                                              
                                                                  





                                                                            
                                                     


                                                                          
                                                         

                                                                              
                                                          
                                                         
                                                         
                                                           
                                               

                                                         

                                                    
                                                              
                                                                                                                                               
                                        
                                                                                                             
                                                                                                                           

                                                        
                                                                                           
                                                             
                                                           
                         
                                                       

                                                            
                                                                      


                                                                                                                                 
                                                                 
                         
                                                                       


                                                                                                     


                                                                                                 
                                                             



                                                                                          

                                                                                                                                   
                         
                                                         
                                                                             
                                                                   
                                                                                                                                
                                                     

                         

                                                                                                 
                                                         
                                                                                                                        
                                                        
                                                                    
                                                
                                                                          
                 
                            
                                                                          

                                                                                               
                                            
                                                       
                                     
                                                                                              
                                                                                                              

                                                                                                      
                                                                                                       
                                 
                         
                                                                                   
                                                                   
                                                                                                                 
                                             
                         
                 
                                                              
                                                      
                                                                                     
                                                                                                                                                         
                                                                 
                                                                          
                                                                         
                                                                                                               
                                                                           
                                                                              
                                                     
                                        
                                                                        
                                                                                                                           

                                                                                                                         
                                         
                                 
                                                                                                      
                                                                                    
                         
                                                                  

                                                                                                                   


                                                                      
                         
                                                                                                                                                        
                 
            
                                                                      
                                            

                                                           
                                                         





                                                                                                               
                            
                                                         
                                                      
      

                                 
                                                           
                                       
                                                                   
                         

                 
         
          
                                             




                                                                   
                                   


                                                                                
         





                                                                      
         

                                                            
         


                                                                                       


                     


                                  
                                                                    
 
                               







                                                                                       
                                         






                                                                                    

                                                                           
                                                                                   










                                                                                                                         
                                                           



                                                                     
                 
         
                                           









                                                                                                         

 
   







                                                                                  



                                                                                        
   
                                                          
 
                               



                                                                                          

                                                           
                                                    

                                                       


                                                                                 
                                                             
                              
                                                 
                            
         

                                                                           

                                                       

                                                               






                                                                                                                          
                 












                                                                                                      
                                                                                          



                                                                                              
                              
                 
                                                          










                                                                                                                    
                                                                                               











                                                                                                                                                            
                                                                                                                           


                                                                 
         
                                     
                    


   
                                                                   
                                                                   

                                          
                                                                                       

                        


                                                                                           
                                                                                

                                                            

                                                               


                                                                                                              
                                                    







                                                                                          

                                                                      



                                                                                               
                                               









                                                                                           
                                     
                      


   
                                                       
                                       
                                                                        
   
                                                 
 
                              
                
                               
                                        
                  
                                                                              
                                                                                                           
                                                
                                                        
                                                                                                       
                                           
                 
                                                            
                                                                                                                                           

                                           
                                                    
                                                                                                            

                                           
                                                                         
                                                                                                                                                           

                                           
 



                                                                                                  


                                                                                                                       
                 
                                                                                                                                
                                                                                                                       
                                           


                                                   

                                                               
                                         
                                                   
                                                      





















                                                                                                                          
                                         
                                                          
                                                       
                 
                                                        
                                    
                                                                                                                           
                                          
                                
                                                       
                                                                                                                                   
                                                              
                                                    
                                                                                   
                                                                               
                                                                     

                                                                                                                                     

                                                                                 
                                                                                            
                                                                                    
                                                                                                  



                                                                                                                    

                                                                                                               

                                              
                                                                                                    

                                                                                                               
                                              
                                 

                                                      
                                                   
                                                                                                 

                                                                                                         
                                                                                                                            
                                                                           

                         

                                         
                                                 






                                                                                             
                         
                 


                                                                        
                                                   



                                                                                                                    
                 

                                                                                       
                                                                   

                                                                                                   

                                       
                 
                                               
                                                                
                                             
                                                            

                                                                                                                                    
                         




                                                                                                           
                 


                                                       
                                                  
                                   
         
                     

                                                                
                
                                         

 


                               
                                                                  
 
                               
                                       
                       
                                        
                                                         
                                         
                                             

                                    
                                           

                                                                                         


                       
                                       
                                                                                       
                                         

                       
                                             


   

                                                       
   
                                                 
 
                                                                                                                     
                               



                                                                                                             
 
 





                                                                                     
                                                    

                                             

                                                                                          

                                           
                                         
                                                                                        


                                                     
                                           
                                      


                               

                                                                
                                              
                                       
                                                                                                                        




                                               


                                                              
                                                              


                                                                         
                            

                                                                 
                                                                                                           
                 
         
 
 
   






                                                                                    
                                                                   
 
                                      
                                          
                                         
         
                                                                                
                                                             
                                     

 





                                                                      
                                                            
 

                                                                                                             

 









                                                                           



                                                                      





                                                                      






























                                                                                         
#include "uplink.h"
#include "helper.h"
#include "locks.h"
#include "image.h"
#include "altservers.h"
#include "net.h"
#include <dnbd3/shared/sockhelper.h>
#include <dnbd3/shared/protocol.h>
#include <dnbd3/shared/timing.h>
#include <dnbd3/shared/crc32.h>
#include "threadpool.h"
#include "reference.h"

#include <assert.h>
#include <inttypes.h>
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <stdatomic.h>

static const uint8_t HOP_FLAG_BGR = 0x80;
static const uint8_t HOP_FLAG_PREFETCH = 0x40;
#define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 )
#define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE )
#define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) )

static atomic_uint_fast64_t totalBytesReceived = 0;

typedef struct {
	uint64_t start, end, handle;
} req_t;

static void cancelAllRequests(dnbd3_uplink_t *uplink);
static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly);
static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
static void handleReceive(dnbd3_uplink_t *uplink);
static bool sendKeepalive(dnbd3_uplink_t *uplink);
static void requestCrc32List(dnbd3_uplink_t *uplink);
static bool sendReplicationRequest(dnbd3_uplink_t *uplink);
static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
static bool connectionShouldShutdown(dnbd3_uplink_t *uplink);
static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);

#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) )

// ############ uplink connection handling

void uplink_globalsInit()
{
}

uint64_t uplink_getTotalBytesReceived()
{
	return (uint64_t)totalBytesReceived;
}

/**
 * Create and initialize an uplink instance for the given
 * image. Uplinks run in their own thread.
 * Locks on: _images[].lock
 */
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
	if ( !_isProxy || _shutdown ) return false;
	assert( image != NULL );
	if ( sock == -1 && !altservers_imageHasAltServers( image->name ) )
		return false; // Nothing to do
	mutex_lock( &image->lock );
	dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
	if ( uplink != NULL ) {
		mutex_unlock( &image->lock );
		if ( sock != -1 ) {
			close( sock );
		}
		ref_put( &uplink->reference );
		return true; // There's already an uplink
	}
	if ( image->ref_cacheMap == NULL ) {
		logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name );
		goto failure;
	}
	uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
	// Start with one reference for the uplink thread. We'll return it when the thread finishes
	ref_init( &uplink->reference, freeUplinkStruct, 1 );
	mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
	mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
	mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
	uplink->image = image;
	uplink->bytesReceived = 0;
	uplink->bytesReceivedLastSave = 0;
	uplink->idleTime = SERVER_UPLINK_IDLE_TIMEOUT - 90;
	uplink->queue = NULL;
	uplink->queueLen = 0;
	uplink->cacheFd = -1;
	uplink->signal = signal_new();
	if ( uplink->signal == NULL ) {
		logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." );
		goto failure;
	}
	mutex_lock( &uplink->rttLock );
	mutex_lock( &uplink->sendMutex );
	uplink->current.fd = -1;
	mutex_unlock( &uplink->sendMutex );
	uplink->cycleDetected = false;
	image->problem.uplink = true;
	image->problem.write = true;
	image->problem.queue = false;
	if ( sock != -1 ) {
		uplink->better.fd = sock;
		int index = altservers_hostToIndex( host );
		uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access
		uplink->rttTestResult = RTT_DOCHANGE;
		uplink->better.version = version;
	} else {
		uplink->better.fd = -1;
		uplink->rttTestResult = RTT_IDLE;
	}
	mutex_unlock( &uplink->rttLock );
	uplink->recvBufferLen = 0;
	uplink->shutdown = false;
	if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) {
		logadd( LOG_ERROR, "Could not start thread for new uplink." );
		goto failure;
	}
	ref_setref( &image->uplinkref, &uplink->reference );
	mutex_unlock( &image->lock );
	return true;
failure: ;
	if ( uplink != NULL ) {
		image->users++; // Expected by freeUplinkStruct()
		ref_put( &uplink->reference ); // The ref for the uplink thread that never was
	}
	mutex_unlock( &image->lock );
	return false;
}

/**
 * Locks on image.lock, uplink.lock
 * Calling it multiple times, even concurrently, will
 * not break anything.
 */
bool uplink_shutdown(dnbd3_image_t *image)
{
	assert( image != NULL );
	mutex_lock( &image->lock );
	dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
	if ( uplink == NULL ) {
		mutex_unlock( &image->lock );
		return true;
	}
	mutex_lock( &uplink->queueLock );
	bool exp = false;
	if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
		image->users++; // Prevent free while uplink shuts down
		signal_call( uplink->signal );
	} else {
		logadd( LOG_ERROR, "This will never happen. '%s:%d'", PIMG(image) );
	}
	cancelAllRequests( uplink );
	ref_setref( &image->uplinkref, NULL );
	mutex_unlock( &uplink->queueLock );
	bool retval = ( exp && image->users == 0 );
	ref_put( &uplink->reference );
	mutex_unlock( &image->lock );
	return retval;
}

/**
 * Cancel all requests of this uplink.
 * HOLD QUEUE LOCK WHILE CALLING
 */
static void cancelAllRequests(dnbd3_uplink_t *uplink)
{
	dnbd3_queue_entry_t *it = uplink->queue;
	while ( it != NULL ) {
		dnbd3_queue_client_t *cit = it->clients;
		while ( cit != NULL ) {
			(*cit->callback)( cit->data, cit->handle, 0, 0, NULL );
			dnbd3_queue_client_t *next = cit->next;
			free( cit );
			cit = next;
		}
		dnbd3_queue_entry_t *next = it->next;
		free( it );
		it = next;
	}
	uplink->queue = NULL;
	uplink->queueLen = 0;
	uplink->image->problem.queue = false;
}

static void freeUplinkStruct(ref *ref)
{
	dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference);
	logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", PIMG(uplink->image) );
	assert( uplink->queueLen == 0 );
	if ( uplink->signal != NULL ) {
		signal_close( uplink->signal );
	}
	if ( uplink->current.fd != -1 ) {
		close( uplink->current.fd );
		uplink->current.fd = -1;
	}
	if ( uplink->better.fd != -1 ) {
		close( uplink->better.fd );
		uplink->better.fd = -1;
	}
	mutex_destroy( &uplink->queueLock );
	mutex_destroy( &uplink->rttLock );
	mutex_destroy( &uplink->sendMutex );
	free( uplink->recvBuffer );
	uplink->recvBuffer = NULL;
	if ( uplink->cacheFd != -1 ) {
		close( uplink->cacheFd );
	}
	// Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code
	// of the uplink thread, depending on who set the uplink->shutdown flag. (Or uplink_init if that failed)
	image_release( uplink->image );
	free( uplink ); // !!!
}

/**
 * Remove given client from uplink request queue
 * Locks on: uplink.queueLock
 */
void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback)
{
	mutex_lock( &uplink->queueLock );
	for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
		for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) {
			if ( (**cit).data == data && (**cit).callback == callback ) {
				(*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL );
				dnbd3_queue_client_t *entry = *cit;
				*cit = (**cit).next;
				free( entry );
			} else {
				cit = &(**cit).next;
			}
		}
	}
	mutex_unlock( &uplink->queueLock );
}

/**
 * Called from a client (proxy) connection to request a missing part of the image.
 * The caller has made sure that the range is actually missing.
 */
bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
	assert( client != NULL && callback != NULL );
	if ( ( hops & 0x3f ) > 60 ) { // This is just silly
		logadd( LOG_WARNING, "Refusing to relay a request that has > 60 hops" );
		return false;
	}
	dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
	if ( unlikely( uplink == NULL ) ) {
		uplink_init( client->image, -1, NULL, -1 );
		uplink = ref_get_uplink( &client->image->uplinkref );
		if ( uplink == NULL ) {
			logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
			return false;
		}
	}
	// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
	// This might be a false positive if there are multiple instances running on the same host (IP)
	bool ret;
	if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
		uplink->cycleDetected = true;
		signal_call( uplink->signal );
		logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
		ret = false;
	} else {
		ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops );
	}
	ref_put( &uplink->reference );
	return ret;
}

/**
 * Called by integrated fuse module
 */
bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
		uint64_t handle, uint64_t start, uint32_t length)
{
	dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
	if ( unlikely( uplink == NULL ) ) {
		uplink_init( image, -1, NULL, -1 );
		uplink = ref_get_uplink( &image->uplinkref );
		if ( uplink == NULL ) {
			logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
			return false;
		}
	}
	bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 );
	ref_put( &uplink->reference );
	return ret;
}

static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted)
{
	uint32_t length = (uint32_t)( *end - start );
	if ( length >= wanted )
		return;
	length = wanted;
	if ( unlikely( _backgroundReplication == BGR_HASHBLOCK
			&& *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) {
		// Don't extend across hash-block border in this mode
		*end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 );
	} else {
		*end = start + length;
	}
	if ( unlikely( *end > image->virtualFilesize ) ) {
		*end = image->virtualFilesize;
	}
	*end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 );
	//logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end );
}

static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops)
{
	if ( uplink->current.fd == -1 )
		return false;
	return dnbd3_get_block( uplink->current.fd, req->start,
			(uint32_t)( req->end - req->start ), req->handle,
			COND_HOPCOUNT( uplink->current.version, hops ) );
}

/**
 * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
 * If callback is NULL, this is assumed to be a background replication request.
 * Locks on: uplink.queueLock, uplink.sendMutex
 */
static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback,
		uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
	assert( uplink != NULL );
	assert( data == NULL || callback != NULL );
	if ( ( hops & HOP_FLAG_BGR ) // This is a background replication request
			&& _backgroundReplication != BGR_FULL ) { // Deny if we're not doing BGR
		// TODO: Allow BGR_HASHBLOCK too, but only if hash block isn't completely empty
		logadd( LOG_DEBUG2, "Dopping client because of BGR policy" );
		return false;
	}
	if ( uplink->shutdown ) {
		logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
		return false;
	}
	if ( length > (uint32_t)_maxPayload ) {
		logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload",
				length );
		return false;
	}

	hops++;
	if ( callback == NULL ) {
		// Set upper-most bit for replication requests that we fire
		// In client mode, at least set prefetch flag to prevent prefetch cascading
		hops |= (uint8_t)( _pretendClient ? HOP_FLAG_PREFETCH : HOP_FLAG_BGR );
	}

	req_t req, preReq;
	dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL;
	bool isNew;
	const uint64_t end = start + length;
	req.start = start & ~(DNBD3_BLOCK_SIZE - 1);
	req.end = end;
	/* Don't do this  -- this breaks matching of prefetch jobs, since they'd
	 * be misaligned, and the next client request wouldn't match anything.
	 * To improve this, we need to be able to attach a queue_client to multiple queue_entries
	 * and then serve it once all the queue_entries are done (atomic_int in queue_client).
	 * But currently we directly send the receive buffer's content to the queue_client after
	 * receiving the payload, as this will also work when the local cache is borked (we just
	 * tunnel though the traffic). One could argue that this mode of operation is nonsense,
	 * and we should just drop all affected clients. Then as a next step, don't serve the
	 * clients form the receive buffer, but just issue a normal sendfile() call after writing
	 * the received data to the local cache.
	 */
	if ( callback != NULL && _minRequestSize != 0 ) {
		// Not background replication request, extend request size
		extendRequest( req.start, &req.end, uplink->image, _minRequestSize );
	}
	req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1);
	// Critical section - work with the queue
	mutex_lock( &uplink->queueLock );
	if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
		goto fail_lock;
	}
	for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
		if ( it->from <= start && it->to >= end ) {
			// Matching range, attach
			request = it;
			break;
		}
		if ( it->next == NULL ) {
			// Not matching, last in list, remember
			last = it;
			break;
		}
	}
	dnbd3_queue_client_t **c = NULL;
	if ( request == NULL ) {
		// No existing request to attach to
		if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
			logadd( LOG_WARNING,
					"Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
			goto fail_lock;
		}
		uplink->queueLen++;
		if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
			uplink->image->problem.queue = true;
		}
		request = malloc( sizeof(*request) );
		if ( last == NULL ) {
			uplink->queue = request;
		} else {
			last->next = request;
		}
		request->next = NULL;
		request->handle = ++uplink->queueId;
		request->from = req.start;
		request->to = req.end;
#ifdef DEBUG
		timing_get( &request->entered );
#endif
		request->hopCount = hops;
		request->sent = true; // Optimistic; would be set to false on failure
		if ( callback == NULL ) {
			// BGR
			request->clients = NULL;
		} else {
			c = &request->clients;
		}
		isNew = true;
	} else if ( callback == NULL ) {
		// Replication request that maches existing request. Do nothing
		isNew = false;
	} else {
		// Existing request. Check if potential cycle
		if ( hops > request->hopCount && request->from == start && request->to == end ) {
			logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
			goto fail_lock;
		}
		// Count number if clients, get tail of list
		int count = 0;
		c = &request->clients;
		while ( *c != NULL ) {
			c = &(**c).next;
			if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
				logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
				goto fail_lock;
			}
		}
		isNew = false;
	}
	// Prefetch immediately, without unlocking the list - the old approach of
	// async prefetching in another thread was sometimes so slow that we'd process
	// another request from the same client before the prefetch job would execute.
	if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data )
			&& !( hops & (HOP_FLAG_BGR | HOP_FLAG_PREFETCH) ) // No cascading of prefetches
			&& end == request->to && length <= _maxPrefetch ) {
		// Only if this is a client request, and the !! end boundary matches exactly !!
		// (See above for reason why)
		// - We neither check the local cache, nor other pending requests. Worth it?
		// Complexity vs. probability
		preReq.start = end;
		preReq.end = end;
		extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) );
		if ( preReq.start < preReq.end ) {
			//logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end );
			uplink->queueLen++;
			pre = malloc( sizeof(*pre) );
			pre->next = request->next;
			request->next = pre;
			pre->handle = preReq.handle = ++uplink->queueId;
			pre->from = preReq.start;
			pre->to = preReq.end;
			pre->hopCount = hops | HOP_FLAG_PREFETCH;
			pre->sent = true; // Optimistic; would be set to false on failure
			pre->clients = NULL;
#ifdef DEBUG
			timing_get( &pre->entered );
#endif
		}
	}
	// // // //
	// Copy data - need this after unlocking
	req.handle = request->handle;
	if ( callback != NULL ) {
		assert( c != NULL );
		*c = malloc( sizeof( *request->clients ) );
		(**c).next = NULL;
		(**c).handle = handle;
		(**c).from = start;
		(**c).to = end;
		(**c).data = data;
		(**c).callback = callback;
	}
	mutex_unlock( &uplink->queueLock );
	// End queue critical section
	if ( pre == NULL && !isNew )
		return true; // Nothing to do

	// Fire away the request(s)
	mutex_lock( &uplink->sendMutex );
	bool ret1 = true;
	bool ret2 = true;
	if ( isNew ) {
		ret1 = requestBlock( uplink, &req, hops );
	}
	if ( pre != NULL ) {
		ret2 = requestBlock( uplink, &preReq, hops | HOP_FLAG_PREFETCH );
	}
	if ( !ret1 || !ret2 ) { // Set with send locked
		uplink->image->problem.uplink = true;
	}
	mutex_unlock( &uplink->sendMutex );
	// markRequestUnsend locks the queue, would violate locking order with send mutex
	if ( !ret1 ) {
		markRequestUnsent( uplink, req.handle );
		logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
	}
	if ( !ret2 ) {
		markRequestUnsent( uplink, preReq.handle );
	}

	if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) {
		logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
	}
	return true;

fail_lock:
	mutex_unlock( &uplink->queueLock );
	return false;
}

/**
 * Uplink thread.
 * Locks are irrelevant as this is never called from another function
 */
static void* uplink_mainloop(void *data)
{
#define EV_SIGNAL (0)
#define EV_SOCKET (1)
#define EV_COUNT  (2)
	struct pollfd events[EV_COUNT];
	dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data;
	int numSocks, waitTime;
	int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
	int rttTestResult;
	uint32_t discoverFailCount = 0;
	ticks nextAltCheck, lastKeepalive;
	char buffer[200];
	memset( events, 0, sizeof(events) );
	timing_get( &nextAltCheck );
	lastKeepalive = nextAltCheck;
	//
	assert( uplink != NULL );
	setThreadName( "idle-uplink" );
	thread_detach( uplink->thread );
	blockNoncriticalSignals();
	// Make sure file is open for writing
	if ( !reopenCacheFd( uplink, false ) ) {
		// It might have failed - still offer proxy mode, we just can't cache
		logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno );
	}
	//
	events[EV_SIGNAL].events = POLLIN;
	events[EV_SIGNAL].fd = signal_getWaitFd( uplink->signal );
	events[EV_SOCKET].fd = -1;
	if ( uplink->rttTestResult != RTT_DOCHANGE ) {
		altservers_findUplink( uplink ); // In case we didn't kickstart
	}
	while ( !_shutdown && !uplink->shutdown ) {
		// poll()
		if ( uplink->rttTestResult == RTT_DOCHANGE ) {
			// 0 means poll, since we're about to change the server
			waitTime = 0;
		} else {
			declare_now;
			waitTime = (int)timing_diffMs( &now, &nextAltCheck );
			if ( waitTime < 100 ) waitTime = 100;
			else if ( waitTime > 10000 ) waitTime = 10000;
		}
		events[EV_SOCKET].fd = uplink->current.fd;
		numSocks = poll( events, EV_COUNT, waitTime );
		if ( _shutdown || uplink->shutdown ) goto cleanup;
		if ( numSocks == -1 ) { // Error?
			if ( errno == EINTR ) continue;
			logadd( LOG_DEBUG1, "poll() error %d", (int)errno );
			usleep( 10000 );
			continue;
		}
		// Check if server switch is in order
		if ( unlikely( uplink->rttTestResult == RTT_DOCHANGE ) ) {
			mutex_lock( &uplink->rttLock );
			assert( uplink->rttTestResult == RTT_DOCHANGE );
			uplink->rttTestResult = RTT_IDLE;
			// The rttTest worker thread has finished our request.
			// And says it's better to switch to another server
			const int fd = uplink->current.fd;
			mutex_lock( &uplink->sendMutex );
			uplink->current = uplink->better;
			mutex_unlock( &uplink->sendMutex );
			uplink->better.fd = -1;
			uplink->cycleDetected = false;
			mutex_unlock( &uplink->rttLock );
			discoverFailCount = 0;
			if ( fd != -1 ) close( fd );
			uplink->image->problem.uplink = false;
			uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
			buffer[0] = '@';
			if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
				logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
				setThreadName( buffer );
			}
			// If we don't have a crc32 list yet, see if the new server has one
			if ( uplink->image->crc32 == NULL ) {
				requestCrc32List( uplink );
			}
			// Re-send all pending requests
			sendQueuedRequests( uplink, false );
			sendReplicationRequest( uplink );
			events[EV_SOCKET].events = POLLIN | POLLRDHUP;
			if ( uplink->image->problem.uplink ) {
				// Some of the requests above must have failed again already :-(
				logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
				connectionFailed( uplink, true );
			}
			timing_gets( &nextAltCheck, altCheckInterval );
			// The rtt worker already did the handshake for our image, so there's nothing
			// more to do here
		}
		// Check events
		// Signal
		if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
			uplink->image->problem.uplink = true;
			logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
			goto cleanup;
		} else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
			// signal triggered -> pending requests
			if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) {
				logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name );
			}
			if ( uplink->current.fd != -1 ) {
				// Uplink seems fine, relay requests to it...
				sendQueuedRequests( uplink, true );
			} else if ( uplink->queueLen != 0 ) { // No uplink; maybe it was shutdown since it was idle for too long
				uplink->idleTime = 0;
			}
		}
		// Uplink socket
		if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
			connectionFailed( uplink, true );
			logadd( LOG_DEBUG1, "Uplink gone away, panic! (revents=%d)\n", (int)events[EV_SOCKET].revents );
			setThreadName( "panic-uplink" );
		} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
			handleReceive( uplink );
			if ( _shutdown || uplink->shutdown ) goto cleanup;
		}
		declare_now;
		uint32_t timepassed = timing_diff( &lastKeepalive, &now );
		if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL
				|| ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) {
			lastKeepalive = now;
			uplink->idleTime += timepassed;
			// Keep-alive
			if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) {
				// Send keep-alive if nothing is happening, and try to trigger background rep.
				if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) {
					connectionFailed( uplink, true );
					logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
				}
			}
			// Don't keep uplink established if we're idle for too much
			if ( connectionShouldShutdown( uplink ) ) {
				logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) );
				goto cleanup;
			}
		}
		// See if we should trigger an RTT measurement
		rttTestResult = uplink->rttTestResult;
		if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
			if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && discoverFailCount == 0 ) || uplink->cycleDetected ) {
				// It seems it's time for a check
				if ( image_isComplete( uplink->image ) ) {
					// Quit work if image is complete
					logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name );
					setThreadName( "finished-uplink" );
					uplink->image->problem.uplink = false;
					goto cleanup;
				} else {
					// Not complete - do measurement
					altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous)
					if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
						uplink->nextReplicationIndex = 0;
					}
				}
				altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
				timing_set( &nextAltCheck, &now, altCheckInterval );
			}
		} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
			if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) {
				discoverFailCount++;
				if ( uplink->current.fd == -1 ) {
					uplink->cycleDetected = false;
				}
			}
			timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH) ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED );
		}
#ifdef DEBUG
		if ( uplink->current.fd != -1 && !uplink->shutdown ) {
			bool resend = false;
			ticks deadline;
			timing_set( &deadline, &now, -10 );
			mutex_lock( &uplink->queueLock );
			for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
				if ( timing_reached( &it->entered, &deadline ) ) {
					logadd( LOG_WARNING, "Starving request detected:"
							" (from %" PRIu64 " to %" PRIu64 ", sent: %d) %s:%d",
							it->from, it->to, (int)it->sent, PIMG(uplink->image) );
					it->entered = now;
#ifdef DEBUG_RESEND_STARVING
					it->sent = false;
					resend = true;
#endif
				}
			}
			mutex_unlock( &uplink->queueLock );
			if ( resend ) {
				sendQueuedRequests( uplink, true );
			}
		}
#endif
	}
cleanup: ;
	dnbd3_image_t *image = uplink->image;
	dnbd3_cache_map_t *cache = ref_get_cachemap( image );
	if ( cache != NULL ) {
		cache->dirty = true; // Force writeout of cache map
		ref_put( &cache->reference );
	}
	mutex_lock( &image->lock );
	bool exp = false;
	if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) {
		image->users++; // We set the flag - hold onto image
	}
	dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref );
	if ( current == uplink ) { // Set NULL if it's still us...
		mutex_lock( &uplink->queueLock );
		cancelAllRequests( uplink );
		mutex_unlock( &uplink->queueLock );
		ref_setref( &image->uplinkref, NULL );
	}
	if ( current != NULL ) { // Decrease ref in any case
		ref_put( &current->reference );
	}
	mutex_unlock( &image->lock );
	// Finally as the thread is done, decrease our own ref that we initialized with
	ref_put( &uplink->reference );
	return NULL ;
}

/**
 * Only called from uplink thread.
 */
static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
	assert_uplink_thread();
	// Scan for new requests, or optionally, (re)send all
	// Build a buffer, so if there aren't too many requests, we can send them after
	// unlocking the queue again. Otherwise we need flushes during iteration, which
	// is no ideal, but in that case the uplink is probably overwhelmed anyways.
	// Try 125 as that's exactly 300bytes, usually 2*MTU.
#define MAX_RESEND_BATCH 125
	dnbd3_request_t reqs[MAX_RESEND_BATCH];
	int count = 0;
	mutex_lock( &uplink->queueLock );
	for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
		if ( newOnly && it->sent )
			continue;
		it->sent = true;
		dnbd3_request_t *hdr = &reqs[count++];
		hdr->magic = dnbd3_packet_magic;
		hdr->cmd = CMD_GET_BLOCK;
		hdr->size = (uint32_t)( it->to - it->from );
		hdr->offset = it->from; // Offset first, then hops! (union)
		hdr->hops = COND_HOPCOUNT( uplink->current.version, it->hopCount );
		hdr->handle = it->handle;
		fixup_request( *hdr );
		if ( count == MAX_RESEND_BATCH ) {
			bool ok = false;
			logadd( LOG_DEBUG2, "BLOCKING resend of %d", count );
			count = 0;
			mutex_lock( &uplink->sendMutex );
			if ( uplink->current.fd != -1 ) {
				ok = ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH, 3 )
						== DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH );
			}
			mutex_unlock( &uplink->sendMutex );
			if ( !ok ) {
				uplink->image->problem.uplink = true;
				break;
			}
		}
	}
	mutex_unlock( &uplink->queueLock );
	if ( count != 0 ) {
		mutex_lock( &uplink->sendMutex );
		if ( uplink->current.fd != -1 ) {
			uplink->image->problem.uplink =
				( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * count, 3 )
					!= DNBD3_REQUEST_SIZE * count );
		}
		mutex_unlock( &uplink->sendMutex );
	}
#undef MAX_RESEND_BATCH
}

/**
 * Send a block request to an uplink server without really having
 * any client that needs that data. This will be used for background replication.
 *
 * We'll go through the cache map of the image and look for bytes that don't have
 * all bits set. We then request the corresponding 8 blocks of 4kb from the uplink
 * server. This means we might request data we already have, but it makes
 * the code simpler. Worst case would be only one bit is zero, which means
 * 4kb are missing, but we will request 32kb.
 *
 * Only called form uplink thread, so current.fd is assumed to be valid.
 *
 * @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
 */
static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
{
	assert_uplink_thread();
	if ( uplink->current.fd == -1 )
		return false; // Should never be called in this state, consider send error
	if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
		return true; // Don't do background replication
	if ( uplink->nextReplicationIndex == -1 )
		return true; // No more blocks to replicate
	dnbd3_image_t * const image = uplink->image;
	if ( image->users < _bgrMinClients )
		return true; // Not enough active users
	const int numNewRequests = numWantedReplicationRequests( uplink );
	if ( numNewRequests <= 0 )
		return true; // Already sufficient amount of requests on the wire
	dnbd3_cache_map_t *cache = ref_get_cachemap( image );
	if ( cache == NULL ) {
		// No cache map (=image complete)
		return true;
	}
	const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
	const int lastBlockIndex = mapBytes - 1;
	for ( int bc = 0; bc < numNewRequests; ++bc ) {
		int endByte;
		if ( UPLINK_MAX_QUEUE - uplink->queueLen < 10 )
			break; // Don't overload queue
		if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
			endByte = uplink->nextReplicationIndex + mapBytes;
		} else { // Hashblock based: Only look for match in current hash block
			endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
			if ( endByte > mapBytes ) {
				endByte = mapBytes;
			}
		}
		atomic_thread_fence( memory_order_acquire );
		int replicationIndex = -1;
		for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) {
			const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
			if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff
					&& ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) {
				// Found incomplete one
				replicationIndex = i;
				break;
			}
		}
		if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
			// Nothing left in current block, find next one
			replicationIndex = findNextIncompleteHashBlock( uplink, endByte );
		}
		if ( replicationIndex == -1 ) {
			// Replication might be complete, uplink_mainloop should take care....
			uplink->nextReplicationIndex = -1;
			break;
		}
		const uint64_t handle = ++uplink->queueId;
		const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
		uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
		// Extend the default 32k request size if _minRequestSize is > 32k
		for ( size_t extra = 1; extra < ( _minRequestSize / FILE_BYTES_PER_MAP_BYTE )
				&& offset + size < image->virtualFilesize
				&& _backgroundReplication == BGR_FULL; ++extra ) {
			if ( atomic_load_explicit( &cache->map[replicationIndex+1], memory_order_relaxed ) == 0xff )
				break; // Hit complete 32k block, stop here
			replicationIndex++;
			size += (uint32_t)MIN( image->virtualFilesize - offset - size, FILE_BYTES_PER_MAP_BYTE );
		}
		if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) {
			logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)",
					PIMG(uplink->image) );
			ref_put( &cache->reference );
			return false;
		}
		if ( replicationIndex == lastBlockIndex ) {
			uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
		}
		uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
		if ( _backgroundReplication == BGR_HASHBLOCK
				&& uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
			// Just crossed a hash block boundary, look for new candidate starting at this very index
			uplink->nextReplicationIndex = findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
			if ( uplink->nextReplicationIndex == -1 )
				break;
		}
	}
	ref_put( &cache->reference );
	return true;
}

/**
 * find next index into cache map that corresponds to the beginning
 * of a hash block which is neither completely empty nor completely
 * replicated yet. Returns -1 if no match.
 */
static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex)
{
	int retval = -1;
	dnbd3_cache_map_t *cache = ref_get_cachemap( uplink->image );
	if ( cache != NULL ) {
		const int mapBytes = IMGSIZE_TO_MAPBYTES( uplink->image->virtualFilesize );
		const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK );
		atomic_thread_fence( memory_order_acquire );
		int j;
		for (j = 0; j < mapBytes; ++j) {
			const int i = ( start + j ) % mapBytes;
			const uint8_t b = atomic_load_explicit( &cache->map[i], memory_order_relaxed );
			const bool isFull = b == 0xff || ( i + 1 == mapBytes && uplink->replicatedLastBlock );
			const bool isEmpty = b == 0;
			if ( !isEmpty && !isFull ) {
				// Neither full nor empty, replicate
				if ( retval == -1 ) {
					retval = i;
				}
				break;
			}
			if ( ( i & MAP_INDEX_HASH_START_MASK ) == i ) {
				// Reset state if we just crossed into the next hash chunk
				retval = ( isEmpty ) ? ( i ) : ( -1 );
			} else if ( isFull ) {
				if ( retval != -1 ) {
					// It's a full one, previous one was empty -> replicate
					break;
				}
			} else if ( isEmpty ) {
				if ( retval == -1 ) { // Previous one was full -> replicate
					retval = i;
					break;
				}
			}
		}
		if ( j == mapBytes ) { // Nothing found, loop ran until end
			retval = -1;
		}
	}
	ref_put( &cache->reference );
	return retval;
}

/**
 * Receive data from uplink server and process/dispatch
 * Locks on: uplink.lock, images[].lock
 * Only called from uplink thread, so current.fd is assumed to be valid.
 */
static void handleReceive(dnbd3_uplink_t *uplink)
{
	dnbd3_reply_t inReply;
	int ret;
	assert_uplink_thread();
	assert( uplink->queueLen >= 0 );
	for (;;) {
		ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
		if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
		if ( ret == REPLY_AGAIN ) break;
		if ( unlikely( ret == REPLY_CLOSED ) ) {
			logadd( LOG_INFO, "Uplink: Remote host hung up (%s:%d)", PIMG(uplink->image) );
			goto error_cleanup;
		}
		if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
			logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s:%d)", PIMG(uplink->image) );
			goto error_cleanup;
		}
		if ( unlikely( ret != REPLY_OK ) ) {
			logadd( LOG_INFO, "Uplink: Connection error %d (%s:%d)", ret, PIMG(uplink->image) );
			goto error_cleanup;
		}
		if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
			logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s:%d", inReply.size, PIMG(uplink->image) );
			goto error_cleanup;
		}

		if ( unlikely( uplink->recvBufferLen < inReply.size ) ) {
			uplink->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
			uplink->recvBuffer = realloc( uplink->recvBuffer, uplink->recvBufferLen );
			if ( uplink->recvBuffer == NULL ) {
				logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
				exit( 1 );
			}
		}
		if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
			logadd( LOG_INFO, "Lost connection to uplink server of %s:%d (payload)", PIMG(uplink->image) );
			goto error_cleanup;
		}
		// Payload read completely
		// Bail out if we're not interested
		if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) )
			continue;
		// Is a legit block reply
		totalBytesReceived += inReply.size;
		uplink->bytesReceived += inReply.size;
		// Get entry from queue
		dnbd3_queue_entry_t *entry;
		mutex_lock( &uplink->queueLock );
		for ( entry = uplink->queue; entry != NULL; entry = entry->next ) {
			if ( entry->handle == inReply.handle )
				break;
		}
		if ( entry == NULL ) {
			mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
			logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)",
					inReply.handle, PIMG(uplink->image) );
			continue;
		}
		const uint64_t start = entry->from;
		const uint64_t end = entry->to;
		mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
		// We don't remove the entry from the list here yet, to slightly increase the chance of other
		// clients attaching to this request while we write the data to disk
		if ( end - start != inReply.size ) {
			logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)",
					inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) );
		}
		// 1) Write to cache file
		if ( unlikely( uplink->cacheFd == -1 ) ) {
			reopenCacheFd( uplink, false );
		}
		if ( likely( uplink->cacheFd != -1 ) ) {
			int err = 0;
			bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
			uint32_t done = 0;
			ret = 0;
			while ( done < inReply.size ) {
				ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done );
				if ( unlikely( ret == -1 ) ) {
					err = errno;
					if ( err == EINTR && !_shutdown ) continue;
					if ( err == ENOSPC || err == EDQUOT ) {
						// try to free 256MiB
						if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break;
						tryAgain = false;
						continue; // Success, retry write
					}
					if ( err == EBADF || err == EINVAL || err == EIO ) {
						uplink->image->problem.write = true;
						if ( !tryAgain || !reopenCacheFd( uplink, true ) )
							break;
						tryAgain = false;
						continue; // Write handle to image successfully re-opened, try again
					}
					logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d",
							PIMG(uplink->image), err );
					break;
				}
				if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) {
					logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d",
							ret, PIMG(uplink->image) );
					break;
				}
				done += (uint32_t)ret;
			}
			if ( likely( done > 0 ) ) {
				image_updateCachemap( uplink->image, start, start + done, true );
			}
			if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
				logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
						PIMG(uplink->image), err );
			}
		}
		bool found = false;
		dnbd3_queue_entry_t **it;
		mutex_lock( &uplink->queueLock );
		for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) {
			if ( *it == entry && entry->handle == inReply.handle ) { // ABA check
				assert( found == false );
				*it = (**it).next;
				found = true;
				uplink->queueLen--;
				break;
			}
		}
		if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) {
			uplink->image->problem.queue = false;
		}
		mutex_unlock( &uplink->queueLock );
		if ( !found ) {
			logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)",
					PIMG(uplink->image) );
			continue;
		}
		dnbd3_queue_client_t *next;
		for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
			assert( c->from >= start && c->to <= end );
			(*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ),
					(const char*)( uplink->recvBuffer + (c->from - start) ) );
			next = c->next;
			free( c );
		}
		if ( entry->clients != NULL ) {
			// Was some client -- reset idle counter
			uplink->idleTime = 0;
			// Re-enable replication if disabled
			if ( uplink->nextReplicationIndex == -1 ) {
				uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
			}
		} else {
			if ( uplink->cacheFd != -1 ) {
				// Try to remove from fs cache if no client was interested in this data
				posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
			}
		}
		free( entry );
	} // main receive loop
	// Trigger background replication if applicable
	if ( !sendReplicationRequest( uplink ) ) {
		goto error_cleanup;
	}
	// Normal end
	return;
	// Error handling from failed receive or message parsing
error_cleanup: ;
	connectionFailed( uplink, true );
}

/**
 * Only call from uplink thread
 */
static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
	assert_uplink_thread();
	if ( uplink->current.fd == -1 )
		return;
	setThreadName( "panic-uplink" );
	altservers_serverFailed( uplink->current.index );
	mutex_lock( &uplink->sendMutex );
	uplink->image->problem.uplink = true;
	close( uplink->current.fd );
	uplink->current.fd = -1;
	mutex_unlock( &uplink->sendMutex );
	if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
		uplink->nextReplicationIndex = 0;
	}
	if ( !findNew )
		return;
	mutex_lock( &uplink->rttLock );
	bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1;
	mutex_unlock( &uplink->rttLock );
	if ( bail )
		return;
	altservers_findUplinkAsync( uplink );
}

/**
 * Send keep alive request to server.
 * Called from uplink thread, current.fd must be valid.
 */
static bool sendKeepalive(dnbd3_uplink_t *uplink)
{
	static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
	assert_uplink_thread();
	mutex_lock( &uplink->sendMutex );
	bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
	mutex_unlock( &uplink->sendMutex );
	return sendOk;
}

/**
 * Request crclist from uplink.
 * Called from uplink thread, current.fd must be valid.
 * FIXME This is broken as it could happen that another message arrives after sending
 * the request. Refactor, split and move receive into general receive handler.
 */
static void requestCrc32List(dnbd3_uplink_t *uplink)
{
	dnbd3_image_t *image = uplink->image;
	if ( image == NULL || image->virtualFilesize == 0 ) return;
	size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t);
	uint32_t masterCrc;
	uint32_t *buffer = malloc( bytes );
	mutex_lock( &uplink->sendMutex );
	bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
	if ( !sendOk ) {
		uplink->image->problem.uplink = true;
	}
	mutex_unlock( &uplink->sendMutex );
	if ( !sendOk || bytes == 0 ) {
		free( buffer );
		return;
	}
	uint32_t lists_crc = crc32( 0, NULL, 0 );
	lists_crc = crc32( lists_crc, (uint8_t*)buffer, bytes );
	lists_crc = net_order_32( lists_crc );
	if ( lists_crc != masterCrc ) {
		logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s:%d)!", PIMG(uplink->image) );
		free( buffer );
		return;
	}
	uplink->image->masterCrc32 = masterCrc;
	uplink->image->crc32 = buffer;
	const size_t len = strlen( uplink->image->path ) + 30;
	char path[len];
	snprintf( path, len, "%s.crc", uplink->image->path );
	const int fd = open( path, O_WRONLY | O_CREAT, 0644 );
	if ( fd != -1 ) {
		ssize_t ret = write( fd, &masterCrc, sizeof(masterCrc) );
		ret += write( fd, buffer, bytes );
		close( fd );
		if ( (size_t)ret != sizeof(masterCrc) + bytes ) {
			unlink( path );
			logadd( LOG_WARNING, "Could not write crc32 file for %s:%d", PIMG(uplink->image) );
		}
	}
}

/**
 * Open the given image's main image file in
 * rw mode, assigning it to the cacheFd struct member.
 *
 * @param force If cacheFd was previously assigned a file descriptor (not == -1),
 * it will be closed first. Otherwise, nothing will happen and true will be returned
 * immediately.
 */
static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
{
	if ( uplink->cacheFd != -1 ) {
		if ( !force ) return true;
		close( uplink->cacheFd );
	}
	uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 );
	uplink->image->problem.write = uplink->cacheFd == -1;
	return uplink->cacheFd != -1;
}

/**
 * Returns true if the uplink has been idle for some time (apart from
 * background replication, if it is set to hashblock, or if it has
 * a minimum number of active clients configured that is not currently
 * reached)
 */
static bool connectionShouldShutdown(dnbd3_uplink_t *uplink)
{
	return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT
			&& ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );
}

bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
{
	int current;
	mutex_lock( &uplink->rttLock );
	current = uplink->current.fd == -1 ? -1 : uplink->current.index;
	mutex_unlock( &uplink->rttLock );
	if ( current == -1 )
		return false;
	return altservers_toString( current, buffer, len );
}

/**
 * Get number of replication requests that should be sent right now to
 * meet the configured bgrWindowSize. Returns 0 if any client requests
 * are pending.
 * This applies a sort of "slow start" in case the uplink was recently
 * dealing with actual client requests, in that the uplink's idle time
 * (in seconds) is an upper bound for the number returned, so we don't
 * saturate the uplink with loads of requests right away, in case that
 * client triggers more requests to the uplink server.
 */
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink)
{
	int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 );
	if ( uplink->queueLen == 0 )
		return ret;
	mutex_lock( &uplink->queueLock );
	for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
		if ( it->clients == NULL ) {
			ret--;
		} else {
			ret = 0; // Do not allow BGR if client requests are being handled
			break;
		}
	}
	mutex_unlock( &uplink->queueLock );
	return ret;
}

static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle)
{
	mutex_lock( &uplink->queueLock );
	for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
		if ( it->handle == handle ) {
			it->sent = false;
			break;
		}
	}
	mutex_unlock( &uplink->queueLock );
}