summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2026-01-20 17:24:35 +0100
committerSimon Rettberg2026-01-20 17:24:35 +0100
commite84313f6192f3533070feea09f739265d9ec38d0 (patch)
tree48638acda8171f265fd041d9de6fd99b86e7521b
parentgithub: Use github.com/gregkh/linux instead of git.kernel.org (diff)
downloaddnbd3-e84313f6192f3533070feea09f739265d9ec38d0.tar.gz
dnbd3-e84313f6192f3533070feea09f739265d9ec38d0.tar.xz
dnbd3-e84313f6192f3533070feea09f739265d9ec38d0.zip
[SERVER] Speed up replicationHEAD
Handling block replies on uplink connections is now parallelized: - write to disk - relay data to clients - request next chunk (when BGR is active) happens in parallel now.
-rw-r--r--src/server/globals.h10
-rw-r--r--src/server/locks.h1
-rw-r--r--src/server/net.c11
-rw-r--r--src/server/uplink.c384
4 files changed, 233 insertions, 173 deletions
diff --git a/src/server/globals.h b/src/server/globals.h
index 900b86d..1a5678f 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -92,6 +92,8 @@ struct _dnbd3_uplink
pthread_t thread; // thread holding the connection
pthread_mutex_t sendMutex; // For locking socket while sending
pthread_mutex_t queueLock; // lock for synchronization on request queue etc.
+ pthread_mutex_t asyncHandleMutex; // async handling of received reply
+ pthread_cond_t asyncHandleCond; // condition for async receive handler
dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer
pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer
atomic_int rttTestResult; // RTT_*
@@ -105,7 +107,7 @@ struct _dnbd3_uplink
// If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block"
atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup.
atomic_uint_fast64_t bytesReceivedLastSave; // Number of bytes received when we last saved the cache map
- int queueLen; // length of queue
+ int queueLen; // length of queue (slots; either BGR (no client at all) or one or more clients)
int idleTime; // How many seconds the uplink was idle (apart from keep-alives)
dnbd3_queue_entry_t *queue;
atomic_uint_fast32_t queueId;
@@ -169,15 +171,15 @@ struct _dnbd3_client
{
#define HOSTNAMELEN (48)
atomic_uint_fast64_t bytesSent; // Byte counter for this client.
- dnbd3_image_t * _Atomic image; // Image in use by this client, or NULL during handshake
+ _Atomic(dnbd3_image_t *) image; // Image in use by this client, or NULL during handshake
+ pthread_t thread;
int sock;
- _Atomic uint8_t relayedCount; // How many requests are in-flight to the uplink server
+ _Atomic(uint8_t) relayedCount; // How many requests are in-flight to the uplink server
bool isServer; // true if a server in proxy mode, false if real client
dnbd3_host_t host;
char hostName[HOSTNAMELEN]; // inet_ntop version of host
pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too)
pthread_mutex_t lock;
- pthread_t thread;
};
// #######################################################
diff --git a/src/server/locks.h b/src/server/locks.h
index 3b04caa..ef2da1b 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -18,6 +18,7 @@
#define LOCK_IMAGE_LIST 150
#define LOCK_IMAGE 160
#define LOCK_UPLINK_QUEUE 170
+#define LOCK_UPLINK_ASYNC_HANDLE 175
#define LOCK_ALT_SERVER_LIST 180
#define LOCK_CLIENT_SEND 190
#define LOCK_UPLINK_RTT 200
diff --git a/src/server/net.c b/src/server/net.c
index f2f63b8..0f17e49 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -680,15 +680,9 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client)
}
ref_put( &uplink->reference );
}
- if ( client->relayedCount != 0 ) {
+ while ( client->relayedCount != 0 ) {
logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount );
- int i;
- for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) {
- usleep( 10000 );
- }
- if ( client->relayedCount != 0 ) {
- logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount );
- }
+ sleep( 1 );
}
}
mutex_lock( &client->sendMutex );
@@ -748,4 +742,3 @@ static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, u
mutex_unlock( &client->sendMutex );
client->relayedCount--;
}
-
diff --git a/src/server/uplink.c b/src/server/uplink.c
index e05d27c..19cd330 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -35,10 +35,11 @@ static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly);
static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
+static void switchToNewServer(dnbd3_uplink_t *uplink);
static void handleReceive(dnbd3_uplink_t *uplink);
static bool sendKeepalive(dnbd3_uplink_t *uplink);
static void requestCrc32List(dnbd3_uplink_t *uplink);
-static bool sendReplicationRequest(dnbd3_uplink_t *uplink);
+static int sendReplicationRequest(dnbd3_uplink_t *uplink);
static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
static bool connectionShouldShutdown(dnbd3_uplink_t *uplink);
static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
@@ -88,6 +89,8 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
// Start with one reference for the uplink thread. We'll return it when the thread finishes
ref_init( &uplink->reference, freeUplinkStruct, 1 );
mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
+ mutex_init( &uplink->asyncHandleMutex, LOCK_UPLINK_ASYNC_HANDLE );
+ pthread_cond_init( &uplink->asyncHandleCond, NULL );
mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
uplink->image = image;
@@ -204,22 +207,21 @@ static void freeUplinkStruct(ref *ref)
}
if ( uplink->current.fd != -1 ) {
close( uplink->current.fd );
- uplink->current.fd = -1;
}
if ( uplink->better.fd != -1 ) {
close( uplink->better.fd );
- uplink->better.fd = -1;
}
+ pthread_cond_destroy( &uplink->asyncHandleCond );
+ mutex_destroy( &uplink->asyncHandleMutex );
mutex_destroy( &uplink->queueLock );
mutex_destroy( &uplink->rttLock );
mutex_destroy( &uplink->sendMutex );
free( uplink->recvBuffer );
- uplink->recvBuffer = NULL;
if ( uplink->cacheFd != -1 ) {
close( uplink->cacheFd );
}
// Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code
- // of the uplink thread, depending on who set the uplink->shutdown flag. (Or uplink_init if that failed)
+ // of the uplink thread, depending on who set the uplink->shutdown flag. (Or in uplink_init if that failed)
image_release( uplink->image );
free( uplink ); // !!!
}
@@ -282,7 +284,7 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint
}
/**
- * Called by integrated fuse module
+ * Called by integrated fuse module, and iSCSI handler.
*/
bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
uint64_t handle, uint64_t start, uint32_t length)
@@ -433,6 +435,7 @@ static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_ca
// BGR
request->clients = NULL;
} else {
+ // Actual client
c = &request->clients;
}
isNew = true;
@@ -572,6 +575,7 @@ static void* uplink_mainloop(void *data)
events[EV_SIGNAL].events = POLLIN;
events[EV_SIGNAL].fd = signal_getWaitFd( uplink->signal );
events[EV_SOCKET].fd = -1;
+ events[EV_SOCKET].events = POLLIN | POLLRDHUP;
if ( uplink->rttTestResult != RTT_DOCHANGE ) {
altservers_findUplink( uplink ); // In case we didn't kickstart
}
@@ -597,40 +601,13 @@ static void* uplink_mainloop(void *data)
}
// Check if server switch is in order
if ( unlikely( uplink->rttTestResult == RTT_DOCHANGE ) ) {
- mutex_lock( &uplink->rttLock );
- assert( uplink->rttTestResult == RTT_DOCHANGE );
- uplink->rttTestResult = RTT_IDLE;
- // The rttTest worker thread has finished our request.
- // And says it's better to switch to another server
- const int fd = uplink->current.fd;
- mutex_lock( &uplink->sendMutex );
- uplink->current = uplink->better;
- mutex_unlock( &uplink->sendMutex );
- uplink->better.fd = -1;
- uplink->cycleDetected = false;
- mutex_unlock( &uplink->rttLock );
discoverFailCount = 0;
- if ( fd != -1 ) close( fd );
- uplink->image->problem.uplink = false;
- uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
+ switchToNewServer( uplink );
buffer[0] = '@';
if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) {
logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 );
setThreadName( buffer );
}
- // If we don't have a crc32 list yet, see if the new server has one
- if ( uplink->image->crc32 == NULL ) {
- requestCrc32List( uplink );
- }
- // Re-send all pending requests
- sendQueuedRequests( uplink, false );
- sendReplicationRequest( uplink );
- events[EV_SOCKET].events = POLLIN | POLLRDHUP;
- if ( uplink->image->problem.uplink ) {
- // Some of the requests above must have failed again already :-(
- logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
- connectionFailed( uplink, true );
- }
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
@@ -641,7 +618,8 @@ static void* uplink_mainloop(void *data)
uplink->image->problem.uplink = true;
logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
goto cleanup;
- } else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
+ }
+ if ( (events[EV_SIGNAL].revents & POLLIN) ) {
// signal triggered -> pending requests
if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name );
@@ -662,20 +640,26 @@ static void* uplink_mainloop(void *data)
handleReceive( uplink );
if ( _shutdown || uplink->shutdown ) goto cleanup;
}
+ // Measure time since we sent last keepalive
declare_now;
uint32_t timepassed = timing_diff( &lastKeepalive, &now );
- if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL
- || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) {
+ if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) {
+ int numSent = 0; // Number of replication requests sent
lastKeepalive = now;
uplink->idleTime += timepassed;
- // Keep-alive
+ // Background replication
if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) {
- // Send keep-alive if nothing is happening, and try to trigger background rep.
- if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) {
+ numSent = sendReplicationRequest( uplink );
+ if ( numSent == -1 ) {
connectionFailed( uplink, true );
- logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
+ logadd( LOG_DEBUG1, "Error sending BGR, image %s:%d", PIMG(uplink->image) );
}
}
+ // No BGR requests sent, send keep alive instead
+ if ( numSent == 0 && uplink->current.fd != -1 && !sendKeepalive( uplink ) ) {
+ connectionFailed( uplink, true );
+ logadd( LOG_DEBUG1, "Error sending keepalive, image %s:%d", PIMG(uplink->image) );
+ }
// Don't keep uplink established if we're idle for too much
if ( connectionShouldShutdown( uplink ) ) {
logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) );
@@ -736,7 +720,7 @@ static void* uplink_mainloop(void *data)
}
}
#endif
- }
+ } // Mainloop end
cleanup: ;
dnbd3_image_t *image = uplink->image;
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
@@ -775,8 +759,8 @@ static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly)
// Build a buffer, so if there aren't too many requests, we can send them after
// unlocking the queue again. Otherwise we need flushes during iteration, which
// is no ideal, but in that case the uplink is probably overwhelmed anyways.
- // Try 125 as that's exactly 300bytes, usually 2*MTU.
-#define MAX_RESEND_BATCH 125
+ // Try 120 as that should fit in two TCP segments on a standard 1500MTU setup.
+#define MAX_RESEND_BATCH 120
dnbd3_request_t reqs[MAX_RESEND_BATCH];
int count = 0;
mutex_lock( &uplink->queueLock );
@@ -821,6 +805,39 @@ static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly)
#undef MAX_RESEND_BATCH
}
+static void switchToNewServer(dnbd3_uplink_t *uplink)
+{
+ mutex_lock( &uplink->rttLock );
+ assert( uplink->rttTestResult == RTT_DOCHANGE );
+ uplink->rttTestResult = RTT_IDLE;
+ // The rttTest worker thread has finished our request.
+ // And says it's better to switch to another server
+ const int fd = uplink->current.fd;
+ mutex_lock( &uplink->sendMutex );
+ uplink->current = uplink->better;
+ mutex_unlock( &uplink->sendMutex );
+ uplink->better.fd = -1;
+ uplink->cycleDetected = false;
+ mutex_unlock( &uplink->rttLock );
+ if ( fd != -1 ) {
+ close( fd );
+ }
+ uplink->image->problem.uplink = false;
+ uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
+ // If we don't have a crc32 list yet, see if the new server has one
+ if ( uplink->image->crc32 == NULL ) {
+ requestCrc32List( uplink );
+ }
+ // Re-send all pending requests
+ sendQueuedRequests( uplink, false );
+ sendReplicationRequest( uplink );
+ if ( uplink->image->problem.uplink ) {
+ // Some of the requests above must have failed again already :-(
+ logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
+ connectionFailed( uplink, true );
+ }
+}
+
/**
* Send a block request to an uplink server without really having
* any client that needs that data. This will be used for background replication.
@@ -833,27 +850,27 @@ static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly)
*
* Only called form uplink thread, so current.fd is assumed to be valid.
*
- * @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
+ * @return -1 if sending request failed, number of actually sent requests otherwise (can be 0 if not necessary/disabled)
*/
-static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
+static int sendReplicationRequest(dnbd3_uplink_t *uplink)
{
assert_uplink_thread();
if ( uplink->current.fd == -1 )
- return false; // Should never be called in this state, consider send error
+ return -1; // Should never be called in this state, consider send error
if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
- return true; // Don't do background replication
+ return 0; // Don't do background replication
if ( uplink->nextReplicationIndex == -1 )
- return true; // No more blocks to replicate
+ return 0; // No more blocks to replicate
dnbd3_image_t * const image = uplink->image;
if ( image->users < _bgrMinClients )
- return true; // Not enough active users
+ return 0; // Not enough active users
const int numNewRequests = numWantedReplicationRequests( uplink );
if ( numNewRequests <= 0 )
- return true; // Already sufficient amount of requests on the wire
+ return 0; // Already sufficient amount of requests on the wire
dnbd3_cache_map_t *cache = ref_get_cachemap( image );
if ( cache == NULL ) {
// No cache map (=image complete)
- return true;
+ return 0;
}
const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize );
const int lastBlockIndex = mapBytes - 1;
@@ -905,7 +922,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)",
PIMG(uplink->image) );
ref_put( &cache->reference );
- return false;
+ return -1;
}
if ( replicationIndex == lastBlockIndex ) {
uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks
@@ -920,7 +937,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
}
}
ref_put( &cache->reference );
- return true;
+ return numNewRequests;
}
/**
@@ -972,6 +989,16 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa
return retval;
}
+struct reply_async_data {
+ dnbd3_uplink_t *uplink;
+ dnbd3_reply_t *inReply;
+ dnbd3_queue_entry_t *entry;
+ atomic_int jobs;
+};
+
+static void* handleReceiveSaveToDisk(void* param);
+static void* handleReceiveSendToClients(void* param);
+
/**
* Receive data from uplink server and process/dispatch
* Locks on: uplink.lock, images[].lock
@@ -980,13 +1007,20 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa
static void handleReceive(dnbd3_uplink_t *uplink)
{
dnbd3_reply_t inReply;
+ struct reply_async_data tparams = {
+ .uplink = uplink,
+ .inReply = &inReply,
+ };
int ret;
+ bool bailout = false;
assert_uplink_thread();
assert( uplink->queueLen >= 0 );
- for (;;) {
+ while ( !bailout ) {
ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
- if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
- if ( ret == REPLY_AGAIN ) break;
+ if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) )
+ continue;
+ if ( ret == REPLY_AGAIN )
+ break;
if ( unlikely( ret == REPLY_CLOSED ) ) {
logadd( LOG_INFO, "Uplink: Remote host hung up (%s:%d)", PIMG(uplink->image) );
goto error_cleanup;
@@ -1023,129 +1057,158 @@ static void handleReceive(dnbd3_uplink_t *uplink)
// Is a legit block reply
totalBytesReceived += inReply.size;
uplink->bytesReceived += inReply.size;
- // Get entry from queue
- dnbd3_queue_entry_t *entry;
+ // Get entry from queue and remove
+ dnbd3_queue_entry_t *entry = NULL;
mutex_lock( &uplink->queueLock );
- for ( entry = uplink->queue; entry != NULL; entry = entry->next ) {
- if ( entry->handle == inReply.handle )
+ for ( dnbd3_queue_entry_t **it = &uplink->queue; *it != NULL; it = &(**it).next ) {
+ if ( (**it).handle == inReply.handle ) {
+ entry = *it;
+ *it = (**it).next;
+ uplink->queueLen--;
break;
+ }
}
if ( entry == NULL ) {
- mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ mutex_unlock( &uplink->queueLock );
logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)",
inReply.handle, PIMG(uplink->image) );
continue;
}
- const uint64_t start = entry->from;
- const uint64_t end = entry->to;
- mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock!
+ tparams.entry = entry;
+ if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = false;
+ }
+ mutex_unlock( &uplink->queueLock );
// We don't remove the entry from the list here yet, to slightly increase the chance of other
// clients attaching to this request while we write the data to disk
- if ( end - start != inReply.size ) {
+ if ( entry->to - entry->from != inReply.size ) {
logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)",
- inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) );
+ inReply.size, (unsigned int)( entry->to - entry->from ), PIMG(uplink->image) );
}
+ tparams.jobs = 0;
// 1) Write to cache file
if ( unlikely( uplink->cacheFd == -1 ) ) {
reopenCacheFd( uplink, false );
}
if ( likely( uplink->cacheFd != -1 ) ) {
- int err = 0;
- bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid
- uint32_t done = 0;
- ret = 0;
- while ( done < inReply.size ) {
- ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done );
- if ( unlikely( ret == -1 ) ) {
- err = errno;
- if ( err == EINTR && !_shutdown ) continue;
- if ( err == ENOSPC || err == EDQUOT ) {
- // try to free 256MiB
- if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break;
- tryAgain = false;
- continue; // Success, retry write
- }
- if ( err == EBADF || err == EINVAL || err == EIO ) {
- uplink->image->problem.write = true;
- if ( !tryAgain || !reopenCacheFd( uplink, true ) )
- break;
- tryAgain = false;
- continue; // Write handle to image successfully re-opened, try again
- }
- logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d",
- PIMG(uplink->image), err );
- break;
- }
- if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) {
- logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d",
- ret, PIMG(uplink->image) );
- break;
- }
- done += (uint32_t)ret;
- }
- if ( likely( done > 0 ) ) {
- image_updateCachemap( uplink->image, start, start + done, true );
- }
- if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
- logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
- PIMG(uplink->image), err );
- }
- }
- bool found = false;
- dnbd3_queue_entry_t **it;
- mutex_lock( &uplink->queueLock );
- for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) {
- if ( *it == entry && entry->handle == inReply.handle ) { // ABA check
- assert( found == false );
- *it = (**it).next;
- found = true;
- uplink->queueLen--;
- break;
- }
- }
- if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) {
- uplink->image->problem.queue = false;
+ tparams.jobs++;
+ threadpool_run( &handleReceiveSaveToDisk, &tparams, NULL );
}
- mutex_unlock( &uplink->queueLock );
- if ( !found ) {
- logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)",
- PIMG(uplink->image) );
- continue;
+ // 2) Send to any waiting clients
+ if ( entry->clients != NULL ) {
+ tparams.jobs++;
+ threadpool_run( &handleReceiveSendToClients, &tparams, NULL );
}
- dnbd3_queue_client_t *next;
- for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
- assert( c->from >= start && c->to <= end );
- (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ),
- (const char*)( uplink->recvBuffer + (c->from - start) ) );
- next = c->next;
- free( c );
+ // 3) Trigger more background replication if applicable
+ if ( sendReplicationRequest( uplink ) == -1 ) {
+ bailout = true;
}
- if ( entry->clients != NULL ) {
- // Was some client -- reset idle counter
- uplink->idleTime = 0;
- // Re-enable replication if disabled
- if ( uplink->nextReplicationIndex == -1 ) {
- uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
- }
- } else {
- if ( uplink->cacheFd != -1 ) {
- // Try to remove from fs cache if no client was interested in this data
- posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED );
- }
+ // 4) Wait for both jobs
+ mutex_lock( &uplink->asyncHandleMutex );
+ while (tparams.jobs > 0) {
+ pthread_cond_wait( &uplink->asyncHandleCond, &uplink->asyncHandleMutex );
}
+ mutex_unlock( &uplink->asyncHandleMutex );
free( entry );
} // main receive loop
- // Trigger background replication if applicable
- if ( !sendReplicationRequest( uplink ) ) {
- goto error_cleanup;
- }
- // Normal end
- return;
- // Error handling from failed receive or message parsing
-error_cleanup: ;
+ if ( !bailout ) {
+ // Try one more time to send more replication requests
+ sendReplicationRequest( uplink );
+ return;
+ }
+ // Error occurred, cleanup
+error_cleanup:
connectionFailed( uplink, true );
}
+static void* handleReceiveSaveToDisk(void* param)
+{
+ struct reply_async_data *tparams = (struct reply_async_data*)param;
+ dnbd3_reply_t *inReply = tparams->inReply;
+ dnbd3_uplink_t *uplink = tparams->uplink;
+ dnbd3_queue_entry_t *entry = tparams->entry;
+ int err = 0;
+ int ret = 0;
+ bool tryAgain = true; // Allow one retry in case we run out of space or the write-fd became invalid
+ uint32_t done = 0;
+
+ while ( done < inReply->size ) {
+ ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply->size - done,
+ entry->from + done );
+ if ( unlikely( ret == -1 ) ) {
+ err = errno;
+ if ( err == EINTR && !_shutdown ) continue;
+ if ( err == ENOSPC || err == EDQUOT ) {
+ // try to free 256MiB
+ if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break;
+ tryAgain = false;
+ continue; // Success, retry write
+ }
+ if ( err == EBADF || err == EINVAL || err == EIO ) {
+ uplink->image->problem.write = true;
+ if ( !tryAgain || !reopenCacheFd( uplink, true ) )
+ break;
+ tryAgain = false;
+ continue; // Write-handle to image successfully re-opened, try again
+ }
+ logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d",
+ PIMG(uplink->image), err );
+ break;
+ }
+ if ( unlikely( ret <= 0 || (uint32_t)ret > inReply->size - done ) ) {
+ logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d",
+ ret, PIMG(uplink->image) );
+ break;
+ }
+ done += (uint32_t)ret;
+ }
+ if ( likely( done > 0 ) ) {
+ image_updateCachemap( uplink->image, entry->from, entry->from + done, true );
+ }
+ if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) {
+ logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.",
+ PIMG(uplink->image), err );
+ }
+ if ( uplink->cacheFd != -1 ) {
+ // Try to remove from fs cache if no client was interested in this data
+ posix_fadvise( uplink->cacheFd, entry->from, inReply->size, POSIX_FADV_DONTNEED );
+ }
+ mutex_lock( &uplink->asyncHandleMutex );
+ if ( --tparams->jobs == 0 ) {
+ pthread_cond_signal( &uplink->asyncHandleCond );
+ }
+ mutex_unlock( &uplink->asyncHandleMutex );
+ return NULL;
+}
+
+static void* handleReceiveSendToClients(void* param)
+{
+ struct reply_async_data *tparams = (struct reply_async_data*)param;
+ dnbd3_uplink_t *uplink = tparams->uplink;
+ dnbd3_queue_entry_t *entry = tparams->entry;
+ dnbd3_queue_client_t *next;
+
+ for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
+ assert( c->from >= entry->from && c->to <= entry->to );
+ (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ),
+ (const char*)( uplink->recvBuffer + (c->from - entry->from) ) );
+ next = c->next;
+ free( c );
+ }
+ // Was some client -- reset idle counter
+ uplink->idleTime = 0;
+ // Re-enable replication if disabled
+ if ( uplink->nextReplicationIndex == -1 ) {
+ uplink->nextReplicationIndex = (int)( entry->from / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK;
+ }
+ mutex_lock( &uplink->asyncHandleMutex );
+ if ( --tparams->jobs == 0 ) {
+ pthread_cond_signal( &uplink->asyncHandleCond );
+ }
+ mutex_unlock( &uplink->asyncHandleMutex );
+ return NULL;
+}
+
/**
* Only call from uplink thread
*/
@@ -1283,14 +1346,14 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
* meet the configured bgrWindowSize. Returns 0 if any client requests
* are pending.
* This applies a sort of "slow start" in case the uplink was recently
- * dealing with actual client requests, in that the uplink's idle time
- * (in seconds) is an upper bound for the number returned, so we don't
- * saturate the uplink with loads of requests right away, in case that
- * client triggers more requests to the uplink server.
+ * dealing with actual client requests. In that case the uplink's idle
+ * time (in seconds) is an upper bound for the number returned. So we
+ * don't saturate the uplink with loads of requests right away, in case
+ * that client triggers more requests to the uplink server.
*/
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink)
{
- int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 );
+ int ret = MIN( _bgrWindowSize, uplink->idleTime );
if ( uplink->queueLen == 0 )
return ret;
mutex_lock( &uplink->queueLock );
@@ -1303,7 +1366,8 @@ static int numWantedReplicationRequests(dnbd3_uplink_t *uplink)
}
}
mutex_unlock( &uplink->queueLock );
- return ret;
+
+ return MAX( 0, ret );
}
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle)