summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-18 21:59:26 +0200
committerSimon Rettberg2019-08-18 21:59:26 +0200
commit1d2295131020688b5a688286ce8c53d6bb7abdb8 (patch)
treef161e25b92a23ad7aebfafb482b265a4c0cc948b
parent[SERVER] uplink: More consistent type/variable naming (diff)
downloaddnbd3-1d2295131020688b5a688286ce8c53d6bb7abdb8.tar.gz
dnbd3-1d2295131020688b5a688286ce8c53d6bb7abdb8.tar.xz
dnbd3-1d2295131020688b5a688286ce8c53d6bb7abdb8.zip
[SERVER] Add struct representing active connection to uplink server
-rw-r--r--src/server/altservers.c30
-rw-r--r--src/server/globals.h14
-rw-r--r--src/server/image.c2
-rw-r--r--src/server/integrity.c2
-rw-r--r--src/server/uplink.c78
5 files changed, 60 insertions, 66 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 1001981..fbe10a8 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -125,14 +125,14 @@ void altservers_findUplink(dnbd3_uplink_t *uplink)
{
if ( uplink->shutdown )
return;
- if ( uplink->fd != -1 && numAltServers <= 1 )
+ if ( uplink->current.fd != -1 && numAltServers <= 1 )
return;
int i;
// if betterFd != -1 it means the uplink is supposed to switch to another
// server. As this function here is called by the uplink thread, it can
// never be that the uplink is supposed to switch, but instead calls
// this function.
- assert( uplink->betterFd == -1 );
+ assert( uplink->better.fd == -1 );
// it is however possible that an RTT measurement is currently in progress,
// so check for that case and do nothing if one is in progress
// XXX As this function is only ever called by the image's uplink thread,
@@ -457,9 +457,9 @@ static void *altservers_main(void *data UNUSED)
if ( uplink == NULL )
continue;
// First, get 4 alt servers
- numAlts = altservers_getListForUplink( servers, ALTS, uplink->fd == -1 );
+ numAlts = altservers_getListForUplink( servers, ALTS, uplink->current.fd == -1 );
// If we're already connected and only got one server anyways, there isn't much to do
- if ( numAlts <= 1 && uplink->fd != -1 ) {
+ if ( numAlts <= 1 && uplink->current.fd != -1 ) {
uplink->rttTestResult = RTT_DONTCHANGE;
continue;
}
@@ -475,15 +475,15 @@ static void *altservers_main(void *data UNUSED)
}
LOG( LOG_DEBUG2, "[%d] Running alt check", itLink );
assert( uplink->rttTestResult == RTT_INPROGRESS );
- if ( uplink->fd != -1 ) {
+ if ( uplink->current.fd != -1 ) {
// Add current server if not already in list
found = false;
for (itAlt = 0; itAlt < numAlts; ++itAlt) {
- if ( !isSameAddressPort( &uplink->currentServer, &servers[itAlt] ) ) continue;
+ if ( !isSameAddressPort( &uplink->current.host, &servers[itAlt] ) ) continue;
found = true;
break;
}
- if ( !found ) servers[numAlts++] = uplink->currentServer;
+ if ( !found ) servers[numAlts++] = uplink->current.host;
}
// Test them all
int bestSock = -1;
@@ -537,7 +537,7 @@ static void *altservers_main(void *data UNUSED)
// Measurement done - everything fine so far
mutex_lock( &altServersLock );
mutex_lock( &uplink->rttLock );
- const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer );
+ const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->current.host );
// Penaltize rtt if this was a cycle; this will treat this server with lower priority
// in the near future too, so we prevent alternating between two servers that are both
// part of a cycle and have the lowest latency.
@@ -547,9 +547,9 @@ static void *altservers_main(void *data UNUSED)
unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt );
mutex_unlock( &altServersLock );
// If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time
- if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000;
+ if ( ( uplink->cycleDetected || uplink->current.fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000;
mutex_unlock( &uplink->rttLock );
- if ( uplink->fd != -1 && isCurrent ) {
+ if ( uplink->current.fd != -1 && isCurrent ) {
// Was measuring current server
currentRtt = avg;
close( sock );
@@ -574,18 +574,18 @@ static void *altservers_main(void *data UNUSED)
close( sock );
}
// Done testing all servers. See if we should switch
- if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
+ if ( bestSock != -1 && (uplink->current.fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
// yep
- if ( currentRtt > 10000000 || uplink->fd == -1 ) {
+ if ( currentRtt > 10000000 || uplink->current.fd == -1 ) {
LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt );
} else {
LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt );
}
sock_setTimeout( bestSock, _uplinkTimeout );
mutex_lock( &uplink->rttLock );
- uplink->betterFd = bestSock;
- uplink->betterServer = servers[bestIndex];
- uplink->betterVersion = bestProtocolVersion;
+ uplink->better.fd = bestSock;
+ uplink->better.host = servers[bestIndex];
+ uplink->better.version = bestProtocolVersion;
uplink->rttTestResult = RTT_DOCHANGE;
mutex_unlock( &uplink->rttLock );
signal_call( uplink->signal );
diff --git a/src/server/globals.h b/src/server/globals.h
index 0371e33..659e5a2 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -31,9 +31,9 @@ typedef struct
} dnbd3_queued_request_t;
typedef struct {
- int fd;
- int version;
- dnbd3_host_t host;
+ int fd; // Socket fd for this connection
+ int version; // Protocol version of remote server
+ dnbd3_host_t host; // IP/Port of remote server
} dnbd3_server_connection_t;
#define RTT_IDLE 0 // Not in progress
@@ -43,20 +43,16 @@ typedef struct {
#define RTT_NOT_REACHABLE 4 // No uplink was reachable
struct _dnbd3_uplink
{
- int fd; // socket fd to remote server
- int version; // remote server protocol version
+ dnbd3_server_connection_t current; // Currently active connection; fd == -1 means disconnected
+ dnbd3_server_connection_t better; // Better connection as found by altserver worker; fd == -1 means none
dnbd3_signal_t* signal; // used to wake up the process
pthread_t thread; // thread holding the connection
pthread_mutex_t sendMutex; // For locking socket while sending
pthread_mutex_t queueLock; // lock for synchronization on request queue etc.
dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer
- dnbd3_host_t currentServer; // Current server we're connected to
pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer
int rttTestResult; // RTT_*
int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD!
- int betterVersion; // protocol version of better server
- int betterFd; // Active connection to better server, ready to use
- dnbd3_host_t betterServer; // The better server
uint8_t *recvBuffer; // Buffer for receiving payload
uint32_t recvBufferLen; // Len of ^^
atomic_bool shutdown; // signal this thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop()
diff --git a/src/server/image.c b/src/server/image.c
index 4a65ed3..d250715 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -1508,7 +1508,7 @@ json_t* image_getListAsJson()
uplinkName[0] = '\0';
} else {
bytesReceived = image->uplink->bytesReceived;
- if ( image->uplink->fd == -1 || !host_to_string( &image->uplink->currentServer, uplinkName, sizeof(uplinkName) ) ) {
+ if ( image->uplink->current.fd == -1 || !host_to_string( &image->uplink->current.host, uplinkName, sizeof(uplinkName) ) ) {
uplinkName[0] = '\0';
}
}
diff --git a/src/server/integrity.c b/src/server/integrity.c
index c52d17b..3d1ac9b 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -240,7 +240,7 @@ static void* integrity_main(void * data UNUSED)
if ( !foundCorrupted ) {
mutex_lock( &image->lock );
if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper?
- image->working = image->uplink->fd != -1 && image->readFd != -1;
+ image->working = image->uplink->current.fd != -1 && image->readFd != -1;
}
mutex_unlock( &image->lock );
}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 7d66b21..e21e28c 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -97,7 +97,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->idleTime = 0;
uplink->queueLen = 0;
mutex_lock( &uplink->sendMutex );
- uplink->fd = -1;
+ uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cacheFd = -1;
uplink->signal = NULL;
@@ -105,12 +105,12 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
mutex_lock( &uplink->rttLock );
uplink->cycleDetected = false;
if ( sock >= 0 ) {
- uplink->betterFd = sock;
- uplink->betterServer = *host;
+ uplink->better.fd = sock;
+ uplink->better.host = *host;
uplink->rttTestResult = RTT_DOCHANGE;
- uplink->betterVersion = version;
+ uplink->better.version = version;
} else {
- uplink->betterFd = -1;
+ uplink->better.fd = -1;
uplink->rttTestResult = RTT_IDLE;
}
mutex_unlock( &uplink->rttLock );
@@ -211,7 +211,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) {
+ if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) {
mutex_unlock( &client->image->lock );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
mutex_lock( &uplink->rttLock );
@@ -315,14 +315,14 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
- if ( uplink->fd == -1 ) {
+ if ( uplink->current.fd == -1 ) {
mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
if ( hops < 200 ) ++hops;
- const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
+ const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
mutex_unlock( &uplink->sendMutex );
if ( !ret ) {
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
@@ -405,7 +405,7 @@ static void* uplink_mainloop(void *data)
mutex_unlock( &uplink->rttLock );
if ( waitTime == 0 ) {
// Nothing
- } else if ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) {
+ } else if ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) {
waitTime = 1000;
} else {
declare_now;
@@ -413,7 +413,7 @@ static void* uplink_mainloop(void *data)
if ( waitTime < 100 ) waitTime = 100;
if ( waitTime > 5000 ) waitTime = 5000;
}
- events[EV_SOCKET].fd = uplink->fd;
+ events[EV_SOCKET].fd = uplink->current.fd;
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || uplink->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
@@ -430,13 +430,11 @@ static void* uplink_mainloop(void *data)
uplink->rttTestResult = RTT_IDLE;
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
- const int fd = uplink->fd;
+ const int fd = uplink->current.fd;
mutex_lock( &uplink->sendMutex );
- uplink->fd = uplink->betterFd;
+ uplink->current = uplink->better;
mutex_unlock( &uplink->sendMutex );
- uplink->betterFd = -1;
- uplink->currentServer = uplink->betterServer;
- uplink->version = uplink->betterVersion;
+ uplink->better.fd = -1;
uplink->cycleDetected = false;
mutex_unlock( &uplink->rttLock );
discoverFailCount = 0;
@@ -445,7 +443,7 @@ static void* uplink_mainloop(void *data)
uplink->image->working = true;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
- if ( host_to_string( &uplink->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
+ if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) {
logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
@@ -471,7 +469,7 @@ static void* uplink_mainloop(void *data)
if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name );
}
- if ( uplink->fd != -1 ) {
+ if ( uplink->current.fd != -1 ) {
// Uplink seems fine, relay requests to it...
uplink_sendRequests( uplink, true );
} else { // No uplink; maybe it was shutdown since it was idle for too long
@@ -499,9 +497,9 @@ static void* uplink_mainloop(void *data)
uplink_saveCacheMap( uplink );
}
// Keep-alive
- if ( uplink->fd != -1 && uplink->replicationHandle == REP_NONE ) {
+ if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) {
// Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( uplink->fd ) ) {
+ if ( uplink_sendKeepalive( uplink->current.fd ) ) {
// Re-trigger periodically, in case it requires a minimum user count
uplink_sendReplicationRequest( uplink );
} else {
@@ -511,10 +509,10 @@ static void* uplink_mainloop(void *data)
}
}
// Don't keep uplink established if we're idle for too much
- if ( uplink->fd != -1 && uplink_connectionShouldShutdown( uplink ) ) {
+ if ( uplink->current.fd != -1 && uplink_connectionShouldShutdown( uplink ) ) {
mutex_lock( &uplink->sendMutex );
- close( uplink->fd );
- uplink->fd = events[EV_SOCKET].fd = -1;
+ close( uplink->current.fd );
+ uplink->current.fd = events[EV_SOCKET].fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
if ( uplink->recvBufferLen != 0 ) {
@@ -531,7 +529,7 @@ static void* uplink_mainloop(void *data)
const int rttTestResult = uplink->rttTestResult;
mutex_unlock( &uplink->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
// It seems it's time for a check
if ( image_isComplete( uplink->image ) ) {
// Quit work if image is complete
@@ -556,7 +554,7 @@ static void* uplink_mainloop(void *data)
timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
- if ( uplink->fd != -1 && !uplink->shutdown ) {
+ if ( uplink->current.fd != -1 && !uplink->shutdown ) {
bool resend = false;
ticks deadline;
timing_set( &deadline, &now, -10 );
@@ -594,10 +592,10 @@ static void* uplink_mainloop(void *data)
uplink->image->uplink = NULL;
}
mutex_lock( &uplink->queueLock );
- const int fd = uplink->fd;
+ const int fd = uplink->current.fd;
const dnbd3_signal_t* signal = uplink->signal;
mutex_lock( &uplink->sendMutex );
- uplink->fd = -1;
+ uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->signal = NULL;
// Do not access uplink->image after unlocking, since we set
@@ -610,8 +608,8 @@ static void* uplink_mainloop(void *data)
// Wait for the RTT check to finish/fail if it's in progress
while ( uplink->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
- if ( uplink->betterFd != -1 ) {
- close( uplink->betterFd );
+ if ( uplink->better.fd != -1 ) {
+ close( uplink->better.fd );
}
mutex_destroy( &uplink->queueLock );
mutex_destroy( &uplink->rttLock );
@@ -651,14 +649,14 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
mutex_unlock( &uplink->queueLock );
if ( hops < 200 ) ++hops;
mutex_lock( &uplink->sendMutex );
- const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
+ const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
mutex_unlock( &uplink->sendMutex );
if ( !ret ) {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( &uplink->currentServer );
+ altservers_serverFailed( &uplink->current.host );
return;
}
mutex_lock( &uplink->queueLock );
@@ -678,7 +676,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
*/
static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( uplink == NULL || uplink->fd == -1 ) return;
+ if ( uplink == NULL || uplink->current.fd == -1 ) return;
if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication
if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
return;
@@ -724,7 +722,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
uplink->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
mutex_lock( &uplink->sendMutex );
- bool sendOk = dnbd3_get_block( uplink->fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->version, 1 ) );
+ bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) );
mutex_unlock( &uplink->sendMutex );
if ( !sendOk ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
@@ -798,7 +796,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
dnbd3_reply_t inReply, outReply;
int ret, i;
for (;;) {
- ret = dnbd3_read_reply( uplink->fd, &inReply, false );
+ ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
if ( unlikely( ret == REPLY_CLOSED ) ) {
@@ -826,7 +824,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
exit( 1 );
}
}
- if ( unlikely( (uint32_t)sock_recv( uplink->fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
+ if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path );
goto error_cleanup;
}
@@ -973,12 +971,12 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
- if ( uplink->fd == -1 )
+ if ( uplink->current.fd == -1 )
return;
- altservers_serverFailed( &uplink->currentServer );
+ altservers_serverFailed( &uplink->current.host );
mutex_lock( &uplink->sendMutex );
- close( uplink->fd );
- uplink->fd = -1;
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->replicationHandle = REP_NONE;
if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
@@ -987,7 +985,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
if ( !findNew )
return;
mutex_lock( &uplink->rttLock );
- bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->betterFd != -1;
+ bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1;
mutex_unlock( &uplink->rttLock );
if ( bail )
return;
@@ -1016,7 +1014,7 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
uint32_t masterCrc;
uint32_t *buffer = malloc( bytes );
mutex_lock( &uplink->sendMutex );
- bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes );
+ bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );