summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-18 21:59:26 +0200
committerSimon Rettberg2019-08-18 21:59:26 +0200
commit1d2295131020688b5a688286ce8c53d6bb7abdb8 (patch)
treef161e25b92a23ad7aebfafb482b265a4c0cc948b /src/server/uplink.c
parent[SERVER] uplink: More consistent type/variable naming (diff)
downloaddnbd3-1d2295131020688b5a688286ce8c53d6bb7abdb8.tar.gz
dnbd3-1d2295131020688b5a688286ce8c53d6bb7abdb8.tar.xz
dnbd3-1d2295131020688b5a688286ce8c53d6bb7abdb8.zip
[SERVER] Add struct representing active connection to uplink server
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c78
1 files changed, 38 insertions, 40 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 7d66b21..e21e28c 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -97,7 +97,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->idleTime = 0;
uplink->queueLen = 0;
mutex_lock( &uplink->sendMutex );
- uplink->fd = -1;
+ uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cacheFd = -1;
uplink->signal = NULL;
@@ -105,12 +105,12 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
mutex_lock( &uplink->rttLock );
uplink->cycleDetected = false;
if ( sock >= 0 ) {
- uplink->betterFd = sock;
- uplink->betterServer = *host;
+ uplink->better.fd = sock;
+ uplink->better.host = *host;
uplink->rttTestResult = RTT_DOCHANGE;
- uplink->betterVersion = version;
+ uplink->better.version = version;
} else {
- uplink->betterFd = -1;
+ uplink->better.fd = -1;
uplink->rttTestResult = RTT_IDLE;
}
mutex_unlock( &uplink->rttLock );
@@ -211,7 +211,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
- if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) {
+ if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) {
mutex_unlock( &client->image->lock );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
mutex_lock( &uplink->rttLock );
@@ -315,14 +315,14 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
if ( mutex_trylock( &uplink->sendMutex ) != 0 ) {
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
- if ( uplink->fd == -1 ) {
+ if ( uplink->current.fd == -1 ) {
mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
if ( hops < 200 ) ++hops;
- const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
+ const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
mutex_unlock( &uplink->sendMutex );
if ( !ret ) {
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
@@ -405,7 +405,7 @@ static void* uplink_mainloop(void *data)
mutex_unlock( &uplink->rttLock );
if ( waitTime == 0 ) {
// Nothing
- } else if ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) {
+ } else if ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) {
waitTime = 1000;
} else {
declare_now;
@@ -413,7 +413,7 @@ static void* uplink_mainloop(void *data)
if ( waitTime < 100 ) waitTime = 100;
if ( waitTime > 5000 ) waitTime = 5000;
}
- events[EV_SOCKET].fd = uplink->fd;
+ events[EV_SOCKET].fd = uplink->current.fd;
numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || uplink->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
@@ -430,13 +430,11 @@ static void* uplink_mainloop(void *data)
uplink->rttTestResult = RTT_IDLE;
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
- const int fd = uplink->fd;
+ const int fd = uplink->current.fd;
mutex_lock( &uplink->sendMutex );
- uplink->fd = uplink->betterFd;
+ uplink->current = uplink->better;
mutex_unlock( &uplink->sendMutex );
- uplink->betterFd = -1;
- uplink->currentServer = uplink->betterServer;
- uplink->version = uplink->betterVersion;
+ uplink->better.fd = -1;
uplink->cycleDetected = false;
mutex_unlock( &uplink->rttLock );
discoverFailCount = 0;
@@ -445,7 +443,7 @@ static void* uplink_mainloop(void *data)
uplink->image->working = true;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
- if ( host_to_string( &uplink->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
+ if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) {
logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
@@ -471,7 +469,7 @@ static void* uplink_mainloop(void *data)
if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name );
}
- if ( uplink->fd != -1 ) {
+ if ( uplink->current.fd != -1 ) {
// Uplink seems fine, relay requests to it...
uplink_sendRequests( uplink, true );
} else { // No uplink; maybe it was shutdown since it was idle for too long
@@ -499,9 +497,9 @@ static void* uplink_mainloop(void *data)
uplink_saveCacheMap( uplink );
}
// Keep-alive
- if ( uplink->fd != -1 && uplink->replicationHandle == REP_NONE ) {
+ if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) {
// Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( uplink->fd ) ) {
+ if ( uplink_sendKeepalive( uplink->current.fd ) ) {
// Re-trigger periodically, in case it requires a minimum user count
uplink_sendReplicationRequest( uplink );
} else {
@@ -511,10 +509,10 @@ static void* uplink_mainloop(void *data)
}
}
// Don't keep uplink established if we're idle for too much
- if ( uplink->fd != -1 && uplink_connectionShouldShutdown( uplink ) ) {
+ if ( uplink->current.fd != -1 && uplink_connectionShouldShutdown( uplink ) ) {
mutex_lock( &uplink->sendMutex );
- close( uplink->fd );
- uplink->fd = events[EV_SOCKET].fd = -1;
+ close( uplink->current.fd );
+ uplink->current.fd = events[EV_SOCKET].fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
if ( uplink->recvBufferLen != 0 ) {
@@ -531,7 +529,7 @@ static void* uplink_mainloop(void *data)
const int rttTestResult = uplink->rttTestResult;
mutex_unlock( &uplink->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
// It seems it's time for a check
if ( image_isComplete( uplink->image ) ) {
// Quit work if image is complete
@@ -556,7 +554,7 @@ static void* uplink_mainloop(void *data)
timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
- if ( uplink->fd != -1 && !uplink->shutdown ) {
+ if ( uplink->current.fd != -1 && !uplink->shutdown ) {
bool resend = false;
ticks deadline;
timing_set( &deadline, &now, -10 );
@@ -594,10 +592,10 @@ static void* uplink_mainloop(void *data)
uplink->image->uplink = NULL;
}
mutex_lock( &uplink->queueLock );
- const int fd = uplink->fd;
+ const int fd = uplink->current.fd;
const dnbd3_signal_t* signal = uplink->signal;
mutex_lock( &uplink->sendMutex );
- uplink->fd = -1;
+ uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->signal = NULL;
// Do not access uplink->image after unlocking, since we set
@@ -610,8 +608,8 @@ static void* uplink_mainloop(void *data)
// Wait for the RTT check to finish/fail if it's in progress
while ( uplink->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
- if ( uplink->betterFd != -1 ) {
- close( uplink->betterFd );
+ if ( uplink->better.fd != -1 ) {
+ close( uplink->better.fd );
}
mutex_destroy( &uplink->queueLock );
mutex_destroy( &uplink->rttLock );
@@ -651,14 +649,14 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
mutex_unlock( &uplink->queueLock );
if ( hops < 200 ) ++hops;
mutex_lock( &uplink->sendMutex );
- const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
+ const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
mutex_unlock( &uplink->sendMutex );
if ( !ret ) {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( &uplink->currentServer );
+ altservers_serverFailed( &uplink->current.host );
return;
}
mutex_lock( &uplink->queueLock );
@@ -678,7 +676,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
*/
static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( uplink == NULL || uplink->fd == -1 ) return;
+ if ( uplink == NULL || uplink->current.fd == -1 ) return;
if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication
if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
return;
@@ -724,7 +722,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
uplink->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
mutex_lock( &uplink->sendMutex );
- bool sendOk = dnbd3_get_block( uplink->fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->version, 1 ) );
+ bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) );
mutex_unlock( &uplink->sendMutex );
if ( !sendOk ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
@@ -798,7 +796,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
dnbd3_reply_t inReply, outReply;
int ret, i;
for (;;) {
- ret = dnbd3_read_reply( uplink->fd, &inReply, false );
+ ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
if ( unlikely( ret == REPLY_CLOSED ) ) {
@@ -826,7 +824,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
exit( 1 );
}
}
- if ( unlikely( (uint32_t)sock_recv( uplink->fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
+ if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path );
goto error_cleanup;
}
@@ -973,12 +971,12 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
- if ( uplink->fd == -1 )
+ if ( uplink->current.fd == -1 )
return;
- altservers_serverFailed( &uplink->currentServer );
+ altservers_serverFailed( &uplink->current.host );
mutex_lock( &uplink->sendMutex );
- close( uplink->fd );
- uplink->fd = -1;
+ close( uplink->current.fd );
+ uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->replicationHandle = REP_NONE;
if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
@@ -987,7 +985,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
if ( !findNew )
return;
mutex_lock( &uplink->rttLock );
- bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->betterFd != -1;
+ bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1;
mutex_unlock( &uplink->rttLock );
if ( bail )
return;
@@ -1016,7 +1014,7 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
uint32_t masterCrc;
uint32_t *buffer = malloc( bytes );
mutex_lock( &uplink->sendMutex );
- bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes );
+ bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );