summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2015-12-16 17:28:52 +0100
committerSimon Rettberg2015-12-16 17:28:52 +0100
commitf49b63e11de50e72f85f8c6688da36d89bf17b87 (patch)
tree0665a23bbab04dcf62094cfaf087745a4998caff
parent[FUSE] Fix forking mode (not passing -f) by not spawning threads before enter... (diff)
downloaddnbd3-f49b63e11de50e72f85f8c6688da36d89bf17b87.tar.gz
dnbd3-f49b63e11de50e72f85f8c6688da36d89bf17b87.tar.xz
dnbd3-f49b63e11de50e72f85f8c6688da36d89bf17b87.zip
[SERVER] More fine grained locking for RPC; better error logging
-rw-r--r--LOCKS1
-rw-r--r--src/server/globals.h12
-rw-r--r--src/server/image.c47
-rw-r--r--src/server/net.c66
-rw-r--r--src/server/net.h4
-rw-r--r--src/server/rpc.c61
-rw-r--r--src/server/server.c7
-rw-r--r--src/server/uplink.c14
8 files changed, 144 insertions, 68 deletions
diff --git a/LOCKS b/LOCKS
index ea1cb45..f2e64dd 100644
--- a/LOCKS
+++ b/LOCKS
@@ -21,6 +21,7 @@ pendingLockProduce
pendingLockConsume
altServersLock
client.sendMutex
+client.statsLock
statisticsSentLock
statisticsReceivedLock
diff --git a/src/server/globals.h b/src/server/globals.h
index 0bf34de..a06e0e0 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -118,13 +118,17 @@ struct _dnbd3_image
struct _dnbd3_client
{
+#define HOSTNAMELEN (48)
+ uint64_t bytesSent; // Byte counter for this client. Use statsLock when accessing
+ dnbd3_image_t *image;
+ uint32_t tmpBytesSent; // Temporary byte counter that gets added to the global counter periodically. Use statsLock when accessing
int sock;
+ bool isServer; // true if a server in proxy mode, false if real client
dnbd3_host_t host;
- dnbd3_image_t *image;
- uint64_t bytesSent;
- pthread_spinlock_t lock;
+ char hostName[HOSTNAMELEN];
pthread_mutex_t sendMutex;
- bool isServer; // true if a server in proxy mode, false if real client
+ pthread_spinlock_t lock;
+ pthread_spinlock_t statsLock;
};
// #######################################################
diff --git a/src/server/image.c b/src/server/image.c
index 2a48047..5638188 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -1173,7 +1173,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
logadd( LOG_DEBUG2, "Not found, bailing out" );
return image_get( name, detectedRid, true );
}
- if ( requestedRid == 0 ) {
+ if ( !_vmdkLegacyMode && requestedRid == 0 ) {
// rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0
while ( detectedRid != 0 ) {
dnbd3_image_t *image = image_get( name, detectedRid, true );
@@ -1347,24 +1347,45 @@ cleanup_fail:;
json_t* image_getListAsJson()
{
json_t *imagesJson = json_array();
- json_t *image;
-
+ json_t *jsonImage;
int i;
char buffer[100] = { 0 };
+ uint64_t bytesReceived;
+ int users, completeness;
+
spin_lock( &imageListLock );
- for (i = 0; i < _num_images; ++i) {
+ for ( i = 0; i < _num_images; ++i ) {
if ( _images[i] == NULL ) continue;
- spin_lock( &_images[i]->lock );
- image = json_pack( "{sisssisIsi}", "id", _images[i]->id, "name", _images[i]->lower_name, "rid", (int) _images[i]->rid, "users", (json_int_t) _images[i]->users,
- "complete", image_getCompletenessEstimate( _images[i] ) );
- if ( _images[i]->uplink != NULL ) {
- host_to_string( &_images[i]->uplink->currentServer, buffer, sizeof(buffer) );
- json_object_set_new( image, "uplinkServer", json_string( buffer ) );
- json_object_set_new( image, "receivedBytes", json_integer( (json_int_t) _images[i]->uplink->bytesReceived ) );
+ dnbd3_image_t *image = _images[i];
+ spin_lock( &image->lock );
+ spin_unlock( &imageListLock );
+ users = image->users;
+ completeness = image_getCompletenessEstimate( image );
+ if ( image->uplink == NULL ) {
+ bytesReceived = 0;
+ } else {
+ bytesReceived = image->uplink->bytesReceived;
+ if ( !host_to_string( &image->uplink->currentServer, buffer, sizeof(buffer) ) ) {
+ buffer[0] = '\0';
+ }
}
- json_array_append_new( imagesJson, image );
+ image->users++; // Prevent freeing after we unlock
+ spin_unlock( &image->lock );
- spin_unlock( &_images[i]->lock );
+ jsonImage = json_pack( "{sisssisisi}",
+ "id", image->id, // id, lower_name, rid never change, so access them without locking
+ "name", image->lower_name,
+ "rid", (int) image->rid,
+ "users", users,
+ "complete", completeness );
+ if ( bytesReceived != 0 ) {
+ json_object_set_new( jsonImage, "uplinkServer", json_string( buffer ) );
+ json_object_set_new( jsonImage, "receivedBytes", json_integer( (json_int_t) bytesReceived ) );
+ }
+ json_array_append_new( imagesJson, jsonImage );
+
+ image = image_release( image ); // Since we did image->users++;
+ spin_lock( &imageListLock );
}
spin_unlock( &imageListLock );
return imagesJson;
diff --git a/src/server/net.c b/src/server/net.c
index a467620..9dbba50 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -50,6 +50,17 @@ static char nullbytes[500];
static uint64_t totalBytesSent = 0;
static pthread_spinlock_t statisticsSentLock;
+/**
+ * Update global sent stats. Hold client's statsLock when calling.
+ */
+void net_updateGlobalSentStatsFromClient(dnbd3_client_t * const client)
+{
+ spin_lock( &statisticsSentLock );
+ totalBytesSent += client->tmpBytesSent;
+ spin_unlock( &statisticsSentLock );
+ client->tmpBytesSent = 0;
+}
+
static inline bool recv_request_header(int sock, dnbd3_request_t *request)
{
int ret, fails = 0;
@@ -160,8 +171,6 @@ void *net_client_handler(void *dnbd3_client)
serialized_buffer_t payload;
uint16_t rid, client_version;
uint64_t start, end;
- uint32_t tempBytesSent = 0;
- char hostPrintable[100];
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -171,11 +180,14 @@ void *net_client_handler(void *dnbd3_client)
reply.magic = dnbd3_packet_magic;
sock_setTimeout( client->sock, _clientTimeout );
+ if ( !host_to_string( &client->host, client->hostName, HOSTNAMELEN ) ) {
+ client->hostName[HOSTNAMELEN-1] = '\0';
+ }
// Receive first packet. This must be CMD_SELECT_IMAGE by protocol specification
if ( recv_request_header( client->sock, &request ) ) {
if ( request.cmd != CMD_SELECT_IMAGE ) {
- logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd );
+ logadd( LOG_DEBUG1, "Client %s sent invalid handshake (%d). Dropping Client\n", client->hostName, (int)request.cmd );
} else if ( recv_request_payload( client->sock, request.size, &payload ) ) {
char *image_name;
client_version = serializer_get_uint16( &payload );
@@ -184,16 +196,17 @@ void *net_client_handler(void *dnbd3_client)
client->isServer = serializer_get_uint8( &payload );
if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
if ( client_version < MIN_SUPPORTED_CLIENT ) {
- logadd( LOG_DEBUG1, "Client too old\n" );
+ logadd( LOG_DEBUG1, "Client %s too old", client->hostName );
} else {
- logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
+ logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName );
}
} else {
client->image = image = image_getOrLoad( image_name, rid );
if ( image == NULL ) {
//logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
} else if ( !image->working ) {
- logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
+ client->hostName, image_name, (int)rid );
} else {
// Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
bOk = true;
@@ -227,9 +240,7 @@ void *net_client_handler(void *dnbd3_client)
rpc_sendStatsJson( client->sock );
} else {
// Unknown request
- if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
- logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable );
- }
+ logadd( LOG_DEBUG1, "Client %s sent invalid handshake", client->hostName );
}
if ( bOk ) {
@@ -247,7 +258,7 @@ void *net_client_handler(void *dnbd3_client)
case CMD_GET_BLOCK:
if ( request.offset >= image->virtualFilesize ) {
// Sanity check
- logadd( LOG_WARNING, "Client requested non-existent block" );
+ logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName );
reply.size = 0;
reply.cmd = CMD_ERROR;
send_reply( client->sock, &reply, NULL );
@@ -255,7 +266,7 @@ void *net_client_handler(void *dnbd3_client)
}
if ( request.offset + request.size > image->virtualFilesize ) {
// Sanity check
- logadd( LOG_WARNING, "Client requested data block that extends beyond image size" );
+ logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName );
reply.size = 0;
reply.cmd = CMD_ERROR;
send_reply( client->sock, &reply, NULL );
@@ -315,8 +326,8 @@ void *net_client_handler(void *dnbd3_client)
spin_unlock( &image->lock );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, request.offset, request.size ) ) {
- logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d",
- image->lower_name, (int)image->rid );
+ logadd( LOG_DEBUG1, "Could not relay un-cached request from %s to upstream proxy, disabling image %s:%d",
+ client->hostName, image->lower_name, (int)image->rid );
image->working = false;
goto exit_client_cleanup;
}
@@ -334,7 +345,7 @@ void *net_client_handler(void *dnbd3_client)
// Send reply header
if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) {
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
- logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK header failed\n" );
+ logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK reply header to %s failed", client->hostName );
goto exit_client_cleanup;
}
@@ -355,8 +366,8 @@ void *net_client_handler(void *dnbd3_client)
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
if ( ret == -1 ) {
if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) {
- logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
- (int)done, (int)realBytes, err );
+ logadd( LOG_DEBUG1, "sendfile to %s failed (image to net. sent %d/%d, errno=%d)",
+ client->hostName, (int)done, (int)realBytes, err );
}
if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false;
}
@@ -372,18 +383,15 @@ void *net_client_handler(void *dnbd3_client)
}
}
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ spin_lock( &client->statsLock );
// Global per-client counter
- spin_lock( &client->lock );
client->bytesSent += request.size; // Increase counter for statistics.
- spin_unlock( &client->lock );
// Local counter that gets added to the global total bytes sent counter periodically
- tempBytesSent += request.size;
- if (tempBytesSent > 1024 * 1024 ) {
- spin_lock( &statisticsSentLock );
- totalBytesSent += tempBytesSent;
- spin_unlock( &statisticsSentLock );
- tempBytesSent = 0;
+ client->tmpBytesSent += request.size;
+ if ( client->tmpBytesSent > 100000000 ) {
+ net_updateGlobalSentStatsFromClient( client );
}
+ spin_unlock( &client->statsLock );
break;
case CMD_GET_SERVERS:
@@ -407,9 +415,7 @@ void *net_client_handler(void *dnbd3_client)
set_name: ;
if ( !hasName ) {
hasName = true;
- if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
- setThreadName( hostPrintable );
- }
+ setThreadName( client->hostName );
}
break;
@@ -434,7 +440,7 @@ set_name: ;
break;
default:
- logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd );
+ logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd );
break;
}
@@ -442,9 +448,7 @@ set_name: ;
}
exit_client_cleanup: ;
dnbd3_removeClient( client );
- spin_lock( &statisticsSentLock );
- totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server.
- spin_unlock( &statisticsSentLock );
+ net_updateGlobalSentStatsFromClient( client ); // Don't need client's lock here as it's not active anymore
client = dnbd3_freeClient( client ); // This will also call image_release on client->image
return NULL ;
}
diff --git a/src/server/net.h b/src/server/net.h
index 4624141..b8c850d 100644
--- a/src/server/net.h
+++ b/src/server/net.h
@@ -21,8 +21,12 @@
#ifndef NET_H_
#define NET_H_
+#include "globals.h"
+
void net_init();
+void net_updateGlobalSentStatsFromClient(dnbd3_client_t * const client);
+
uint64_t net_getTotalBytesSent();
void *net_client_handler(void *client_socket);
diff --git a/src/server/rpc.c b/src/server/rpc.c
index 48fc009..3ea8a9a 100644
--- a/src/server/rpc.c
+++ b/src/server/rpc.c
@@ -17,25 +17,30 @@ static void clientsToJson(json_t *jsonClients);
void rpc_sendStatsJson(int sock)
{
+ json_t *jsonClients = json_array();
+ clientsToJson( jsonClients );
const uint64_t bytesReceived = uplink_getTotalBytesReceived();
const uint64_t bytesSent = net_getTotalBytesSent();
const int uptime = dnbd3_serverUptime();
- json_t *jsonClients = json_array();
-
- clientsToJson( jsonClients );
- json_t *statisticsJson = json_pack( "{sIsI}", "bytesReceived", (json_int_t) bytesReceived, "bytesSent", (json_int_t) bytesSent );
+ json_t *statisticsJson = json_pack( "{sIsI}",
+ "bytesReceived", (json_int_t) bytesReceived,
+ "bytesSent", (json_int_t) bytesSent );
json_object_set_new( statisticsJson, "clients", jsonClients );
json_object_set_new( statisticsJson, "images", image_getListAsJson() );
json_object_set_new( statisticsJson, "uptime", json_integer( uptime ) );
char *jsonString = json_dumps( statisticsJson, 0 );
+ json_decref( statisticsJson );
char buffer[500];
- snprintf(buffer, sizeof buffer , "HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: %d\r\nContent-Type: application/json\r\n\r\n",
+ snprintf(buffer, sizeof buffer , "HTTP/1.1 200 OK\r\n"
+ "Connection: Close\r\n"
+ "Content-Length: %d\r\n"
+ "Content-Type: application/json\r\n"
+ "\r\n",
(int) strlen( jsonString ) );
write( sock, buffer, strlen( buffer ) );
sock_sendAll( sock, jsonString, strlen( jsonString ), 10 );
- json_decref( statisticsJson );
// Wait for flush
shutdown( sock, SHUT_WR );
while ( read( sock, buffer, sizeof buffer ) > 0 );
@@ -46,19 +51,43 @@ static void clientsToJson(json_t *jsonClients)
{
json_t *clientStats;
int i;
- char clientName[100];
+ int imgId;
+ uint64_t bytesSent;
+ char host[HOSTNAMELEN];
+ host[HOSTNAMELEN-1] = '\0';
+
spin_lock( &_clients_lock );
- for (i = 0; i < _num_clients; ++i) {
- if ( _clients[i] == NULL ) continue;
- spin_lock( &_clients[i]->lock );
- if ( _clients[i]->image != NULL ) {
- if ( !host_to_string( &_clients[i]->host, clientName, sizeof(clientName) ) ) {
- strcpy( clientName, "???" );
- }
- clientStats = json_pack( "{sssisI}", "address", clientName, "imageId", _clients[i]->image->id, "bytesSent", (json_int_t)_clients[i]->bytesSent );
+ for ( i = 0; i < _num_clients; ++i ) {
+ if ( _clients[i] == NULL ) {
+ continue;
+ }
+ // Do not lock on client.lock here:
+ // 1) .image can only be set once, will never change (just like .image.id)
+ // 2) .hostName never changes as well
+ // 3) .bytesSent and .tmpBytesSent are guarded by .statsLock
+ // 4) the client cannot be freed, as it's still in the list and we hold the list's lock
+ if ( _clients[i]->image == NULL ) {
+ imgId = -1;
+ } else {
+ strncpy( host, _clients[i]->hostName, HOSTNAMELEN - 1 );
+ imgId = _clients[i]->image->id;
+ spin_lock( &_clients[i]->statsLock );
+ bytesSent = _clients[i]->bytesSent;
+ net_updateGlobalSentStatsFromClient( _clients[i] ); // Do this since we read the totalBytesSent counter later
+ spin_unlock( &_clients[i]->statsLock );
+ }
+ spin_unlock( &_clients_lock );
+ // Unlock so we give other threads a chance to access the client list.
+ // We might not get an atomic snapshot of the currently connected clients,
+ // but that doesn't really make a difference anyways.
+ if ( imgId != -1 ) {
+ clientStats = json_pack( "{sssisI}",
+ "address", host,
+ "imageId", imgId,
+ "bytesSent", (json_int_t)bytesSent );
json_array_append_new( jsonClients, clientStats );
}
- spin_unlock( &_clients[i]->lock );
+ spin_lock( &_clients_lock );
}
spin_unlock( &_clients_lock );
}
diff --git a/src/server/server.c b/src/server/server.c
index e8df934..ffa817f 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -398,7 +398,9 @@ dnbd3_client_t* dnbd3_initClient(struct sockaddr_storage *client, int fd)
}
dnbd3_client->sock = fd;
dnbd3_client->bytesSent = 0;
+ dnbd3_client->tmpBytesSent = 0;
spin_init( &dnbd3_client->lock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &dnbd3_client->statsLock, PTHREAD_PROCESS_PRIVATE );
pthread_mutex_init( &dnbd3_client->sendMutex, NULL );
return dnbd3_client;
}
@@ -430,9 +432,11 @@ dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client)
{
spin_lock( &client->lock );
pthread_mutex_lock( &client->sendMutex );
- if ( client->sock >= 0 ) close( client->sock );
+ if ( client->sock != -1 ) close( client->sock );
client->sock = -1;
pthread_mutex_unlock( &client->sendMutex );
+ spin_lock( &client->statsLock );
+ spin_unlock( &client->statsLock );
if ( client->image != NULL ) {
spin_lock( &client->image->lock );
if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client );
@@ -442,6 +446,7 @@ dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client)
client->image = NULL;
spin_unlock( &client->lock );
spin_destroy( &client->lock );
+ spin_destroy( &client->statsLock );
pthread_mutex_destroy( &client->sendMutex );
free( client );
return NULL ;
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 29c42af..fa95fba 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -597,10 +597,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
// from 0, you also need to change the "attach to existing request"-logic in uplink_request()
outReply.magic = dnbd3_packet_magic;
bool served = false;
- for (i = link->queueLen - 1; i >= 0; --i) {
+ for ( i = link->queueLen - 1; i >= 0; --i ) {
dnbd3_queued_request_t * const req = &link->queue[i];
if ( req->status == ULR_PROCESSING ) {
- //logadd( LOG_DEBUG2 %p, "Reply slot %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)link, i, req->handle, req->from, req->to );
+ size_t bytesSent = 0;
assert( req->from >= start && req->to <= end );
dnbd3_client_t * const client = req->client;
outReply.cmd = CMD_GET_BLOCK;
@@ -618,9 +618,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
spin_unlock( &link->queueLock );
if ( client->sock != -1 ) {
ssize_t sent = writev( client->sock, iov, 2 );
- if ( sent > (ssize_t) sizeof outReply ) client->bytesSent += (uint64_t) sent - sizeof outReply;
+ if ( sent > (ssize_t)sizeof outReply ) {
+ bytesSent = (size_t)sent - sizeof outReply;
+ }
}
+ spin_lock( &client->statsLock );
pthread_mutex_unlock( &client->sendMutex );
+ if ( bytesSent != 0 ) {
+ client->bytesSent += bytesSent;
+ client->tmpBytesSent += bytesSent;
+ }
+ spin_unlock( &client->statsLock );
spin_lock( &link->queueLock );
}
if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;