From e4dec3562e6cab27e1a3f40165e4c0d9d0bf05c9 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 28 Jul 2020 17:49:17 +0200 Subject: [SERVER] Add FUSE mode Still needs some cleanup and optimizations, variable naming sucks, comments, etc. --- src/server/uplink.c | 146 ++++++++++++++++++++++++---------------------------- 1 file changed, 67 insertions(+), 79 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index bf6f32e..cb4f956 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -38,6 +38,7 @@ static bool connectionShouldShutdown(dnbd3_uplink_t *uplink); static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew); static int numWantedReplicationRequests(dnbd3_uplink_t *uplink); static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle); +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); static void *prefetchForClient(void *data); typedef struct { @@ -180,8 +181,7 @@ static void cancelAllRequests(dnbd3_uplink_t *uplink) while ( it != NULL ) { dnbd3_queue_client_t *cit = it->clients; while ( cit != NULL ) { - net_sendReply( cit->client, CMD_ERROR, cit->handle ); - cit->client->relayedCount--; + (*cit->callback)( cit->data, cit->handle, 0, 0, NULL ); dnbd3_queue_client_t *next = cit->next; free( cit ); cit = next; @@ -229,15 +229,13 @@ static void freeUplinkStruct(ref *ref) * Remove given client from uplink request queue * Locks on: uplink.queueLock */ -void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) +void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback) { - if ( client->relayedCount == 0 ) - return; mutex_lock( &uplink->queueLock ); for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) { - if ( (**cit).client == client ) { - --client->relayedCount; + if ( (**cit).data == data && (**cit).callback == callback ) { + (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL ); dnbd3_queue_client_t *entry = *cit; *cit = (**cit).next; free( entry ); @@ -247,63 +245,78 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) } } mutex_unlock( &uplink->queueLock ); - if ( unlikely( client->relayedCount != 0 ) ) { - logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); - int i; - for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { - usleep( 10000 ); - } - if ( client->relayedCount != 0 ) { - logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); - } - } } -/** - * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. - * If client is NULL, this is assumed to be a background replication request. - * Locks on: uplink.queueLock, uplink.sendMutex - */ -bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - bool getUplink = ( uplink == NULL ); - assert( client != NULL || uplink != NULL ); - if ( hops++ > 200 ) { // This is just silly + assert( client != NULL && callback != NULL ); + if ( hops > 200 ) { // This is just silly logadd( LOG_WARNING, "Refusing to relay a request that has > 200 hops" ); return false; } - if ( length > (uint32_t)_maxPayload ) { - logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); - return false; - } - if ( getUplink ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( client->image, -1, NULL, -1 ); uplink = ref_get_uplink( &client->image->uplinkref ); - if ( unlikely( uplink == NULL ) ) { - uplink_init( client->image, -1, NULL, -1 ); - uplink = ref_get_uplink( &client->image->uplinkref ); - if ( uplink == NULL ) { - logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); - return false; - } + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; } } - if ( uplink->shutdown ) { - logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); - goto fail_ref; - } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) + bool ret; if ( client != NULL && hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - goto fail_ref; + ret = false; + } else { + ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops ); + } + ref_put( &uplink->reference ); + return ret; +} + +bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length) +{ + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( image, -1, NULL, -1 ); + uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } + } + bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 ); + ref_put( &uplink->reference ); + return ret; +} + +/** + * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. + * If client is NULL, this is assumed to be a background replication request. + * Locks on: uplink.queueLock, uplink.sendMutex + */ +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +{ + assert( uplink != NULL ); + assert( data == NULL || callback != NULL ); + if ( uplink->shutdown ) { + logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); + return false; + } + if ( length > (uint32_t)_maxPayload ) { + logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length ); + return false; } struct { uint64_t handle, start, end; } req; + hops++; do { const uint64_t end = start + length; dnbd3_queue_entry_t *request = NULL, *last = NULL; @@ -350,14 +363,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han #endif request->hopCount = hops; request->sent = true; // Optimistic; would be set to false on failure - if ( client == NULL ) { + if ( callback == NULL ) { // BGR request->clients = NULL; } else { c = &request->clients; } isNew = true; - } else if ( client == NULL ) { + } else if ( callback == NULL ) { // Replication request that maches existing request. Do nothing isNew = false; } else { @@ -381,14 +394,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han req.handle = request->handle; req.start = request->from; req.end = request->to; - if ( client != NULL ) { + if ( callback != NULL ) { *c = malloc( sizeof( *request->clients ) ); (**c).next = NULL; (**c).handle = handle; (**c).from = start; (**c).to = end; - (**c).client = client; - client->relayedCount++; + (**c).data = data; + (**c).callback = callback; } mutex_unlock( &uplink->queueLock ); @@ -426,7 +439,7 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han } success_ref: - if ( client != NULL ) { + if ( callback != NULL ) { // Was from client -- potential prefetch // Same size as this request, but consider end of image... uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end, @@ -442,16 +455,9 @@ success_ref: threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" ); } } - if ( getUplink ) { - ref_put( &uplink->reference ); - } return true; fail_lock: mutex_unlock( &uplink->queueLock ); -fail_ref: - if ( getUplink ) { - ref_put( &uplink->reference ); - } return false; } @@ -461,7 +467,7 @@ static void *prefetchForClient(void *data) dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image ); if ( cache != NULL ) { if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) { - uplink_request( job->uplink, NULL, ++job->uplink->queueId, job->start, job->length, 0 ); + uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 ); } ref_put( &cache->reference ); } @@ -824,7 +830,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink) const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); const uint64_t handle = ++uplink->queueId; - if ( !uplink_request( uplink, NULL, handle, offset, size, 0 ) ) { + if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)", PIMG(uplink->image) ); ref_put( &cache->reference ); @@ -902,7 +908,7 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa */ static void handleReceive(dnbd3_uplink_t *uplink) { - dnbd3_reply_t inReply, outReply; + dnbd3_reply_t inReply; int ret; assert_uplink_thread(); for (;;) { @@ -967,7 +973,6 @@ static void handleReceive(dnbd3_uplink_t *uplink) logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)", inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) ); } - struct iovec iov[2]; // 1) Write to cache file if ( unlikely( uplink->cacheFd == -1 ) ) { reopenCacheFd( uplink, false ); @@ -1035,28 +1040,11 @@ static void handleReceive(dnbd3_uplink_t *uplink) PIMG(uplink->image) ); continue; } - outReply.magic = dnbd3_packet_magic; dnbd3_queue_client_t *next; for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { assert( c->from >= start && c->to <= end ); - dnbd3_client_t * const client = c->client; - outReply.cmd = CMD_GET_BLOCK; - outReply.handle = c->handle; - outReply.size = (uint32_t)( c->to - c->from ); - iov[0].iov_base = &outReply; - iov[0].iov_len = sizeof outReply; - iov[1].iov_base = uplink->recvBuffer + (c->from - start); - iov[1].iov_len = outReply.size; - fixup_reply( outReply ); - mutex_lock( &client->sendMutex ); - if ( client->sock != -1 ) { - ssize_t sent = writev( client->sock, iov, 2 ); - if ( sent > (ssize_t)sizeof outReply ) { - client->bytesSent += (size_t)sent - sizeof outReply; - } - } - mutex_unlock( &client->sendMutex ); - client->relayedCount--; + (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ), + (const char*)( uplink->recvBuffer + (c->from - start) ) ); next = c->next; free( c ); } -- cgit v1.2.3-55-g7522