summaryrefslogtreecommitdiffstats
path: root/src/server/net.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/net.c')
-rw-r--r--src/server/net.c174
1 files changed, 91 insertions, 83 deletions
diff --git a/src/server/net.c b/src/server/net.c
index aba4e7d..eb51d29 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -3,7 +3,7 @@
*
* Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de>
*
- * This file may be licensed under the terms of of the
+ * This file may be licensed under the terms of the
* GNU General Public License Version 2 (the ``GPL'').
*
* Software distributed under the License is distributed
@@ -26,10 +26,10 @@
#include "altservers.h"
#include "reference.h"
-#include "../shared/sockhelper.h"
-#include "../shared/timing.h"
-#include "../shared/protocol.h"
-#include "../serialize.h"
+#include <dnbd3/shared/sockhelper.h>
+#include <dnbd3/shared/timing.h>
+#include <dnbd3/shared/protocol.h>
+#include <dnbd3/shared/serialize.h>
#include <assert.h>
@@ -58,11 +58,12 @@ static atomic_uint_fast64_t totalBytesSent = 0;
static bool addToList(dnbd3_client_t *client);
static void removeFromList(dnbd3_client_t *client);
static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client);
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer);
static inline bool recv_request_header(int sock, dnbd3_request_t *request)
{
ssize_t ret, fails = 0;
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
sock = 0;
#endif
// Read request header from socket
@@ -89,7 +90,7 @@ static inline bool recv_request_header(int sock, dnbd3_request_t *request)
static inline bool recv_request_payload(int sock, uint32_t size, serialized_buffer_t *payload)
{
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
sock = 0;
#endif
if ( size == 0 ) {
@@ -113,7 +114,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
* Send reply with optional payload. payload can be null. The caller has to
* acquire the sendMutex first.
*/
-static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
+static inline bool send_reply(int sock, dnbd3_reply_t *reply, const void *payload)
{
const uint32_t size = reply->size;
fixup_reply( *reply );
@@ -159,7 +160,7 @@ void* net_handleNewConnection(void *clientPtr)
// Await data from client. Since this is a fresh connection, we expect data right away
sock_setTimeout( client->sock, _clientTimeout );
do {
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
const int ret = (int)recv( 0, &request, sizeof(request), MSG_WAITALL );
#else
const int ret = (int)recv( client->sock, &request, sizeof(request), MSG_WAITALL );
@@ -197,6 +198,7 @@ void* net_handleNewConnection(void *clientPtr)
client->hostName[HOSTNAMELEN-1] = '\0';
mutex_unlock( &client->lock );
client->bytesSent = 0;
+ client->relayedCount = 0;
if ( !addToList( client ) ) {
freeClientStruct( client );
@@ -207,6 +209,7 @@ void* net_handleNewConnection(void *clientPtr)
dnbd3_reply_t reply;
dnbd3_image_t *image = NULL;
+ dnbd3_cache_map_t *cache = NULL;
int image_file = -1;
int num;
@@ -215,7 +218,6 @@ void* net_handleNewConnection(void *clientPtr)
serialized_buffer_t payload;
uint16_t rid, client_version;
- uint64_t start, end;
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -262,22 +264,24 @@ void* net_handleNewConnection(void *clientPtr)
atomic_thread_fence( memory_order_release );
if ( unlikely( image == NULL ) ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( unlikely( !image->working ) ) {
+ } else if ( unlikely( image->problem.read || image->problem.changed ) ) {
logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
client->hostName, image_name, (int)rid );
} else {
// Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
bOk = true;
if ( image->ref_cacheMap != NULL ) {
- dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
- if ( uplink != NULL && ( uplink->cacheFd == -1 || uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) ) {
+ if ( image->problem.queue || image->problem.write ) {
bOk = ( rand() % 4 ) == 1;
}
- if ( bOk && uplink != NULL && uplink->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
- usleep( 100000 ); // server gets a penalty and is less likely to be selected
- }
- if ( uplink != NULL ) {
- ref_put( &uplink->reference );
+ if ( bOk ) {
+ if ( image->problem.write ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ if ( image->problem.uplink ) {
+ // Penaltize depending on completeness, if no uplink is available
+ usleep( ( 100 - image->completenessEstimate ) * 100 );
+ }
}
}
if ( bOk ) {
@@ -286,6 +290,7 @@ void* net_handleNewConnection(void *clientPtr)
if ( !client->isServer ) {
// Only update immediately if this is a client. Servers are handled on disconnect.
timing_get( &image->atime );
+ image->accessed = true;
}
mutex_unlock( &image->lock );
serializer_reset_write( &payload );
@@ -313,9 +318,8 @@ void* net_handleNewConnection(void *clientPtr)
// client handling mainloop
while ( recv_request_header( client->sock, &request ) ) {
if ( _shutdown ) break;
- switch ( request.cmd ) {
+ if ( likely ( request.cmd == CMD_GET_BLOCK ) ) {
- case CMD_GET_BLOCK:;
const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking
reply.handle = request.handle;
if ( unlikely( offset >= image->virtualFilesize ) ) {
@@ -324,7 +328,7 @@ void* net_handleNewConnection(void *clientPtr)
reply.size = 0;
reply.cmd = CMD_ERROR;
send_reply( client->sock, &reply, NULL );
- break;
+ continue;
}
if ( unlikely( offset + request.size > image->virtualFilesize ) ) {
// Sanity check
@@ -332,63 +336,36 @@ void* net_handleNewConnection(void *clientPtr)
reply.size = 0;
reply.cmd = CMD_ERROR;
send_reply( client->sock, &reply, NULL );
- break;
+ continue;
}
- dnbd3_cache_map_t *cache;
- if ( request.size != 0 && ( cache = ref_get_cachemap( image ) ) != NULL ) {
+ if ( cache == NULL ) {
+ cache = ref_get_cachemap( image );
+ }
+
+ if ( request.size != 0 && cache != NULL ) {
// This is a proxyed image, check if we need to relay the request...
- start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- bool isCached = true;
- const uint64_t firstByteInMap = start >> 15;
- const uint64_t lastByteInMap = (end - 1) >> 15;
- uint64_t pos;
- uint8_t b;
- atomic_thread_fence( memory_order_acquire );
- // Middle - quick checking
- if ( isCached ) {
- for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) {
- if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) {
- isCached = false;
- break;
- }
- }
- }
- // First byte
- if ( isCached ) {
- b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed );
- for ( pos = start; firstByteInMap == (pos >> 15) && pos < end; pos += DNBD3_BLOCK_SIZE ) {
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = (uint8_t)( 1 << map_x );
- if ( (b & bit_mask) == 0 ) {
- isCached = false;
- break;
+ const uint64_t start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint64_t end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ if ( !image_isRangeCachedUnsafe( cache, start, end ) ) {
+ if ( unlikely( client->relayedCount > 250 ) ) {
+ logadd( LOG_DEBUG1, "Client is overloading uplink; throttling" );
+ for ( int i = 0; i < 100 && client->relayedCount > 200; ++i ) {
+ usleep( 10000 );
}
- }
- }
- // Last byte - only check if request spans multiple bytes in cache map
- if ( isCached && firstByteInMap != lastByteInMap ) {
- b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed );
- for ( pos = lastByteInMap << 15; pos < end; pos += DNBD3_BLOCK_SIZE ) {
- assert( lastByteInMap == (pos >> 15) );
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = (uint8_t)( 1 << map_x );
- if ( (b & bit_mask) == 0 ) {
- isCached = false;
- break;
+ if ( client->relayedCount > 250 ) {
+ logadd( LOG_WARNING, "Could not lower client's uplink backlog; dropping client" );
+ goto exit_client_cleanup;
}
}
- }
- ref_put( &cache->reference );
- if ( !isCached ) {
- if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d",
+ client->relayedCount++;
+ if ( !uplink_requestClient( client, &uplinkCallback, request.handle, offset, request.size, request.hops ) ) {
+ client->relayedCount--;
+ logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d",
client->hostName, image->name, image->rid );
- image->working = false;
goto exit_client_cleanup;
}
- break; // DONE, exit request.cmd switch
+ continue; // Reply arrives on uplink some time later, handle next request now
}
}
@@ -419,7 +396,7 @@ void* net_handleNewConnection(void *clientPtr)
// TODO: Should we consider EOPNOTSUPP on BSD for sendfile and fallback to read/write?
// Linux would set EINVAL or ENOSYS instead, which it unfortunately also does for a couple of other failures :/
// read/write would kill performance anyways so a fallback would probably be of little use either way.
-#ifdef AFL_MODE
+#ifdef DNBD3_SERVER_AFL
char buf[1000];
size_t cnt = realBytes - done;
if ( cnt > 1000 ) {
@@ -456,7 +433,7 @@ void* net_handleNewConnection(void *clientPtr)
}
if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) {
logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid );
- image->working = false;
+ image->problem.read = true;
}
}
goto exit_client_cleanup;
@@ -473,7 +450,16 @@ void* net_handleNewConnection(void *clientPtr)
if ( lock ) mutex_unlock( &client->sendMutex );
// Global per-client counter
client->bytesSent += request.size; // Increase counter for statistics.
- break;
+ continue;
+ }
+ // Any other command
+ // Release cache map every now and then, in case the image was replicated
+ // entirely. Will be re-grabbed on next CMD_GET_BLOCK otherwise.
+ if ( cache != NULL ) {
+ ref_put( &cache->reference );
+ cache = NULL;
+ }
+ switch ( request.cmd ) {
case CMD_GET_SERVERS:
// Build list of known working alt servers
@@ -522,9 +508,9 @@ set_name: ;
logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd );
break;
- }
- }
- }
+ } // end switch
+ } // end loop
+ } // end bOk
exit_client_cleanup: ;
// First remove from list, then add to counter to prevent race condition
removeFromList( client );
@@ -533,8 +519,12 @@ exit_client_cleanup: ;
if ( image != NULL && client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) {
mutex_lock( &image->lock );
timing_get( &image->atime );
+ image->accessed = true;
mutex_unlock( &image->lock );
}
+ if ( cache != NULL ) {
+ ref_put( &cache->reference );
+ }
freeClientStruct( client ); // This will also call image_release on client->image
return NULL ;
fail_preadd: ;
@@ -695,9 +685,21 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client)
if ( client->image != NULL ) {
dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
if ( uplink != NULL ) {
- uplink_removeClient( uplink, client );
+ if ( client->relayedCount != 0 ) {
+ uplink_removeEntry( uplink, client, &uplinkCallback );
+ }
ref_put( &uplink->reference );
}
+ if ( client->relayedCount != 0 ) {
+ logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount );
+ int i;
+ for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) {
+ usleep( 10000 );
+ }
+ if ( client->relayedCount != 0 ) {
+ logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount );
+ }
+ }
}
mutex_lock( &client->sendMutex );
if ( client->sock != -1 ) {
@@ -739,15 +741,21 @@ static bool addToList(dnbd3_client_t *client)
return true;
}
-void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle)
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer)
{
- dnbd3_reply_t reply;
- reply.magic = dnbd3_packet_magic;
- reply.cmd = cmd;
- reply.handle = handle;
- reply.size = 0;
+ dnbd3_client_t *client = (dnbd3_client_t*)data;
+ dnbd3_reply_t reply = {
+ .magic = dnbd3_packet_magic,
+ .cmd = buffer == NULL ? CMD_ERROR : CMD_GET_BLOCK,
+ .handle = handle,
+ .size = length,
+ };
mutex_lock( &client->sendMutex );
- send_reply( client->sock, &reply, NULL );
+ send_reply( client->sock, &reply, buffer );
+ if ( buffer == NULL ) {
+ shutdown( client->sock, SHUT_RDWR );
+ }
+ client->relayedCount--;
mutex_unlock( &client->sendMutex );
}