diff options
Diffstat (limited to 'src/server/net.c')
-rw-r--r-- | src/server/net.c | 174 |
1 files changed, 91 insertions, 83 deletions
diff --git a/src/server/net.c b/src/server/net.c index aba4e7d..eb51d29 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -3,7 +3,7 @@ * * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> * - * This file may be licensed under the terms of of the + * This file may be licensed under the terms of the * GNU General Public License Version 2 (the ``GPL''). * * Software distributed under the License is distributed @@ -26,10 +26,10 @@ #include "altservers.h" #include "reference.h" -#include "../shared/sockhelper.h" -#include "../shared/timing.h" -#include "../shared/protocol.h" -#include "../serialize.h" +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/serialize.h> #include <assert.h> @@ -58,11 +58,12 @@ static atomic_uint_fast64_t totalBytesSent = 0; static bool addToList(dnbd3_client_t *client); static void removeFromList(dnbd3_client_t *client); static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client); +static void uplinkCallback(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer); static inline bool recv_request_header(int sock, dnbd3_request_t *request) { ssize_t ret, fails = 0; -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL sock = 0; #endif // Read request header from socket @@ -89,7 +90,7 @@ static inline bool recv_request_header(int sock, dnbd3_request_t *request) static inline bool recv_request_payload(int sock, uint32_t size, serialized_buffer_t *payload) { -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL sock = 0; #endif if ( size == 0 ) { @@ -113,7 +114,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff * Send reply with optional payload. payload can be null. The caller has to * acquire the sendMutex first. */ -static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) +static inline bool send_reply(int sock, dnbd3_reply_t *reply, const void *payload) { const uint32_t size = reply->size; fixup_reply( *reply ); @@ -159,7 +160,7 @@ void* net_handleNewConnection(void *clientPtr) // Await data from client. Since this is a fresh connection, we expect data right away sock_setTimeout( client->sock, _clientTimeout ); do { -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL const int ret = (int)recv( 0, &request, sizeof(request), MSG_WAITALL ); #else const int ret = (int)recv( client->sock, &request, sizeof(request), MSG_WAITALL ); @@ -197,6 +198,7 @@ void* net_handleNewConnection(void *clientPtr) client->hostName[HOSTNAMELEN-1] = '\0'; mutex_unlock( &client->lock ); client->bytesSent = 0; + client->relayedCount = 0; if ( !addToList( client ) ) { freeClientStruct( client ); @@ -207,6 +209,7 @@ void* net_handleNewConnection(void *clientPtr) dnbd3_reply_t reply; dnbd3_image_t *image = NULL; + dnbd3_cache_map_t *cache = NULL; int image_file = -1; int num; @@ -215,7 +218,6 @@ void* net_handleNewConnection(void *clientPtr) serialized_buffer_t payload; uint16_t rid, client_version; - uint64_t start, end; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -262,22 +264,24 @@ void* net_handleNewConnection(void *clientPtr) atomic_thread_fence( memory_order_release ); if ( unlikely( image == NULL ) ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( unlikely( !image->working ) ) { + } else if ( unlikely( image->problem.read || image->problem.changed ) ) { logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", client->hostName, image_name, (int)rid ); } else { // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; if ( image->ref_cacheMap != NULL ) { - dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); - if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) { + if ( image->problem.queue || image->problem.write ) { bOk = ( rand() % 4 ) == 1; } - if ( bOk && uplink != NULL && uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this - usleep( 100000 ); // server gets a penalty and is less likely to be selected - } - if ( uplink != NULL ) { - ref_put( &uplink->reference ); + if ( bOk ) { + if ( image->problem.write ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + if ( image->problem.uplink ) { + // Penaltize depending on completeness, if no uplink is available + usleep( ( 100 - image->completenessEstimate ) * 100 ); + } } } if ( bOk ) { @@ -286,6 +290,7 @@ void* net_handleNewConnection(void *clientPtr) if ( !client->isServer ) { // Only update immediately if this is a client. Servers are handled on disconnect. timing_get( &image->atime ); + image->accessed = true; } mutex_unlock( &image->lock ); serializer_reset_write( &payload ); @@ -313,9 +318,8 @@ void* net_handleNewConnection(void *clientPtr) // client handling mainloop while ( recv_request_header( client->sock, &request ) ) { if ( _shutdown ) break; - switch ( request.cmd ) { + if ( likely ( request.cmd == CMD_GET_BLOCK ) ) { - case CMD_GET_BLOCK:; const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking reply.handle = request.handle; if ( unlikely( offset >= image->virtualFilesize ) ) { @@ -324,7 +328,7 @@ void* net_handleNewConnection(void *clientPtr) reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); - break; + continue; } if ( unlikely( offset + request.size > image->virtualFilesize ) ) { // Sanity check @@ -332,63 +336,36 @@ void* net_handleNewConnection(void *clientPtr) reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); - break; + continue; } - dnbd3_cache_map_t *cache; - if ( request.size != 0 && ( cache = ref_get_cachemap( image ) ) != NULL ) { + if ( cache == NULL ) { + cache = ref_get_cachemap( image ); + } + + if ( request.size != 0 && cache != NULL ) { // This is a proxyed image, check if we need to relay the request... - start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - bool isCached = true; - const uint64_t firstByteInMap = start >> 15; - const uint64_t lastByteInMap = (end - 1) >> 15; - uint64_t pos; - uint8_t b; - atomic_thread_fence( memory_order_acquire ); - // Middle - quick checking - if ( isCached ) { - for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { - if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) { - isCached = false; - break; - } - } - } - // First byte - if ( isCached ) { - b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed ); - for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) { - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (b & bit_mask) == 0 ) { - isCached = false; - break; + const uint64_t start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint64_t end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + if ( !image_isRangeCachedUnsafe( cache, start, end ) ) { + if ( unlikely( client->relayedCount > 250 ) ) { + logadd( LOG_DEBUG1, "Client is overloading uplink; throttling" ); + for ( int i = 0; i < 100 && client->relayedCount > 200; ++i ) { + usleep( 10000 ); } - } - } - // Last byte - only check if request spans multiple bytes in cache map - if ( isCached && firstByteInMap != lastByteInMap ) { - b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed ); - for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) { - assert( lastByteInMap == (pos >> 15) ); - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (b & bit_mask) == 0 ) { - isCached = false; - break; + if ( client->relayedCount > 250 ) { + logadd( LOG_WARNING, "Could not lower client's uplink backlog; dropping client" ); + goto exit_client_cleanup; } } - } - ref_put( &cache->reference ); - if ( !isCached ) { - if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { - logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", + client->relayedCount++; + if ( !uplink_requestClient( client, &uplinkCallback, request.handle, offset, request.size, request.hops ) ) { + client->relayedCount--; + logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d", client->hostName, image->name, image->rid ); - image->working = false; goto exit_client_cleanup; } - break; // DONE, exit request.cmd switch + continue; // Reply arrives on uplink some time later, handle next request now } } @@ -419,7 +396,7 @@ void* net_handleNewConnection(void *clientPtr) // TODO: Should we consider EOPNOTSUPP on BSD for sendfile and fallback to read/write? // Linux would set EINVAL or ENOSYS instead, which it unfortunately also does for a couple of other failures :/ // read/write would kill performance anyways so a fallback would probably be of little use either way. -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL char buf[1000]; size_t cnt = realBytes - done; if ( cnt > 1000 ) { @@ -456,7 +433,7 @@ void* net_handleNewConnection(void *clientPtr) } if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) { logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid ); - image->working = false; + image->problem.read = true; } } goto exit_client_cleanup; @@ -473,7 +450,16 @@ void* net_handleNewConnection(void *clientPtr) if ( lock ) mutex_unlock( &client->sendMutex ); // Global per-client counter client->bytesSent += request.size; // Increase counter for statistics. - break; + continue; + } + // Any other command + // Release cache map every now and then, in case the image was replicated + // entirely. Will be re-grabbed on next CMD_GET_BLOCK otherwise. + if ( cache != NULL ) { + ref_put( &cache->reference ); + cache = NULL; + } + switch ( request.cmd ) { case CMD_GET_SERVERS: // Build list of known working alt servers @@ -522,9 +508,9 @@ set_name: ; logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd ); break; - } - } - } + } // end switch + } // end loop + } // end bOk exit_client_cleanup: ; // First remove from list, then add to counter to prevent race condition removeFromList( client ); @@ -533,8 +519,12 @@ exit_client_cleanup: ; if ( image != NULL && client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { mutex_lock( &image->lock ); timing_get( &image->atime ); + image->accessed = true; mutex_unlock( &image->lock ); } + if ( cache != NULL ) { + ref_put( &cache->reference ); + } freeClientStruct( client ); // This will also call image_release on client->image return NULL ; fail_preadd: ; @@ -695,9 +685,21 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) if ( client->image != NULL ) { dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); if ( uplink != NULL ) { - uplink_removeClient( uplink, client ); + if ( client->relayedCount != 0 ) { + uplink_removeEntry( uplink, client, &uplinkCallback ); + } ref_put( &uplink->reference ); } + if ( client->relayedCount != 0 ) { + logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); + int i; + for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { + usleep( 10000 ); + } + if ( client->relayedCount != 0 ) { + logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); + } + } } mutex_lock( &client->sendMutex ); if ( client->sock != -1 ) { @@ -739,15 +741,21 @@ static bool addToList(dnbd3_client_t *client) return true; } -void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle) +static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer) { - dnbd3_reply_t reply; - reply.magic = dnbd3_packet_magic; - reply.cmd = cmd; - reply.handle = handle; - reply.size = 0; + dnbd3_client_t *client = (dnbd3_client_t*)data; + dnbd3_reply_t reply = { + .magic = dnbd3_packet_magic, + .cmd = buffer == NULL ? CMD_ERROR : CMD_GET_BLOCK, + .handle = handle, + .size = length, + }; mutex_lock( &client->sendMutex ); - send_reply( client->sock, &reply, NULL ); + send_reply( client->sock, &reply, buffer ); + if ( buffer == NULL ) { + shutdown( client->sock, SHUT_RDWR ); + } + client->relayedCount--; mutex_unlock( &client->sendMutex ); } |