From f49b63e11de50e72f85f8c6688da36d89bf17b87 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 16 Dec 2015 17:28:52 +0100 Subject: [SERVER] More fine grained locking for RPC; better error logging --- LOCKS | 1 + src/server/globals.h | 12 ++++++---- src/server/image.c | 47 ++++++++++++++++++++++++++----------- src/server/net.c | 66 ++++++++++++++++++++++++++++------------------------ src/server/net.h | 4 ++++ src/server/rpc.c | 61 +++++++++++++++++++++++++++++++++++------------- src/server/server.c | 7 +++++- src/server/uplink.c | 14 ++++++++--- 8 files changed, 144 insertions(+), 68 deletions(-) diff --git a/LOCKS b/LOCKS index ea1cb45..f2e64dd 100644 --- a/LOCKS +++ b/LOCKS @@ -21,6 +21,7 @@ pendingLockProduce pendingLockConsume altServersLock client.sendMutex +client.statsLock statisticsSentLock statisticsReceivedLock diff --git a/src/server/globals.h b/src/server/globals.h index 0bf34de..a06e0e0 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -118,13 +118,17 @@ struct _dnbd3_image struct _dnbd3_client { +#define HOSTNAMELEN (48) + uint64_t bytesSent; // Byte counter for this client. Use statsLock when accessing + dnbd3_image_t *image; + uint32_t tmpBytesSent; // Temporary byte counter that gets added to the global counter periodically. Use statsLock when accessing int sock; + bool isServer; // true if a server in proxy mode, false if real client dnbd3_host_t host; - dnbd3_image_t *image; - uint64_t bytesSent; - pthread_spinlock_t lock; + char hostName[HOSTNAMELEN]; pthread_mutex_t sendMutex; - bool isServer; // true if a server in proxy mode, false if real client + pthread_spinlock_t lock; + pthread_spinlock_t statsLock; }; // ####################################################### diff --git a/src/server/image.c b/src/server/image.c index 2a48047..5638188 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -1173,7 +1173,7 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste logadd( LOG_DEBUG2, "Not found, bailing out" ); return image_get( name, detectedRid, true ); } - if ( requestedRid == 0 ) { + if ( !_vmdkLegacyMode && requestedRid == 0 ) { // rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0 while ( detectedRid != 0 ) { dnbd3_image_t *image = image_get( name, detectedRid, true ); @@ -1347,24 +1347,45 @@ cleanup_fail:; json_t* image_getListAsJson() { json_t *imagesJson = json_array(); - json_t *image; - + json_t *jsonImage; int i; char buffer[100] = { 0 }; + uint64_t bytesReceived; + int users, completeness; + spin_lock( &imageListLock ); - for (i = 0; i < _num_images; ++i) { + for ( i = 0; i < _num_images; ++i ) { if ( _images[i] == NULL ) continue; - spin_lock( &_images[i]->lock ); - image = json_pack( "{sisssisIsi}", "id", _images[i]->id, "name", _images[i]->lower_name, "rid", (int) _images[i]->rid, "users", (json_int_t) _images[i]->users, - "complete", image_getCompletenessEstimate( _images[i] ) ); - if ( _images[i]->uplink != NULL ) { - host_to_string( &_images[i]->uplink->currentServer, buffer, sizeof(buffer) ); - json_object_set_new( image, "uplinkServer", json_string( buffer ) ); - json_object_set_new( image, "receivedBytes", json_integer( (json_int_t) _images[i]->uplink->bytesReceived ) ); + dnbd3_image_t *image = _images[i]; + spin_lock( &image->lock ); + spin_unlock( &imageListLock ); + users = image->users; + completeness = image_getCompletenessEstimate( image ); + if ( image->uplink == NULL ) { + bytesReceived = 0; + } else { + bytesReceived = image->uplink->bytesReceived; + if ( !host_to_string( &image->uplink->currentServer, buffer, sizeof(buffer) ) ) { + buffer[0] = '\0'; + } } - json_array_append_new( imagesJson, image ); + image->users++; // Prevent freeing after we unlock + spin_unlock( &image->lock ); - spin_unlock( &_images[i]->lock ); + jsonImage = json_pack( "{sisssisisi}", + "id", image->id, // id, lower_name, rid never change, so access them without locking + "name", image->lower_name, + "rid", (int) image->rid, + "users", users, + "complete", completeness ); + if ( bytesReceived != 0 ) { + json_object_set_new( jsonImage, "uplinkServer", json_string( buffer ) ); + json_object_set_new( jsonImage, "receivedBytes", json_integer( (json_int_t) bytesReceived ) ); + } + json_array_append_new( imagesJson, jsonImage ); + + image = image_release( image ); // Since we did image->users++; + spin_lock( &imageListLock ); } spin_unlock( &imageListLock ); return imagesJson; diff --git a/src/server/net.c b/src/server/net.c index a467620..9dbba50 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -50,6 +50,17 @@ static char nullbytes[500]; static uint64_t totalBytesSent = 0; static pthread_spinlock_t statisticsSentLock; +/** + * Update global sent stats. Hold client's statsLock when calling. + */ +void net_updateGlobalSentStatsFromClient(dnbd3_client_t * const client) +{ + spin_lock( &statisticsSentLock ); + totalBytesSent += client->tmpBytesSent; + spin_unlock( &statisticsSentLock ); + client->tmpBytesSent = 0; +} + static inline bool recv_request_header(int sock, dnbd3_request_t *request) { int ret, fails = 0; @@ -160,8 +171,6 @@ void *net_client_handler(void *dnbd3_client) serialized_buffer_t payload; uint16_t rid, client_version; uint64_t start, end; - uint32_t tempBytesSent = 0; - char hostPrintable[100]; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -171,11 +180,14 @@ void *net_client_handler(void *dnbd3_client) reply.magic = dnbd3_packet_magic; sock_setTimeout( client->sock, _clientTimeout ); + if ( !host_to_string( &client->host, client->hostName, HOSTNAMELEN ) ) { + client->hostName[HOSTNAMELEN-1] = '\0'; + } // Receive first packet. This must be CMD_SELECT_IMAGE by protocol specification if ( recv_request_header( client->sock, &request ) ) { if ( request.cmd != CMD_SELECT_IMAGE ) { - logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd ); + logadd( LOG_DEBUG1, "Client %s sent invalid handshake (%d). Dropping Client\n", client->hostName, (int)request.cmd ); } else if ( recv_request_payload( client->sock, request.size, &payload ) ) { char *image_name; client_version = serializer_get_uint16( &payload ); @@ -184,16 +196,17 @@ void *net_client_handler(void *dnbd3_client) client->isServer = serializer_get_uint8( &payload ); if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { if ( client_version < MIN_SUPPORTED_CLIENT ) { - logadd( LOG_DEBUG1, "Client too old\n" ); + logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); } else { - logadd( LOG_DEBUG1, "Incomplete handshake received\n" ); + logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName ); } } else { client->image = image = image_getOrLoad( image_name, rid ); if ( image == NULL ) { //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); } else if ( !image->working ) { - logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", + client->hostName, image_name, (int)rid ); } else { // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable bOk = true; @@ -227,9 +240,7 @@ void *net_client_handler(void *dnbd3_client) rpc_sendStatsJson( client->sock ); } else { // Unknown request - if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) { - logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable ); - } + logadd( LOG_DEBUG1, "Client %s sent invalid handshake", client->hostName ); } if ( bOk ) { @@ -247,7 +258,7 @@ void *net_client_handler(void *dnbd3_client) case CMD_GET_BLOCK: if ( request.offset >= image->virtualFilesize ) { // Sanity check - logadd( LOG_WARNING, "Client requested non-existent block" ); + logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName ); reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); @@ -255,7 +266,7 @@ void *net_client_handler(void *dnbd3_client) } if ( request.offset + request.size > image->virtualFilesize ) { // Sanity check - logadd( LOG_WARNING, "Client requested data block that extends beyond image size" ); + logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName ); reply.size = 0; reply.cmd = CMD_ERROR; send_reply( client->sock, &reply, NULL ); @@ -315,8 +326,8 @@ void *net_client_handler(void *dnbd3_client) spin_unlock( &image->lock ); if ( !isCached ) { if ( !uplink_request( client, request.handle, request.offset, request.size ) ) { - logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d", - image->lower_name, (int)image->rid ); + logadd( LOG_DEBUG1, "Could not relay un-cached request from %s to upstream proxy, disabling image %s:%d", + client->hostName, image->lower_name, (int)image->rid ); image->working = false; goto exit_client_cleanup; } @@ -334,7 +345,7 @@ void *net_client_handler(void *dnbd3_client) // Send reply header if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { if ( lock ) pthread_mutex_unlock( &client->sendMutex ); - logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK header failed\n" ); + logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK reply header to %s failed", client->hostName ); goto exit_client_cleanup; } @@ -355,8 +366,8 @@ void *net_client_handler(void *dnbd3_client) if ( lock ) pthread_mutex_unlock( &client->sendMutex ); if ( ret == -1 ) { if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) { - logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n", - (int)done, (int)realBytes, err ); + logadd( LOG_DEBUG1, "sendfile to %s failed (image to net. sent %d/%d, errno=%d)", + client->hostName, (int)done, (int)realBytes, err ); } if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false; } @@ -372,18 +383,15 @@ void *net_client_handler(void *dnbd3_client) } } if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + spin_lock( &client->statsLock ); // Global per-client counter - spin_lock( &client->lock ); client->bytesSent += request.size; // Increase counter for statistics. - spin_unlock( &client->lock ); // Local counter that gets added to the global total bytes sent counter periodically - tempBytesSent += request.size; - if (tempBytesSent > 1024 * 1024 ) { - spin_lock( &statisticsSentLock ); - totalBytesSent += tempBytesSent; - spin_unlock( &statisticsSentLock ); - tempBytesSent = 0; + client->tmpBytesSent += request.size; + if ( client->tmpBytesSent > 100000000 ) { + net_updateGlobalSentStatsFromClient( client ); } + spin_unlock( &client->statsLock ); break; case CMD_GET_SERVERS: @@ -407,9 +415,7 @@ void *net_client_handler(void *dnbd3_client) set_name: ; if ( !hasName ) { hasName = true; - if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) { - setThreadName( hostPrintable ); - } + setThreadName( client->hostName ); } break; @@ -434,7 +440,7 @@ set_name: ; break; default: - logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd ); + logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd ); break; } @@ -442,9 +448,7 @@ set_name: ; } exit_client_cleanup: ; dnbd3_removeClient( client ); - spin_lock( &statisticsSentLock ); - totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server. - spin_unlock( &statisticsSentLock ); + net_updateGlobalSentStatsFromClient( client ); // Don't need client's lock here as it's not active anymore client = dnbd3_freeClient( client ); // This will also call image_release on client->image return NULL ; } diff --git a/src/server/net.h b/src/server/net.h index 4624141..b8c850d 100644 --- a/src/server/net.h +++ b/src/server/net.h @@ -21,8 +21,12 @@ #ifndef NET_H_ #define NET_H_ +#include "globals.h" + void net_init(); +void net_updateGlobalSentStatsFromClient(dnbd3_client_t * const client); + uint64_t net_getTotalBytesSent(); void *net_client_handler(void *client_socket); diff --git a/src/server/rpc.c b/src/server/rpc.c index 48fc009..3ea8a9a 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -17,25 +17,30 @@ static void clientsToJson(json_t *jsonClients); void rpc_sendStatsJson(int sock) { + json_t *jsonClients = json_array(); + clientsToJson( jsonClients ); const uint64_t bytesReceived = uplink_getTotalBytesReceived(); const uint64_t bytesSent = net_getTotalBytesSent(); const int uptime = dnbd3_serverUptime(); - json_t *jsonClients = json_array(); - - clientsToJson( jsonClients ); - json_t *statisticsJson = json_pack( "{sIsI}", "bytesReceived", (json_int_t) bytesReceived, "bytesSent", (json_int_t) bytesSent ); + json_t *statisticsJson = json_pack( "{sIsI}", + "bytesReceived", (json_int_t) bytesReceived, + "bytesSent", (json_int_t) bytesSent ); json_object_set_new( statisticsJson, "clients", jsonClients ); json_object_set_new( statisticsJson, "images", image_getListAsJson() ); json_object_set_new( statisticsJson, "uptime", json_integer( uptime ) ); char *jsonString = json_dumps( statisticsJson, 0 ); + json_decref( statisticsJson ); char buffer[500]; - snprintf(buffer, sizeof buffer , "HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: %d\r\nContent-Type: application/json\r\n\r\n", + snprintf(buffer, sizeof buffer , "HTTP/1.1 200 OK\r\n" + "Connection: Close\r\n" + "Content-Length: %d\r\n" + "Content-Type: application/json\r\n" + "\r\n", (int) strlen( jsonString ) ); write( sock, buffer, strlen( buffer ) ); sock_sendAll( sock, jsonString, strlen( jsonString ), 10 ); - json_decref( statisticsJson ); // Wait for flush shutdown( sock, SHUT_WR ); while ( read( sock, buffer, sizeof buffer ) > 0 ); @@ -46,19 +51,43 @@ static void clientsToJson(json_t *jsonClients) { json_t *clientStats; int i; - char clientName[100]; + int imgId; + uint64_t bytesSent; + char host[HOSTNAMELEN]; + host[HOSTNAMELEN-1] = '\0'; + spin_lock( &_clients_lock ); - for (i = 0; i < _num_clients; ++i) { - if ( _clients[i] == NULL ) continue; - spin_lock( &_clients[i]->lock ); - if ( _clients[i]->image != NULL ) { - if ( !host_to_string( &_clients[i]->host, clientName, sizeof(clientName) ) ) { - strcpy( clientName, "???" ); - } - clientStats = json_pack( "{sssisI}", "address", clientName, "imageId", _clients[i]->image->id, "bytesSent", (json_int_t)_clients[i]->bytesSent ); + for ( i = 0; i < _num_clients; ++i ) { + if ( _clients[i] == NULL ) { + continue; + } + // Do not lock on client.lock here: + // 1) .image can only be set once, will never change (just like .image.id) + // 2) .hostName never changes as well + // 3) .bytesSent and .tmpBytesSent are guarded by .statsLock + // 4) the client cannot be freed, as it's still in the list and we hold the list's lock + if ( _clients[i]->image == NULL ) { + imgId = -1; + } else { + strncpy( host, _clients[i]->hostName, HOSTNAMELEN - 1 ); + imgId = _clients[i]->image->id; + spin_lock( &_clients[i]->statsLock ); + bytesSent = _clients[i]->bytesSent; + net_updateGlobalSentStatsFromClient( _clients[i] ); // Do this since we read the totalBytesSent counter later + spin_unlock( &_clients[i]->statsLock ); + } + spin_unlock( &_clients_lock ); + // Unlock so we give other threads a chance to access the client list. + // We might not get an atomic snapshot of the currently connected clients, + // but that doesn't really make a difference anyways. + if ( imgId != -1 ) { + clientStats = json_pack( "{sssisI}", + "address", host, + "imageId", imgId, + "bytesSent", (json_int_t)bytesSent ); json_array_append_new( jsonClients, clientStats ); } - spin_unlock( &_clients[i]->lock ); + spin_lock( &_clients_lock ); } spin_unlock( &_clients_lock ); } diff --git a/src/server/server.c b/src/server/server.c index e8df934..ffa817f 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -398,7 +398,9 @@ dnbd3_client_t* dnbd3_initClient(struct sockaddr_storage *client, int fd) } dnbd3_client->sock = fd; dnbd3_client->bytesSent = 0; + dnbd3_client->tmpBytesSent = 0; spin_init( &dnbd3_client->lock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &dnbd3_client->statsLock, PTHREAD_PROCESS_PRIVATE ); pthread_mutex_init( &dnbd3_client->sendMutex, NULL ); return dnbd3_client; } @@ -430,9 +432,11 @@ dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client) { spin_lock( &client->lock ); pthread_mutex_lock( &client->sendMutex ); - if ( client->sock >= 0 ) close( client->sock ); + if ( client->sock != -1 ) close( client->sock ); client->sock = -1; pthread_mutex_unlock( &client->sendMutex ); + spin_lock( &client->statsLock ); + spin_unlock( &client->statsLock ); if ( client->image != NULL ) { spin_lock( &client->image->lock ); if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); @@ -442,6 +446,7 @@ dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client) client->image = NULL; spin_unlock( &client->lock ); spin_destroy( &client->lock ); + spin_destroy( &client->statsLock ); pthread_mutex_destroy( &client->sendMutex ); free( client ); return NULL ; diff --git a/src/server/uplink.c b/src/server/uplink.c index 29c42af..fa95fba 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -597,10 +597,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) // from 0, you also need to change the "attach to existing request"-logic in uplink_request() outReply.magic = dnbd3_packet_magic; bool served = false; - for (i = link->queueLen - 1; i >= 0; --i) { + for ( i = link->queueLen - 1; i >= 0; --i ) { dnbd3_queued_request_t * const req = &link->queue[i]; if ( req->status == ULR_PROCESSING ) { - //logadd( LOG_DEBUG2 %p, "Reply slot %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)link, i, req->handle, req->from, req->to ); + size_t bytesSent = 0; assert( req->from >= start && req->to <= end ); dnbd3_client_t * const client = req->client; outReply.cmd = CMD_GET_BLOCK; @@ -618,9 +618,17 @@ static void uplink_handleReceive(dnbd3_connection_t *link) spin_unlock( &link->queueLock ); if ( client->sock != -1 ) { ssize_t sent = writev( client->sock, iov, 2 ); - if ( sent > (ssize_t) sizeof outReply ) client->bytesSent += (uint64_t) sent - sizeof outReply; + if ( sent > (ssize_t)sizeof outReply ) { + bytesSent = (size_t)sent - sizeof outReply; + } } + spin_lock( &client->statsLock ); pthread_mutex_unlock( &client->sendMutex ); + if ( bytesSent != 0 ) { + client->bytesSent += bytesSent; + client->tmpBytesSent += bytesSent; + } + spin_unlock( &client->statsLock ); spin_lock( &link->queueLock ); } if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; -- cgit v1.2.3-55-g7522