summaryrefslogtreecommitdiffstats
path: root/src/server/net.c
diff options
context:
space:
mode:
authorSimon Rettberg2015-12-15 17:45:44 +0100
committerSimon Rettberg2015-12-15 17:45:44 +0100
commit72104f2e83fa724f9667c876dca17a2c5ee9b2a2 (patch)
tree38837580c70b390f0bc35c15d2bc4d0865a9f3c4 /src/server/net.c
parent[SERVER] Make listen port configurable (diff)
downloaddnbd3-72104f2e83fa724f9667c876dca17a2c5ee9b2a2.tar.gz
dnbd3-72104f2e83fa724f9667c876dca17a2c5ee9b2a2.tar.xz
dnbd3-72104f2e83fa724f9667c876dca17a2c5ee9b2a2.zip
[SERVER] Remove non-working images from list, plus refactoring
Now that we can automatically load unknown images from disk on request, it makes sense to remove non-working images from the image list. On future requests, we will look for them on disk again, which is nice in case of temporary storage hickups. Also, some more ore less related locking has been refined (loading images, replicating images)
Diffstat (limited to 'src/server/net.c')
-rw-r--r--src/server/net.c192
1 files changed, 119 insertions, 73 deletions
diff --git a/src/server/net.c b/src/server/net.c
index ad228f4..a467620 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -85,7 +85,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" );
return false;
}
- if ( recv( sock, payload->buffer, size, MSG_WAITALL ) != size ) {
+ if ( sock_recv( sock, payload->buffer, size ) != size ) {
logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size );
return false;
}
@@ -94,34 +94,35 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
return true;
}
+/**
+ * Send reply with optional payload. payload can be null. The caller has to
+ * acquire the sendMutex first.
+ */
static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
{
- const unsigned int size = reply->size;
+ const size_t size = reply->size;
fixup_reply( *reply );
- if ( !payload || size == 0 ) {
- if ( send( sock, reply, sizeof(dnbd3_reply_t), 0 ) != sizeof(dnbd3_reply_t) ) {
- logadd( LOG_DEBUG1, "Send failed (header-only)\n" );
- return false;
- }
- } else {
- struct iovec iov[2];
- iov[0].iov_base = reply;
- iov[0].iov_len = sizeof(dnbd3_reply_t);
- iov[1].iov_base = payload;
- iov[1].iov_len = (size_t)size;
- if ( (size_t)writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) {
- logadd( LOG_DEBUG1, "Send failed (reply with payload of %u bytes)\n", size );
+ if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) {
+ logadd( LOG_DEBUG1, "Sending reply header to client failed" );
+ return false;
+ }
+ if ( size != 0 && payload != NULL ) {
+ if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) {
+ logadd( LOG_DEBUG1, "Sending payload of %u bytes to client failed", size );
return false;
}
}
return true;
}
+/**
+ * Send given amount of null bytes. The caller has to acquire the sendMutex first.
+ */
static inline bool sendPadding( const int fd, uint32_t bytes )
{
ssize_t ret;
while ( bytes >= sizeof(nullbytes) ) {
- ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 1 );
+ ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 );
if ( ret <= 0 )
return false;
bytes -= (uint32_t)ret;
@@ -131,8 +132,9 @@ static inline bool sendPadding( const int fd, uint32_t bytes )
uint64_t net_getTotalBytesSent()
{
+ // reads and writes to 64bit ints are not atomic on x86, so let's be safe and use locking
spin_lock( &statisticsSentLock );
- uint64_t tmp = totalBytesSent;
+ const uint64_t tmp = totalBytesSent;
spin_unlock( &statisticsSentLock );
return tmp;
}
@@ -156,10 +158,10 @@ void *net_client_handler(void *dnbd3_client)
bool hasName = false;
serialized_buffer_t payload;
- char *image_name;
uint16_t rid, client_version;
uint64_t start, end;
- char buffer[100];
+ uint32_t tempBytesSent = 0;
+ char hostPrintable[100];
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -174,25 +176,38 @@ void *net_client_handler(void *dnbd3_client)
if ( recv_request_header( client->sock, &request ) ) {
if ( request.cmd != CMD_SELECT_IMAGE ) {
logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd );
- } else {
- if ( recv_request_payload( client->sock, request.size, &payload ) ) {
- client_version = serializer_get_uint16( &payload );
- image_name = serializer_get_string( &payload );
- rid = serializer_get_uint16( &payload );
- client->isServer = serializer_get_uint8( &payload );
- if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
- if ( client_version < MIN_SUPPORTED_CLIENT ) {
- logadd( LOG_DEBUG1, "Client too old\n" );
- } else {
- logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
- }
+ } else if ( recv_request_payload( client->sock, request.size, &payload ) ) {
+ char *image_name;
+ client_version = serializer_get_uint16( &payload );
+ image_name = serializer_get_string( &payload );
+ rid = serializer_get_uint16( &payload );
+ client->isServer = serializer_get_uint8( &payload );
+ if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
+ if ( client_version < MIN_SUPPORTED_CLIENT ) {
+ logadd( LOG_DEBUG1, "Client too old\n" );
} else {
- client->image = image = image_getOrLoad( image_name, rid );
- if ( image == NULL ) {
- //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( !image->working ) {
- logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else {
+ logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
+ }
+ } else {
+ client->image = image = image_getOrLoad( image_name, rid );
+ if ( image == NULL ) {
+ //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else if ( !image->working ) {
+ logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else {
+ // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
+ bOk = true;
+ if ( image->cache_map != NULL ) {
+ spin_lock( &image->lock );
+ if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ bOk = ( rand() % 4 ) == 1;
+ }
+ spin_unlock( &image->lock );
+ if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ }
+ if ( bOk ) {
image_file = image->readFd;
serializer_reset_write( &payload );
serializer_put_uint16( &payload, PROTOCOL_VERSION );
@@ -201,9 +216,8 @@ void *net_client_handler(void *dnbd3_client)
serializer_put_uint64( &payload, image->virtualFilesize );
reply.cmd = CMD_SELECT_IMAGE;
reply.size = serializer_get_written_length( &payload );
- if ( send_reply( client->sock, &reply, &payload ) ) {
- if ( !client->isServer ) image->atime = time( NULL );
- bOk = true;
+ if ( !send_reply( client->sock, &reply, &payload ) ) {
+ bOk = false;
}
}
}
@@ -211,6 +225,11 @@ void *net_client_handler(void *dnbd3_client)
}
} else if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) {
rpc_sendStatsJson( client->sock );
+ } else {
+ // Unknown request
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable );
+ }
}
if ( bOk ) {
@@ -251,23 +270,13 @@ void *net_client_handler(void *dnbd3_client)
spin_lock( &image->lock );
// Check again as we only aquired the lock just now
if ( image->cache_map != NULL ) {
- const uint64_t firstByte = start >> 15;
- const uint64_t lastByte = (end - 1) >> 15;
- // First byte
- uint64_t pos = start;
- do {
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[firstByte] & bit_mask) == 0 ) {
- isCached = false;
- break;
- }
- pos += DNBD3_BLOCK_SIZE;
- } while ( firstByte == (pos >> 15) && pos < end );
+ const uint64_t firstByteInMap = start >> 15;
+ const uint64_t lastByteInMap = (end - 1) >> 15;
+ uint64_t pos;
// Middle - quick checking
if ( isCached ) {
- pos = firstByte + 1;
- while ( pos < lastByte ) {
+ pos = firstByteInMap + 1;
+ while ( pos < lastByteInMap ) {
if ( image->cache_map[pos] != 0xff ) {
isCached = false;
break;
@@ -275,14 +284,27 @@ void *net_client_handler(void *dnbd3_client)
++pos;
}
}
- // Last byte
- if ( isCached && firstByte != lastByte ) {
- pos = lastByte << 15;
+ // First byte
+ if ( isCached ) {
+ pos = start;
+ do {
+ const int map_x = (pos >> 12) & 7; // mod 8
+ const uint8_t bit_mask = 1 << map_x;
+ if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) {
+ isCached = false;
+ break;
+ }
+ pos += DNBD3_BLOCK_SIZE;
+ } while ( firstByteInMap == (pos >> 15) && pos < end );
+ }
+ // Last byte - only check if request spans multiple bytes in cache map
+ if ( isCached && firstByteInMap != lastByteInMap ) {
+ pos = lastByteInMap << 15;
while ( pos < end ) {
- assert( lastByte == (pos >> 15) );
+ assert( lastByteInMap == (pos >> 15) );
const int map_x = (pos >> 12) & 7; // mod 8
const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[lastByte] & bit_mask) == 0 ) {
+ if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) {
isCached = false;
break;
}
@@ -293,7 +315,9 @@ void *net_client_handler(void *dnbd3_client)
spin_unlock( &image->lock );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, request.offset, request.size ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request to upstream proxy\n" );
+ logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d",
+ image->lower_name, (int)image->rid );
+ image->working = false;
goto exit_client_cleanup;
}
break; // DONE
@@ -329,10 +353,13 @@ void *net_client_handler(void *dnbd3_client)
if ( ret <= 0 ) {
const int err = errno;
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
- if ( ret < 0 && err != EPIPE && err != ECONNRESET )
- logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
- (int)done, (int)realBytes, err );
- if ( err == EBADF || err == EINVAL || err == EIO ) image->working = false;
+ if ( ret == -1 ) {
+ if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) {
+ logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
+ (int)done, (int)realBytes, err );
+ }
+ if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false;
+ }
goto exit_client_cleanup;
}
done += ret;
@@ -343,9 +370,20 @@ void *net_client_handler(void *dnbd3_client)
goto exit_client_cleanup;
}
}
- client->bytesSent += request.size; // Increase counter for statistics.
}
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ // Global per-client counter
+ spin_lock( &client->lock );
+ client->bytesSent += request.size; // Increase counter for statistics.
+ spin_unlock( &client->lock );
+ // Local counter that gets added to the global total bytes sent counter periodically
+ tempBytesSent += request.size;
+ if (tempBytesSent > 1024 * 1024 ) {
+ spin_lock( &statisticsSentLock );
+ totalBytesSent += tempBytesSent;
+ spin_unlock( &statisticsSentLock );
+ tempBytesSent = 0;
+ }
break;
case CMD_GET_SERVERS:
@@ -353,19 +391,25 @@ void *net_client_handler(void *dnbd3_client)
num = altservers_getMatching( &client->host, server_list, NUMBER_SERVERS );
reply.cmd = CMD_GET_SERVERS;
reply.size = num * sizeof(dnbd3_server_entry_t);
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, server_list );
- client->isServer = false; // Only clients request list of servers
+ pthread_mutex_unlock( &client->sendMutex );
goto set_name;
break;
case CMD_KEEPALIVE:
reply.cmd = CMD_KEEPALIVE;
reply.size = 0;
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
+ pthread_mutex_unlock( &client->sendMutex );
+ image->atime = time( NULL );
set_name: ;
- if ( !hasName && host_to_string( &client->host, buffer, sizeof buffer ) ) {
+ if ( !hasName ) {
hasName = true;
- setThreadName( buffer );
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ setThreadName( hostPrintable );
+ }
}
break;
@@ -376,19 +420,21 @@ set_name: ;
case CMD_GET_CRC32:
reply.cmd = CMD_GET_CRC32;
+ pthread_mutex_lock( &client->sendMutex );
if ( image->crc32 == NULL ) {
reply.size = 0;
send_reply( client->sock, &reply, NULL );
} else {
const int size = reply.size = (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t);
send_reply( client->sock, &reply, NULL );
- send( client->sock, &image->masterCrc32, sizeof(uint32_t), 0 );
+ send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE );
send( client->sock, image->crc32, size - sizeof(uint32_t), 0 );
}
+ pthread_mutex_unlock( &client->sendMutex );
break;
default:
- logadd( LOG_ERROR, "Unknown command: %d", (int)request.cmd );
+ logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd );
break;
}
@@ -397,9 +443,9 @@ set_name: ;
exit_client_cleanup: ;
dnbd3_removeClient( client );
spin_lock( &statisticsSentLock );
- totalBytesSent += client->bytesSent;// Add the amount of bytes sent by the client to the statistics of the server.
+ totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server.
spin_unlock( &statisticsSentLock );
- client = dnbd3_freeClient( client );
+ client = dnbd3_freeClient( client ); // This will also call image_release on client->image
return NULL ;
}