diff options
Diffstat (limited to 'src/server/net.c')
-rw-r--r-- | src/server/net.c | 266 |
1 files changed, 148 insertions, 118 deletions
diff --git a/src/server/net.c b/src/server/net.c index 9abe221..eb51d29 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -3,7 +3,7 @@ * * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> * - * This file may be licensed under the terms of of the + * This file may be licensed under the terms of the * GNU General Public License Version 2 (the ``GPL''). * * Software distributed under the License is distributed @@ -24,11 +24,12 @@ #include "locks.h" #include "rpc.h" #include "altservers.h" +#include "reference.h" -#include "../shared/sockhelper.h" -#include "../shared/timing.h" -#include "../shared/protocol.h" -#include "../serialize.h" +#include <dnbd3/shared/sockhelper.h> +#include <dnbd3/shared/timing.h> +#include <dnbd3/shared/protocol.h> +#include <dnbd3/shared/serialize.h> #include <assert.h> @@ -43,6 +44,7 @@ #include <jansson.h> #include <inttypes.h> #include <stdatomic.h> +#include <signal.h> static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS]; static int _num_clients = 0; @@ -56,11 +58,12 @@ static atomic_uint_fast64_t totalBytesSent = 0; static bool addToList(dnbd3_client_t *client); static void removeFromList(dnbd3_client_t *client); static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client); +static void uplinkCallback(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer); static inline bool recv_request_header(int sock, dnbd3_request_t *request) { ssize_t ret, fails = 0; -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL sock = 0; #endif // Read request header from socket @@ -87,7 +90,7 @@ static inline bool recv_request_header(int sock, dnbd3_request_t *request) static inline bool recv_request_payload(int sock, uint32_t size, serialized_buffer_t *payload) { -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL sock = 0; #endif if ( size == 0 ) { @@ -111,7 +114,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff * Send reply with optional payload. payload can be null. The caller has to * acquire the sendMutex first. */ -static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) +static inline bool send_reply(int sock, dnbd3_reply_t *reply, const void *payload) { const uint32_t size = reply->size; fixup_reply( *reply ); @@ -145,18 +148,19 @@ static inline bool sendPadding( const int fd, uint32_t bytes ) void net_init() { - mutex_init( &_clients_lock ); + mutex_init( &_clients_lock, LOCK_CLIENT_LIST ); } void* net_handleNewConnection(void *clientPtr) { dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr; dnbd3_request_t request; + client->thread = pthread_self(); // Await data from client. Since this is a fresh connection, we expect data right away sock_setTimeout( client->sock, _clientTimeout ); do { -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL const int ret = (int)recv( 0, &request, sizeof(request), MSG_WAITALL ); #else const int ret = (int)recv( client->sock, &request, sizeof(request), MSG_WAITALL ); @@ -186,14 +190,15 @@ void* net_handleNewConnection(void *clientPtr) } } while (0); // Fully init client struct - mutex_init( &client->lock ); - mutex_init( &client->sendMutex ); + mutex_init( &client->lock, LOCK_CLIENT ); + mutex_init( &client->sendMutex, LOCK_CLIENT_SEND ); mutex_lock( &client->lock ); host_to_string( &client->host, client->hostName, HOSTNAMELEN ); client->hostName[HOSTNAMELEN-1] = '\0'; mutex_unlock( &client->lock ); client->bytesSent = 0; + client->relayedCount = 0; if ( !addToList( client ) ) { freeClientStruct( client ); @@ -204,6 +209,7 @@ void* net_handleNewConnection(void *clientPtr) dnbd3_reply_t reply; dnbd3_image_t *image = NULL; + dnbd3_cache_map_t *cache = NULL; int image_file = -1; int num; @@ -212,7 +218,6 @@ void* net_handleNewConnection(void *clientPtr) serialized_buffer_t payload; uint16_t rid, client_version; - uint64_t start, end; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -229,7 +234,7 @@ void* net_handleNewConnection(void *clientPtr) rid = serializer_get_uint16( &payload ); const uint8_t flags = serializer_get_uint8( &payload ); client->isServer = ( flags & FLAGS8_SERVER ); - if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( unlikely( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) ) { if ( client_version < MIN_SUPPORTED_CLIENT ) { logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); } else { @@ -243,7 +248,7 @@ void* net_handleNewConnection(void *clientPtr) // We're a proxy, client is another proxy, we don't do BGR, but connecting proxy does... // Reject, as this would basically force this proxy to do BGR too. image = image_get( image_name, rid, true ); - if ( image != NULL && image->cache_map != NULL ) { + if ( image != NULL && image->ref_cacheMap != NULL ) { // Only exception is if the image is complete locally image = image_release( image ); } @@ -255,27 +260,28 @@ void* net_handleNewConnection(void *clientPtr) // No BGR mismatch, but don't lookup if image is unknown locally image = image_get( image_name, rid, true ); } - mutex_lock( &client->lock ); client->image = image; - mutex_unlock( &client->lock ); - if ( image == NULL ) { + atomic_thread_fence( memory_order_release ); + if ( unlikely( image == NULL ) ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( !image->working ) { + } else if ( unlikely( image->problem.read || image->problem.changed ) ) { logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", client->hostName, image_name, (int)rid ); } else { - bool penalty; // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; - if ( image->cache_map != NULL ) { - mutex_lock( &image->lock ); - if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + if ( image->ref_cacheMap != NULL ) { + if ( image->problem.queue || image->problem.write ) { bOk = ( rand() % 4 ) == 1; } - penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1; - mutex_unlock( &image->lock ); - if ( penalty ) { // Wait 100ms if local caching is not working so this - usleep( 100000 ); // server gets a penalty and is less likely to be selected + if ( bOk ) { + if ( image->problem.write ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + if ( image->problem.uplink ) { + // Penaltize depending on completeness, if no uplink is available + usleep( ( 100 - image->completenessEstimate ) * 100 ); + } } } if ( bOk ) { @@ -284,6 +290,7 @@ void* net_handleNewConnection(void *clientPtr) if ( !client->isServer ) { // Only update immediately if this is a client. Servers are handled on disconnect. timing_get( &image->atime ); + image->accessed = true; } mutex_unlock( &image->lock ); serializer_reset_write( &payload ); @@ -301,7 +308,7 @@ void* net_handleNewConnection(void *clientPtr) } } - if ( bOk ) { + if ( likely( bOk ) ) { // add artificial delay if applicable if ( client->isServer && _serverPenalty != 0 ) { usleep( _serverPenalty ); @@ -311,95 +318,62 @@ void* net_handleNewConnection(void *clientPtr) // client handling mainloop while ( recv_request_header( client->sock, &request ) ) { if ( _shutdown ) break; - switch ( request.cmd ) { + if ( likely ( request.cmd == CMD_GET_BLOCK ) ) { - case CMD_GET_BLOCK:; const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking - if ( offset >= image->virtualFilesize ) { + reply.handle = request.handle; + if ( unlikely( offset >= image->virtualFilesize ) ) { // Sanity check logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName ); reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); - break; + continue; } - if ( offset + request.size > image->virtualFilesize ) { + if ( unlikely( offset + request.size > image->virtualFilesize ) ) { // Sanity check logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName ); reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); - break; + continue; + } + + if ( cache == NULL ) { + cache = ref_get_cachemap( image ); } - if ( request.size != 0 && image->cache_map != NULL ) { + if ( request.size != 0 && cache != NULL ) { // This is a proxyed image, check if we need to relay the request... - start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - bool isCached = true; - mutex_lock( &image->lock ); - // Check again as we only aquired the lock just now - if ( image->cache_map != NULL ) { - const uint64_t firstByteInMap = start >> 15; - const uint64_t lastByteInMap = (end - 1) >> 15; - uint64_t pos; - // Middle - quick checking - if ( isCached ) { - pos = firstByteInMap + 1; - while ( pos < lastByteInMap ) { - if ( image->cache_map[pos] != 0xff ) { - isCached = false; - break; - } - ++pos; + const uint64_t start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint64_t end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + if ( !image_isRangeCachedUnsafe( cache, start, end ) ) { + if ( unlikely( client->relayedCount > 250 ) ) { + logadd( LOG_DEBUG1, "Client is overloading uplink; throttling" ); + for ( int i = 0; i < 100 && client->relayedCount > 200; ++i ) { + usleep( 10000 ); } - } - // First byte - if ( isCached ) { - pos = start; - do { - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) { - isCached = false; - break; - } - pos += DNBD3_BLOCK_SIZE; - } while ( firstByteInMap == (pos >> 15) && pos < end ); - } - // Last byte - only check if request spans multiple bytes in cache map - if ( isCached && firstByteInMap != lastByteInMap ) { - pos = lastByteInMap << 15; - while ( pos < end ) { - assert( lastByteInMap == (pos >> 15) ); - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = (uint8_t)( 1 << map_x ); - if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) { - isCached = false; - break; - } - pos += DNBD3_BLOCK_SIZE; + if ( client->relayedCount > 250 ) { + logadd( LOG_WARNING, "Could not lower client's uplink backlog; dropping client" ); + goto exit_client_cleanup; } } - } - mutex_unlock( &image->lock ); - if ( !isCached ) { - if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { - logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", + client->relayedCount++; + if ( !uplink_requestClient( client, &uplinkCallback, request.handle, offset, request.size, request.hops ) ) { + client->relayedCount--; + logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d", client->hostName, image->name, image->rid ); - image->working = false; goto exit_client_cleanup; } - break; // DONE, exit request.cmd switch + continue; // Reply arrives on uplink some time later, handle next request now } } reply.cmd = CMD_GET_BLOCK; reply.size = request.size; - reply.handle = request.handle; fixup_reply( reply ); - const bool lock = image->uplink != NULL; + const bool lock = image->uplinkref != NULL; if ( lock ) mutex_lock( &client->sendMutex ); // Send reply header if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { @@ -422,7 +396,7 @@ void* net_handleNewConnection(void *clientPtr) // TODO: Should we consider EOPNOTSUPP on BSD for sendfile and fallback to read/write? // Linux would set EINVAL or ENOSYS instead, which it unfortunately also does for a couple of other failures :/ // read/write would kill performance anyways so a fallback would probably be of little use either way. -#ifdef AFL_MODE +#ifdef DNBD3_SERVER_AFL char buf[1000]; size_t cnt = realBytes - done; if ( cnt > 1000 ) { @@ -459,7 +433,7 @@ void* net_handleNewConnection(void *clientPtr) } if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) { logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid ); - image->working = false; + image->problem.read = true; } } goto exit_client_cleanup; @@ -476,11 +450,20 @@ void* net_handleNewConnection(void *clientPtr) if ( lock ) mutex_unlock( &client->sendMutex ); // Global per-client counter client->bytesSent += request.size; // Increase counter for statistics. - break; + continue; + } + // Any other command + // Release cache map every now and then, in case the image was replicated + // entirely. Will be re-grabbed on next CMD_GET_BLOCK otherwise. + if ( cache != NULL ) { + ref_put( &cache->reference ); + cache = NULL; + } + switch ( request.cmd ) { case CMD_GET_SERVERS: // Build list of known working alt servers - num = altservers_getListForClient( &client->host, server_list, NUMBER_SERVERS ); + num = altservers_getListForClient( client, server_list, NUMBER_SERVERS ); reply.cmd = CMD_GET_SERVERS; reply.size = (uint32_t)( num * sizeof(dnbd3_server_entry_t) ); mutex_lock( &client->sendMutex ); @@ -525,24 +508,27 @@ set_name: ; logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd ); break; - } - } - } + } // end switch + } // end loop + } // end bOk exit_client_cleanup: ; // First remove from list, then add to counter to prevent race condition removeFromList( client ); totalBytesSent += client->bytesSent; // Access time, but only if client didn't just probe - if ( image != NULL ) { + if ( image != NULL && client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { mutex_lock( &image->lock ); - if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { - timing_get( &image->atime ); - } + timing_get( &image->atime ); + image->accessed = true; mutex_unlock( &image->lock ); } + if ( cache != NULL ) { + ref_put( &cache->reference ); + } freeClientStruct( client ); // This will also call image_release on client->image return NULL ; fail_preadd: ; + // This is before we even initialized any mutex close( client->sock ); free( client ); return NULL; @@ -609,6 +595,12 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) } bs += client->bytesSent; } + // Do this before unlocking the list, otherwise we might + // account for a client twice if it would disconnect after + // unlocking but before we add the count here. + if ( bytesSent != NULL ) { + *bytesSent = totalBytesSent + bs; + } mutex_unlock( &_clients_lock ); if ( clientCount != NULL ) { *clientCount = cc; @@ -616,9 +608,6 @@ void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) if ( serverCount != NULL ) { *serverCount = sc; } - if ( bytesSent != NULL ) { - *bytesSent = totalBytesSent + bs; - } } void net_disconnectAll() @@ -626,11 +615,10 @@ void net_disconnectAll() int i; mutex_lock( &_clients_lock ); for (i = 0; i < _num_clients; ++i) { - if ( _clients[i] == NULL ) continue; - dnbd3_client_t * const client = _clients[i]; - mutex_lock( &client->lock ); - if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR ); - mutex_unlock( &client->lock ); + if ( _clients[i] == NULL ) + continue; + shutdown( _clients[i]->sock, SHUT_RDWR ); + pthread_kill( _clients[i]->thread, SIGINT ); } mutex_unlock( &_clients_lock ); } @@ -668,11 +656,19 @@ static void removeFromList(dnbd3_client_t *client) { int i; mutex_lock( &_clients_lock ); - for ( i = _num_clients - 1; i >= 0; --i ) { - if ( _clients[i] == client ) { - _clients[i] = NULL; + if ( _num_clients != 0 ) { + for ( i = _num_clients - 1; i >= 0; --i ) { + if ( _clients[i] == client ) { + _clients[i] = NULL; + break; + } + } + if ( i != 0 && i + 1 == _num_clients ) { + do { + i--; + } while ( _clients[i] == NULL && i > 0 ); + _num_clients = i + 1; } - if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; } mutex_unlock( &_clients_lock ); } @@ -686,17 +682,33 @@ static void removeFromList(dnbd3_client_t *client) static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) { mutex_lock( &client->lock ); + if ( client->image != NULL ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink != NULL ) { + if ( client->relayedCount != 0 ) { + uplink_removeEntry( uplink, client, &uplinkCallback ); + } + ref_put( &uplink->reference ); + } + if ( client->relayedCount != 0 ) { + logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); + int i; + for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { + usleep( 10000 ); + } + if ( client->relayedCount != 0 ) { + logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); + } + } + } mutex_lock( &client->sendMutex ); - if ( client->sock != -1 ) close( client->sock ); + if ( client->sock != -1 ) { + close( client->sock ); + } client->sock = -1; mutex_unlock( &client->sendMutex ); - if ( client->image != NULL ) { - mutex_lock( &client->image->lock ); - if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); - mutex_unlock( &client->image->lock ); - client->image = image_release( client->image ); - } mutex_unlock( &client->lock ); + client->image = image_release( client->image ); mutex_destroy( &client->lock ); mutex_destroy( &client->sendMutex ); free( client ); @@ -729,3 +741,21 @@ static bool addToList(dnbd3_client_t *client) return true; } +static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer) +{ + dnbd3_client_t *client = (dnbd3_client_t*)data; + dnbd3_reply_t reply = { + .magic = dnbd3_packet_magic, + .cmd = buffer == NULL ? CMD_ERROR : CMD_GET_BLOCK, + .handle = handle, + .size = length, + }; + mutex_lock( &client->sendMutex ); + send_reply( client->sock, &reply, buffer ); + if ( buffer == NULL ) { + shutdown( client->sock, SHUT_RDWR ); + } + client->relayedCount--; + mutex_unlock( &client->sendMutex ); +} + |