summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2020-03-03 12:21:01 +0100
committerSimon Rettberg2020-03-03 12:21:01 +0100
commit26c1ad7af0f5749c5343a5823b9c8cece885ce84 (patch)
tree0fe45f629560edb47bd86c7dc78b69715348b600 /src/server/uplink.c
parent[SERVER] altservers: Fix missing index mapping (replication) (diff)
downloaddnbd3-26c1ad7af0f5749c5343a5823b9c8cece885ce84.tar.gz
dnbd3-26c1ad7af0f5749c5343a5823b9c8cece885ce84.tar.xz
dnbd3-26c1ad7af0f5749c5343a5823b9c8cece885ce84.zip
[SERVER] Remove "working" flag, introduce fine-grained flags
Tracking the "working" state of images using one boolean is insufficient regarding the different ways in which providing an image can fail. Introduce separate flags for different conditions, like "file not readable", "file not writable", "no uplink server available", "file content has changed".
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c114
1 files changed, 76 insertions, 38 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index f39e633..aba53ba 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -53,9 +53,9 @@ static void* uplink_mainloop(void *data);
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
static void uplink_handleReceive(dnbd3_uplink_t *uplink);
-static int uplink_sendKeepalive(const int fd);
+static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink);
static void uplink_addCrc32(dnbd3_uplink_t *uplink);
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
+static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink);
static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink);
@@ -117,6 +117,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
uplink->cycleDetected = false;
+ image->problem.uplink = true;
if ( sock != -1 ) {
uplink->better.fd = sock;
int index = altservers_hostToIndex( host );
@@ -371,6 +372,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
} else {
if ( unlikely( uplink->current.fd == -1 ) ) {
+ uplink->image->problem.uplink = true;
mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
} else {
@@ -378,12 +380,14 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
if ( hops < 200 ) ++hops;
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
if ( unlikely( !ret ) ) {
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
} else {
// Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again
int state;
+ mutex_unlock( &uplink->sendMutex );
mutex_lock( &uplink->queueLock );
if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) {
state = uplink->queue[freeSlot].status;
@@ -460,9 +464,9 @@ static void* uplink_mainloop(void *data)
}
while ( !_shutdown && !uplink->shutdown ) {
// poll()
- waitTime = uplink->rttTestResult == RTT_DOCHANGE ? 0 : -1;
- if ( waitTime == 0 ) {
+ if ( uplink->rttTestResult == RTT_DOCHANGE ) {
// 0 means poll, since we're about to change the server
+ waitTime = 0;
} else {
declare_now;
waitTime = (int)timing_diffMs( &now, &nextAltCheck );
@@ -495,7 +499,7 @@ static void* uplink_mainloop(void *data)
discoverFailCount = 0;
if ( fd != -1 ) close( fd );
uplink->replicationHandle = REP_NONE;
- uplink->image->working = true;
+ uplink->image->problem.uplink = false;
uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
buffer[0] = '@';
if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -510,6 +514,11 @@ static void* uplink_mainloop(void *data)
uplink_sendRequests( uplink, false );
uplink_sendReplicationRequest( uplink );
events[EV_SOCKET].events = POLLIN | POLLRDHUP;
+ if ( uplink->image->problem.uplink ) {
+ // Some of the requests above must have failed again already :-(
+ logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
+ uplink_connectionFailed( uplink, true );
+ }
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
@@ -517,6 +526,7 @@ static void* uplink_mainloop(void *data)
// Check events
// Signal
if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ uplink->image->problem.uplink = true;
logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
goto cleanup;
} else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
@@ -553,14 +563,10 @@ static void* uplink_mainloop(void *data)
}
// Keep-alive
if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) {
- // Send keep-alive if nothing is happening
- if ( uplink_sendKeepalive( uplink->current.fd ) ) {
- // Re-trigger periodically, in case it requires a minimum user count
- uplink_sendReplicationRequest( uplink );
- } else {
+ // Send keep-alive if nothing is happening, and try to trigger background rep.
+ if ( !uplink_sendKeepalive( uplink ) || !uplink_sendReplicationRequest( uplink ) ) {
uplink_connectionFailed( uplink, true );
- logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" );
- setThreadName( "panic-uplink" );
+ logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
}
}
// Don't keep uplink established if we're idle for too much
@@ -578,6 +584,7 @@ static void* uplink_mainloop(void *data)
// Quit work if image is complete
logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name );
setThreadName( "finished-uplink" );
+ uplink->image->problem.uplink = false;
goto cleanup;
} else {
// Not complete - do measurement
@@ -592,10 +599,6 @@ static void* uplink_mainloop(void *data)
} else if ( rttTestResult == RTT_NOT_REACHABLE ) {
if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) {
discoverFailCount++;
- if ( uplink->image->working && uplink->current.fd == -1 && discoverFailCount > (SERVER_RTT_MAX_UNREACH / 2) ) {
- logadd( LOG_DEBUG1, "Disabling %s:%d since no uplink is available", uplink->image->name, (int)uplink->image->rid );
- uplink->image->working = false;
- }
if ( uplink->current.fd == -1 ) {
uplink->cycleDetected = false;
}
@@ -624,8 +627,9 @@ static void* uplink_mainloop(void *data)
}
}
mutex_unlock( &uplink->queueLock );
- if ( resend )
+ if ( resend ) {
uplink_sendRequests( uplink, true );
+ }
}
#endif
}
@@ -653,6 +657,9 @@ static void* uplink_mainloop(void *data)
return NULL ;
}
+/**
+ * Only called from uplink thread.
+ */
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
// Scan for new requests
@@ -672,13 +679,15 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
if ( hops < 200 ) ++hops;
mutex_lock( &uplink->sendMutex );
const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !ret ) {
+ if ( likely( ret ) ) {
+ mutex_unlock( &uplink->sendMutex );
+ } else {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" );
- altservers_serverFailed( uplink->current.index );
return;
}
mutex_lock( &uplink->queueLock );
@@ -695,21 +704,27 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
* server. This means we might request data we already have, but it makes
* the code simpler. Worst case would be only one bit is zero, which means
* 4kb are missing, but we will request 32kb.
+ *
+ * Only called form uplink thread, so current.fd is assumed to be valid.
+ *
+ * @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
*/
-static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
+static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
{
- if ( uplink == NULL || uplink->current.fd == -1 ) return;
- if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication
+ if ( uplink->current.fd == -1 )
+ return false; // Should never be called in this state, consider send error
+ if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
+ return true; // Don't do background replication
if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE )
- return; // Already a replication request on the wire, or no more blocks to replicate
+ return true; // Already a replication request on the wire, or no more blocks to replicate
dnbd3_image_t * const image = uplink->image;
- if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return;
- if ( image->users < _bgrMinClients ) return; // Not enough active users
+ if ( image->users < _bgrMinClients )
+ return true; // Not enough active users
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
- if ( cache == NULL || image->users < _bgrMinClients ) {
+ if ( cache == NULL || image->users ) {
// No cache map (=image complete)
ref_put( &cache->reference );
- return;
+ return true;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
const int lastBlockIndex = mapBytes - 1;
@@ -741,17 +756,20 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
if ( replicationIndex == -1 ) {
// Replication might be complete, uplink_mainloop should take care....
uplink->nextReplicationIndex = -1;
- return;
+ return true;
}
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
uplink->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) );
- mutex_unlock( &uplink->sendMutex );
- if ( !sendOk ) {
+ if ( likely( sendOk ) ) {
+ mutex_unlock( &uplink->sendMutex );
+ } else {
+ uplink->image->problem.uplink = true;
+ mutex_unlock( &uplink->sendMutex );
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
- return;
+ return false;
}
if ( replicationIndex == lastBlockIndex ) {
uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
@@ -762,6 +780,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
// Just crossed a hash block boundary, look for new candidate starting at this very index
uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
}
+ return true;
}
/**
@@ -816,6 +835,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
/**
* Receive data from uplink server and process/dispatch
* Locks on: uplink.lock, images[].lock
+ * Only called from uplink thread, so current.fd is assumed to be valid.
*/
static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
@@ -990,11 +1010,14 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
mutex_lock( &uplink->queueLock );
const bool rep = ( uplink->queueLen == 0 );
mutex_unlock( &uplink->queueLock );
- if ( rep ) uplink_sendReplicationRequest( uplink );
+ if ( rep ) {
+ if ( !uplink_sendReplicationRequest( uplink ) )
+ goto error_cleanup;
+ }
}
return;
// Error handling from failed receive or message parsing
- error_cleanup: ;
+error_cleanup: ;
uplink_connectionFailed( uplink, true );
}
@@ -1005,8 +1028,10 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
if ( uplink->current.fd == -1 )
return;
+ setThreadName( "panic-uplink" );
altservers_serverFailed( uplink->current.index );
mutex_lock( &uplink->sendMutex );
+ uplink->image->problem.uplink = true;
close( uplink->current.fd );
uplink->current.fd = -1;
mutex_unlock( &uplink->sendMutex );
@@ -1025,14 +1050,24 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
}
/**
- * Send keep alive request to server
+ * Send keep alive request to server.
+ * Called from uplink thread, current.fd must be valid.
*/
-static int uplink_sendKeepalive(const int fd)
+static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink)
{
static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
- return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_lock( &uplink->sendMutex );
+ bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ mutex_unlock( &uplink->sendMutex );
+ return sendOk;
}
+/**
+ * Request crclist from uplink.
+ * Called from uplink thread, current.fd must be valid.
+ * FIXME This is broken as it could happen that another message arrives after sending
+ * the request. Refactor, split and move receive into general receive handler.
+ */
static void uplink_addCrc32(dnbd3_uplink_t *uplink)
{
dnbd3_image_t *image = uplink->image;
@@ -1042,6 +1077,9 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
uint32_t *buffer = malloc( bytes );
mutex_lock( &uplink->sendMutex );
bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes );
+ if ( !sendOk ) {
+ uplink->image->problem.uplink = true;
+ }
mutex_unlock( &uplink->sendMutex );
if ( !sendOk || bytes == 0 ) {
free( buffer );