diff options
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r-- | src/server/uplink.c | 1529 |
1 files changed, 881 insertions, 648 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c index 682b986..8a83124 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -3,10 +3,13 @@ #include "locks.h" #include "image.h" #include "altservers.h" -#include "../shared/sockhelper.h" -#include "../shared/protocol.h" -#include "../shared/timing.h" -#include "../shared/crc32.h" +#include "net.h" +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/shared/crc32.h> +#include "threadpool.h" +#include "reference.h" #include <assert.h> #include <inttypes.h> @@ -15,25 +18,35 @@ #include <unistd.h> #include <stdatomic.h> +static const uint8_t HOP_FLAG_BGR = 0x80; +static const uint8_t HOP_FLAG_PREFETCH = 0x40; #define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 ) #define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE ) #define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) ) -#define REP_NONE ( (uint64_t)0xffffffffffffffff ) - static atomic_uint_fast64_t totalBytesReceived = 0; +typedef struct { + uint64_t start, end, handle; +} req_t; + +static void cancelAllRequests(dnbd3_uplink_t *uplink); +static void freeUplinkStruct(ref *ref); static void* uplink_mainloop(void *data); -static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly); -static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int lastBlockIndex); -static void uplink_handleReceive(dnbd3_connection_t *link); -static int uplink_sendKeepalive(const int fd); -static void uplink_addCrc32(dnbd3_connection_t *uplink); -static void uplink_sendReplicationRequest(dnbd3_connection_t *link); -static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); -static bool uplink_saveCacheMap(dnbd3_connection_t *link); -static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link); -static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew); +static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly); +static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex); +static void handleReceive(dnbd3_uplink_t *uplink); +static bool sendKeepalive(dnbd3_uplink_t *uplink); +static void requestCrc32List(dnbd3_uplink_t *uplink); +static bool sendReplicationRequest(dnbd3_uplink_t *uplink); +static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force); +static bool connectionShouldShutdown(dnbd3_uplink_t *uplink); +static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew); +static int numWantedReplicationRequests(dnbd3_uplink_t *uplink); +static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle); +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); + +#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) ) // ############ uplink connection handling @@ -54,56 +67,73 @@ uint64_t uplink_getTotalBytesReceived() bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) { if ( !_isProxy || _shutdown ) return false; - dnbd3_connection_t *link = NULL; assert( image != NULL ); + if ( sock == -1 && !altservers_imageHasAltServers( image->name ) ) + return false; // Nothing to do mutex_lock( &image->lock ); - if ( image->uplink != NULL && !image->uplink->shutdown ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { mutex_unlock( &image->lock ); - if ( sock >= 0 ) close( sock ); - return true; // There's already an uplink, so should we consider this success or failure? + if ( sock != -1 ) { + close( sock ); + } + ref_put( &uplink->reference ); + return true; // There's already an uplink } - if ( image->cache_map == NULL ) { + if ( image->ref_cacheMap == NULL ) { logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name ); goto failure; } - link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); - mutex_init( &link->queueLock ); - mutex_init( &link->rttLock ); - mutex_init( &link->sendMutex ); - link->image = image; - link->bytesReceived = 0; - link->idleTime = 0; - link->queueLen = 0; - mutex_lock( &link->sendMutex ); - link->fd = -1; - mutex_unlock( &link->sendMutex ); - link->cacheFd = -1; - link->signal = NULL; - link->replicationHandle = REP_NONE; - mutex_lock( &link->rttLock ); - link->cycleDetected = false; - if ( sock >= 0 ) { - link->betterFd = sock; - link->betterServer = *host; - link->rttTestResult = RTT_DOCHANGE; - link->betterVersion = version; + uplink = calloc( 1, sizeof(dnbd3_uplink_t) ); + // Start with one reference for the uplink thread. We'll return it when the thread finishes + ref_init( &uplink->reference, freeUplinkStruct, 1 ); + mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE ); + mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT ); + mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND ); + uplink->image = image; + uplink->bytesReceived = 0; + uplink->bytesReceivedLastSave = 0; + uplink->idleTime = SERVER_UPLINK_IDLE_TIMEOUT - 90; + uplink->queue = NULL; + uplink->queueLen = 0; + uplink->cacheFd = -1; + uplink->signal = signal_new(); + if ( uplink->signal == NULL ) { + logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." ); + goto failure; + } + mutex_lock( &uplink->rttLock ); + mutex_lock( &uplink->sendMutex ); + uplink->current.fd = -1; + mutex_unlock( &uplink->sendMutex ); + uplink->cycleDetected = false; + image->problem.uplink = true; + image->problem.write = true; + image->problem.queue = false; + if ( sock != -1 ) { + uplink->better.fd = sock; + int index = altservers_hostToIndex( host ); + uplink->better.index = index == -1 ? 0 : index; // Prevent invalid array access + uplink->rttTestResult = RTT_DOCHANGE; + uplink->better.version = version; } else { - link->betterFd = -1; - link->rttTestResult = RTT_IDLE; + uplink->better.fd = -1; + uplink->rttTestResult = RTT_IDLE; } - mutex_unlock( &link->rttLock ); - link->recvBufferLen = 0; - link->shutdown = false; - if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) { + mutex_unlock( &uplink->rttLock ); + uplink->recvBufferLen = 0; + uplink->shutdown = false; + if ( 0 != thread_create( &(uplink->thread), NULL, &uplink_mainloop, (void *)uplink ) ) { logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; } + ref_setref( &image->uplinkref, &uplink->reference ); mutex_unlock( &image->lock ); return true; failure: ; - if ( link != NULL ) { - free( link ); - link = image->uplink = NULL; + if ( uplink != NULL ) { + image->users++; // Expected by freeUplinkStruct() + ref_put( &uplink->reference ); // The ref for the uplink thread that never was } mutex_unlock( &image->lock ); return false; @@ -114,201 +144,398 @@ failure: ; * Calling it multiple times, even concurrently, will * not break anything. */ -void uplink_shutdown(dnbd3_image_t *image) +bool uplink_shutdown(dnbd3_image_t *image) { - bool join = false; - pthread_t thread; assert( image != NULL ); mutex_lock( &image->lock ); - if ( image->uplink == NULL ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { mutex_unlock( &image->lock ); - return; + return true; } - dnbd3_connection_t * const uplink = image->uplink; mutex_lock( &uplink->queueLock ); - if ( !uplink->shutdown ) { - uplink->shutdown = true; + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // Prevent free while uplink shuts down signal_call( uplink->signal ); - thread = uplink->thread; - join = true; + } else { + logadd( LOG_ERROR, "This will never happen. '%s:%d'", PIMG(image) ); } + cancelAllRequests( uplink ); + ref_setref( &image->uplinkref, NULL ); mutex_unlock( &uplink->queueLock ); - bool wait = image->uplink != NULL; + bool retval = ( exp && image->users == 0 ); + ref_put( &uplink->reference ); mutex_unlock( &image->lock ); - if ( join ) thread_join( thread, NULL ); - while ( wait ) { - usleep( 5000 ); - mutex_lock( &image->lock ); - wait = image->uplink != NULL && image->uplink->shutdown; - mutex_unlock( &image->lock ); + return retval; +} + +/** + * Cancel all requests of this uplink. + * HOLD QUEUE LOCK WHILE CALLING + */ +static void cancelAllRequests(dnbd3_uplink_t *uplink) +{ + dnbd3_queue_entry_t *it = uplink->queue; + while ( it != NULL ) { + dnbd3_queue_client_t *cit = it->clients; + while ( cit != NULL ) { + (*cit->callback)( cit->data, cit->handle, 0, 0, NULL ); + dnbd3_queue_client_t *next = cit->next; + free( cit ); + cit = next; + } + dnbd3_queue_entry_t *next = it->next; + free( it ); + it = next; } + uplink->queue = NULL; + uplink->queueLen = 0; + uplink->image->problem.queue = false; +} + +static void freeUplinkStruct(ref *ref) +{ + dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference); + logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", PIMG(uplink->image) ); + assert( uplink->queueLen == 0 ); + if ( uplink->signal != NULL ) { + signal_close( uplink->signal ); + } + if ( uplink->current.fd != -1 ) { + close( uplink->current.fd ); + uplink->current.fd = -1; + } + if ( uplink->better.fd != -1 ) { + close( uplink->better.fd ); + uplink->better.fd = -1; + } + mutex_destroy( &uplink->queueLock ); + mutex_destroy( &uplink->rttLock ); + mutex_destroy( &uplink->sendMutex ); + free( uplink->recvBuffer ); + uplink->recvBuffer = NULL; + if ( uplink->cacheFd != -1 ) { + close( uplink->cacheFd ); + } + // Finally let go of image. It was acquired either in uplink_shutdown or in the cleanup code + // of the uplink thread, depending on who set the uplink->shutdown flag. (Or uplink_init if that failed) + image_release( uplink->image ); + free( uplink ); // !!! } /** * Remove given client from uplink request queue * Locks on: uplink.queueLock */ -void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) +void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback) { mutex_lock( &uplink->queueLock ); - for (int i = uplink->queueLen - 1; i >= 0; --i) { - if ( uplink->queue[i].client == client ) { - uplink->queue[i].client = NULL; - uplink->queue[i].status = ULR_FREE; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) { + if ( (**cit).data == data && (**cit).callback == callback ) { + (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL ); + dnbd3_queue_client_t *entry = *cit; + *cit = (**cit).next; + free( entry ); + } else { + cit = &(**cit).next; + } } - if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--; } mutex_unlock( &uplink->queueLock ); } /** - * Request a chunk of data through an uplink server - * Locks on: image.lock, uplink.queueLock + * Called from a client (proxy) connection to request a missing part of the image. + * The caller has made sure that the range is actually missing. */ -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - if ( client == NULL || client->image == NULL ) return false; - if ( length > (uint32_t)_maxPayload ) { - logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); + assert( client != NULL && callback != NULL ); + if ( ( hops & 0x3f ) > 60 ) { // This is just silly + logadd( LOG_WARNING, "Refusing to relay a request that has > 60 hops" ); return false; } - mutex_lock( &client->image->lock ); - if ( client->image->uplink == NULL ) { - mutex_unlock( &client->image->lock ); - logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( client->image, -1, NULL, -1 ); + uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } + } + // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain + // This might be a false positive if there are multiple instances running on the same host (IP) + bool ret; + if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { + uplink->cycleDetected = true; + signal_call( uplink->signal ); + logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); + ret = false; + } else { + ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops ); + } + ref_put( &uplink->reference ); + return ret; +} + +/** + * Called by integrated fuse module + */ +bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, + uint64_t handle, uint64_t start, uint32_t length) +{ + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( image, -1, NULL, -1 ); + uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } + } + bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 ); + ref_put( &uplink->reference ); + return ret; +} + +static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted) +{ + uint32_t length = (uint32_t)( *end - start ); + if ( length >= wanted ) + return; + length = wanted; + if ( unlikely( _backgroundReplication == BGR_HASHBLOCK + && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) { + // Don't extend across hash-block border in this mode + *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 ); + } else { + *end = start + length; + } + if ( unlikely( *end > image->virtualFilesize ) ) { + *end = image->virtualFilesize; + } + *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 ); + //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end ); +} + +static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops) +{ + if ( uplink->current.fd == -1 ) + return false; + return dnbd3_get_block( uplink->current.fd, req->start, + (uint32_t)( req->end - req->start ), req->handle, + COND_HOPCOUNT( uplink->current.version, hops ) ); +} + +/** + * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. + * If callback is NULL, this is assumed to be a background replication request. + * Locks on: uplink.queueLock, uplink.sendMutex + */ +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, + uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +{ + assert( uplink != NULL ); + assert( data == NULL || callback != NULL ); + if ( ( hops & HOP_FLAG_BGR ) // This is a background replication request + && _backgroundReplication != BGR_FULL ) { // Deny if we're not doing BGR + // TODO: Allow BGR_HASHBLOCK too, but only if hash block isn't completely empty + logadd( LOG_DEBUG2, "Dopping client because of BGR policy" ); return false; } - dnbd3_connection_t * const uplink = client->image->uplink; if ( uplink->shutdown ) { - mutex_unlock( &client->image->lock ); logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); return false; } - // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain - // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { - mutex_unlock( &client->image->lock ); - logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - mutex_lock( &uplink->rttLock ); - uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); - signal_call( uplink->signal ); + if ( length > (uint32_t)_maxPayload ) { + logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", + length ); return false; } - int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise - int existingType = -1; // ULR_* type of existing request - int i; - int freeSlot = -1; - bool requestLoop = false; - const uint64_t end = start + length; + hops++; + if ( callback == NULL ) { + // Set upper-most bit for replication requests that we fire + // In client mode, at least set prefetch flag to prevent prefetch cascading + hops |= (uint8_t)( _pretendClient ? HOP_FLAG_PREFETCH : HOP_FLAG_BGR ); + } + req_t req, preReq; + dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL; + bool isNew; + const uint64_t end = start + length; + req.start = start & ~(DNBD3_BLOCK_SIZE - 1); + req.end = end; + /* Don't do this -- this breaks matching of prefetch jobs, since they'd + * be misaligned, and the next client request wouldn't match anything. + * To improve this, we need to be able to attach a queue_client to multiple queue_entries + * and then serve it once all the queue_entries are done (atomic_int in queue_client). + * But currently we directly send the receive buffer's content to the queue_client after + * receiving the payload, as this will also work when the local cache is borked (we just + * tunnel though the traffic). One could argue that this mode of operation is nonsense, + * and we should just drop all affected clients. Then as a next step, don't serve the + * clients form the receive buffer, but just issue a normal sendfile() call after writing + * the received data to the local cache. + */ + if ( callback != NULL && _minRequestSize != 0 ) { + // Not background replication request, extend request size + extendRequest( req.start, &req.end, uplink->image, _minRequestSize ); + } + req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1); + // Critical section - work with the queue mutex_lock( &uplink->queueLock ); - mutex_unlock( &client->image->lock ); - for (i = 0; i < uplink->queueLen; ++i) { - if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) { - freeSlot = i; - continue; + if ( uplink->shutdown ) { // Check again after locking to prevent lost requests + goto fail_lock; + } + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->from <= start && it->to >= end ) { + // Matching range, attach + request = it; + break; } - if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { - if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end ) { - requestLoop = true; - break; - } - if ( foundExisting == -1 || existingType == ULR_PENDING ) { - foundExisting = i; - existingType = uplink->queue[i].status; - if ( freeSlot != -1 ) break; - } + if ( it->next == NULL ) { + // Not matching, last in list, remember + last = it; + break; } } - if ( requestLoop ) { - mutex_unlock( &uplink->queueLock ); - logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - mutex_lock( &uplink->rttLock ); - uplink->cycleDetected = true; - mutex_unlock( &uplink->rttLock ); - signal_call( uplink->signal ); - return false; + dnbd3_queue_client_t **c = NULL; + if ( request == NULL ) { + // No existing request to attach to + if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) { + logadd( LOG_WARNING, + "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." ); + goto fail_lock; + } + uplink->queueLen++; + if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = true; + } + request = malloc( sizeof(*request) ); + if ( last == NULL ) { + uplink->queue = request; + } else { + last->next = request; + } + request->next = NULL; + request->handle = ++uplink->queueId; + request->from = req.start; + request->to = req.end; +#ifdef DEBUG + timing_get( &request->entered ); +#endif + request->hopCount = hops; + request->sent = true; // Optimistic; would be set to false on failure + if ( callback == NULL ) { + // BGR + request->clients = NULL; + } else { + c = &request->clients; + } + isNew = true; + } else if ( callback == NULL ) { + // Replication request that maches existing request. Do nothing + isNew = false; + } else { + // Existing request. Check if potential cycle + if ( hops > request->hopCount && request->from == start && request->to == end ) { + logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) ); + goto fail_lock; + } + // Count number if clients, get tail of list + int count = 0; + c = &request->clients; + while ( *c != NULL ) { + c = &(**c).next; + if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) { + logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count ); + goto fail_lock; + } + } + isNew = false; } - if ( freeSlot == -1 ) { - if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { - mutex_unlock( &uplink->queueLock ); - logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); - return false; + // Prefetch immediately, without unlocking the list - the old approach of + // async prefetching in another thread was sometimes so slow that we'd process + // another request from the same client before the prefetch job would execute. + if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data ) + && !( hops & (HOP_FLAG_BGR | HOP_FLAG_PREFETCH) ) // No cascading of prefetches + && end == request->to && length <= _maxPrefetch ) { + // Only if this is a client request, and the !! end boundary matches exactly !! + // (See above for reason why) + // - We neither check the local cache, nor other pending requests. Worth it? + // Complexity vs. probability + preReq.start = end; + preReq.end = end; + extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) ); + if ( preReq.start < preReq.end ) { + //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end ); + uplink->queueLen++; + pre = malloc( sizeof(*pre) ); + pre->next = request->next; + request->next = pre; + pre->handle = preReq.handle = ++uplink->queueId; + pre->from = preReq.start; + pre->to = preReq.end; + pre->hopCount = hops | HOP_FLAG_PREFETCH; + pre->sent = true; // Optimistic; would be set to false on failure + pre->clients = NULL; +#ifdef DEBUG + timing_get( &pre->entered ); +#endif } - freeSlot = uplink->queueLen++; } - // Do not send request to uplink server if we have a matching pending request AND the request either has the - // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise - // explicitly send this request to the uplink server. The second condition mentioned here is to prevent - // a race condition where the reply for the outstanding request already arrived and the uplink thread - // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might - // already have passed the index of the free slot we determined, but not reached the existing request we just found above. - if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request" -#ifdef _DEBUG - if ( foundExisting != -1 ) { - logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot ); - logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n" - "New %" PRIu64 "-%" PRIu64 " (%p)\n", - uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client, - start, end, (void*)client ); + // // // // + // Copy data - need this after unlocking + req.handle = request->handle; + if ( callback != NULL ) { + assert( c != NULL ); + *c = malloc( sizeof( *request->clients ) ); + (**c).next = NULL; + (**c).handle = handle; + (**c).from = start; + (**c).to = end; + (**c).data = data; + (**c).callback = callback; } -#endif - // Fill structure - uplink->queue[freeSlot].from = start; - uplink->queue[freeSlot].to = end; - uplink->queue[freeSlot].handle = handle; - uplink->queue[freeSlot].client = client; - //int old = uplink->queue[freeSlot].status; - uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING); - uplink->queue[freeSlot].hopCount = hops; -#ifdef _DEBUG - timing_get( &uplink->queue[freeSlot].entered ); - //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end ); -#endif mutex_unlock( &uplink->queueLock ); + // End queue critical section + if ( pre == NULL && !isNew ) + return true; // Nothing to do - if ( foundExisting != -1 ) - return true; // Attached to pending request, do nothing - - // See if we can fire away the request - if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { - logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); - } else { - if ( uplink->fd == -1 ) { - mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); - } else { - const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); - if ( hops < 200 ) ++hops; - const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); - mutex_unlock( &uplink->sendMutex ); - if ( !ret ) { - logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); - } else { - mutex_lock( &uplink->queueLock ); - if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) { - uplink->queue[freeSlot].status = ULR_PENDING; - logadd( LOG_DEBUG2, "Succesful direct uplink request" ); - } else { - logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" ); - } - mutex_unlock( &uplink->queueLock ); - return true; - } - // Fall through to waking up sender thread - } + // Fire away the request(s) + mutex_lock( &uplink->sendMutex ); + bool ret1 = true; + bool ret2 = true; + if ( isNew ) { + ret1 = requestBlock( uplink, &req, hops ); + } + if ( pre != NULL ) { + ret2 = requestBlock( uplink, &preReq, hops | HOP_FLAG_PREFETCH ); + } + if ( !ret1 || !ret2 ) { // Set with send locked + uplink->image->problem.uplink = true; + } + mutex_unlock( &uplink->sendMutex ); + // markRequestUnsend locks the queue, would violate locking order with send mutex + if ( !ret1 ) { + markRequestUnsent( uplink, req.handle ); + logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle ); + } + if ( !ret2 ) { + markRequestUnsent( uplink, preReq.handle ); } - if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); - } + if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } return true; + +fail_lock: + mutex_unlock( &uplink->queueLock ); + return false; } /** @@ -321,52 +548,47 @@ static void* uplink_mainloop(void *data) #define EV_SOCKET (1) #define EV_COUNT (2) struct pollfd events[EV_COUNT]; - dnbd3_connection_t * const link = (dnbd3_connection_t*)data; - int numSocks, i, waitTime; + dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data; + int numSocks, waitTime; int altCheckInterval = SERVER_RTT_INTERVAL_INIT; + int rttTestResult; uint32_t discoverFailCount = 0; - uint32_t unsavedSeconds = 0; ticks nextAltCheck, lastKeepalive; char buffer[200]; memset( events, 0, sizeof(events) ); timing_get( &nextAltCheck ); lastKeepalive = nextAltCheck; // - assert( link != NULL ); + assert( uplink != NULL ); setThreadName( "idle-uplink" ); + thread_detach( uplink->thread ); blockNoncriticalSignals(); // Make sure file is open for writing - if ( !uplink_reopenCacheFd( link, false ) ) { + if ( !reopenCacheFd( uplink, false ) ) { // It might have failed - still offer proxy mode, we just can't cache - logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno ); + logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno ); } // - link->signal = signal_new(); - if ( link->signal == NULL ) { - logadd( LOG_WARNING, "error creating signal. Uplink unavailable." ); - goto cleanup; - } events[EV_SIGNAL].events = POLLIN; - events[EV_SIGNAL].fd = signal_getWaitFd( link->signal ); + events[EV_SIGNAL].fd = signal_getWaitFd( uplink->signal ); events[EV_SOCKET].fd = -1; - while ( !_shutdown && !link->shutdown ) { + if ( uplink->rttTestResult != RTT_DOCHANGE ) { + altservers_findUplink( uplink ); // In case we didn't kickstart + } + while ( !_shutdown && !uplink->shutdown ) { // poll() - mutex_lock( &link->rttLock ); - waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1; - mutex_unlock( &link->rttLock ); - if ( waitTime == 0 ) { - // Nothing - } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { - waitTime = 1000; + if ( uplink->rttTestResult == RTT_DOCHANGE ) { + // 0 means poll, since we're about to change the server + waitTime = 0; } else { declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); if ( waitTime < 100 ) waitTime = 100; - if ( waitTime > 5000 ) waitTime = 5000; + else if ( waitTime > 10000 ) waitTime = 10000; } - events[EV_SOCKET].fd = link->fd; + events[EV_SOCKET].fd = uplink->current.fd; numSocks = poll( events, EV_COUNT, waitTime ); - if ( _shutdown || link->shutdown ) goto cleanup; + if ( _shutdown || uplink->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? if ( errno == EINTR ) continue; logadd( LOG_DEBUG1, "poll() error %d", (int)errno ); @@ -374,40 +596,41 @@ static void* uplink_mainloop(void *data) continue; } // Check if server switch is in order - mutex_lock( &link->rttLock ); - if ( link->rttTestResult != RTT_DOCHANGE ) { - mutex_unlock( &link->rttLock ); - } else { - link->rttTestResult = RTT_IDLE; + if ( unlikely( uplink->rttTestResult == RTT_DOCHANGE ) ) { + mutex_lock( &uplink->rttLock ); + assert( uplink->rttTestResult == RTT_DOCHANGE ); + uplink->rttTestResult = RTT_IDLE; // The rttTest worker thread has finished our request. // And says it's better to switch to another server - const int fd = link->fd; - mutex_lock( &link->sendMutex ); - link->fd = link->betterFd; - mutex_unlock( &link->sendMutex ); - link->betterFd = -1; - link->currentServer = link->betterServer; - link->version = link->betterVersion; - link->cycleDetected = false; - mutex_unlock( &link->rttLock ); + const int fd = uplink->current.fd; + mutex_lock( &uplink->sendMutex ); + uplink->current = uplink->better; + mutex_unlock( &uplink->sendMutex ); + uplink->better.fd = -1; + uplink->cycleDetected = false; + mutex_unlock( &uplink->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); - link->replicationHandle = REP_NONE; - link->image->working = true; - link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received + uplink->image->problem.uplink = false; + uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; - if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { - logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", link->image->name, buffer + 1 ); + if ( altservers_toString( uplink->current.index, buffer + 1, sizeof(buffer) - 1 ) ) { + logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 ); setThreadName( buffer ); } // If we don't have a crc32 list yet, see if the new server has one - if ( link->image->crc32 == NULL ) { - uplink_addCrc32( link ); + if ( uplink->image->crc32 == NULL ) { + requestCrc32List( uplink ); } // Re-send all pending requests - uplink_sendRequests( link, false ); - uplink_sendReplicationRequest( link ); + sendQueuedRequests( uplink, false ); + sendReplicationRequest( uplink ); events[EV_SOCKET].events = POLLIN | POLLRDHUP; + if ( uplink->image->problem.uplink ) { + // Some of the requests above must have failed again already :-( + logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" ); + connectionFailed( uplink, true ); + } timing_gets( &nextAltCheck, altCheckInterval ); // The rtt worker already did the handshake for our image, so there's nothing // more to do here @@ -415,206 +638,187 @@ static void* uplink_mainloop(void *data) // Check events // Signal if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + uplink->image->problem.uplink = true; logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" ); goto cleanup; } else if ( (events[EV_SIGNAL].revents & POLLIN) ) { // signal triggered -> pending requests - if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name ); + if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name ); } - if ( link->fd != -1 ) { + if ( uplink->current.fd != -1 ) { // Uplink seems fine, relay requests to it... - uplink_sendRequests( link, true ); - } else { // No uplink; maybe it was shutdown since it was idle for too long - link->idleTime = 0; + sendQueuedRequests( uplink, true ); + } else if ( uplink->queueLen != 0 ) { // No uplink; maybe it was shutdown since it was idle for too long + uplink->idleTime = 0; } } // Uplink socket if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { - uplink_connectionFailed( link, true ); - logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); + connectionFailed( uplink, true ); + logadd( LOG_DEBUG1, "Uplink gone away, panic! (revents=%d)\n", (int)events[EV_SOCKET].revents ); setThreadName( "panic-uplink" ); } else if ( (events[EV_SOCKET].revents & POLLIN) ) { - uplink_handleReceive( link ); - if ( _shutdown || link->shutdown ) goto cleanup; + handleReceive( uplink ); + if ( _shutdown || uplink->shutdown ) goto cleanup; } declare_now; uint32_t timepassed = timing_diff( &lastKeepalive, &now ); - if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { + if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL + || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) { lastKeepalive = now; - link->idleTime += timepassed; - unsavedSeconds += timepassed; - if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) { - // fsync/save every 4 minutes, or every 60 seconds if link is idle - unsavedSeconds = 0; - uplink_saveCacheMap( link ); - } + uplink->idleTime += timepassed; // Keep-alive - if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { - // Send keep-alive if nothing is happening - if ( uplink_sendKeepalive( link->fd ) ) { - // Re-trigger periodically, in case it requires a minimum user count - uplink_sendReplicationRequest( link ); - } else { - uplink_connectionFailed( link, true ); - logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" ); - setThreadName( "panic-uplink" ); + if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) { + // Send keep-alive if nothing is happening, and try to trigger background rep. + if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) { + connectionFailed( uplink, true ); + logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" ); } } - // Don't keep link established if we're idle for too much - if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { - mutex_lock( &link->sendMutex ); - close( link->fd ); - link->fd = events[EV_SOCKET].fd = -1; - mutex_unlock( &link->sendMutex ); - link->cycleDetected = false; - if ( link->recvBufferLen != 0 ) { - link->recvBufferLen = 0; - free( link->recvBuffer ); - link->recvBuffer = NULL; - } - logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid ); - setThreadName( "idle-uplink" ); + // Don't keep uplink established if we're idle for too much + if ( connectionShouldShutdown( uplink ) ) { + logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) ); + goto cleanup; } } // See if we should trigger an RTT measurement - mutex_lock( &link->rttLock ); - const int rttTestResult = link->rttTestResult; - mutex_unlock( &link->rttLock ); + rttTestResult = uplink->rttTestResult; if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { - if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) { + if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && discoverFailCount == 0 ) || uplink->cycleDetected ) { // It seems it's time for a check - if ( image_isComplete( link->image ) ) { + if ( image_isComplete( uplink->image ) ) { // Quit work if image is complete - logadd( LOG_INFO, "Replication of %s complete.", link->image->name ); + logadd( LOG_INFO, "Replication of %s complete.", uplink->image->name ); setThreadName( "finished-uplink" ); + uplink->image->problem.uplink = false; goto cleanup; - } else if ( !uplink_connectionShouldShutdown( link ) ) { + } else { // Not complete - do measurement - altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) - if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { - link->nextReplicationIndex = 0; + altservers_findUplinkAsync( uplink ); // This will set RTT_INPROGRESS (synchronous) + if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { + uplink->nextReplicationIndex = 0; } } altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX); timing_set( &nextAltCheck, &now, altCheckInterval ); } } else if ( rttTestResult == RTT_NOT_REACHABLE ) { - mutex_lock( &link->rttLock ); - link->rttTestResult = RTT_IDLE; - mutex_unlock( &link->rttLock ); - discoverFailCount++; - timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); + if ( atomic_compare_exchange_strong( &uplink->rttTestResult, &rttTestResult, RTT_IDLE ) ) { + discoverFailCount++; + if ( uplink->current.fd == -1 ) { + uplink->cycleDetected = false; + } + } + timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_MAX_UNREACH) ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED ); } -#ifdef _DEBUG - if ( link->fd != -1 && !link->shutdown ) { +#ifdef DEBUG + if ( uplink->current.fd != -1 && !uplink->shutdown ) { bool resend = false; ticks deadline; timing_set( &deadline, &now, -10 ); - mutex_lock( &link->queueLock ); - for (i = 0; i < link->queueLen; ++i) { - if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) { - snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" - "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, link->queue[i].client->image->name, - link->queue[i].from, link->queue[i].to, link->queue[i].status ); - link->queue[i].entered = now; -#ifdef _DEBUG_RESEND_STARVING - link->queue[i].status = ULR_NEW; + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( timing_reached( &it->entered, &deadline ) ) { + logadd( LOG_WARNING, "Starving request detected:" + " (from %" PRIu64 " to %" PRIu64 ", sent: %d) %s:%d", + it->from, it->to, (int)it->sent, PIMG(uplink->image) ); + it->entered = now; +#ifdef DEBUG_RESEND_STARVING + it->sent = false; resend = true; #endif - mutex_unlock( &link->queueLock ); - logadd( LOG_WARNING, "%s", buffer ); - mutex_lock( &link->queueLock ); } } - mutex_unlock( &link->queueLock ); - if ( resend ) - uplink_sendRequests( link, true ); + mutex_unlock( &uplink->queueLock ); + if ( resend ) { + sendQueuedRequests( uplink, true ); + } } #endif } - cleanup: ; - altservers_removeUplink( link ); - uplink_saveCacheMap( link ); - mutex_lock( &link->image->lock ); - if ( link->image->uplink == link ) { - link->image->uplink = NULL; - } - mutex_lock( &link->queueLock ); - const int fd = link->fd; - const dnbd3_signal_t* signal = link->signal; - mutex_lock( &link->sendMutex ); - link->fd = -1; - mutex_unlock( &link->sendMutex ); - link->signal = NULL; - if ( !link->shutdown ) { - link->shutdown = true; - thread_detach( link->thread ); +cleanup: ; + dnbd3_image_t *image = uplink->image; + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + cache->dirty = true; // Force writeout of cache map + ref_put( &cache->reference ); } - // Do not access link->image after unlocking, since we set - // image->uplink to NULL. Acquire with image_lock first, - // like done below when checking whether to re-init uplink - mutex_unlock( &link->image->lock ); - mutex_unlock( &link->queueLock ); - if ( fd != -1 ) close( fd ); - if ( signal != NULL ) signal_close( signal ); - // Wait for the RTT check to finish/fail if it's in progress - while ( link->rttTestResult == RTT_INPROGRESS ) - usleep( 10000 ); - if ( link->betterFd != -1 ) { - close( link->betterFd ); + mutex_lock( &image->lock ); + bool exp = false; + if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { + image->users++; // We set the flag - hold onto image } - mutex_destroy( &link->queueLock ); - mutex_destroy( &link->rttLock ); - mutex_destroy( &link->sendMutex ); - free( link->recvBuffer ); - link->recvBuffer = NULL; - if ( link->cacheFd != -1 ) { - close( link->cacheFd ); + dnbd3_uplink_t *current = ref_get_uplink( &image->uplinkref ); + if ( current == uplink ) { // Set NULL if it's still us... + mutex_lock( &uplink->queueLock ); + cancelAllRequests( uplink ); + mutex_unlock( &uplink->queueLock ); + ref_setref( &image->uplinkref, NULL ); } - dnbd3_image_t *image = image_lock( link->image ); - free( link ); // !!! - if ( image != NULL ) { - if ( !_shutdown && image->cache_map != NULL ) { - // Ingegrity checker must have found something in the meantime - uplink_init( image, -1, NULL, 0 ); - } - image_release( image ); + if ( current != NULL ) { // Decrease ref in any case + ref_put( ¤t->reference ); } + mutex_unlock( &image->lock ); + // Finally as the thread is done, decrease our own ref that we initialized with + ref_put( &uplink->reference ); return NULL ; } -static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) +/** + * Only called from uplink thread. + */ +static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly) { - // Scan for new requests - int j; - mutex_lock( &link->queueLock ); - for (j = 0; j < link->queueLen; ++j) { - if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue; - link->queue[j].status = ULR_PENDING; - uint8_t hops = link->queue[j].hopCount; - const uint64_t reqStart = link->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint32_t reqSize = (uint32_t)(((link->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); - /* - logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", - (void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize ); - */ - mutex_unlock( &link->queueLock ); - if ( hops < 200 ) ++hops; - mutex_lock( &link->sendMutex ); - const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); - mutex_unlock( &link->sendMutex ); - if ( !ret ) { - // Non-critical - if the connection dropped or the server was changed - // the thread will re-send this request as soon as the connection - // is reestablished. - logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - altservers_serverFailed( &link->currentServer ); - return; + assert_uplink_thread(); + // Scan for new requests, or optionally, (re)send all + // Build a buffer, so if there aren't too many requests, we can send them after + // unlocking the queue again. Otherwise we need flushes during iteration, which + // is no ideal, but in that case the uplink is probably overwhelmed anyways. + // Try 125 as that's exactly 300bytes, usually 2*MTU. +#define MAX_RESEND_BATCH 125 + dnbd3_request_t reqs[MAX_RESEND_BATCH]; + int count = 0; + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( newOnly && it->sent ) + continue; + it->sent = true; + dnbd3_request_t *hdr = &reqs[count++]; + hdr->magic = dnbd3_packet_magic; + hdr->cmd = CMD_GET_BLOCK; + hdr->size = (uint32_t)( it->to - it->from ); + hdr->offset = it->from; // Offset first, then hops! (union) + hdr->hops = COND_HOPCOUNT( uplink->current.version, it->hopCount ); + hdr->handle = it->handle; + fixup_request( *hdr ); + if ( count == MAX_RESEND_BATCH ) { + bool ok = false; + logadd( LOG_DEBUG2, "BLOCKING resend of %d", count ); + count = 0; + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + ok = ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH, 3 ) + == DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH ); + } + mutex_unlock( &uplink->sendMutex ); + if ( !ok ) { + uplink->image->problem.uplink = true; + break; + } } - mutex_lock( &link->queueLock ); } - mutex_unlock( &link->queueLock ); + mutex_unlock( &uplink->queueLock ); + if ( count != 0 ) { + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + uplink->image->problem.uplink = + ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * count, 3 ) + != DNBD3_REQUEST_SIZE * count ); + } + mutex_unlock( &uplink->sendMutex ); + } +#undef MAX_RESEND_BATCH } /** @@ -626,90 +830,118 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) * server. This means we might request data we already have, but it makes * the code simpler. Worst case would be only one bit is zero, which means * 4kb are missing, but we will request 32kb. + * + * Only called form uplink thread, so current.fd is assumed to be valid. + * + * @return false if sending request failed, true otherwise (i.e. not necessary/disabled) */ -static void uplink_sendReplicationRequest(dnbd3_connection_t *link) +static bool sendReplicationRequest(dnbd3_uplink_t *uplink) { - if ( link == NULL || link->fd == -1 ) return; - if ( _backgroundReplication == BGR_DISABLED || link->cacheFd == -1 ) return; // Don't do background replication - if ( link->nextReplicationIndex == -1 || link->replicationHandle != REP_NONE ) - return; - dnbd3_image_t * const image = link->image; - if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return; - mutex_lock( &image->lock ); - if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) { - // No cache map (=image complete), or replication pending, or not enough users, do nothing - mutex_unlock( &image->lock ); - return; + assert_uplink_thread(); + if ( uplink->current.fd == -1 ) + return false; // Should never be called in this state, consider send error + if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) + return true; // Don't do background replication + if ( uplink->nextReplicationIndex == -1 ) + return true; // No more blocks to replicate + dnbd3_image_t * const image = uplink->image; + if ( image->users < _bgrMinClients ) + return true; // Not enough active users + const int numNewRequests = numWantedReplicationRequests( uplink ); + if ( numNewRequests <= 0 ) + return true; // Already sufficient amount of requests on the wire + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL ) { + // No cache map (=image complete) + return true; } const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); const int lastBlockIndex = mapBytes - 1; - int endByte; - if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks - endByte = link->nextReplicationIndex + mapBytes; - } else { // Hashblock based: Only look for match in current hash block - endByte = ( link->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; - if ( endByte > mapBytes ) { - endByte = mapBytes; + for ( int bc = 0; bc < numNewRequests; ++bc ) { + int endByte; + if ( UPLINK_MAX_QUEUE - uplink->queueLen < 10 ) + break; // Don't overload queue + if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks + endByte = uplink->nextReplicationIndex + mapBytes; + } else { // Hashblock based: Only look for match in current hash block + endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; + if ( endByte > mapBytes ) { + endByte = mapBytes; + } } - } - int replicationIndex = -1; - for ( int j = link->nextReplicationIndex; j < endByte; ++j ) { - const int i = j % ( mapBytes ); // Wrap around for BGR_FULL - if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !link->replicatedLastBlock ) ) { - // Found incomplete one - replicationIndex = i; + atomic_thread_fence( memory_order_acquire ); + int replicationIndex = -1; + for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) { + const int i = j % ( mapBytes ); // Wrap around for BGR_FULL + if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff + && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) { + // Found incomplete one + replicationIndex = i; + break; + } + } + if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { + // Nothing left in current block, find next one + replicationIndex = findNextIncompleteHashBlock( uplink, endByte ); + } + if ( replicationIndex == -1 ) { + // Replication might be complete, uplink_mainloop should take care.... + uplink->nextReplicationIndex = -1; break; } + const uint64_t handle = ++uplink->queueId; + const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; + uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); + // Extend the default 32k request size if _minRequestSize is > 32k + for ( size_t extra = 1; extra < ( _minRequestSize / FILE_BYTES_PER_MAP_BYTE ) + && offset + size < image->virtualFilesize + && _backgroundReplication == BGR_FULL; ++extra ) { + if ( atomic_load_explicit( &cache->map[replicationIndex+1], memory_order_relaxed ) == 0xff ) + break; // Hit complete 32k block, stop here + replicationIndex++; + size += (uint32_t)MIN( image->virtualFilesize - offset - size, FILE_BYTES_PER_MAP_BYTE ); + } + if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) { + logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)", + PIMG(uplink->image) ); + ref_put( &cache->reference ); + return false; + } + if ( replicationIndex == lastBlockIndex ) { + uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks + } + uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter + if ( _backgroundReplication == BGR_HASHBLOCK + && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { + // Just crossed a hash block boundary, look for new candidate starting at this very index + uplink->nextReplicationIndex = findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex ); + if ( uplink->nextReplicationIndex == -1 ) + break; + } } - mutex_unlock( &image->lock ); - if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { - // Nothing left in current block, find next one - replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte ); - } - if ( replicationIndex == -1 ) { - // Replication might be complete, uplink_mainloop should take care.... - link->nextReplicationIndex = -1; - return; - } - const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; - link->replicationHandle = offset; - const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); - mutex_lock( &link->sendMutex ); - bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ); - mutex_unlock( &link->sendMutex ); - if ( !sendOk ) { - logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); - return; - } - if ( replicationIndex == lastBlockIndex ) { - link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks - } - link->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter - if ( _backgroundReplication == BGR_HASHBLOCK - && link->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { - // Just crossed a hash block boundary, look for new candidate starting at this very index - link->nextReplicationIndex = uplink_findNextIncompleteHashBlock( link, link->nextReplicationIndex ); - } + ref_put( &cache->reference ); + return true; } /** - * find next index into cache_map that corresponds to the beginning + * find next index into cache map that corresponds to the beginning * of a hash block which is neither completely empty nor completely * replicated yet. Returns -1 if no match. */ -static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex) +static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex) { int retval = -1; - mutex_lock( &link->image->lock ); - const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize ); - const uint8_t *cache_map = link->image->cache_map; - if ( cache_map != NULL ) { - int j; + dnbd3_cache_map_t *cache = ref_get_cachemap( uplink->image ); + if ( cache != NULL ) { + const int mapBytes = IMGSIZE_TO_MAPBYTES( uplink->image->virtualFilesize ); const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK ); + atomic_thread_fence( memory_order_acquire ); + int j; for (j = 0; j < mapBytes; ++j) { const int i = ( start + j ) % mapBytes; - const bool isFull = cache_map[i] == 0xff || ( i + 1 == mapBytes && link->replicatedLastBlock ); - const bool isEmpty = cache_map[i] == 0; + const uint8_t b = atomic_load_explicit( &cache->map[i], memory_order_relaxed ); + const bool isFull = b == 0xff || ( i + 1 == mapBytes && uplink->replicatedLastBlock ); + const bool isEmpty = b == 0; if ( !isEmpty && !isFull ) { // Neither full nor empty, replicate if ( retval == -1 ) { @@ -736,74 +968,97 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const in retval = -1; } } - mutex_unlock( &link->image->lock ); + ref_put( &cache->reference ); return retval; } /** * Receive data from uplink server and process/dispatch - * Locks on: link.lock, images[].lock + * Locks on: uplink.lock, images[].lock + * Only called from uplink thread, so current.fd is assumed to be valid. */ -static void uplink_handleReceive(dnbd3_connection_t *link) +static void handleReceive(dnbd3_uplink_t *uplink) { - dnbd3_reply_t inReply, outReply; - int ret, i; + dnbd3_reply_t inReply; + int ret; + assert_uplink_thread(); + assert( uplink->queueLen >= 0 ); for (;;) { - ret = dnbd3_read_reply( link->fd, &inReply, false ); - if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue; + ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); + if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; if ( ret == REPLY_AGAIN ) break; if ( unlikely( ret == REPLY_CLOSED ) ) { - logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path ); + logadd( LOG_INFO, "Uplink: Remote host hung up (%s:%d)", PIMG(uplink->image) ); goto error_cleanup; } if ( unlikely( ret == REPLY_WRONGMAGIC ) ) { - logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); + logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s:%d)", PIMG(uplink->image) ); goto error_cleanup; } if ( unlikely( ret != REPLY_OK ) ) { - logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path ); + logadd( LOG_INFO, "Uplink: Connection error %d (%s:%d)", ret, PIMG(uplink->image) ); goto error_cleanup; } if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { - logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path ); + logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s:%d", inReply.size, PIMG(uplink->image) ); goto error_cleanup; } - if ( unlikely( link->recvBufferLen < inReply.size ) ) { - link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); - link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); - if ( link->recvBuffer == NULL ) { + if ( unlikely( uplink->recvBufferLen < inReply.size ) ) { + uplink->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); + uplink->recvBuffer = realloc( uplink->recvBuffer, uplink->recvBufferLen ); + if ( uplink->recvBuffer == NULL ) { logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" ); exit( 1 ); } } - if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) { - logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path ); + if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) { + logadd( LOG_INFO, "Lost connection to uplink server of %s:%d (payload)", PIMG(uplink->image) ); goto error_cleanup; } // Payload read completely // Bail out if we're not interested - if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue; + if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) + continue; // Is a legit block reply - struct iovec iov[2]; - const uint64_t start = inReply.handle; - const uint64_t end = inReply.handle + inReply.size; totalBytesReceived += inReply.size; - link->bytesReceived += inReply.size; + uplink->bytesReceived += inReply.size; + // Get entry from queue + dnbd3_queue_entry_t *entry; + mutex_lock( &uplink->queueLock ); + for ( entry = uplink->queue; entry != NULL; entry = entry->next ) { + if ( entry->handle == inReply.handle ) + break; + } + if ( entry == NULL ) { + mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)", + inReply.handle, PIMG(uplink->image) ); + continue; + } + const uint64_t start = entry->from; + const uint64_t end = entry->to; + mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + // We don't remove the entry from the list here yet, to slightly increase the chance of other + // clients attaching to this request while we write the data to disk + if ( end - start != inReply.size ) { + logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)", + inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) ); + } // 1) Write to cache file - if ( unlikely( link->cacheFd == -1 ) ) { - uplink_reopenCacheFd( link, false ); + if ( unlikely( uplink->cacheFd == -1 ) ) { + reopenCacheFd( uplink, false ); } - if ( likely( link->cacheFd != -1 ) ) { + if ( likely( uplink->cacheFd != -1 ) ) { int err = 0; bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid uint32_t done = 0; ret = 0; while ( done < inReply.size ) { - ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done ); + ret = (int)pwrite( uplink->cacheFd, uplink->recvBuffer + done, inReply.size - done, start + done ); if ( unlikely( ret == -1 ) ) { err = errno; - if ( err == EINTR ) continue; + if ( err == EINTR && !_shutdown ) continue; if ( err == ENOSPC || err == EDQUOT ) { // try to free 256MiB if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break; @@ -811,150 +1066,135 @@ static void uplink_handleReceive(dnbd3_connection_t *link) continue; // Success, retry write } if ( err == EBADF || err == EINVAL || err == EIO ) { - if ( !tryAgain || !uplink_reopenCacheFd( link, true ) ) + uplink->image->problem.write = true; + if ( !tryAgain || !reopenCacheFd( uplink, true ) ) break; tryAgain = false; continue; // Write handle to image successfully re-opened, try again } - logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", link->image->name, (int)link->image->rid, err ); + logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", + PIMG(uplink->image), err ); break; } if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) { - logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, link->image->name, (int)link->image->rid ); + logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", + ret, PIMG(uplink->image) ); break; } done += (uint32_t)ret; } if ( likely( done > 0 ) ) { - image_updateCachemap( link->image, start, start + done, true ); + image_updateCachemap( uplink->image, start, start + done, true ); } if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", - link->image->name, (int)link->image->rid, err ); + PIMG(uplink->image), err ); } } - // 2) Figure out which clients are interested in it - mutex_lock( &link->queueLock ); - for (i = 0; i < link->queueLen; ++i) { - dnbd3_queued_request_t * const req = &link->queue[i]; - assert( req->status != ULR_PROCESSING ); - if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue; - assert( req->client != NULL ); - if ( req->from >= start && req->to <= end ) { // Match :-) - req->status = ULR_PROCESSING; + bool found = false; + dnbd3_queue_entry_t **it; + mutex_lock( &uplink->queueLock ); + for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) { + if ( *it == entry && entry->handle == inReply.handle ) { // ABA check + assert( found == false ); + *it = (**it).next; + found = true; + uplink->queueLen--; + break; } } - // 3) Send to interested clients - iterate backwards so request collaboration works, and - // so we can decrease queueLen on the fly while iterating. Should you ever change this to start - // from 0, you also need to change the "attach to existing request"-logic in uplink_request() - outReply.magic = dnbd3_packet_magic; - bool served = false; - for ( i = link->queueLen - 1; i >= 0; --i ) { - dnbd3_queued_request_t * const req = &link->queue[i]; - if ( req->status == ULR_PROCESSING ) { - size_t bytesSent = 0; - assert( req->from >= start && req->to <= end ); - dnbd3_client_t * const client = req->client; - outReply.cmd = CMD_GET_BLOCK; - outReply.handle = req->handle; - outReply.size = (uint32_t)( req->to - req->from ); - iov[0].iov_base = &outReply; - iov[0].iov_len = sizeof outReply; - iov[1].iov_base = link->recvBuffer + (req->from - start); - iov[1].iov_len = outReply.size; - fixup_reply( outReply ); - req->status = ULR_FREE; - req->client = NULL; - served = true; - mutex_lock( &client->sendMutex ); - mutex_unlock( &link->queueLock ); - if ( client->sock != -1 ) { - ssize_t sent = writev( client->sock, iov, 2 ); - if ( sent > (ssize_t)sizeof outReply ) { - bytesSent = (size_t)sent - sizeof outReply; - } - } - mutex_unlock( &client->sendMutex ); - if ( bytesSent != 0 ) { - client->bytesSent += bytesSent; - } - mutex_lock( &link->queueLock ); - } - if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; + if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = false; } - mutex_unlock( &link->queueLock ); -#ifdef _DEBUG - if ( !served && start != link->replicationHandle ) { - logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); + mutex_unlock( &uplink->queueLock ); + if ( !found ) { + logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)", + PIMG(uplink->image) ); + continue; } -#endif - if ( start == link->replicationHandle ) { - // Was our background replication - link->replicationHandle = REP_NONE; - // Try to remove from fs cache if no client was interested in this data - if ( !served && link->cacheFd != -1 ) { - posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); - } + dnbd3_queue_client_t *next; + for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { + assert( c->from >= start && c->to <= end ); + (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ), + (const char*)( uplink->recvBuffer + (c->from - start) ) ); + next = c->next; + free( c ); } - if ( served ) { + if ( entry->clients != NULL ) { // Was some client -- reset idle counter - link->idleTime = 0; + uplink->idleTime = 0; // Re-enable replication if disabled - if ( link->nextReplicationIndex == -1 ) { - link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; + if ( uplink->nextReplicationIndex == -1 ) { + uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; + } + } else { + if ( uplink->cacheFd != -1 ) { + // Try to remove from fs cache if no client was interested in this data + posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); } } + free( entry ); + } // main receive loop + // Trigger background replication if applicable + if ( !sendReplicationRequest( uplink ) ) { + goto error_cleanup; } - if ( link->replicationHandle == REP_NONE ) { - mutex_lock( &link->queueLock ); - const bool rep = ( link->queueLen == 0 ); - mutex_unlock( &link->queueLock ); - if ( rep ) uplink_sendReplicationRequest( link ); - } + // Normal end return; // Error handling from failed receive or message parsing - error_cleanup: ; - uplink_connectionFailed( link, true ); +error_cleanup: ; + connectionFailed( uplink, true ); } -static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) +/** + * Only call from uplink thread + */ +static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { - if ( link->fd == -1 ) + assert_uplink_thread(); + if ( uplink->current.fd == -1 ) return; - altservers_serverFailed( &link->currentServer ); - mutex_lock( &link->sendMutex ); - close( link->fd ); - link->fd = -1; - mutex_unlock( &link->sendMutex ); - link->replicationHandle = REP_NONE; - if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { - link->nextReplicationIndex = 0; + setThreadName( "panic-uplink" ); + altservers_serverFailed( uplink->current.index ); + mutex_lock( &uplink->sendMutex ); + uplink->image->problem.uplink = true; + close( uplink->current.fd ); + uplink->current.fd = -1; + mutex_unlock( &uplink->sendMutex ); + if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { + uplink->nextReplicationIndex = 0; } if ( !findNew ) return; - mutex_lock( &link->rttLock ); - bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1; - mutex_unlock( &link->rttLock ); + mutex_lock( &uplink->rttLock ); + bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1; + mutex_unlock( &uplink->rttLock ); if ( bail ) return; - altservers_findUplink( link ); + altservers_findUplinkAsync( uplink ); } /** - * Send keep alive request to server + * Send keep alive request to server. + * Called from uplink thread, current.fd must be valid. */ -static int uplink_sendKeepalive(const int fd) +static bool sendKeepalive(dnbd3_uplink_t *uplink) { - static dnbd3_request_t request = { 0 }; - if ( request.magic == 0 ) { - request.magic = dnbd3_packet_magic; - request.cmd = CMD_KEEPALIVE; - fixup_request( request ); - } - return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); + static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) }; + assert_uplink_thread(); + mutex_lock( &uplink->sendMutex ); + bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); + mutex_unlock( &uplink->sendMutex ); + return sendOk; } -static void uplink_addCrc32(dnbd3_connection_t *uplink) +/** + * Request crclist from uplink. + * Called from uplink thread, current.fd must be valid. + * FIXME This is broken as it could happen that another message arrives after sending + * the request. Refactor, split and move receive into general receive handler. + */ +static void requestCrc32List(dnbd3_uplink_t *uplink) { dnbd3_image_t *image = uplink->image; if ( image == NULL || image->virtualFilesize == 0 ) return; @@ -962,7 +1202,10 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ); + bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes ); + if ( !sendOk ) { + uplink->image->problem.uplink = true; + } mutex_unlock( &uplink->sendMutex ); if ( !sendOk || bytes == 0 ) { free( buffer ); @@ -972,7 +1215,7 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) lists_crc = crc32( lists_crc, (uint8_t*)buffer, bytes ); lists_crc = net_order_32( lists_crc ); if ( lists_crc != masterCrc ) { - logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s)!", uplink->image->name ); + logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s:%d)!", PIMG(uplink->image) ); free( buffer ); return; } @@ -982,10 +1225,14 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) char path[len]; snprintf( path, len, "%s.crc", uplink->image->path ); const int fd = open( path, O_WRONLY | O_CREAT, 0644 ); - if ( fd >= 0 ) { - write( fd, &masterCrc, sizeof(uint32_t) ); - write( fd, buffer, bytes ); + if ( fd != -1 ) { + ssize_t ret = write( fd, &masterCrc, sizeof(masterCrc) ); + ret += write( fd, buffer, bytes ); close( fd ); + if ( (size_t)ret != sizeof(masterCrc) + bytes ) { + unlink( path ); + logadd( LOG_WARNING, "Could not write crc32 file for %s:%d", PIMG(uplink->image) ); + } } } @@ -997,91 +1244,77 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) * it will be closed first. Otherwise, nothing will happen and true will be returned * immediately. */ -static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force) +static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force) { - if ( link->cacheFd != -1 ) { + if ( uplink->cacheFd != -1 ) { if ( !force ) return true; - close( link->cacheFd ); + close( uplink->cacheFd ); } - link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT, 0644 ); - return link->cacheFd != -1; + uplink->cacheFd = open( uplink->image->path, O_WRONLY | O_CREAT, 0644 ); + uplink->image->problem.write = uplink->cacheFd == -1; + return uplink->cacheFd != -1; } /** - * Saves the cache map of the given image. - * Return true on success. - * Locks on: imageListLock, image.lock + * Returns true if the uplink has been idle for some time (apart from + * background replication, if it is set to hashblock, or if it has + * a minimum number of active clients configured that is not currently + * reached) */ -static bool uplink_saveCacheMap(dnbd3_connection_t *link) +static bool connectionShouldShutdown(dnbd3_uplink_t *uplink) { - dnbd3_image_t *image = link->image; - assert( image != NULL ); - - if ( link->cacheFd != -1 ) { - if ( fsync( link->cacheFd ) == -1 ) { - // A failing fsync means we have no guarantee that any data - // since the last fsync (or open if none) has been saved. Apart - // from keeping the cache_map from the last successful fsync - // around and restoring it there isn't much we can do to recover - // a consistent state. Bail out. - logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); - logadd( LOG_ERROR, "Bailing out immediately" ); - exit( 1 ); - } - } - - if ( image->cache_map == NULL ) return true; - logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); - mutex_lock( &image->lock ); - // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to - // figure out that this image's cache copy is complete - if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) { - mutex_unlock( &image->lock ); - return true; - } - const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); - uint8_t *map = malloc( size ); - memcpy( map, image->cache_map, size ); - // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image, - // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O - mutex_unlock( &image->lock ); - assert( image->path != NULL ); - char mapfile[strlen( image->path ) + 4 + 1]; - strcpy( mapfile, image->path ); - strcat( mapfile, ".map" ); + return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT + && ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) ); +} - int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); - if ( fd == -1 ) { - const int err = errno; - free( map ); - logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); +bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) +{ + int current; + mutex_lock( &uplink->rttLock ); + current = uplink->current.fd == -1 ? -1 : uplink->current.index; + mutex_unlock( &uplink->rttLock ); + if ( current == -1 ) return false; - } + return altservers_toString( current, buffer, len ); +} - size_t done = 0; - while ( done < size ) { - const ssize_t ret = write( fd, map, size - done ); - if ( ret == -1 ) { - if ( errno == EINTR ) continue; - logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); - break; - } - if ( ret <= 0 ) { - logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); +/** + * Get number of replication requests that should be sent right now to + * meet the configured bgrWindowSize. Returns 0 if any client requests + * are pending. + * This applies a sort of "slow start" in case the uplink was recently + * dealing with actual client requests, in that the uplink's idle time + * (in seconds) is an upper bound for the number returned, so we don't + * saturate the uplink with loads of requests right away, in case that + * client triggers more requests to the uplink server. + */ +static int numWantedReplicationRequests(dnbd3_uplink_t *uplink) +{ + int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 ); + if ( uplink->queueLen == 0 ) + return ret; + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->clients == NULL ) { + ret--; + } else { + ret = 0; // Do not allow BGR if client requests are being handled break; } - done += (size_t)ret; - } - if ( fsync( fd ) == -1 ) { - logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); } - close( fd ); - free( map ); - return true; + mutex_unlock( &uplink->queueLock ); + return ret; } -static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link) +static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle) { - return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT && _backgroundReplication != BGR_FULL ); + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->handle == handle ) { + it->sent = false; + break; + } + } + mutex_unlock( &uplink->queueLock ); } |