summaryrefslogtreecommitdiffstats
path: root/src/server/net.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/net.c')
-rw-r--r--src/server/net.c192
1 files changed, 119 insertions, 73 deletions
diff --git a/src/server/net.c b/src/server/net.c
index ad228f4..a467620 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -85,7 +85,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" );
return false;
}
- if ( recv( sock, payload->buffer, size, MSG_WAITALL ) != size ) {
+ if ( sock_recv( sock, payload->buffer, size ) != size ) {
logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size );
return false;
}
@@ -94,34 +94,35 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
return true;
}
+/**
+ * Send reply with optional payload. payload can be null. The caller has to
+ * acquire the sendMutex first.
+ */
static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
{
- const unsigned int size = reply->size;
+ const size_t size = reply->size;
fixup_reply( *reply );
- if ( !payload || size == 0 ) {
- if ( send( sock, reply, sizeof(dnbd3_reply_t), 0 ) != sizeof(dnbd3_reply_t) ) {
- logadd( LOG_DEBUG1, "Send failed (header-only)\n" );
- return false;
- }
- } else {
- struct iovec iov[2];
- iov[0].iov_base = reply;
- iov[0].iov_len = sizeof(dnbd3_reply_t);
- iov[1].iov_base = payload;
- iov[1].iov_len = (size_t)size;
- if ( (size_t)writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) {
- logadd( LOG_DEBUG1, "Send failed (reply with payload of %u bytes)\n", size );
+ if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) {
+ logadd( LOG_DEBUG1, "Sending reply header to client failed" );
+ return false;
+ }
+ if ( size != 0 && payload != NULL ) {
+ if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) {
+ logadd( LOG_DEBUG1, "Sending payload of %u bytes to client failed", size );
return false;
}
}
return true;
}
+/**
+ * Send given amount of null bytes. The caller has to acquire the sendMutex first.
+ */
static inline bool sendPadding( const int fd, uint32_t bytes )
{
ssize_t ret;
while ( bytes >= sizeof(nullbytes) ) {
- ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 1 );
+ ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 );
if ( ret <= 0 )
return false;
bytes -= (uint32_t)ret;
@@ -131,8 +132,9 @@ static inline bool sendPadding( const int fd, uint32_t bytes )
uint64_t net_getTotalBytesSent()
{
+ // reads and writes to 64bit ints are not atomic on x86, so let's be safe and use locking
spin_lock( &statisticsSentLock );
- uint64_t tmp = totalBytesSent;
+ const uint64_t tmp = totalBytesSent;
spin_unlock( &statisticsSentLock );
return tmp;
}
@@ -156,10 +158,10 @@ void *net_client_handler(void *dnbd3_client)
bool hasName = false;
serialized_buffer_t payload;
- char *image_name;
uint16_t rid, client_version;
uint64_t start, end;
- char buffer[100];
+ uint32_t tempBytesSent = 0;
+ char hostPrintable[100];
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -174,25 +176,38 @@ void *net_client_handler(void *dnbd3_client)
if ( recv_request_header( client->sock, &request ) ) {
if ( request.cmd != CMD_SELECT_IMAGE ) {
logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd );
- } else {
- if ( recv_request_payload( client->sock, request.size, &payload ) ) {
- client_version = serializer_get_uint16( &payload );
- image_name = serializer_get_string( &payload );
- rid = serializer_get_uint16( &payload );
- client->isServer = serializer_get_uint8( &payload );
- if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
- if ( client_version < MIN_SUPPORTED_CLIENT ) {
- logadd( LOG_DEBUG1, "Client too old\n" );
- } else {
- logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
- }
+ } else if ( recv_request_payload( client->sock, request.size, &payload ) ) {
+ char *image_name;
+ client_version = serializer_get_uint16( &payload );
+ image_name = serializer_get_string( &payload );
+ rid = serializer_get_uint16( &payload );
+ client->isServer = serializer_get_uint8( &payload );
+ if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
+ if ( client_version < MIN_SUPPORTED_CLIENT ) {
+ logadd( LOG_DEBUG1, "Client too old\n" );
} else {
- client->image = image = image_getOrLoad( image_name, rid );
- if ( image == NULL ) {
- //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( !image->working ) {
- logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else {
+ logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
+ }
+ } else {
+ client->image = image = image_getOrLoad( image_name, rid );
+ if ( image == NULL ) {
+ //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else if ( !image->working ) {
+ logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else {
+ // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
+ bOk = true;
+ if ( image->cache_map != NULL ) {
+ spin_lock( &image->lock );
+ if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ bOk = ( rand() % 4 ) == 1;
+ }
+ spin_unlock( &image->lock );
+ if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ }
+ if ( bOk ) {
image_file = image->readFd;
serializer_reset_write( &payload );
serializer_put_uint16( &payload, PROTOCOL_VERSION );
@@ -201,9 +216,8 @@ void *net_client_handler(void *dnbd3_client)
serializer_put_uint64( &payload, image->virtualFilesize );
reply.cmd = CMD_SELECT_IMAGE;
reply.size = serializer_get_written_length( &payload );
- if ( send_reply( client->sock, &reply, &payload ) ) {
- if ( !client->isServer ) image->atime = time( NULL );
- bOk = true;
+ if ( !send_reply( client->sock, &reply, &payload ) ) {
+ bOk = false;
}
}
}
@@ -211,6 +225,11 @@ void *net_client_handler(void *dnbd3_client)
}
} else if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) {
rpc_sendStatsJson( client->sock );
+ } else {
+ // Unknown request
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable );
+ }
}
if ( bOk ) {
@@ -251,23 +270,13 @@ void *net_client_handler(void *dnbd3_client)
spin_lock( &image->lock );
// Check again as we only aquired the lock just now
if ( image->cache_map != NULL ) {
- const uint64_t firstByte = start >> 15;
- const uint64_t lastByte = (end - 1) >> 15;
- // First byte
- uint64_t pos = start;
- do {
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[firstByte] & bit_mask) == 0 ) {
- isCached = false;
- break;
- }
- pos += DNBD3_BLOCK_SIZE;
- } while ( firstByte == (pos >> 15) && pos < end );
+ const uint64_t firstByteInMap = start >> 15;
+ const uint64_t lastByteInMap = (end - 1) >> 15;
+ uint64_t pos;
// Middle - quick checking
if ( isCached ) {
- pos = firstByte + 1;
- while ( pos < lastByte ) {
+ pos = firstByteInMap + 1;
+ while ( pos < lastByteInMap ) {
if ( image->cache_map[pos] != 0xff ) {
isCached = false;
break;
@@ -275,14 +284,27 @@ void *net_client_handler(void *dnbd3_client)
++pos;
}
}
- // Last byte
- if ( isCached && firstByte != lastByte ) {
- pos = lastByte << 15;
+ // First byte
+ if ( isCached ) {
+ pos = start;
+ do {
+ const int map_x = (pos >> 12) & 7; // mod 8
+ const uint8_t bit_mask = 1 << map_x;
+ if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) {
+ isCached = false;
+ break;
+ }
+ pos += DNBD3_BLOCK_SIZE;
+ } while ( firstByteInMap == (pos >> 15) && pos < end );
+ }
+ // Last byte - only check if request spans multiple bytes in cache map
+ if ( isCached && firstByteInMap != lastByteInMap ) {
+ pos = lastByteInMap << 15;
while ( pos < end ) {
- assert( lastByte == (pos >> 15) );
+ assert( lastByteInMap == (pos >> 15) );
const int map_x = (pos >> 12) & 7; // mod 8
const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[lastByte] & bit_mask) == 0 ) {
+ if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) {
isCached = false;
break;
}
@@ -293,7 +315,9 @@ void *net_client_handler(void *dnbd3_client)
spin_unlock( &image->lock );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, request.offset, request.size ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request to upstream proxy\n" );
+ logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d",
+ image->lower_name, (int)image->rid );
+ image->working = false;
goto exit_client_cleanup;
}
break; // DONE
@@ -329,10 +353,13 @@ void *net_client_handler(void *dnbd3_client)
if ( ret <= 0 ) {
const int err = errno;
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
- if ( ret < 0 && err != EPIPE && err != ECONNRESET )
- logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
- (int)done, (int)realBytes, err );
- if ( err == EBADF || err == EINVAL || err == EIO ) image->working = false;
+ if ( ret == -1 ) {
+ if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) {
+ logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
+ (int)done, (int)realBytes, err );
+ }
+ if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false;
+ }
goto exit_client_cleanup;
}
done += ret;
@@ -343,9 +370,20 @@ void *net_client_handler(void *dnbd3_client)
goto exit_client_cleanup;
}
}
- client->bytesSent += request.size; // Increase counter for statistics.
}
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ // Global per-client counter
+ spin_lock( &client->lock );
+ client->bytesSent += request.size; // Increase counter for statistics.
+ spin_unlock( &client->lock );
+ // Local counter that gets added to the global total bytes sent counter periodically
+ tempBytesSent += request.size;
+ if (tempBytesSent > 1024 * 1024 ) {
+ spin_lock( &statisticsSentLock );
+ totalBytesSent += tempBytesSent;
+ spin_unlock( &statisticsSentLock );
+ tempBytesSent = 0;
+ }
break;
case CMD_GET_SERVERS:
@@ -353,19 +391,25 @@ void *net_client_handler(void *dnbd3_client)
num = altservers_getMatching( &client->host, server_list, NUMBER_SERVERS );
reply.cmd = CMD_GET_SERVERS;
reply.size = num * sizeof(dnbd3_server_entry_t);
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, server_list );
- client->isServer = false; // Only clients request list of servers
+ pthread_mutex_unlock( &client->sendMutex );
goto set_name;
break;
case CMD_KEEPALIVE:
reply.cmd = CMD_KEEPALIVE;
reply.size = 0;
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
+ pthread_mutex_unlock( &client->sendMutex );
+ image->atime = time( NULL );
set_name: ;
- if ( !hasName && host_to_string( &client->host, buffer, sizeof buffer ) ) {
+ if ( !hasName ) {
hasName = true;
- setThreadName( buffer );
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ setThreadName( hostPrintable );
+ }
}
break;
@@ -376,19 +420,21 @@ set_name: ;
case CMD_GET_CRC32:
reply.cmd = CMD_GET_CRC32;
+ pthread_mutex_lock( &client->sendMutex );
if ( image->crc32 == NULL ) {
reply.size = 0;
send_reply( client->sock, &reply, NULL );
} else {
const int size = reply.size = (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t);
send_reply( client->sock, &reply, NULL );
- send( client->sock, &image->masterCrc32, sizeof(uint32_t), 0 );
+ send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE );
send( client->sock, image->crc32, size - sizeof(uint32_t), 0 );
}
+ pthread_mutex_unlock( &client->sendMutex );
break;
default:
- logadd( LOG_ERROR, "Unknown command: %d", (int)request.cmd );
+ logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd );
break;
}
@@ -397,9 +443,9 @@ set_name: ;
exit_client_cleanup: ;
dnbd3_removeClient( client );
spin_lock( &statisticsSentLock );
- totalBytesSent += client->bytesSent;// Add the amount of bytes sent by the client to the statistics of the server.
+ totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server.
spin_unlock( &statisticsSentLock );
- client = dnbd3_freeClient( client );
+ client = dnbd3_freeClient( client ); // This will also call image_release on client->image
return NULL ;
}