From 8d96868945f3d52b44de02e2c468766c46693aef Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 7 Sep 2017 15:50:50 +0200 Subject: [SERVER] Refactor: Move client list to net.* and isolate --- src/server/net.c | 316 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 256 insertions(+), 60 deletions(-) (limited to 'src/server/net.c') diff --git a/src/server/net.c b/src/server/net.c index 3092c7e..f7a866b 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -38,12 +38,22 @@ #include #include #endif +#include + +static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS]; +static int _num_clients = 0; +static pthread_spinlock_t _clients_lock; static char nullbytes[500]; static uint64_t totalBytesSent = 0; static pthread_spinlock_t statisticsSentLock; +// Adding and removing clients -- list management +static void dnbd3_removeClient(dnbd3_client_t *client); +static dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client); +static bool dnbd3_addClient(dnbd3_client_t *client); + /** * Update global sent stats. Hold client's statsLock when calling. */ @@ -146,13 +156,51 @@ uint64_t net_getTotalBytesSent() void net_init() { + spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE ); spin_init( &statisticsSentLock, PTHREAD_PROCESS_PRIVATE ); } -void *net_client_handler(void *dnbd3_client) +void* net_handleNewConnection(void *clientPtr) { - dnbd3_client_t * const client = (dnbd3_client_t *)dnbd3_client; + dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr; dnbd3_request_t request; + int ret; + + // Await data from client. Since this is a fresh connection, we expect data right away + sock_setTimeout( client->sock, _clientTimeout ); + ret = recv( client->sock, &request, sizeof(request), MSG_WAITALL ); + // Let's see if this looks like an HTTP request + if ( ret > 5 && request.magic != dnbd3_packet_magic + && ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) ) { + rpc_sendStatsJson( client->sock ); + goto fail_preadd; + } + + // It's expected to be a real dnbd3 client + // Check request for validity + if ( ret != sizeof(request) ) { + logadd( LOG_DEBUG1, "Error receiving request: Could not read message header (%d/%d, e=%d)", ret, (int)sizeof(request), errno ); + goto fail_preadd; + } + if ( request.magic != dnbd3_packet_magic ) { + logadd( LOG_DEBUG1, "Magic in client handshake incorrect" ); + goto fail_preadd; + } + fixup_request( *request ); + if ( request.cmd != CMD_SELECT_IMAGE ) { + logadd( LOG_WARNING, "Client sent != CMD_SELECT_IMAGE in handshake (got cmd=%d, size=%d), dropping client.", (int)request.cmd, (int)request.size ); + goto fail_preadd; + } + // Fully init client struct + spin_init( &client->lock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &client->statsLock, PTHREAD_PROCESS_PRIVATE ); + pthread_mutex_init( &client->sendMutex, NULL ); + if ( !dnbd3_addClient( client ) ) { + dnbd3_freeClient( client ); + logadd( LOG_WARNING, "Could not add new client to list when connecting" ); + return NULL; + } + dnbd3_reply_t reply; dnbd3_image_t *image = NULL; @@ -173,75 +221,62 @@ void *net_client_handler(void *dnbd3_client) memset( &payload, 0, sizeof(payload) ); reply.magic = dnbd3_packet_magic; - sock_setTimeout( client->sock, _clientTimeout ); spin_lock( &client->lock ); host_to_string( &client->host, client->hostName, HOSTNAMELEN ); client->hostName[HOSTNAMELEN-1] = '\0'; spin_unlock( &client->lock ); - // Receive first packet. This must be CMD_SELECT_IMAGE by protocol specification - if ( recv_request_header( client->sock, &request ) ) { - if ( request.cmd != CMD_SELECT_IMAGE ) { - logadd( LOG_DEBUG1, "Client %s sent invalid handshake (%d). Dropping Client\n", client->hostName, (int)request.cmd ); - } else if ( recv_request_payload( client->sock, request.size, &payload ) ) { - char *image_name; - client_version = serializer_get_uint16( &payload ); - image_name = serializer_get_string( &payload ); - rid = serializer_get_uint16( &payload ); - client->isServer = serializer_get_uint8( &payload ); - if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { - if ( client_version < MIN_SUPPORTED_CLIENT ) { - logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); - } else { - logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName ); - } + // Receive first packet's payload + if ( recv_request_payload( client->sock, request.size, &payload ) ) { + char *image_name; + client_version = serializer_get_uint16( &payload ); + image_name = serializer_get_string( &payload ); + rid = serializer_get_uint16( &payload ); + client->isServer = serializer_get_uint8( &payload ); + if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( client_version < MIN_SUPPORTED_CLIENT ) { + logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); } else { - image = image_getOrLoad( image_name, rid ); - spin_lock( &client->lock ); - client->image = image; - spin_unlock( &client->lock ); - if ( image == NULL ) { - //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( !image->working ) { - logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", - client->hostName, image_name, (int)rid ); - } else { - // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable - bOk = true; - if ( image->cache_map != NULL ) { - spin_lock( &image->lock ); - if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { - bOk = ( rand() % 4 ) == 1; - } - spin_unlock( &image->lock ); - if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this - usleep( 100000 ); // server gets a penalty and is less likely to be selected - } + logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName ); + } + } else { + image = image_getOrLoad( image_name, rid ); + spin_lock( &client->lock ); + client->image = image; + spin_unlock( &client->lock ); + if ( image == NULL ) { + //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + } else if ( !image->working ) { + logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", + client->hostName, image_name, (int)rid ); + } else { + // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable + bOk = true; + if ( image->cache_map != NULL ) { + spin_lock( &image->lock ); + if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + bOk = ( rand() % 4 ) == 1; } - if ( bOk ) { - image_file = image->readFd; - serializer_reset_write( &payload ); - serializer_put_uint16( &payload, PROTOCOL_VERSION ); - serializer_put_string( &payload, image->name ); - serializer_put_uint16( &payload, (uint16_t)image->rid ); - serializer_put_uint64( &payload, image->virtualFilesize ); - reply.cmd = CMD_SELECT_IMAGE; - reply.size = serializer_get_written_length( &payload ); - if ( !send_reply( client->sock, &reply, &payload ) ) { - bOk = false; - } + spin_unlock( &image->lock ); + if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + } + if ( bOk ) { + image_file = image->readFd; + serializer_reset_write( &payload ); + serializer_put_uint16( &payload, PROTOCOL_VERSION ); + serializer_put_string( &payload, image->name ); + serializer_put_uint16( &payload, (uint16_t)image->rid ); + serializer_put_uint64( &payload, image->virtualFilesize ); + reply.cmd = CMD_SELECT_IMAGE; + reply.size = serializer_get_written_length( &payload ); + if ( !send_reply( client->sock, &reply, &payload ) ) { + bOk = false; } } } } - } else { - fixup_request( request ); - if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) { - rpc_sendStatsJson( client->sock ); - } else { - // Unknown request - logadd( LOG_DEBUG1, "Client %s sent invalid handshake", client->hostName ); - } } if ( bOk ) { @@ -486,5 +521,166 @@ exit_client_cleanup: ; net_updateGlobalSentStatsFromClient( client ); // Don't need client's lock here as it's not active anymore dnbd3_freeClient( client ); // This will also call image_release on client->image return NULL ; +fail_preadd: ; + close( client->sock ); + free( client ); + return NULL; +} + +json_t* net_clientsToJson() +{ + json_t *jsonClients = json_array(); + json_t *clientStats; + int i; + int imgId; + uint64_t bytesSent; + char host[HOSTNAMELEN]; + host[HOSTNAMELEN-1] = '\0'; + + spin_lock( &_clients_lock ); + for ( i = 0; i < _num_clients; ++i ) { + if ( _clients[i] == NULL ) { + continue; + } + dnbd3_client_t * const client = _clients[i]; + spin_lock( &client->lock ); + spin_unlock( &_clients_lock ); + // Unlock so we give other threads a chance to access the client list. + // We might not get an atomic snapshot of the currently connected clients, + // but that doesn't really make a difference anyways. + if ( client->image == NULL ) { + spin_unlock( &client->lock ); + imgId = -1; + } else { + strncpy( host, client->hostName, HOSTNAMELEN - 1 ); + imgId = client->image->id; + spin_lock( &client->statsLock ); + spin_unlock( &client->lock ); + bytesSent = client->bytesSent; + net_updateGlobalSentStatsFromClient( client ); // Do this since we read the totalBytesSent counter later + spin_unlock( &client->statsLock ); + } + if ( imgId != -1 ) { + clientStats = json_pack( "{sssisI}", + "address", host, + "imageId", imgId, + "bytesSent", (json_int_t)bytesSent ); + json_array_append_new( jsonClients, clientStats ); + } + spin_lock( &_clients_lock ); + } + spin_unlock( &_clients_lock ); + return jsonClients; +} + +void net_disconnectAll() +{ + int i; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] == NULL ) continue; + dnbd3_client_t * const client = _clients[i]; + spin_lock( &client->lock ); + if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR ); + spin_unlock( &client->lock ); + } + spin_unlock( &_clients_lock ); +} + +void net_waitForAllDisconnected() +{ + int retries = 10, count, i; + do { + count = 0; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] == NULL ) continue; + count++; + } + spin_unlock( &_clients_lock ); + if ( count != 0 ) { + logadd( LOG_INFO, "%d clients still active...\n", count ); + sleep( 1 ); + } + } while ( count != 0 && --retries > 0 ); + _num_clients = 0; +} + +/* +++ + * Client list. + * + * Adding and removing clients. + */ + +/** + * Remove a client from the clients array + * Locks on: _clients_lock + */ +static void dnbd3_removeClient(dnbd3_client_t *client) +{ + int i; + spin_lock( &_clients_lock ); + for ( i = _num_clients - 1; i >= 0; --i ) { + if ( _clients[i] == client ) { + _clients[i] = NULL; + } + if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; + } + spin_unlock( &_clients_lock ); +} + +/** + * Free the client struct recursively. + * !! Make sure to call this function after removing the client from _dnbd3_clients !! + * Locks on: _clients[].lock, _images[].lock + * might call functions that lock on _images, _image[], uplink.queueLock, client.sendMutex + */ +static dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client) +{ + spin_lock( &client->lock ); + pthread_mutex_lock( &client->sendMutex ); + if ( client->sock != -1 ) close( client->sock ); + client->sock = -1; + pthread_mutex_unlock( &client->sendMutex ); + spin_lock( &client->statsLock ); + spin_unlock( &client->statsLock ); + if ( client->image != NULL ) { + spin_lock( &client->image->lock ); + if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); + spin_unlock( &client->image->lock ); + client->image = image_release( client->image ); + } + spin_unlock( &client->lock ); + spin_destroy( &client->lock ); + spin_destroy( &client->statsLock ); + pthread_mutex_destroy( &client->sendMutex ); + free( client ); + return NULL ; +} + +//###// + +/** + * Add client to the clients array. + * Locks on: _clients_lock + */ +static bool dnbd3_addClient(dnbd3_client_t *client) +{ + int i; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] != NULL ) continue; + _clients[i] = client; + spin_unlock( &_clients_lock ); + return true; + } + if ( _num_clients >= SERVER_MAX_CLIENTS ) { + spin_unlock( &_clients_lock ); + logadd( LOG_ERROR, "Maximum number of clients reached!" ); + return false; + } + _clients[_num_clients++] = client; + spin_unlock( &_clients_lock ); + return true; } -- cgit v1.2.3-55-g7522