diff options
| author | Simon Rettberg | 2026-01-20 17:24:35 +0100 |
|---|---|---|
| committer | Simon Rettberg | 2026-01-20 17:24:35 +0100 |
| commit | e84313f6192f3533070feea09f739265d9ec38d0 (patch) | |
| tree | 48638acda8171f265fd041d9de6fd99b86e7521b | |
| parent | github: Use github.com/gregkh/linux instead of git.kernel.org (diff) | |
| download | dnbd3-e84313f6192f3533070feea09f739265d9ec38d0.tar.gz dnbd3-e84313f6192f3533070feea09f739265d9ec38d0.tar.xz dnbd3-e84313f6192f3533070feea09f739265d9ec38d0.zip | |
[SERVER] Speed up replicationHEAD
Handling block replies on uplink connections is now parallelized:
- write to disk
- relay data to clients
- request next chunk (when BGR is active)
happens in parallel now.
| -rw-r--r-- | src/server/globals.h | 10 | ||||
| -rw-r--r-- | src/server/locks.h | 1 | ||||
| -rw-r--r-- | src/server/net.c | 11 | ||||
| -rw-r--r-- | src/server/uplink.c | 384 |
4 files changed, 233 insertions, 173 deletions
diff --git a/src/server/globals.h b/src/server/globals.h index 900b86d..1a5678f 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -92,6 +92,8 @@ struct _dnbd3_uplink pthread_t thread; // thread holding the connection pthread_mutex_t sendMutex; // For locking socket while sending pthread_mutex_t queueLock; // lock for synchronization on request queue etc. + pthread_mutex_t asyncHandleMutex; // async handling of received reply + pthread_cond_t asyncHandleCond; // condition for async receive handler dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer atomic_int rttTestResult; // RTT_* @@ -105,7 +107,7 @@ struct _dnbd3_uplink // If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block" atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup. atomic_uint_fast64_t bytesReceivedLastSave; // Number of bytes received when we last saved the cache map - int queueLen; // length of queue + int queueLen; // length of queue (slots; either BGR (no client at all) or one or more clients) int idleTime; // How many seconds the uplink was idle (apart from keep-alives) dnbd3_queue_entry_t *queue; atomic_uint_fast32_t queueId; @@ -169,15 +171,15 @@ struct _dnbd3_client { #define HOSTNAMELEN (48) atomic_uint_fast64_t bytesSent; // Byte counter for this client. - dnbd3_image_t * _Atomic image; // Image in use by this client, or NULL during handshake + _Atomic(dnbd3_image_t *) image; // Image in use by this client, or NULL during handshake + pthread_t thread; int sock; - _Atomic uint8_t relayedCount; // How many requests are in-flight to the uplink server + _Atomic(uint8_t) relayedCount; // How many requests are in-flight to the uplink server bool isServer; // true if a server in proxy mode, false if real client dnbd3_host_t host; char hostName[HOSTNAMELEN]; // inet_ntop version of host pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too) pthread_mutex_t lock; - pthread_t thread; }; // ####################################################### diff --git a/src/server/locks.h b/src/server/locks.h index 3b04caa..ef2da1b 100644 --- a/src/server/locks.h +++ b/src/server/locks.h @@ -18,6 +18,7 @@ #define LOCK_IMAGE_LIST 150 #define LOCK_IMAGE 160 #define LOCK_UPLINK_QUEUE 170 +#define LOCK_UPLINK_ASYNC_HANDLE 175 #define LOCK_ALT_SERVER_LIST 180 #define LOCK_CLIENT_SEND 190 #define LOCK_UPLINK_RTT 200 diff --git a/src/server/net.c b/src/server/net.c index f2f63b8..0f17e49 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -680,15 +680,9 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) } ref_put( &uplink->reference ); } - if ( client->relayedCount != 0 ) { + while ( client->relayedCount != 0 ) { logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); - int i; - for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { - usleep( 10000 ); - } - if ( client->relayedCount != 0 ) { - logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); - } + sleep( 1 ); } } mutex_lock( &client->sendMutex ); @@ -748,4 +742,3 @@ static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, u mutex_unlock( &client->sendMutex ); client->relayedCount--; } - diff --git a/src/server/uplink.c b/src/server/uplink.c index e05d27c..19cd330 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -35,10 +35,11 @@ static void freeUplinkStruct(ref *ref); static void* uplink_mainloop(void *data); static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly); static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex); +static void switchToNewServer(dnbd3_uplink_t *uplink); static void handleReceive(dnbd3_uplink_t *uplink); static bool sendKeepalive(dnbd3_uplink_t *uplink); static void requestCrc32List(dnbd3_uplink_t *uplink); -static bool sendReplicationRequest(dnbd3_uplink_t *uplink); +static int sendReplicationRequest(dnbd3_uplink_t *uplink); static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force); static bool connectionShouldShutdown(dnbd3_uplink_t *uplink); static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew); @@ -88,6 +89,8 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version // Start with one reference for the uplink thread. We'll return it when the thread finishes ref_init( &uplink->reference, freeUplinkStruct, 1 ); mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE ); + mutex_init( &uplink->asyncHandleMutex, LOCK_UPLINK_ASYNC_HANDLE ); + pthread_cond_init( &uplink->asyncHandleCond, NULL ); mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT ); mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND ); uplink->image = image; @@ -204,22 +207,21 @@ static void freeUplinkStruct(ref *ref) } if ( uplink->current.fd != -1 ) { close( uplink->current.fd ); - uplink->current.fd = -1; } if ( uplink->better.fd != -1 ) { close( uplink->better.fd ); - uplink->better.fd = -1; } + pthread_cond_destroy( &uplink->asyncHandleCond ); + mutex_destroy( &uplink->asyncHandleMutex ); mutex_destroy( &uplink->queueLock ); mutex_destroy( &uplink->rttLock ); mutex_destroy( &uplink->sendMutex ); free( uplink->recvBuffer ); - uplink->recvBuffer = NULL; if ( uplink->cacheFd != -1 ) { close( uplink->cacheFd ); } // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code - // of the uplink thread, depending on who set the uplink->shutdown flag. (Or uplink_init if that failed) + // of the uplink thread, depending on who set the uplink->shutdown flag. (Or in uplink_init if that failed) image_release( uplink->image ); free( uplink ); // !!! } @@ -282,7 +284,7 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint } /** - * Called by integrated fuse module + * Called by integrated fuse module, and iSCSI handler. */ bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length) @@ -433,6 +435,7 @@ static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_ca // BGR request->clients = NULL; } else { + // Actual client c = &request->clients; } isNew = true; @@ -572,6 +575,7 @@ static void* uplink_mainloop(void *data) events[EV_SIGNAL].events = POLLIN; events[EV_SIGNAL].fd = signal_getWaitFd( uplink->signal ); events[EV_SOCKET].fd = -1; + events[EV_SOCKET].events = POLLIN | POLLRDHUP; if ( uplink->rttTestResult != RTT_DOCHANGE ) { altservers_findUplink( uplink ); // In case we didn't kickstart } @@ -597,40 +601,13 @@ static void* uplink_mainloop(void *data) } // Check if server switch is in order if ( unlikely( uplink->rttTestResult == RTT_DOCHANGE ) ) { - mutex_lock( &uplink->rttLock ); - assert( uplink->rttTestResult == RTT_DOCHANGE ); - uplink->rttTestResult = RTT_IDLE; - // The rttTest worker thread has finished our request. - // And says it's better to switch to another server - const int fd = uplink->current.fd; - mutex_lock( &uplink->sendMutex ); - uplink->current = uplink->better; - mutex_unlock( &uplink->sendMutex ); - uplink->better.fd = -1; - uplink->cycleDetected = false; - mutex_unlock( &uplink->rttLock ); discoverFailCount = 0; - if ( fd != -1 ) close( fd ); - uplink->image->problem.uplink = false; - uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received + switchToNewServer( uplink ); buffer[0] = '@'; if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) { logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 ); setThreadName( buffer ); } - // If we don't have a crc32 list yet, see if the new server has one - if ( uplink->image->crc32 == NULL ) { - requestCrc32List( uplink ); - } - // Re-send all pending requests - sendQueuedRequests( uplink, false ); - sendReplicationRequest( uplink ); - events[EV_SOCKET].events = POLLIN | POLLRDHUP; - if ( uplink->image->problem.uplink ) { - // Some of the requests above must have failed again already :-( - logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" ); - connectionFailed( uplink, true ); - } timing_gets( &nextAltCheck, altCheckInterval ); // The rtt worker already did the handshake for our image, so there's nothing // more to do here @@ -641,7 +618,8 @@ static void* uplink_mainloop(void *data) uplink->image->problem.uplink = true; logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" ); goto cleanup; - } else if ( (events[EV_SIGNAL].revents & POLLIN) ) { + } + if ( (events[EV_SIGNAL].revents & POLLIN) ) { // signal triggered -> pending requests if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name ); @@ -662,20 +640,26 @@ static void* uplink_mainloop(void *data) handleReceive( uplink ); if ( _shutdown || uplink->shutdown ) goto cleanup; } + // Measure time since we sent last keepalive declare_now; uint32_t timepassed = timing_diff( &lastKeepalive, &now ); - if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL - || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) { + if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { + int numSent = 0; // Number of replication requests sent lastKeepalive = now; uplink->idleTime += timepassed; - // Keep-alive + // Background replication if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) { - // Send keep-alive if nothing is happening, and try to trigger background rep. - if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) { + numSent = sendReplicationRequest( uplink ); + if ( numSent == -1 ) { connectionFailed( uplink, true ); - logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" ); + logadd( LOG_DEBUG1, "Error sending BGR, image %s:%d", PIMG(uplink->image) ); } } + // No BGR requests sent, send keep alive instead + if ( numSent == 0 && uplink->current.fd != -1 && !sendKeepalive( uplink ) ) { + connectionFailed( uplink, true ); + logadd( LOG_DEBUG1, "Error sending keepalive, image %s:%d", PIMG(uplink->image) ); + } // Don't keep uplink established if we're idle for too much if ( connectionShouldShutdown( uplink ) ) { logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) ); @@ -736,7 +720,7 @@ static void* uplink_mainloop(void *data) } } #endif - } + } // Mainloop end cleanup: ; dnbd3_image_t *image = uplink->image; dnbd3_cache_map_t *cache = ref_get_cachemap( image ); @@ -775,8 +759,8 @@ static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly) // Build a buffer, so if there aren't too many requests, we can send them after // unlocking the queue again. Otherwise we need flushes during iteration, which // is no ideal, but in that case the uplink is probably overwhelmed anyways. - // Try 125 as that's exactly 300bytes, usually 2*MTU. -#define MAX_RESEND_BATCH 125 + // Try 120 as that should fit in two TCP segments on a standard 1500MTU setup. +#define MAX_RESEND_BATCH 120 dnbd3_request_t reqs[MAX_RESEND_BATCH]; int count = 0; mutex_lock( &uplink->queueLock ); @@ -821,6 +805,39 @@ static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly) #undef MAX_RESEND_BATCH } +static void switchToNewServer(dnbd3_uplink_t *uplink) +{ + mutex_lock( &uplink->rttLock ); + assert( uplink->rttTestResult == RTT_DOCHANGE ); + uplink->rttTestResult = RTT_IDLE; + // The rttTest worker thread has finished our request. + // And says it's better to switch to another server + const int fd = uplink->current.fd; + mutex_lock( &uplink->sendMutex ); + uplink->current = uplink->better; + mutex_unlock( &uplink->sendMutex ); + uplink->better.fd = -1; + uplink->cycleDetected = false; + mutex_unlock( &uplink->rttLock ); + if ( fd != -1 ) { + close( fd ); + } + uplink->image->problem.uplink = false; + uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received + // If we don't have a crc32 list yet, see if the new server has one + if ( uplink->image->crc32 == NULL ) { + requestCrc32List( uplink ); + } + // Re-send all pending requests + sendQueuedRequests( uplink, false ); + sendReplicationRequest( uplink ); + if ( uplink->image->problem.uplink ) { + // Some of the requests above must have failed again already :-( + logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" ); + connectionFailed( uplink, true ); + } +} + /** * Send a block request to an uplink server without really having * any client that needs that data. This will be used for background replication. @@ -833,27 +850,27 @@ static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly) * * Only called form uplink thread, so current.fd is assumed to be valid. * - * @return false if sending request failed, true otherwise (i.e. not necessary/disabled) + * @return -1 if sending request failed, number of actually sent requests otherwise (can be 0 if not necessary/disabled) */ -static bool sendReplicationRequest(dnbd3_uplink_t *uplink) +static int sendReplicationRequest(dnbd3_uplink_t *uplink) { assert_uplink_thread(); if ( uplink->current.fd == -1 ) - return false; // Should never be called in this state, consider send error + return -1; // Should never be called in this state, consider send error if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) - return true; // Don't do background replication + return 0; // Don't do background replication if ( uplink->nextReplicationIndex == -1 ) - return true; // No more blocks to replicate + return 0; // No more blocks to replicate dnbd3_image_t * const image = uplink->image; if ( image->users < _bgrMinClients ) - return true; // Not enough active users + return 0; // Not enough active users const int numNewRequests = numWantedReplicationRequests( uplink ); if ( numNewRequests <= 0 ) - return true; // Already sufficient amount of requests on the wire + return 0; // Already sufficient amount of requests on the wire dnbd3_cache_map_t *cache = ref_get_cachemap( image ); if ( cache == NULL ) { // No cache map (=image complete) - return true; + return 0; } const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); const int lastBlockIndex = mapBytes - 1; @@ -905,7 +922,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink) logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)", PIMG(uplink->image) ); ref_put( &cache->reference ); - return false; + return -1; } if ( replicationIndex == lastBlockIndex ) { uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks @@ -920,7 +937,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink) } } ref_put( &cache->reference ); - return true; + return numNewRequests; } /** @@ -972,6 +989,16 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa return retval; } +struct reply_async_data { + dnbd3_uplink_t *uplink; + dnbd3_reply_t *inReply; + dnbd3_queue_entry_t *entry; + atomic_int jobs; +}; + +static void* handleReceiveSaveToDisk(void* param); +static void* handleReceiveSendToClients(void* param); + /** * Receive data from uplink server and process/dispatch * Locks on: uplink.lock, images[].lock @@ -980,13 +1007,20 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa static void handleReceive(dnbd3_uplink_t *uplink) { dnbd3_reply_t inReply; + struct reply_async_data tparams = { + .uplink = uplink, + .inReply = &inReply, + }; int ret; + bool bailout = false; assert_uplink_thread(); assert( uplink->queueLen >= 0 ); - for (;;) { + while ( !bailout ) { ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); - if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; - if ( ret == REPLY_AGAIN ) break; + if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) + continue; + if ( ret == REPLY_AGAIN ) + break; if ( unlikely( ret == REPLY_CLOSED ) ) { logadd( LOG_INFO, "Uplink: Remote host hung up (%s:%d)", PIMG(uplink->image) ); goto error_cleanup; @@ -1023,129 +1057,158 @@ static void handleReceive(dnbd3_uplink_t *uplink) // Is a legit block reply totalBytesReceived += inReply.size; uplink->bytesReceived += inReply.size; - // Get entry from queue - dnbd3_queue_entry_t *entry; + // Get entry from queue and remove + dnbd3_queue_entry_t *entry = NULL; mutex_lock( &uplink->queueLock ); - for ( entry = uplink->queue; entry != NULL; entry = entry->next ) { - if ( entry->handle == inReply.handle ) + for ( dnbd3_queue_entry_t **it = &uplink->queue; *it != NULL; it = &(**it).next ) { + if ( (**it).handle == inReply.handle ) { + entry = *it; + *it = (**it).next; + uplink->queueLen--; break; + } } if ( entry == NULL ) { - mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + mutex_unlock( &uplink->queueLock ); logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)", inReply.handle, PIMG(uplink->image) ); continue; } - const uint64_t start = entry->from; - const uint64_t end = entry->to; - mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + tparams.entry = entry; + if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = false; + } + mutex_unlock( &uplink->queueLock ); // We don't remove the entry from the list here yet, to slightly increase the chance of other // clients attaching to this request while we write the data to disk - if ( end - start != inReply.size ) { + if ( entry->to - entry->from != inReply.size ) { logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)", - inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) ); + inReply.size, (unsigned int)( entry->to - entry->from ), PIMG(uplink->image) ); } + tparams.jobs = 0; // 1) Write to cache file if ( unlikely( uplink->cacheFd == -1 ) ) { reopenCacheFd( uplink, false ); } if ( likely( uplink->cacheFd != -1 ) ) { - int err = 0; - bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid - uint32_t done = 0; - ret = 0; - while ( done < inReply.size ) { - ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done ); - if ( unlikely( ret == -1 ) ) { - err = errno; - if ( err == EINTR && !_shutdown ) continue; - if ( err == ENOSPC || err == EDQUOT ) { - // try to free 256MiB - if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break; - tryAgain = false; - continue; // Success, retry write - } - if ( err == EBADF || err == EINVAL || err == EIO ) { - uplink->image->problem.write = true; - if ( !tryAgain || !reopenCacheFd( uplink, true ) ) - break; - tryAgain = false; - continue; // Write handle to image successfully re-opened, try again - } - logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", - PIMG(uplink->image), err ); - break; - } - if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) { - logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", - ret, PIMG(uplink->image) ); - break; - } - done += (uint32_t)ret; - } - if ( likely( done > 0 ) ) { - image_updateCachemap( uplink->image, start, start + done, true ); - } - if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { - logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", - PIMG(uplink->image), err ); - } - } - bool found = false; - dnbd3_queue_entry_t **it; - mutex_lock( &uplink->queueLock ); - for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) { - if ( *it == entry && entry->handle == inReply.handle ) { // ABA check - assert( found == false ); - *it = (**it).next; - found = true; - uplink->queueLen--; - break; - } - } - if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) { - uplink->image->problem.queue = false; + tparams.jobs++; + threadpool_run( &handleReceiveSaveToDisk, &tparams, NULL ); } - mutex_unlock( &uplink->queueLock ); - if ( !found ) { - logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)", - PIMG(uplink->image) ); - continue; + // 2) Send to any waiting clients + if ( entry->clients != NULL ) { + tparams.jobs++; + threadpool_run( &handleReceiveSendToClients, &tparams, NULL ); } - dnbd3_queue_client_t *next; - for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { - assert( c->from >= start && c->to <= end ); - (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ), - (const char*)( uplink->recvBuffer + (c->from - start) ) ); - next = c->next; - free( c ); + // 3) Trigger more background replication if applicable + if ( sendReplicationRequest( uplink ) == -1 ) { + bailout = true; } - if ( entry->clients != NULL ) { - // Was some client -- reset idle counter - uplink->idleTime = 0; - // Re-enable replication if disabled - if ( uplink->nextReplicationIndex == -1 ) { - uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; - } - } else { - if ( uplink->cacheFd != -1 ) { - // Try to remove from fs cache if no client was interested in this data - posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); - } + // 4) Wait for both jobs + mutex_lock( &uplink->asyncHandleMutex ); + while (tparams.jobs > 0) { + pthread_cond_wait( &uplink->asyncHandleCond, &uplink->asyncHandleMutex ); } + mutex_unlock( &uplink->asyncHandleMutex ); free( entry ); } // main receive loop - // Trigger background replication if applicable - if ( !sendReplicationRequest( uplink ) ) { - goto error_cleanup; - } - // Normal end - return; - // Error handling from failed receive or message parsing -error_cleanup: ; + if ( !bailout ) { + // Try one more time to send more replication requests + sendReplicationRequest( uplink ); + return; + } + // Error occurred, cleanup +error_cleanup: connectionFailed( uplink, true ); } +static void* handleReceiveSaveToDisk(void* param) +{ + struct reply_async_data *tparams = (struct reply_async_data*)param; + dnbd3_reply_t *inReply = tparams->inReply; + dnbd3_uplink_t *uplink = tparams->uplink; + dnbd3_queue_entry_t *entry = tparams->entry; + int err = 0; + int ret = 0; + bool tryAgain = true; // Allow one retry in case we run out of space or the write-fd became invalid + uint32_t done = 0; + + while ( done < inReply->size ) { + ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply->size - done, + entry->from + done ); + if ( unlikely( ret == -1 ) ) { + err = errno; + if ( err == EINTR && !_shutdown ) continue; + if ( err == ENOSPC || err == EDQUOT ) { + // try to free 256MiB + if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break; + tryAgain = false; + continue; // Success, retry write + } + if ( err == EBADF || err == EINVAL || err == EIO ) { + uplink->image->problem.write = true; + if ( !tryAgain || !reopenCacheFd( uplink, true ) ) + break; + tryAgain = false; + continue; // Write-handle to image successfully re-opened, try again + } + logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", + PIMG(uplink->image), err ); + break; + } + if ( unlikely( ret <= 0 || (uint32_t)ret > inReply->size - done ) ) { + logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", + ret, PIMG(uplink->image) ); + break; + } + done += (uint32_t)ret; + } + if ( likely( done > 0 ) ) { + image_updateCachemap( uplink->image, entry->from, entry->from + done, true ); + } + if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { + logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", + PIMG(uplink->image), err ); + } + if ( uplink->cacheFd != -1 ) { + // Try to remove from fs cache if no client was interested in this data + posix_fadvise( uplink->cacheFd, entry->from, inReply->size, POSIX_FADV_DONTNEED ); + } + mutex_lock( &uplink->asyncHandleMutex ); + if ( --tparams->jobs == 0 ) { + pthread_cond_signal( &uplink->asyncHandleCond ); + } + mutex_unlock( &uplink->asyncHandleMutex ); + return NULL; +} + +static void* handleReceiveSendToClients(void* param) +{ + struct reply_async_data *tparams = (struct reply_async_data*)param; + dnbd3_uplink_t *uplink = tparams->uplink; + dnbd3_queue_entry_t *entry = tparams->entry; + dnbd3_queue_client_t *next; + + for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { + assert( c->from >= entry->from && c->to <= entry->to ); + (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ), + (const char*)( uplink->recvBuffer + (c->from - entry->from) ) ); + next = c->next; + free( c ); + } + // Was some client -- reset idle counter + uplink->idleTime = 0; + // Re-enable replication if disabled + if ( uplink->nextReplicationIndex == -1 ) { + uplink->nextReplicationIndex = (int)( entry->from / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; + } + mutex_lock( &uplink->asyncHandleMutex ); + if ( --tparams->jobs == 0 ) { + pthread_cond_signal( &uplink->asyncHandleCond ); + } + mutex_unlock( &uplink->asyncHandleMutex ); + return NULL; +} + /** * Only call from uplink thread */ @@ -1283,14 +1346,14 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) * meet the configured bgrWindowSize. Returns 0 if any client requests * are pending. * This applies a sort of "slow start" in case the uplink was recently - * dealing with actual client requests, in that the uplink's idle time - * (in seconds) is an upper bound for the number returned, so we don't - * saturate the uplink with loads of requests right away, in case that - * client triggers more requests to the uplink server. + * dealing with actual client requests. In that case the uplink's idle + * time (in seconds) is an upper bound for the number returned. So we + * don't saturate the uplink with loads of requests right away, in case + * that client triggers more requests to the uplink server. */ static int numWantedReplicationRequests(dnbd3_uplink_t *uplink) { - int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 ); + int ret = MIN( _bgrWindowSize, uplink->idleTime ); if ( uplink->queueLen == 0 ) return ret; mutex_lock( &uplink->queueLock ); @@ -1303,7 +1366,8 @@ static int numWantedReplicationRequests(dnbd3_uplink_t *uplink) } } mutex_unlock( &uplink->queueLock ); - return ret; + + return MAX( 0, ret ); } static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle) |
