summaryrefslogtreecommitdiffstats
path: root/src/server/net.c
diff options
context:
space:
mode:
authorSimon Rettberg2017-09-07 15:50:50 +0200
committerSimon Rettberg2017-09-07 15:50:50 +0200
commit8d96868945f3d52b44de02e2c468766c46693aef (patch)
tree09b012052d5bd67b855b36bd47cdb3956d55e0a8 /src/server/net.c
parent[SERVER] Implement closeUnusedFd config option (diff)
downloaddnbd3-8d96868945f3d52b44de02e2c468766c46693aef.tar.gz
dnbd3-8d96868945f3d52b44de02e2c468766c46693aef.tar.xz
dnbd3-8d96868945f3d52b44de02e2c468766c46693aef.zip
[SERVER] Refactor: Move client list to net.* and isolate
Diffstat (limited to 'src/server/net.c')
-rw-r--r--src/server/net.c316
1 files changed, 256 insertions, 60 deletions
diff --git a/src/server/net.c b/src/server/net.c
index 3092c7e..f7a866b 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -38,12 +38,22 @@
#include <sys/socket.h>
#include <sys/uio.h>
#endif
+#include <jansson.h>
+
+static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS];
+static int _num_clients = 0;
+static pthread_spinlock_t _clients_lock;
static char nullbytes[500];
static uint64_t totalBytesSent = 0;
static pthread_spinlock_t statisticsSentLock;
+// Adding and removing clients -- list management
+static void dnbd3_removeClient(dnbd3_client_t *client);
+static dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client);
+static bool dnbd3_addClient(dnbd3_client_t *client);
+
/**
* Update global sent stats. Hold client's statsLock when calling.
*/
@@ -146,13 +156,51 @@ uint64_t net_getTotalBytesSent()
void net_init()
{
+ spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE );
spin_init( &statisticsSentLock, PTHREAD_PROCESS_PRIVATE );
}
-void *net_client_handler(void *dnbd3_client)
+void* net_handleNewConnection(void *clientPtr)
{
- dnbd3_client_t * const client = (dnbd3_client_t *)dnbd3_client;
+ dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr;
dnbd3_request_t request;
+ int ret;
+
+ // Await data from client. Since this is a fresh connection, we expect data right away
+ sock_setTimeout( client->sock, _clientTimeout );
+ ret = recv( client->sock, &request, sizeof(request), MSG_WAITALL );
+ // Let's see if this looks like an HTTP request
+ if ( ret > 5 && request.magic != dnbd3_packet_magic
+ && ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) ) {
+ rpc_sendStatsJson( client->sock );
+ goto fail_preadd;
+ }
+
+ // It's expected to be a real dnbd3 client
+ // Check request for validity
+ if ( ret != sizeof(request) ) {
+ logadd( LOG_DEBUG1, "Error receiving request: Could not read message header (%d/%d, e=%d)", ret, (int)sizeof(request), errno );
+ goto fail_preadd;
+ }
+ if ( request.magic != dnbd3_packet_magic ) {
+ logadd( LOG_DEBUG1, "Magic in client handshake incorrect" );
+ goto fail_preadd;
+ }
+ fixup_request( *request );
+ if ( request.cmd != CMD_SELECT_IMAGE ) {
+ logadd( LOG_WARNING, "Client sent != CMD_SELECT_IMAGE in handshake (got cmd=%d, size=%d), dropping client.", (int)request.cmd, (int)request.size );
+ goto fail_preadd;
+ }
+ // Fully init client struct
+ spin_init( &client->lock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &client->statsLock, PTHREAD_PROCESS_PRIVATE );
+ pthread_mutex_init( &client->sendMutex, NULL );
+ if ( !dnbd3_addClient( client ) ) {
+ dnbd3_freeClient( client );
+ logadd( LOG_WARNING, "Could not add new client to list when connecting" );
+ return NULL;
+ }
+
dnbd3_reply_t reply;
dnbd3_image_t *image = NULL;
@@ -173,75 +221,62 @@ void *net_client_handler(void *dnbd3_client)
memset( &payload, 0, sizeof(payload) );
reply.magic = dnbd3_packet_magic;
- sock_setTimeout( client->sock, _clientTimeout );
spin_lock( &client->lock );
host_to_string( &client->host, client->hostName, HOSTNAMELEN );
client->hostName[HOSTNAMELEN-1] = '\0';
spin_unlock( &client->lock );
- // Receive first packet. This must be CMD_SELECT_IMAGE by protocol specification
- if ( recv_request_header( client->sock, &request ) ) {
- if ( request.cmd != CMD_SELECT_IMAGE ) {
- logadd( LOG_DEBUG1, "Client %s sent invalid handshake (%d). Dropping Client\n", client->hostName, (int)request.cmd );
- } else if ( recv_request_payload( client->sock, request.size, &payload ) ) {
- char *image_name;
- client_version = serializer_get_uint16( &payload );
- image_name = serializer_get_string( &payload );
- rid = serializer_get_uint16( &payload );
- client->isServer = serializer_get_uint8( &payload );
- if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
- if ( client_version < MIN_SUPPORTED_CLIENT ) {
- logadd( LOG_DEBUG1, "Client %s too old", client->hostName );
- } else {
- logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName );
- }
+ // Receive first packet's payload
+ if ( recv_request_payload( client->sock, request.size, &payload ) ) {
+ char *image_name;
+ client_version = serializer_get_uint16( &payload );
+ image_name = serializer_get_string( &payload );
+ rid = serializer_get_uint16( &payload );
+ client->isServer = serializer_get_uint8( &payload );
+ if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
+ if ( client_version < MIN_SUPPORTED_CLIENT ) {
+ logadd( LOG_DEBUG1, "Client %s too old", client->hostName );
} else {
- image = image_getOrLoad( image_name, rid );
- spin_lock( &client->lock );
- client->image = image;
- spin_unlock( &client->lock );
- if ( image == NULL ) {
- //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( !image->working ) {
- logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
- client->hostName, image_name, (int)rid );
- } else {
- // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
- bOk = true;
- if ( image->cache_map != NULL ) {
- spin_lock( &image->lock );
- if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
- bOk = ( rand() % 4 ) == 1;
- }
- spin_unlock( &image->lock );
- if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
- usleep( 100000 ); // server gets a penalty and is less likely to be selected
- }
+ logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName );
+ }
+ } else {
+ image = image_getOrLoad( image_name, rid );
+ spin_lock( &client->lock );
+ client->image = image;
+ spin_unlock( &client->lock );
+ if ( image == NULL ) {
+ //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else if ( !image->working ) {
+ logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n",
+ client->hostName, image_name, (int)rid );
+ } else {
+ // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
+ bOk = true;
+ if ( image->cache_map != NULL ) {
+ spin_lock( &image->lock );
+ if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ bOk = ( rand() % 4 ) == 1;
}
- if ( bOk ) {
- image_file = image->readFd;
- serializer_reset_write( &payload );
- serializer_put_uint16( &payload, PROTOCOL_VERSION );
- serializer_put_string( &payload, image->name );
- serializer_put_uint16( &payload, (uint16_t)image->rid );
- serializer_put_uint64( &payload, image->virtualFilesize );
- reply.cmd = CMD_SELECT_IMAGE;
- reply.size = serializer_get_written_length( &payload );
- if ( !send_reply( client->sock, &reply, &payload ) ) {
- bOk = false;
- }
+ spin_unlock( &image->lock );
+ if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ }
+ if ( bOk ) {
+ image_file = image->readFd;
+ serializer_reset_write( &payload );
+ serializer_put_uint16( &payload, PROTOCOL_VERSION );
+ serializer_put_string( &payload, image->name );
+ serializer_put_uint16( &payload, (uint16_t)image->rid );
+ serializer_put_uint64( &payload, image->virtualFilesize );
+ reply.cmd = CMD_SELECT_IMAGE;
+ reply.size = serializer_get_written_length( &payload );
+ if ( !send_reply( client->sock, &reply, &payload ) ) {
+ bOk = false;
}
}
}
}
- } else {
- fixup_request( request );
- if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) {
- rpc_sendStatsJson( client->sock );
- } else {
- // Unknown request
- logadd( LOG_DEBUG1, "Client %s sent invalid handshake", client->hostName );
- }
}
if ( bOk ) {
@@ -486,5 +521,166 @@ exit_client_cleanup: ;
net_updateGlobalSentStatsFromClient( client ); // Don't need client's lock here as it's not active anymore
dnbd3_freeClient( client ); // This will also call image_release on client->image
return NULL ;
+fail_preadd: ;
+ close( client->sock );
+ free( client );
+ return NULL;
+}
+
+json_t* net_clientsToJson()
+{
+ json_t *jsonClients = json_array();
+ json_t *clientStats;
+ int i;
+ int imgId;
+ uint64_t bytesSent;
+ char host[HOSTNAMELEN];
+ host[HOSTNAMELEN-1] = '\0';
+
+ spin_lock( &_clients_lock );
+ for ( i = 0; i < _num_clients; ++i ) {
+ if ( _clients[i] == NULL ) {
+ continue;
+ }
+ dnbd3_client_t * const client = _clients[i];
+ spin_lock( &client->lock );
+ spin_unlock( &_clients_lock );
+ // Unlock so we give other threads a chance to access the client list.
+ // We might not get an atomic snapshot of the currently connected clients,
+ // but that doesn't really make a difference anyways.
+ if ( client->image == NULL ) {
+ spin_unlock( &client->lock );
+ imgId = -1;
+ } else {
+ strncpy( host, client->hostName, HOSTNAMELEN - 1 );
+ imgId = client->image->id;
+ spin_lock( &client->statsLock );
+ spin_unlock( &client->lock );
+ bytesSent = client->bytesSent;
+ net_updateGlobalSentStatsFromClient( client ); // Do this since we read the totalBytesSent counter later
+ spin_unlock( &client->statsLock );
+ }
+ if ( imgId != -1 ) {
+ clientStats = json_pack( "{sssisI}",
+ "address", host,
+ "imageId", imgId,
+ "bytesSent", (json_int_t)bytesSent );
+ json_array_append_new( jsonClients, clientStats );
+ }
+ spin_lock( &_clients_lock );
+ }
+ spin_unlock( &_clients_lock );
+ return jsonClients;
+}
+
+void net_disconnectAll()
+{
+ int i;
+ spin_lock( &_clients_lock );
+ for (i = 0; i < _num_clients; ++i) {
+ if ( _clients[i] == NULL ) continue;
+ dnbd3_client_t * const client = _clients[i];
+ spin_lock( &client->lock );
+ if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR );
+ spin_unlock( &client->lock );
+ }
+ spin_unlock( &_clients_lock );
+}
+
+void net_waitForAllDisconnected()
+{
+ int retries = 10, count, i;
+ do {
+ count = 0;
+ spin_lock( &_clients_lock );
+ for (i = 0; i < _num_clients; ++i) {
+ if ( _clients[i] == NULL ) continue;
+ count++;
+ }
+ spin_unlock( &_clients_lock );
+ if ( count != 0 ) {
+ logadd( LOG_INFO, "%d clients still active...\n", count );
+ sleep( 1 );
+ }
+ } while ( count != 0 && --retries > 0 );
+ _num_clients = 0;
+}
+
+/* +++
+ * Client list.
+ *
+ * Adding and removing clients.
+ */
+
+/**
+ * Remove a client from the clients array
+ * Locks on: _clients_lock
+ */
+static void dnbd3_removeClient(dnbd3_client_t *client)
+{
+ int i;
+ spin_lock( &_clients_lock );
+ for ( i = _num_clients - 1; i >= 0; --i ) {
+ if ( _clients[i] == client ) {
+ _clients[i] = NULL;
+ }
+ if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients;
+ }
+ spin_unlock( &_clients_lock );
+}
+
+/**
+ * Free the client struct recursively.
+ * !! Make sure to call this function after removing the client from _dnbd3_clients !!
+ * Locks on: _clients[].lock, _images[].lock
+ * might call functions that lock on _images, _image[], uplink.queueLock, client.sendMutex
+ */
+static dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client)
+{
+ spin_lock( &client->lock );
+ pthread_mutex_lock( &client->sendMutex );
+ if ( client->sock != -1 ) close( client->sock );
+ client->sock = -1;
+ pthread_mutex_unlock( &client->sendMutex );
+ spin_lock( &client->statsLock );
+ spin_unlock( &client->statsLock );
+ if ( client->image != NULL ) {
+ spin_lock( &client->image->lock );
+ if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client );
+ spin_unlock( &client->image->lock );
+ client->image = image_release( client->image );
+ }
+ spin_unlock( &client->lock );
+ spin_destroy( &client->lock );
+ spin_destroy( &client->statsLock );
+ pthread_mutex_destroy( &client->sendMutex );
+ free( client );
+ return NULL ;
+}
+
+//###//
+
+/**
+ * Add client to the clients array.
+ * Locks on: _clients_lock
+ */
+static bool dnbd3_addClient(dnbd3_client_t *client)
+{
+ int i;
+ spin_lock( &_clients_lock );
+ for (i = 0; i < _num_clients; ++i) {
+ if ( _clients[i] != NULL ) continue;
+ _clients[i] = client;
+ spin_unlock( &_clients_lock );
+ return true;
+ }
+ if ( _num_clients >= SERVER_MAX_CLIENTS ) {
+ spin_unlock( &_clients_lock );
+ logadd( LOG_ERROR, "Maximum number of clients reached!" );
+ return false;
+ }
+ _clients[_num_clients++] = client;
+ spin_unlock( &_clients_lock );
+ return true;
}