summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2019-08-18 21:31:56 +0200
committerSimon Rettberg2019-08-18 21:31:56 +0200
commitda0950ad342bae3b40a74bf82dba6c1f82e7eb57 (patch)
tree1c8353153ca7eb9e19de1b78066a080f30eb2b6c /src/server/uplink.c
parent[SERVER] altservers: Don't run check if <= 1 alt server available (diff)
downloaddnbd3-da0950ad342bae3b40a74bf82dba6c1f82e7eb57.tar.gz
dnbd3-da0950ad342bae3b40a74bf82dba6c1f82e7eb57.tar.xz
dnbd3-da0950ad342bae3b40a74bf82dba6c1f82e7eb57.zip
[SERVER] uplink: More consistent type/variable naming
* Change link to uplink everywhere * dnbd3_connection_t -> dnbd3_uplink_t
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c554
1 files changed, 277 insertions, 277 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 9570273..7d66b21 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -46,16 +46,16 @@ static const char *const NAMES_ULR[4] = {
static atomic_uint_fast64_t totalBytesReceived = 0;
static void* uplink_mainloop(void *data);
-static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly);
-static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int lastBlockIndex);
-static void uplink_handleReceive(dnbd3_connection_t *link);
+static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
+static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
+static void uplink_handleReceive(dnbd3_uplink_t *uplink);
static int uplink_sendKeepalive(const int fd);
-static void uplink_addCrc32(dnbd3_connection_t *uplink);
-static void uplink_sendReplicationRequest(dnbd3_connection_t *link);
-static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force);
-static bool uplink_saveCacheMap(dnbd3_connection_t *link);
-static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link);
-static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew);
+static void uplink_addCrc32(dnbd3_uplink_t *uplink);
+static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
+static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
+static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink);
+static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink);
+static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
// ############ uplink connection handling
@@ -76,7 +76,7 @@ uint64_t uplink_getTotalBytesReceived()
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version)
{
if ( !_isProxy || _shutdown ) return false;
- dnbd3_connection_t *link = NULL;
+ dnbd3_uplink_t *uplink = NULL;
assert( image != NULL );
mutex_lock( &image->lock );
if ( image->uplink != NULL && !image->uplink->shutdown ) {
@@ -88,44 +88,44 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name );
goto failure;
}
- link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
- mutex_init( &link->queueLock, LOCK_UPLINK_QUEUE );
- mutex_init( &link->rttLock, LOCK_UPLINK_RTT );
- mutex_init( &link->sendMutex, LOCK_UPLINK_SEND );
- link->image = image;
- link->bytesReceived = 0;
- link->idleTime = 0;
- link->queueLen = 0;
- mutex_lock( &link->sendMutex );
- link->fd = -1;
- mutex_unlock( &link->sendMutex );
- link->cacheFd = -1;
- link->signal = NULL;
- link->replicationHandle = REP_NONE;
- mutex_lock( &link->rttLock );
- link->cycleDetected = false;
+ uplink = image->uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
+ mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
+ mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
+ mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
+ uplink->image = image;
+ uplink->bytesReceived = 0;
+ uplink->idleTime = 0;
+ uplink->queueLen = 0;
+ mutex_lock( &uplink->sendMutex );
+ uplink->fd = -1;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->cacheFd = -1;
+ uplink->signal = NULL;
+ uplink->replicationHandle = REP_NONE;
+ mutex_lock( &uplink->rttLock );
+ uplink->cycleDetected = false;
if ( sock >= 0 ) {
- link->betterFd = sock;
- link->betterServer = *host;
- link->rttTestResult = RTT_DOCHANGE;
- link->betterVersion = version;
+ uplink->betterFd = sock;
+ uplink->betterServer = *host;
+ uplink->rttTestResult = RTT_DOCHANGE;
+ uplink->betterVersion = version;
} else {
- link->betterFd = -1;
- link->rttTestResult = RTT_IDLE;
+ uplink->betterFd = -1;
+ uplink->rttTestResult = RTT_IDLE;
}
- mutex_unlock( &link->rttLock );
- link->recvBufferLen = 0;
- link->shutdown = false;
- if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) {
+ mutex_unlock( &uplink->rttLock );
+ uplink->recvBufferLen = 0;
+ uplink->shutdown = false;
+ if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)link ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
}
mutex_unlock( &image->lock );
return true;
failure: ;
- if ( link != NULL ) {
- free( link );
- link = image->uplink = NULL;
+ if ( uplink != NULL ) {
+ free( uplink );
+ uplink = image->uplink = NULL;
}
mutex_unlock( &image->lock );
return false;
@@ -146,7 +146,7 @@ void uplink_shutdown(dnbd3_image_t *image)
mutex_unlock( &image->lock );
return;
}
- dnbd3_connection_t * const uplink = image->uplink;
+ dnbd3_uplink_t * const uplink = image->uplink;
mutex_lock( &uplink->queueLock );
if ( !uplink->shutdown ) {
uplink->shutdown = true;
@@ -170,7 +170,7 @@ void uplink_shutdown(dnbd3_image_t *image)
* Remove given client from uplink request queue
* Locks on: uplink.queueLock
*/
-void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
+void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
{
mutex_lock( &uplink->queueLock );
for (int i = uplink->queueLen - 1; i >= 0; --i) {
@@ -203,7 +203,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
return false;
}
- dnbd3_connection_t * const uplink = client->image->uplink;
+ dnbd3_uplink_t * const uplink = client->image->uplink;
if ( uplink->shutdown ) {
mutex_unlock( &client->image->lock );
logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
@@ -370,7 +370,7 @@ static void* uplink_mainloop(void *data)
#define EV_SOCKET (1)
#define EV_COUNT (2)
struct pollfd events[EV_COUNT];
- dnbd3_connection_t * const link = (dnbd3_connection_t*)data;
+ dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data;
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_INTERVAL_INIT;
uint32_t discoverFailCount = 0;
@@ -381,31 +381,31 @@ static void* uplink_mainloop(void *data)
timing_get( &nextAltCheck );
lastKeepalive = nextAltCheck;
//
- assert( link != NULL );
+ assert( uplink != NULL );
setThreadName( "idle-uplink" );
blockNoncriticalSignals();
// Make sure file is open for writing
- if ( !uplink_reopenCacheFd( link, false ) ) {
+ if ( !uplink_reopenCacheFd( uplink, false ) ) {
// It might have failed - still offer proxy mode, we just can't cache
- logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno );
+ logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno );
}
//
- link->signal = signal_new();
- if ( link->signal == NULL ) {
+ uplink->signal = signal_new();
+ if ( uplink->signal == NULL ) {
logadd( LOG_WARNING, "error creating signal. Uplink unavailable." );
goto cleanup;
}
events[EV_SIGNAL].events = POLLIN;
- events[EV_SIGNAL].fd = signal_getWaitFd( link->signal );
+ events[EV_SIGNAL].fd = signal_getWaitFd( uplink->signal );
events[EV_SOCKET].fd = -1;
- while ( !_shutdown && !link->shutdown ) {
+ while ( !_shutdown && !uplink->shutdown ) {
// poll()
- mutex_lock( &link->rttLock );
- waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1;
- mutex_unlock( &link->rttLock );
+ mutex_lock( &uplink->rttLock );
+ waitTime = uplink->rttTestResult == RTT_DOCHANGE ? 0 : -1;
+ mutex_unlock( &uplink->rttLock );
if ( waitTime == 0 ) {
// Nothing
- } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) {
+ } else if ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) {
waitTime = 1000;
} else {
declare_now;
@@ -413,9 +413,9 @@ static void* uplink_mainloop(void *data)
if ( waitTime < 100 ) waitTime = 100;
if ( waitTime > 5000 ) waitTime = 5000;
}
- events[EV_SOCKET].fd = link->fd;
+ events[EV_SOCKET].fd = uplink->fd;
numSocks = poll( events, EV_COUNT, waitTime );
- if ( _shutdown || link->shutdown ) goto cleanup;
+ if ( _shutdown || uplink->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
if ( errno == EINTR ) continue;
logadd( LOG_DEBUG1, "poll() error %d", (int)errno );
@@ -423,39 +423,39 @@ static void* uplink_mainloop(void *data)
continue;
}
// Check if server switch is in order
- mutex_lock( &link->rttLock );
- if ( link->rttTestResult != RTT_DOCHANGE ) {
- mutex_unlock( &link->rttLock );
+ mutex_lock( &uplink->rttLock );
+ if ( uplink->rttTestResult != RTT_DOCHANGE ) {
+ mutex_unlock( &uplink->rttLock );
} else {
- link->rttTestResult = RTT_IDLE;
+ uplink->rttTestResult = RTT_IDLE;
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
- const int fd = link->fd;
- mutex_lock( &link->sendMutex );
- link->fd = link->betterFd;
- mutex_unlock( &link->sendMutex );
- link->betterFd = -1;
- link->currentServer = link->betterServer;
- link->version = link->betterVersion;
- link->cycleDetected = false;
- mutex_unlock( &link->rttLock );
+ const int fd = uplink->fd;
+ mutex_lock( &uplink->sendMutex );
+ uplink->fd = uplink->betterFd;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->betterFd = -1;
+ uplink->currentServer = uplink->betterServer;
+ uplink->version = uplink->betterVersion;
+ uplink->cycleDetected = false;
+ mutex_unlock( &uplink->rttLock );
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
- link->replicationHandle = REP_NONE;
- link->image->working = true;
- link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
+ uplink->replicationHandle = REP_NONE;
+ uplink->image->working = true;
+ uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
- if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
- logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", link->image->name, buffer + 1 );
+ if ( host_to_string( &uplink->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
+ logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
// If we don't have a crc32 list yet, see if the new server has one
- if ( link->image->crc32 == NULL ) {
- uplink_addCrc32( link );
+ if ( uplink->image->crc32 == NULL ) {
+ uplink_addCrc32( uplink );
}
// Re-send all pending requests
- uplink_sendRequests( link, false );
- uplink_sendReplicationRequest( link );
+ uplink_sendRequests( uplink, false );
+ uplink_sendReplicationRequest( uplink );
events[EV_SOCKET].events = POLLIN | POLLRDHUP;
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
@@ -468,161 +468,161 @@ static void* uplink_mainloop(void *data)
goto cleanup;
} else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
// signal triggered -> pending requests
- if ( signal_clear( link->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name );
+ if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name );
}
- if ( link->fd != -1 ) {
+ if ( uplink->fd != -1 ) {
// Uplink seems fine, relay requests to it...
- uplink_sendRequests( link, true );
+ uplink_sendRequests( uplink, true );
} else { // No uplink; maybe it was shutdown since it was idle for too long
- link->idleTime = 0;
+ uplink->idleTime = 0;
}
}
// Uplink socket
if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
- uplink_connectionFailed( link, true );
+ uplink_connectionFailed( uplink, true );
logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
- uplink_handleReceive( link );
- if ( _shutdown || link->shutdown ) goto cleanup;
+ uplink_handleReceive( uplink );
+ if ( _shutdown || uplink->shutdown ) goto cleanup;
}
declare_now;
uint32_t timepassed = timing_diff( &lastKeepalive, &now );
if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
lastKeepalive = now;
- link->idleTime += timepassed;
+ uplink->idleTime += timepassed;
unsavedSeconds += timepassed;
- if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) {
- // fsync/save every 4 minutes, or every 60 seconds if link is idle
+ if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && uplink->idleTime >= 20 && uplink->idleTime <= 70 ) ) {
+ // fsync/save every 4 minutes, or every 60 seconds if uplink is idle
unsavedSeconds = 0;
- uplink_saveCacheMap( link );
+ uplink_saveCacheMap( uplink );
}
// Keep-alive
- if ( link->fd != -1 && link->replicationHandle == REP_NONE ) {
+ if ( uplink->fd != -1 && uplink->replicationHandle == REP_NONE ) {
// Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( link->fd ) ) {
+ if ( uplink_sendKeepalive( uplink->fd ) ) {
// Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( link );
+ uplink_sendReplicationRequest( uplink );
} else {
- uplink_connectionFailed( link, true );
+ uplink_connectionFailed( uplink, true );
logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
setThreadName( "panic-uplink" );
}
}
- // Don't keep link established if we're idle for too much
- if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
- mutex_lock( &link->sendMutex );
- close( link->fd );
- link->fd = events[EV_SOCKET].fd = -1;
- mutex_unlock( &link->sendMutex );
- link->cycleDetected = false;
- if ( link->recvBufferLen != 0 ) {
- link->recvBufferLen = 0;
- free( link->recvBuffer );
- link->recvBuffer = NULL;
+ // Don't keep uplink established if we're idle for too much
+ if ( uplink->fd != -1 && uplink_connectionShouldShutdown( uplink ) ) {
+ mutex_lock( &uplink->sendMutex );
+ close( uplink->fd );
+ uplink->fd = events[EV_SOCKET].fd = -1;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->cycleDetected = false;
+ if ( uplink->recvBufferLen != 0 ) {
+ uplink->recvBufferLen = 0;
+ free( uplink->recvBuffer );
+ uplink->recvBuffer = NULL;
}
- logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid );
+ logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", uplink->image->name, (int)uplink->image->rid );
setThreadName( "idle-uplink" );
}
}
// See if we should trigger an RTT measurement
- mutex_lock( &link->rttLock );
- const int rttTestResult = link->rttTestResult;
- mutex_unlock( &link->rttLock );
+ mutex_lock( &uplink->rttLock );
+ const int rttTestResult = uplink->rttTestResult;
+ mutex_unlock( &uplink->rttLock );
if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
- if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) {
+ if ( timing_reached( &nextAltCheck, &now ) || ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) {
// It seems it's time for a check
- if ( image_isComplete( link->image ) ) {
+ if ( image_isComplete( uplink->image ) ) {
// Quit work if image is complete
- logadd( LOG_INFO, "Replication of %s complete.", link->image->name );
+ logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name );
setThreadName( "finished-uplink" );
goto cleanup;
- } else if ( !uplink_connectionShouldShutdown( link ) ) {
+ } else if ( !uplink_connectionShouldShutdown( uplink ) ) {
// Not complete - do measurement
- altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
- if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
- link->nextReplicationIndex = 0;
+ altservers_findUplink( uplink ); // This will set RTT_INPROGRESS (synchronous)
+ if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
+ uplink->nextReplicationIndex = 0;
}
}
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX);
timing_set( &nextAltCheck, &now, altCheckInterval );
}
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
- mutex_lock( &link->rttLock );
- link->rttTestResult = RTT_IDLE;
- mutex_unlock( &link->rttLock );
+ mutex_lock( &uplink->rttLock );
+ uplink->rttTestResult = RTT_IDLE;
+ mutex_unlock( &uplink->rttLock );
discoverFailCount++;
timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) );
}
#ifdef _DEBUG
- if ( link->fd != -1 && !link->shutdown ) {
+ if ( uplink->fd != -1 && !uplink->shutdown ) {
bool resend = false;
ticks deadline;
timing_set( &deadline, &now, -10 );
- mutex_lock( &link->queueLock );
- for (i = 0; i < link->queueLen; ++i) {
- if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) {
+ mutex_lock( &uplink->queueLock );
+ for (i = 0; i < uplink->queueLen; ++i) {
+ if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) {
snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n"
- "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, link->queue[i].client->image->name,
- link->queue[i].from, link->queue[i].to, link->queue[i].status );
- link->queue[i].entered = now;
+ "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, uplink->queue[i].client->image->name,
+ uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status );
+ uplink->queue[i].entered = now;
#ifdef _DEBUG_RESEND_STARVING
- link->queue[i].status = ULR_NEW;
+ uplink->queue[i].status = ULR_NEW;
resend = true;
#endif
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->queueLock );
logadd( LOG_WARNING, "%s", buffer );
- mutex_lock( &link->queueLock );
+ mutex_lock( &uplink->queueLock );
}
}
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->queueLock );
if ( resend )
- uplink_sendRequests( link, true );
+ uplink_sendRequests( uplink, true );
}
#endif
}
cleanup: ;
- if ( !link->shutdown ) {
- link->shutdown = true;
- thread_detach( link->thread );
+ if ( !uplink->shutdown ) {
+ uplink->shutdown = true;
+ thread_detach( uplink->thread );
}
- altservers_removeUplink( link );
- uplink_saveCacheMap( link );
- mutex_lock( &link->image->lock );
- if ( link->image->uplink == link ) {
- link->image->uplink = NULL;
+ altservers_removeUplink( uplink );
+ uplink_saveCacheMap( uplink );
+ mutex_lock( &uplink->image->lock );
+ if ( uplink->image->uplink == uplink ) {
+ uplink->image->uplink = NULL;
}
- mutex_lock( &link->queueLock );
- const int fd = link->fd;
- const dnbd3_signal_t* signal = link->signal;
- mutex_lock( &link->sendMutex );
- link->fd = -1;
- mutex_unlock( &link->sendMutex );
- link->signal = NULL;
- // Do not access link->image after unlocking, since we set
+ mutex_lock( &uplink->queueLock );
+ const int fd = uplink->fd;
+ const dnbd3_signal_t* signal = uplink->signal;
+ mutex_lock( &uplink->sendMutex );
+ uplink->fd = -1;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->signal = NULL;
+ // Do not access uplink->image after unlocking, since we set
// image->uplink to NULL. Acquire with image_lock first,
// like done below when checking whether to re-init uplink
- mutex_unlock( &link->image->lock );
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->image->lock );
+ mutex_unlock( &uplink->queueLock );
if ( fd != -1 ) close( fd );
if ( signal != NULL ) signal_close( signal );
// Wait for the RTT check to finish/fail if it's in progress
- while ( link->rttTestResult == RTT_INPROGRESS )
+ while ( uplink->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
- if ( link->betterFd != -1 ) {
- close( link->betterFd );
+ if ( uplink->betterFd != -1 ) {
+ close( uplink->betterFd );
}
- mutex_destroy( &link->queueLock );
- mutex_destroy( &link->rttLock );
- mutex_destroy( &link->sendMutex );
- free( link->recvBuffer );
- link->recvBuffer = NULL;
- if ( link->cacheFd != -1 ) {
- close( link->cacheFd );
+ mutex_destroy( &uplink->queueLock );
+ mutex_destroy( &uplink->rttLock );
+ mutex_destroy( &uplink->sendMutex );
+ free( uplink->recvBuffer );
+ uplink->recvBuffer = NULL;
+ if ( uplink->cacheFd != -1 ) {
+ close( uplink->cacheFd );
}
- dnbd3_image_t *image = image_lock( link->image );
- free( link ); // !!!
+ dnbd3_image_t *image = image_lock( uplink->image );
+ free( uplink ); // !!!
if ( image != NULL ) {
if ( !_shutdown && image->cache_map != NULL ) {
// Ingegrity checker must have found something in the meantime
@@ -633,37 +633,37 @@ static void* uplink_mainloop(void *data)
return NULL ;
}
-static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
+static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
// Scan for new requests
int j;
- mutex_lock( &link->queueLock );
- for (j = 0; j < link->queueLen; ++j) {
- if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
- link->queue[j].status = ULR_PENDING;
- uint8_t hops = link->queue[j].hopCount;
- const uint64_t reqStart = link->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- const uint32_t reqSize = (uint32_t)(((link->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
+ mutex_lock( &uplink->queueLock );
+ for (j = 0; j < uplink->queueLen; ++j) {
+ if ( uplink->queue[j].status != ULR_NEW && (newOnly || uplink->queue[j].status != ULR_PENDING) ) continue;
+ uplink->queue[j].status = ULR_PENDING;
+ uint8_t hops = uplink->queue[j].hopCount;
+ const uint64_t reqStart = uplink->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
/*
logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")",
- (void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize );
+ (void*)link, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize );
*/
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->queueLock );
if ( hops < 200 ) ++hops;
- mutex_lock( &link->sendMutex );
- const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
- mutex_unlock( &link->sendMutex );
+ mutex_lock( &uplink->sendMutex );
+ const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
+ mutex_unlock( &uplink->sendMutex );
if ( !ret ) {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( &link->currentServer );
+ altservers_serverFailed( &uplink->currentServer );
return;
}
- mutex_lock( &link->queueLock );
+ mutex_lock( &uplink->queueLock );
}
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->queueLock );
}
/**
@@ -676,13 +676,13 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
* the code simpler. Worst case would be only one bit is zero, which means
* 4kb are missing, but we will request 32kb.
*/
-static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
+static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( link == NULL || link->fd == -1 ) return;
- if ( _backgroundReplication == BGR_DISABLED || link->cacheFd == -1 ) return; // Don't do background replication
- if ( link->nextReplicationIndex == -1 || link->replicationHandle != REP_NONE )
+ if ( uplink == NULL || uplink->fd == -1 ) return;
+ if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication
+ if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
return;
- dnbd3_image_t * const image = link->image;
+ dnbd3_image_t * const image = uplink->image;
if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
mutex_lock( &image->lock );
if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) {
@@ -694,17 +694,17 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
const int lastBlockIndex = mapBytes - 1;
int endByte;
if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks
- endByte = link->nextReplicationIndex + mapBytes;
+ endByte = uplink->nextReplicationIndex + mapBytes;
} else { // Hashblock based: Only look for match in current hash block
- endByte = ( link->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
+ endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK;
if ( endByte > mapBytes ) {
endByte = mapBytes;
}
}
int replicationIndex = -1;
- for ( int j = link->nextReplicationIndex; j < endByte; ++j ) {
+ for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) {
const int i = j % ( mapBytes ); // Wrap around for BGR_FULL
- if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !link->replicatedLastBlock ) ) {
+ if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) {
// Found incomplete one
replicationIndex = i;
break;
@@ -713,31 +713,31 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
mutex_unlock( &image->lock );
if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
// Nothing left in current block, find next one
- replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte );
+ replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte );
}
if ( replicationIndex == -1 ) {
// Replication might be complete, uplink_mainloop should take care....
- link->nextReplicationIndex = -1;
+ uplink->nextReplicationIndex = -1;
return;
}
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
- link->replicationHandle = offset;
+ uplink->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
- mutex_lock( &link->sendMutex );
- bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) );
- mutex_unlock( &link->sendMutex );
+ mutex_lock( &uplink->sendMutex );
+ bool sendOk = dnbd3_get_block( uplink->fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->version, 1 ) );
+ mutex_unlock( &uplink->sendMutex );
if ( !sendOk ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
return;
}
if ( replicationIndex == lastBlockIndex ) {
- link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
+ uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
}
- link->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
+ uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter
if ( _backgroundReplication == BGR_HASHBLOCK
- && link->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
+ && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
// Just crossed a hash block boundary, look for new candidate starting at this very index
- link->nextReplicationIndex = uplink_findNextIncompleteHashBlock( link, link->nextReplicationIndex );
+ uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
}
}
@@ -746,18 +746,18 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
* of a hash block which is neither completely empty nor completely
* replicated yet. Returns -1 if no match.
*/
-static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex)
+static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex)
{
int retval = -1;
- mutex_lock( &link->image->lock );
- const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize );
- const uint8_t *cache_map = link->image->cache_map;
+ mutex_lock( &uplink->image->lock );
+ const int mapBytes = IMGSIZE_TO_MAPBYTES( uplink->image->virtualFilesize );
+ const uint8_t *cache_map = uplink->image->cache_map;
if ( cache_map != NULL ) {
int j;
const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK );
for (j = 0; j < mapBytes; ++j) {
const int i = ( start + j ) % mapBytes;
- const bool isFull = cache_map[i] == 0xff || ( i + 1 == mapBytes && link->replicatedLastBlock );
+ const bool isFull = cache_map[i] == 0xff || ( i + 1 == mapBytes && uplink->replicatedLastBlock );
const bool isEmpty = cache_map[i] == 0;
if ( !isEmpty && !isFull ) {
// Neither full nor empty, replicate
@@ -785,7 +785,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const in
retval = -1;
}
}
- mutex_unlock( &link->image->lock );
+ mutex_unlock( &uplink->image->lock );
return retval;
}
@@ -793,41 +793,41 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const in
* Receive data from uplink server and process/dispatch
* Locks on: link.lock, images[].lock
*/
-static void uplink_handleReceive(dnbd3_connection_t *link)
+static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
dnbd3_reply_t inReply, outReply;
int ret, i;
for (;;) {
- ret = dnbd3_read_reply( link->fd, &inReply, false );
- if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue;
+ ret = dnbd3_read_reply( uplink->fd, &inReply, false );
+ if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
if ( ret == REPLY_AGAIN ) break;
if ( unlikely( ret == REPLY_CLOSED ) ) {
- logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path );
+ logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", uplink->image->path );
goto error_cleanup;
}
if ( unlikely( ret == REPLY_WRONGMAGIC ) ) {
- logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
+ logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", uplink->image->path );
goto error_cleanup;
}
if ( unlikely( ret != REPLY_OK ) ) {
- logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path );
+ logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, uplink->image->path );
goto error_cleanup;
}
if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) {
- logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path );
+ logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, uplink->image->path );
goto error_cleanup;
}
- if ( unlikely( link->recvBufferLen < inReply.size ) ) {
- link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
- link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
- if ( link->recvBuffer == NULL ) {
+ if ( unlikely( uplink->recvBufferLen < inReply.size ) ) {
+ uplink->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536);
+ uplink->recvBuffer = realloc( uplink->recvBuffer, uplink->recvBufferLen );
+ if ( uplink->recvBuffer == NULL ) {
logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" );
exit( 1 );
}
}
- if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) {
- logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path );
+ if ( unlikely( (uint32_t)sock_recv( uplink->fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) {
+ logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path );
goto error_cleanup;
}
// Payload read completely
@@ -838,18 +838,18 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
const uint64_t start = inReply.handle;
const uint64_t end = inReply.handle + inReply.size;
totalBytesReceived += inReply.size;
- link->bytesReceived += inReply.size;
+ uplink->bytesReceived += inReply.size;
// 1) Write to cache file
- if ( unlikely( link->cacheFd == -1 ) ) {
- uplink_reopenCacheFd( link, false );
+ if ( unlikely( uplink->cacheFd == -1 ) ) {
+ uplink_reopenCacheFd( uplink, false );
}
- if ( likely( link->cacheFd != -1 ) ) {
+ if ( likely( uplink->cacheFd != -1 ) ) {
int err = 0;
bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
uint32_t done = 0;
ret = 0;
while ( done < inReply.size ) {
- ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done );
+ ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done );
if ( unlikely( ret == -1 ) ) {
err = errno;
if ( err == EINTR ) continue;
@@ -860,26 +860,26 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
continue; // Success, retry write
}
if ( err == EBADF || err == EINVAL || err == EIO ) {
- if ( !tryAgain || !uplink_reopenCacheFd( link, true ) )
+ if ( !tryAgain || !uplink_reopenCacheFd( uplink, true ) )
break;
tryAgain = false;
continue; // Write handle to image successfully re-opened, try again
}
- logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", link->image->name, (int)link->image->rid, err );
+ logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", uplink->image->name, (int)uplink->image->rid, err );
break;
}
if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) {
- logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, link->image->name, (int)link->image->rid );
+ logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, uplink->image->name, (int)uplink->image->rid );
break;
}
done += (uint32_t)ret;
}
if ( likely( done > 0 ) ) {
- image_updateCachemap( link->image, start, start + done, true );
+ image_updateCachemap( uplink->image, start, start + done, true );
}
if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
- link->image->name, (int)link->image->rid, err );
+ uplink->image->name, (int)uplink->image->rid, err );
}
}
// 2) Figure out which clients are interested in it
@@ -888,9 +888,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
// by populating a slot with index greater than the highest matching
// request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW
// where it's fine if the index is greater)
- mutex_lock( &link->queueLock );
- for (i = 0; i < link->queueLen; ++i) {
- dnbd3_queued_request_t * const req = &link->queue[i];
+ mutex_lock( &uplink->queueLock );
+ for (i = 0; i < uplink->queueLen; ++i) {
+ dnbd3_queued_request_t * const req = &uplink->queue[i];
assert( req->status != ULR_PROCESSING );
if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue;
assert( req->client != NULL );
@@ -903,8 +903,8 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
// from 0, you also need to change the "attach to existing request"-logic in uplink_request()
outReply.magic = dnbd3_packet_magic;
bool served = false;
- for ( i = link->queueLen - 1; i >= 0; --i ) {
- dnbd3_queued_request_t * const req = &link->queue[i];
+ for ( i = uplink->queueLen - 1; i >= 0; --i ) {
+ dnbd3_queued_request_t * const req = &uplink->queue[i];
if ( req->status == ULR_PROCESSING ) {
size_t bytesSent = 0;
assert( req->from >= start && req->to <= end );
@@ -914,14 +914,14 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
outReply.size = (uint32_t)( req->to - req->from );
iov[0].iov_base = &outReply;
iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = link->recvBuffer + (req->from - start);
+ iov[1].iov_base = uplink->recvBuffer + (req->from - start);
iov[1].iov_len = outReply.size;
fixup_reply( outReply );
req->status = ULR_FREE;
req->client = NULL;
served = true;
mutex_lock( &client->sendMutex );
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->queueLock );
if ( client->sock != -1 ) {
ssize_t sent = writev( client->sock, iov, 2 );
if ( sent > (ssize_t)sizeof outReply ) {
@@ -932,66 +932,66 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
client->bytesSent += bytesSent;
}
mutex_unlock( &client->sendMutex );
- mutex_lock( &link->queueLock );
+ mutex_lock( &uplink->queueLock );
}
- if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;
+ if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--;
}
- mutex_unlock( &link->queueLock );
+ mutex_unlock( &uplink->queueLock );
#ifdef _DEBUG
- if ( !served && start != link->replicationHandle ) {
- logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end );
+ if ( !served && start != uplink->replicationHandle ) {
+ logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, uplink->image->name, start, end );
}
#endif
- if ( start == link->replicationHandle ) {
+ if ( start == uplink->replicationHandle ) {
// Was our background replication
- link->replicationHandle = REP_NONE;
+ uplink->replicationHandle = REP_NONE;
// Try to remove from fs cache if no client was interested in this data
- if ( !served && link->cacheFd != -1 ) {
- posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
+ if ( !served && uplink->cacheFd != -1 ) {
+ posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
}
}
if ( served ) {
// Was some client -- reset idle counter
- link->idleTime = 0;
+ uplink->idleTime = 0;
// Re-enable replication if disabled
- if ( link->nextReplicationIndex == -1 ) {
- link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
+ if ( uplink->nextReplicationIndex == -1 ) {
+ uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
}
}
}
- if ( link->replicationHandle == REP_NONE ) {
- mutex_lock( &link->queueLock );
- const bool rep = ( link->queueLen == 0 );
- mutex_unlock( &link->queueLock );
- if ( rep ) uplink_sendReplicationRequest( link );
+ if ( uplink->replicationHandle == REP_NONE ) {
+ mutex_lock( &uplink->queueLock );
+ const bool rep = ( uplink->queueLen == 0 );
+ mutex_unlock( &uplink->queueLock );
+ if ( rep ) uplink_sendReplicationRequest( uplink );
}
return;
// Error handling from failed receive or message parsing
error_cleanup: ;
- uplink_connectionFailed( link, true );
+ uplink_connectionFailed( uplink, true );
}
-static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew)
+static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
- if ( link->fd == -1 )
+ if ( uplink->fd == -1 )
return;
- altservers_serverFailed( &link->currentServer );
- mutex_lock( &link->sendMutex );
- close( link->fd );
- link->fd = -1;
- mutex_unlock( &link->sendMutex );
- link->replicationHandle = REP_NONE;
- if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
- link->nextReplicationIndex = 0;
+ altservers_serverFailed( &uplink->currentServer );
+ mutex_lock( &uplink->sendMutex );
+ close( uplink->fd );
+ uplink->fd = -1;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->replicationHandle = REP_NONE;
+ if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) {
+ uplink->nextReplicationIndex = 0;
}
if ( !findNew )
return;
- mutex_lock( &link->rttLock );
- bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1;
- mutex_unlock( &link->rttLock );
+ mutex_lock( &uplink->rttLock );
+ bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->betterFd != -1;
+ mutex_unlock( &uplink->rttLock );
if ( bail )
return;
- altservers_findUplink( link );
+ altservers_findUplink( uplink );
}
/**
@@ -1008,7 +1008,7 @@ static int uplink_sendKeepalive(const int fd)
return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
}
-static void uplink_addCrc32(dnbd3_connection_t *uplink)
+static void uplink_addCrc32(dnbd3_uplink_t *uplink)
{
dnbd3_image_t *image = uplink->image;
if ( image == NULL || image->virtualFilesize == 0 ) return;
@@ -1051,14 +1051,14 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
* it will be closed first. Otherwise, nothing will happen and true will be returned
* immediately.
*/
-static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force)
+static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
{
- if ( link->cacheFd != -1 ) {
+ if ( uplink->cacheFd != -1 ) {
if ( !force ) return true;
- close( link->cacheFd );
+ close( uplink->cacheFd );
}
- link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT, 0644 );
- return link->cacheFd != -1;
+ uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 );
+ return uplink->cacheFd != -1;
}
/**
@@ -1066,13 +1066,13 @@ static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force)
* Return true on success.
* Locks on: imageListLock, image.lock
*/
-static bool uplink_saveCacheMap(dnbd3_connection_t *link)
+static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink)
{
- dnbd3_image_t *image = link->image;
+ dnbd3_image_t *image = uplink->image;
assert( image != NULL );
- if ( link->cacheFd != -1 ) {
- if ( fsync( link->cacheFd ) == -1 ) {
+ if ( uplink->cacheFd != -1 ) {
+ if ( fsync( uplink->cacheFd ) == -1 ) {
// A failing fsync means we have no guarantee that any data
// since the last fsync (or open if none) has been saved. Apart
// from keeping the cache_map from the last successful fsync
@@ -1134,9 +1134,9 @@ static bool uplink_saveCacheMap(dnbd3_connection_t *link)
return true;
}
-static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link)
+static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink)
{
- return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT
- && ( _backgroundReplication != BGR_FULL || _bgrMinClients > link->image->users ) );
+ return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT
+ && ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );
}