#include "altservers.h"
#include "locks.h"
#include "helper.h"
#include "image.h"
#include "fileutil.h"
#include "../shared/protocol.h"
#include "../shared/timing.h"
#include "../serverconfig.h"
#include <assert.h>
#include <inttypes.h>
#include <jansson.h>
static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL)
static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL)
static dnbd3_signal_t* runSignal = NULL;
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
static int numAltServers = 0;
static pthread_spinlock_t altServersLock;
static bool initDone = false;
static pthread_t altThread;
static void *altservers_main(void *data);
static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt);
void altservers_init()
{
srand( (unsigned int)time( NULL ) );
// Init spinlock
spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE );
spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE );
memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
logadd( LOG_ERROR, "Could not start altservers connector thread" );
exit( EXIT_FAILURE );
}
// Init waiting links queue
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
pending[i] = NULL;
}
initDone = true;
}
void altservers_shutdown()
{
if ( !initDone ) return;
signal_call( runSignal ); // Wake altservers thread up
thread_join( altThread, NULL );
}
static void addalt(int argc, char **argv, void *data)
{
char *shost;
dnbd3_host_t host;
bool isPrivate = false;
bool isClientOnly = false;
if ( argv[0][0] == '#' ) return;
for (shost = argv[0]; *shost != '\0'; ) { // Trim left and scan for "-" prefix
if ( *shost == '-' ) isPrivate = true;
else if ( *shost == '+' ) isClientOnly = true;
else if ( *shost != ' ' && *shost != '\t' ) break;
shost++;
}
if ( !parse_address( shost, &host ) ) {
logadd( LOG_WARNING, "Invalid entry in alt-servers file ignored: '%s'", shost );
return;
}
if ( argc == 1 ) argv[1] = "";
if ( altservers_add( &host, argv[1], isPrivate, isClientOnly ) ) {
(*(int*)data)++;
}
}
int altservers_load()
{
int count = 0;
char *name;
if ( asprintf( &name, "%s/%s", _configDir, "alt-servers" ) == -1 ) return -1;
file_loadLineBased( name, 1, 2, &addalt, (void*)&count );
free( name );
logadd( LOG_DEBUG1, "Added %d alt servers\n", count );
return count;
}
bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly)
{
int i, freeSlot = -1;
spin_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( isSameAddressPort( &altServers[i].host, host ) ) {
spin_unlock( &altServersLock );
return false;
} else if ( freeSlot == -1 && altServers[i].host.type == 0 ) {
freeSlot = i;
}
}
if ( freeSlot == -1 ) {
if ( numAltServers >= SERVER_MAX_ALTS ) {
logadd( LOG_WARNING, "Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS );
spin_unlock( &altServersLock );
return false;
}
freeSlot = numAltServers++;
}
altServers[freeSlot].host = *host;
altServers[freeSlot].isPrivate = isPrivate;
altServers[freeSlot].isClientOnly = isClientOnly;
if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment );
spin_unlock( &altServersLock );
return true;
}
/**
* ONLY called from the passed uplink's main thread
*/
void altservers_findUplink(dnbd3_connection_t *uplink)
{
int i;
// if betterFd != -1 it means the uplink is supposed to switch to another
// server. As this function here is called by the uplink thread, it can
// never be that the uplink is supposed to switch, but instead calls
// this function.
assert( uplink->betterFd == -1 );
spin_lock( &pendingLockWrite );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
if ( uplink->rttTestResult == RTT_INPROGRESS ) {
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != uplink ) continue;
// Yep, measuring right now
spin_unlock( &pendingLockWrite );
return;
}
}
// Find free slot for measurement
for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] != NULL ) continue;
pending[i] = uplink;
uplink->rttTestResult = RTT_INPROGRESS;
spin_unlock( &pendingLockWrite );
signal_call( runSignal ); // Wake altservers thread up
return;
}
// End of loop - no free slot
spin_unlock( &pendingLockWrite );
logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." );
}
/**
* The given uplink is about to disappear, so remove it from any queues
*/
void altservers_removeUplink(dnbd3_connection_t *uplink)
{
pthread_mutex_lock( &pendingLockConsume );
spin_lock( &pendingLockWrite );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
if ( pending[i] == uplink ) {
uplink->rttTestResult = RTT_NOT_REACHABLE;
pending[i] = NULL;
}
}
spin_unlock( &pendingLockWrite );
pthread_mutex_unlock( &pendingLockConsume );
}
/**
* Get <size> known (working) alt servers, ordered by network closeness
* (by finding the smallest possible subnet)
* Private servers are excluded, so this is what you want to call to
* get a list of servers you can tell a client about
*/
int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size)
{
if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0;
int i, j;
int count = 0;
int scores[size];
int score;
spin_lock( &altServersLock );
if ( size > numAltServers ) size = numAltServers;
for (i = 0; i < numAltServers; ++i) {
if ( altServers[i].host.type == 0 ) continue; // Slot is empty
if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers
if ( host->type == altServers[i].host.type ) {
score = altservers_netCloseness( host, &altServers[i].host ) - altServers[i].numFails;
} else {
score = -( altServers[i].numFails + 128 ); // Wrong address family
}
if ( count == 0 ) {
// Trivial - this is the first entry
output[0].host = altServers[i].host;
output[0].failures = 0;
scores[0] = score;
count++;
} else {
// Other entries already exist, insert in proper position
for (j = 0; j < size; ++j) {
if ( j < count && score <= scores[j] ) continue;
if ( j > count ) break; // Should never happen but just in case...
if ( j < count && j + 1 < size ) {
// Check if we're in the middle and need to move other entries...
memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) );
memmove( &scores[j + 1], &scores[j], sizeof(int) * (size - j - 1) );
}
if ( count < size ) {
count++;
}
output[j].host = altServers[i].host;
output[j].failures = 0;
scores[j] = score;
break;
}
}
}
spin_unlock( &altServersLock );
return count;
}
/**
* Get <size> alt servers. If there are more alt servers than
* requested, random servers will be picked.
* This function is suited for finding uplink servers as
* it includes private servers and ignores any "client only" servers
*/
int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency)
{
if ( size <= 0 ) return 0;
int count = 0, i;
ticks now;
timing_get( &now );
spin_lock( &altServersLock );
// Flip first server in list with a random one every time this is called
if ( numAltServers > 1 ) {
const dnbd3_alt_server_t tmp = altServers[0];
do {
i = rand() % numAltServers;
} while ( i == 0 );
altServers[0] = altServers[i];
altServers[i] = tmp;
}
// We iterate over the list twice. First run adds servers with 0 failures only,
// second one also considers those that failed (not too many times)
if ( size > numAltServers ) size = numAltServers;
for (i = 0; i < numAltServers * 2; ++i) {
dnbd3_alt_server_t *srv = &altServers[i % numAltServers];
if ( srv->host.type == 0 ) continue; // Slot is empty
if ( _proxyPrivateOnly && !srv->isPrivate ) continue; // Config says to consider private alt-servers only? ignore!
if ( srv->isClientOnly ) continue;
bool first = ( i < numAltServers );
if ( first ) {
if ( srv->numFails > 0 ) continue;
} else {
if ( srv->numFails == 0 ) continue; // Already added in first iteration
if ( !emergency && srv->numFails > SERVER_BAD_UPLINK_THRES // server failed X times in a row
&& timing_diff( &srv->lastFail, &now ) < SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore!
if ( !emergency ) srv->numFails--;
}
// server seems ok, include in output and decrease its fail counter
output[count++] = srv->host;
if ( count >= size ) break;
}
spin_unlock( &altServersLock );
return count;
}
json_t* altservers_toJson()
{
json_t *list = json_array();
spin_lock( &altServersLock );
char host[100];
const int count = numAltServers;
dnbd3_alt_server_t src[count];
memcpy( src, altServers, sizeof(src) );
spin_unlock( &altServersLock );
for (int i = 0; i < count; ++i) {
json_t *rtts = json_array();
for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
json_array_append_new( rtts, json_integer( src[i].rtt[ (j + src[i].rttIndex + 1) % SERVER_RTT_PROBES ] ) );
}
sock_printHost( &src[i].host, host, sizeof(host) );
json_t *server = json_pack( "{ss,ss,so,sb,sb,si}",
"comment", src[i].comment,
"host", host,
"rtt", rtts,
"isPrivate", (int)src[i].isPrivate,
"isClientOnly", (int)src[i].isClientOnly,
"numFails", src[i].numFails
);
json_array_append_new( list, server );
}
return list;
}
/**
* Update rtt history of given server - returns the new average for that server
*/
static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt)
{
unsigned int avg = rtt;
int i;
spin_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( !isSameAddressPort( host, &altServers[i].host ) ) continue;
altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt;
#if SERVER_RTT_PROBES == 5
avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2]
+ altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES;
#else
#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES
avg = 0;
for (int j = 0; j < SERVER_RTT_PROBES; ++j) {
avg += altServers[i].rtt[j];
}
avg /= SERVER_RTT_PROBES;
#endif
// If we got a new rtt value, server must be working
if ( altServers[i].numFails > 0 ) {
altServers[i].numFails--;
}
break;
}
spin_unlock( &altServersLock );
return avg;
}
/**
* Determine how close two addresses are to each other by comparing the number of
* matching bits from the left of the address. Does not count individual bits but
* groups of 4 for speed.
* Return: Closeness - higher number means closer
*/
int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2)
{
if ( host1 == NULL || host2 == NULL || host1->type != host2->type ) return -1;
int retval = 0;
const int max = host1->type == HOST_IP4 ? 4 : 16;
for (int i = 0; i < max; ++i) {
if ( (host1->addr[i] & 0xf0) != (host2->addr[i] & 0xf0) ) return retval;
++retval;
if ( (host1->addr[i] & 0x0f) != (host2->addr[i] & 0x0f) ) return retval;
++retval;
}
return retval;
}
/**
* Called if an uplink server failed during normal uplink operation. This unit keeps
* track of how often servers fail, and consider them disabled for some time if they
* fail too many times.
*/
void altservers_serverFailed(const dnbd3_host_t * const host)
{
int i;
int foundIndex = -1, lastOk = -1;
ticks now;
timing_get( &now );
spin_lock( &altServersLock );
for (i = 0; i < numAltServers; ++i) {
if ( foundIndex == -1 ) {
// Looking for the failed server in list
if ( isSameAddressPort( host, &altServers[i].host ) ) {
foundIndex = i;
}
} else if ( altServers[i].host.type != 0 && altServers[i].numFails == 0 ) {
lastOk = i;
}
}
// Do only increase counter if last fail was not too recent. This is
// to prevent the counter from increasing rapidly if many images use the
// same uplink. If there's a network hickup, all uplinks will call this
// function and would increase the counter too quickly, disabling the server.
if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_DELAY_INIT ) {
altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE;
altServers[foundIndex].lastFail = now;
if ( lastOk != -1 ) {
// Make sure non-working servers are put at the end of the list, so they're less likely
// to get picked when testing servers for uplink connections.
const dnbd3_alt_server_t tmp = altServers[foundIndex];
altServers[foundIndex] = altServers[lastOk];
altServers[lastOk] = tmp;
}
}
spin_unlock( &altServersLock );
}
/**
* Mainloop of this module. It will wait for requests by uplinks to find a
* suitable uplink server for them. If found, it will tell the uplink about
* the best server found. Currently the RTT history is kept per server and
* not per uplink, so if many images use the same uplink server, the history
* will update quite quickly. Needs to be improved some time, ie. by only
* updating the rtt if the last update was at least X seconds ago.
*/
static void *altservers_main(void *data UNUSED)
{
const int ALTS = 4;
int ret, itLink, itAlt, numAlts;
bool found;
char buffer[DNBD3_BLOCK_SIZE ];
dnbd3_reply_t reply;
dnbd3_host_t servers[ALTS + 1];
serialized_buffer_t serialized;
struct timespec start, end;
ticks nextCacheMapSave, nextCloseUnusedFd;
setThreadName( "altserver-check" );
blockNoncriticalSignals();
timing_gets( &nextCacheMapSave, 90 );
timing_gets( &nextCloseUnusedFd, 900 );
// Init signal
runSignal = signal_new();
if ( runSignal == NULL ) {
logadd( LOG_WARNING, "error creating signal object. Uplink feature unavailable." );
goto cleanup;
}
// LOOP
while ( !_shutdown ) {
// Wait 5 seconds max.
ret = signal_wait( runSignal, 5000 );
if ( _shutdown ) goto cleanup;
if ( ret == SIGNAL_ERROR ) {
if ( errno == EAGAIN || errno == EINTR ) continue;
logadd( LOG_WARNING, "Error %d on signal_clear on alservers_main! Things will break!", errno );
usleep( 100000 );
}
// Work your way through the queue
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
spin_lock( &pendingLockWrite );
if ( pending[itLink] == NULL ) {
spin_unlock( &pendingLockWrite );
continue; // Check once before locking, as a mutex is expensive
}
spin_unlock( &pendingLockWrite );
pthread_mutex_lock( &pendingLockConsume );
spin_lock( &pendingLockWrite );
dnbd3_connection_t * const uplink = pending[itLink];
spin_unlock( &pendingLockWrite );
if ( uplink == NULL ) { // Check again after locking
pthread_mutex_unlock( &pendingLockConsume );
continue;
}
dnbd3_image_t * const image = image_lock( uplink->image );
if ( image == NULL ) { // Check again after locking
uplink->rttTestResult = RTT_NOT_REACHABLE;
spin_lock( &pendingLockWrite );
pending[itLink] = NULL;
spin_unlock( &pendingLockWrite );
pthread_mutex_unlock( &pendingLockConsume );
logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement\n" );
continue;
}
logadd( LOG_DEBUG2, "Running altcheck for '%s:%d'", image->name, (int)image->rid );
assert( uplink->rttTestResult == RTT_INPROGRESS );
// Now get 4 alt servers
numAlts = altservers_getListForUplink( servers, ALTS, uplink->fd == -1 );
if ( uplink->fd != -1 ) {
// Add current server if not already in list
found = false;
for (itAlt = 0; itAlt < numAlts; ++itAlt) {
if ( !isSameAddressPort( &uplink->currentServer, &servers[itAlt] ) ) continue;
found = true;
break;
}
if ( !found ) servers[numAlts++] = uplink->currentServer;
}
// Test them all
int bestSock = -1;
int bestIndex = -1;
int bestProtocolVersion = -1;
unsigned long bestRtt = RTT_UNREACHABLE;
unsigned long currentRtt = RTT_UNREACHABLE;
for (itAlt = 0; itAlt < numAlts; ++itAlt) {
usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...)
// Connect
clock_gettime( BEST_CLOCK_SOURCE, &start );
int sock = sock_connect( &servers[itAlt], 750, 1000 );
if ( sock < 0 ) continue;
// Select image ++++++++++++++++++++++++++++++
if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) {
goto server_failed;
}
// See if selecting the image succeeded ++++++++++++++++++++++++++++++
uint16_t protocolVersion, rid;
uint64_t imageSize;
char *name;
if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) {
goto server_image_not_available;
}
if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed;
if ( name == NULL || strcmp( name, image->name ) != 0 ) {
ERROR_GOTO( server_failed, "[RTT] Server offers image '%s', requested '%s'", name, image->name );
}
if ( rid != image->rid ) {
ERROR_GOTO( server_failed, "[RTT] Server provides rid %d, requested was %d (%s)",
(int)rid, (int)image->rid, image->name );
}
if ( imageSize != image->virtualFilesize ) {
ERROR_GOTO( server_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)",
imageSize, image->virtualFilesize, image->name );
}
// Request first block (NOT random!) ++++++++++++++++++++++++++++++
if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) {
ERROR_GOTO( server_failed, "[RTT] Could not request first block for %s", image->name );
}
// See if requesting the block succeeded ++++++++++++++++++++++
if ( !dnbd3_get_reply( sock, &reply ) ) {
char buf[100] = { 0 };
host_to_string( &servers[itAlt], buf, 100 );
ERROR_GOTO( server_failed, "[RTT] Received corrupted reply header (%s) after CMD_GET_BLOCK (%s)",
buf, image->name );
}
// check reply header
if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %d bytes for %s",
reply.size, image->name );
}
if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
ERROR_GOTO( server_failed, "[RTT] Could not read first block payload for %s", image->name );
}
clock_gettime( BEST_CLOCK_SOURCE, &end );
// Measurement done - everything fine so far
spin_lock( &uplink->rttLock );
const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer );
// Penaltize rtt if this was a cycle; this will treat this server with lower priority
// in the near future too, so we prevent alternating between two servers that are both
// part of a cycle and have the lowest latency.
const unsigned int rtt = (unsigned int)((end.tv_sec - start.tv_sec) * 1000000
+ (end.tv_nsec - start.tv_nsec) / 1000
+ ( (isCurrent && uplink->cycleDetected) ? 1000000 : 0 )); // µs
unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt );
// If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000;
spin_unlock( &uplink->rttLock );
if ( uplink->fd != -1 && isCurrent ) {
// Was measuring current server
currentRtt = avg;
close( sock );
} else if ( avg < bestRtt ) {
// Was another server, update "best"
if ( bestSock != -1 ) close( bestSock );
bestSock = sock;
bestRtt = avg;
bestIndex = itAlt;
bestProtocolVersion = protocolVersion;
} else {
// Was too slow, ignore
close( sock );
}
// We're done, call continue
continue;
// Jump here if anything went wrong
// This will cleanup and continue
server_failed: ;
altservers_serverFailed( &servers[itAlt] );
server_image_not_available: ;
close( sock );
}
// Done testing all servers. See if we should switch
if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
// yep
logadd( LOG_DEBUG1, "Change @ %s - best: %luµs, current: %luµs\n", image->name, bestRtt, currentRtt );
sock_setTimeout( bestSock, _uplinkTimeout );
spin_lock( &uplink->rttLock );
uplink->betterFd = bestSock;
uplink->betterServer = servers[bestIndex];
uplink->betterVersion = bestProtocolVersion;
uplink->rttTestResult = RTT_DOCHANGE;
spin_unlock( &uplink->rttLock );
signal_call( uplink->signal );
} else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) {
// No server was reachable
spin_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_NOT_REACHABLE;
spin_unlock( &uplink->rttLock );
} else {
// nope
if ( bestSock != -1 ) close( bestSock );
spin_lock( &uplink->rttLock );
uplink->rttTestResult = RTT_DONTCHANGE;
uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away
spin_unlock( &uplink->rttLock );
if ( !image->working ) {
image->working = true;
logadd( LOG_DEBUG1, "No better alt server found, enabling '%s:%d' again.", image->name, (int)image->rid );
}
}
image_release( image );
// end of loop over all pending uplinks
spin_lock( &pendingLockWrite );
pending[itLink] = NULL;
spin_unlock( &pendingLockWrite );
pthread_mutex_unlock( &pendingLockConsume );
}
// Save cache maps of all images if applicable
// TODO: Has nothing to do with alt servers really, maybe move somewhere else?
declare_now;
if ( timing_reached( &nextCacheMapSave, &now ) ) {
timing_gets( &nextCacheMapSave, SERVER_CACHE_MAP_SAVE_INTERVAL );
image_saveAllCacheMaps();
}
// TODO: More random crap
if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) {
timing_gets( &nextCloseUnusedFd, 900 );
image_closeUnusedFd();
}
}
cleanup: ;
if ( runSignal != NULL ) signal_close( runSignal );
runSignal = NULL;
return NULL ;
}