From 43e57ce5e11e9052f5a7db66f2e8613f1784f919 Mon Sep 17 00:00:00 2001 From: Frederic Robra Date: Tue, 25 Jun 2019 17:03:28 +0200 Subject: first version of dnbd3-ng --- src/server/net.c | 731 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 731 insertions(+) create mode 100644 src/server/net.c (limited to 'src/server/net.c') diff --git a/src/server/net.c b/src/server/net.c new file mode 100644 index 0000000..00e88e0 --- /dev/null +++ b/src/server/net.c @@ -0,0 +1,731 @@ +/* + * This file is part of the Distributed Network Block Device 3 + * + * Copyright(c) 2011-2012 Johann Latocha + * + * This file may be licensed under the terms of of the + * GNU General Public License Version 2 (the ``GPL''). + * + * Software distributed under the License is distributed + * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either + * express or implied. See the GPL for the specific language + * governing rights and limitations. + * + * You should have received a copy of the GPL along with this + * program. If not, go to http://www.gnu.org/licenses/gpl.html + * or write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + */ + +#include "helper.h" +#include "image.h" +#include "uplink.h" +#include "locks.h" +#include "rpc.h" +#include "altservers.h" + +#include "../shared/sockhelper.h" +#include "../shared/timing.h" +#include "../shared/protocol.h" +#include "../serialize.h" + +#include + +#ifdef __linux__ +#include +#endif +#ifdef __FreeBSD__ +#include +#include +#include +#endif +#include +#include +#include + +static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS]; +static int _num_clients = 0; +static pthread_spinlock_t _clients_lock; + +static char nullbytes[500]; + +static atomic_uint_fast64_t totalBytesSent = 0; + +// Adding and removing clients -- list management +static bool addToList(dnbd3_client_t *client); +static void removeFromList(dnbd3_client_t *client); +static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client); + +static inline bool recv_request_header(int sock, dnbd3_request_t *request) +{ + ssize_t ret, fails = 0; +#ifdef AFL_MODE + sock = 0; +#endif + // Read request header from socket + while ( ( ret = recv( sock, request, sizeof(*request), MSG_WAITALL ) ) != sizeof(*request) ) { + if ( errno == EINTR && ++fails < 10 ) continue; + if ( ret >= 0 || ++fails > SOCKET_TIMEOUT_CLIENT_RETRIES ) return false; + if ( errno == EAGAIN ) continue; + logadd( LOG_DEBUG2, "Error receiving request: Could not read message header (%d/%d, e=%d)\n", (int)ret, (int)sizeof(*request), errno ); + return false; + } + // Make sure all bytes are in the right order (endianness) + fixup_request( *request ); + if ( request->magic != dnbd3_packet_magic ) { + logadd( LOG_DEBUG2, "Magic in client request incorrect (cmd: %d, len: %d)\n", (int)request->cmd, (int)request->size ); + return false; + } + // Payload sanity check + if ( request->cmd != CMD_GET_BLOCK && request->size > MAX_PAYLOAD ) { + logadd( LOG_WARNING, "Client tries to send a packet of type %d with %d bytes payload. Dropping client.", (int)request->cmd, (int)request->size ); + return false; + } + return true; +} + +static inline bool recv_request_payload(int sock, uint32_t size, serialized_buffer_t *payload) +{ +#ifdef AFL_MODE + sock = 0; +#endif + if ( size == 0 ) { + logadd( LOG_ERROR, "Called recv_request_payload() to receive 0 bytes" ); + return false; + } + if ( size > MAX_PAYLOAD ) { + logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" ); + return false; + } + if ( sock_recv( sock, payload->buffer, size ) != (ssize_t)size ) { + logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size ); + return false; + } + // Prepare payload buffer for reading + serializer_reset_read( payload, size ); + return true; +} + +/** + * Send reply with optional payload. payload can be null. The caller has to + * acquire the sendMutex first. + */ +static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) +{ + const uint32_t size = reply->size; + fixup_reply( *reply ); + if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) { + logadd( LOG_DEBUG1, "Sending reply header to client failed" ); + return false; + } + if ( size != 0 && payload != NULL ) { + if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) { + logadd( LOG_DEBUG1, "Sending payload of %"PRIu32" bytes to client failed", size ); + return false; + } + } + return true; +} + +/** + * Send given amount of null bytes. The caller has to acquire the sendMutex first. + */ +static inline bool sendPadding( const int fd, uint32_t bytes ) +{ + ssize_t ret; + while ( bytes >= sizeof(nullbytes) ) { + ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 ); + if ( ret <= 0 ) + return false; + bytes -= (uint32_t)ret; + } + return sock_sendAll( fd, nullbytes, bytes, 2 ) == (ssize_t)bytes; +} + +void net_init() +{ + spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE ); +} + +void* net_handleNewConnection(void *clientPtr) +{ + dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr; + dnbd3_request_t request; + + // Await data from client. Since this is a fresh connection, we expect data right away + sock_setTimeout( client->sock, _clientTimeout ); + do { +#ifdef AFL_MODE + const int ret = (int)recv( 0, &request, sizeof(request), MSG_WAITALL ); +#else + const int ret = (int)recv( client->sock, &request, sizeof(request), MSG_WAITALL ); +#endif + // It's expected to be a real dnbd3 client + // Check request for validity. This implicitly dictates that all HTTP requests are more than 24 bytes... + if ( ret != (int)sizeof(request) ) { + logadd( LOG_DEBUG2, "Error receiving request: Could not read message header (%d/%d, e=%d)", (int)ret, (int)sizeof(request), errno ); + goto fail_preadd; + } + + if ( request.magic != dnbd3_packet_magic ) { + // Let's see if this looks like an HTTP request + if ( ((char*)&request)[0] == 'G' || ((char*)&request)[0] == 'P' ) { + // Close enough... + rpc_sendStatsJson( client->sock, &client->host, &request, ret ); + } else { + logadd( LOG_DEBUG1, "Magic in client handshake incorrect" ); + } + goto fail_preadd; + } + // Magic OK, untangle byte order if required + fixup_request( request ); + if ( request.cmd != CMD_SELECT_IMAGE ) { + logadd( LOG_WARNING, "Client sent != CMD_SELECT_IMAGE in handshake (got cmd=%d, size=%d), dropping client.", (int)request.cmd, (int)request.size ); + goto fail_preadd; + } + } while (0); + // Fully init client struct + spin_init( &client->lock, PTHREAD_PROCESS_PRIVATE ); + pthread_mutex_init( &client->sendMutex, NULL ); + + spin_lock( &client->lock ); + host_to_string( &client->host, client->hostName, HOSTNAMELEN ); + client->hostName[HOSTNAMELEN-1] = '\0'; + spin_unlock( &client->lock ); + client->bytesSent = 0; + + if ( !addToList( client ) ) { + freeClientStruct( client ); + logadd( LOG_WARNING, "Could not add new client to list when connecting" ); + return NULL; + } + + dnbd3_reply_t reply; + + dnbd3_image_t *image = NULL; + int image_file = -1; + + int num; + bool bOk = false; + bool hasName = false; + + serialized_buffer_t payload; + uint16_t rid, client_version; + uint64_t start, end; + + dnbd3_server_entry_t server_list[NUMBER_SERVERS]; + + // Set to zero to make valgrind happy + memset( &reply, 0, sizeof(reply) ); + memset( &payload, 0, sizeof(payload) ); + reply.magic = dnbd3_packet_magic; + + // Receive first packet's payload + if ( recv_request_payload( client->sock, request.size, &payload ) ) { + char *image_name; + client_version = serializer_get_uint16( &payload ); + image_name = serializer_get_string( &payload ); + rid = serializer_get_uint16( &payload ); + const uint8_t flags = serializer_get_uint8( &payload ); + client->isServer = ( flags & FLAGS8_SERVER ); + if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( client_version < MIN_SUPPORTED_CLIENT ) { + logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); + } else { + logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName ); + } + } else { + if ( !client->isServer || !_isProxy ) { + // Is a normal client, or we're not proxy + image = image_getOrLoad( image_name, rid ); + } else if ( _backgroundReplication != BGR_FULL && ( flags & FLAGS8_BG_REP ) ) { + // We're a proxy, client is another proxy, we don't do BGR, but connecting proxy does... + // Reject, as this would basically force this proxy to do BGR too. + image = image_get( image_name, rid, true ); + if ( image != NULL && image->cache_map != NULL ) { + // Only exception is if the image is complete locally + image = image_release( image ); + } + } else if ( _lookupMissingForProxy ) { + // No BGR mismatch and we're told to lookup missing images on a known uplink server + // if the requesting client is a proxy + image = image_getOrLoad( image_name, rid ); + } else { + // No BGR mismatch, but don't lookup if image is unknown locally + image = image_get( image_name, rid, true ); + } + spin_lock( &client->lock ); + client->image = image; + spin_unlock( &client->lock ); + if ( image == NULL ) { + //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + } else if ( !image->working ) { + logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", + client->hostName, image_name, (int)rid ); + } else { + bool penalty; + // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable + bOk = true; + if ( image->cache_map != NULL ) { + spin_lock( &image->lock ); + if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + bOk = ( rand() % 4 ) == 1; + } + penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1; + spin_unlock( &image->lock ); + if ( penalty ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + } + if ( bOk ) { + spin_lock( &image->lock ); + image_file = image->readFd; + if ( !client->isServer ) { + // Only update immediately if this is a client. Servers are handled on disconnect. + timing_get( &image->atime ); + } + spin_unlock( &image->lock ); + serializer_reset_write( &payload ); + serializer_put_uint16( &payload, client_version < 3 ? client_version : PROTOCOL_VERSION ); // XXX: Since messed up fuse client was messed up before :( + serializer_put_string( &payload, image->name ); + serializer_put_uint16( &payload, (uint16_t)image->rid ); + serializer_put_uint64( &payload, image->virtualFilesize ); + reply.cmd = CMD_SELECT_IMAGE; + reply.size = serializer_get_written_length( &payload ); + if ( !send_reply( client->sock, &reply, &payload ) ) { + bOk = false; + } + } + } + } + } + + if ( bOk ) { + // add artificial delay if applicable + if ( client->isServer && _serverPenalty != 0 ) { + usleep( _serverPenalty ); + } else if ( !client->isServer && _clientPenalty != 0 ) { + usleep( _clientPenalty ); + } + // client handling mainloop + while ( recv_request_header( client->sock, &request ) ) { + if ( _shutdown ) break; + switch ( request.cmd ) { + + case CMD_GET_BLOCK:; + const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking + if ( offset >= image->virtualFilesize ) { + // Sanity check + logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName ); + reply.size = 0; + reply.cmd = CMD_ERROR; + send_reply( client->sock, &reply, NULL ); + break; + } + if ( offset + request.size > image->virtualFilesize ) { + // Sanity check + logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName ); + reply.size = 0; + reply.cmd = CMD_ERROR; + send_reply( client->sock, &reply, NULL ); + break; + } + + if ( request.size != 0 && image->cache_map != NULL ) { + // This is a proxyed image, check if we need to relay the request... + start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + bool isCached = true; + spin_lock( &image->lock ); + // Check again as we only aquired the lock just now + if ( image->cache_map != NULL ) { + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + uint64_t pos; + // Middle - quick checking + if ( isCached ) { + pos = firstByteInMap + 1; + while ( pos < lastByteInMap ) { + if ( image->cache_map[pos] != 0xff ) { + isCached = false; + break; + } + ++pos; + } + } + // First byte + if ( isCached ) { + pos = start; + do { + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) { + isCached = false; + break; + } + pos += DNBD3_BLOCK_SIZE; + } while ( firstByteInMap == (pos >> 15) && pos < end ); + } + // Last byte - only check if request spans multiple bytes in cache map + if ( isCached && firstByteInMap != lastByteInMap ) { + pos = lastByteInMap << 15; + while ( pos < end ) { + assert( lastByteInMap == (pos >> 15) ); + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) { + isCached = false; + break; + } + pos += DNBD3_BLOCK_SIZE; + } + } + } + spin_unlock( &image->lock ); + if ( !isCached ) { + if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { + logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", + client->hostName, image->name, image->rid ); + image->working = false; + goto exit_client_cleanup; + } + break; // DONE, exit request.cmd switch + } + } + + reply.cmd = CMD_GET_BLOCK; + reply.size = request.size; + reply.handle = request.handle; + + fixup_reply( reply ); + const bool lock = image->uplink != NULL; + if ( lock ) pthread_mutex_lock( &client->sendMutex ); + // Send reply header + if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK reply header to %s failed", client->hostName ); + goto exit_client_cleanup; + } + + if ( request.size != 0 ) { + // Send payload if request length > 0 + size_t done = 0; + off_t foffset = (off_t)offset; + size_t realBytes; + if ( offset + request.size <= image->realFilesize ) { + realBytes = request.size; + } else { + realBytes = (size_t)(image->realFilesize - offset); + } + while ( done < realBytes ) { + // TODO: Should we consider EOPNOTSUPP on BSD for sendfile and fallback to read/write? + // Linux would set EINVAL or ENOSYS instead, which it unfortunately also does for a couple of other failures :/ + // read/write would kill performance anyways so a fallback would probably be of little use either way. +#ifdef AFL_MODE + char buf[1000]; + size_t cnt = realBytes - done; + if ( cnt > 1000 ) { + cnt = 1000; + } + const ssize_t sent = pread( image_file, buf, cnt, foffset ); + if ( sent > 0 ) { + //write( client->sock, buf, sent ); // This is not verified in any way, so why even do it... + } else { + const int err = errno; +#elif defined(__linux__) + const ssize_t sent = sendfile( client->sock, image_file, &foffset, realBytes - done ); + if ( sent <= 0 ) { + const int err = errno; +#elif defined(__FreeBSD__) + off_t sent; + const int ret = sendfile( image_file, client->sock, foffset, realBytes - done, NULL, &sent, 0 ); + if ( ret == -1 || sent == 0 ) { + const int err = errno; + if ( ret == -1 ) { + if ( err == EAGAIN || err == EINTR ) { // EBUSY? manpage doesn't explicitly mention *sent here.. But then again we dont set the according flag anyways + done += sent; + continue; + } + sent = -1; + } +#endif + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + if ( sent == -1 ) { + if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN + && err != EAGAIN && err != EWOULDBLOCK ) { + logadd( LOG_DEBUG1, "sendfile to %s failed (image to net. sent %d/%d, errno=%d)", + client->hostName, (int)done, (int)realBytes, err ); + } + if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) { + logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid ); + image->working = false; + } + } + goto exit_client_cleanup; + } + done += sent; + } + if ( request.size > (uint32_t)realBytes ) { + if ( !sendPadding( client->sock, request.size - (uint32_t)realBytes ) ) { + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + goto exit_client_cleanup; + } + } + } + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + // Global per-client counter + client->bytesSent += request.size; // Increase counter for statistics. + break; + + case CMD_GET_SERVERS: + // Build list of known working alt servers + num = altservers_getListForClient( &client->host, server_list, NUMBER_SERVERS ); + reply.cmd = CMD_GET_SERVERS; + reply.size = (uint32_t)( num * sizeof(dnbd3_server_entry_t) ); + pthread_mutex_lock( &client->sendMutex ); + send_reply( client->sock, &reply, server_list ); + pthread_mutex_unlock( &client->sendMutex ); + goto set_name; + break; + + case CMD_KEEPALIVE: + reply.cmd = CMD_KEEPALIVE; + reply.size = 0; + pthread_mutex_lock( &client->sendMutex ); + send_reply( client->sock, &reply, NULL ); + pthread_mutex_unlock( &client->sendMutex ); +set_name: ; + if ( !hasName ) { + hasName = true; + setThreadName( client->hostName ); + } + break; + + case CMD_SET_CLIENT_MODE: + client->isServer = false; + break; + + case CMD_GET_CRC32: + reply.cmd = CMD_GET_CRC32; + pthread_mutex_lock( &client->sendMutex ); + if ( image->crc32 == NULL ) { + reply.size = 0; + send_reply( client->sock, &reply, NULL ); + } else { + const uint32_t size = reply.size = (uint32_t)( (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t) ); + send_reply( client->sock, &reply, NULL ); + send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE ); + send( client->sock, image->crc32, size - sizeof(uint32_t), 0 ); + } + pthread_mutex_unlock( &client->sendMutex ); + break; + + default: + logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd ); + break; + + } + } + } +exit_client_cleanup: ; + // First remove from list, then add to counter to prevent race condition + removeFromList( client ); + totalBytesSent += client->bytesSent; + // Access time, but only if client didn't just probe + if ( image != NULL ) { + spin_lock( &image->lock ); + if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { + timing_get( &image->atime ); + } + spin_unlock( &image->lock ); + } + freeClientStruct( client ); // This will also call image_release on client->image + return NULL ; +fail_preadd: ; + close( client->sock ); + free( client ); + return NULL; +} + +/** + * Get list of all clients. + */ +struct json_t* net_getListAsJson() +{ + json_t *jsonClients = json_array(); + json_t *clientStats; + int imgId, isServer; + uint64_t bytesSent; + char host[HOSTNAMELEN]; + host[HOSTNAMELEN-1] = '\0'; + + spin_lock( &_clients_lock ); + for ( int i = 0; i < _num_clients; ++i ) { + dnbd3_client_t * const client = _clients[i]; + if ( client == NULL || client->image == NULL ) + continue; + spin_lock( &client->lock ); + // Unlock so we give other threads a chance to access the client list. + // We might not get an atomic snapshot of the currently connected clients, + // but that doesn't really make a difference anyways. + spin_unlock( &_clients_lock ); + strncpy( host, client->hostName, HOSTNAMELEN - 1 ); + imgId = client->image->id; + isServer = (int)client->isServer; + bytesSent = client->bytesSent; + spin_unlock( &client->lock ); + clientStats = json_pack( "{sssisisI}", + "address", host, + "imageId", imgId, + "isServer", isServer, + "bytesSent", (json_int_t)bytesSent ); + json_array_append_new( jsonClients, clientStats ); + spin_lock( &_clients_lock ); + } + spin_unlock( &_clients_lock ); + return jsonClients; +} + +/** + * Get number of clients connected, total bytes sent, or both. + * we don't unlock the list while iterating or we might get an + * incorrect result if a client is disconnecting while iterating. + */ +void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) +{ + int cc = 0, sc = 0; + uint64_t bs = 0; + + spin_lock( &_clients_lock ); + for ( int i = 0; i < _num_clients; ++i ) { + const dnbd3_client_t * const client = _clients[i]; + if ( client == NULL || client->image == NULL ) + continue; + if ( client->isServer ) { + sc += 1; + } else { + cc += 1; + } + bs += client->bytesSent; + } + spin_unlock( &_clients_lock ); + if ( clientCount != NULL ) { + *clientCount = cc; + } + if ( serverCount != NULL ) { + *serverCount = sc; + } + if ( bytesSent != NULL ) { + *bytesSent = totalBytesSent + bs; + } +} + +void net_disconnectAll() +{ + int i; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] == NULL ) continue; + dnbd3_client_t * const client = _clients[i]; + spin_lock( &client->lock ); + if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR ); + spin_unlock( &client->lock ); + } + spin_unlock( &_clients_lock ); +} + +void net_waitForAllDisconnected() +{ + int retries = 10, count, i; + do { + count = 0; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] == NULL ) continue; + count++; + } + spin_unlock( &_clients_lock ); + if ( count != 0 ) { + logadd( LOG_INFO, "%d clients still active...\n", count ); + sleep( 1 ); + } + } while ( count != 0 && --retries > 0 ); + _num_clients = 0; +} + +/* +++ + * Client list. + * + * Adding and removing clients. + */ + +/** + * Remove a client from the clients array + * Locks on: _clients_lock + */ +static void removeFromList(dnbd3_client_t *client) +{ + int i; + spin_lock( &_clients_lock ); + for ( i = _num_clients - 1; i >= 0; --i ) { + if ( _clients[i] == client ) { + _clients[i] = NULL; + } + if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; + } + spin_unlock( &_clients_lock ); +} + +/** + * Free the client struct recursively. + * !! Make sure to call this function after removing the client from _dnbd3_clients !! + * Locks on: _clients[].lock, _images[].lock + * might call functions that lock on _images, _image[], uplink.queueLock, client.sendMutex + */ +static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) +{ + spin_lock( &client->lock ); + pthread_mutex_lock( &client->sendMutex ); + if ( client->sock != -1 ) close( client->sock ); + client->sock = -1; + pthread_mutex_unlock( &client->sendMutex ); + if ( client->image != NULL ) { + spin_lock( &client->image->lock ); + if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); + spin_unlock( &client->image->lock ); + client->image = image_release( client->image ); + } + spin_unlock( &client->lock ); + spin_destroy( &client->lock ); + pthread_mutex_destroy( &client->sendMutex ); + free( client ); + return NULL ; +} + +//###// + +/** + * Add client to the clients array. + * Locks on: _clients_lock + */ +static bool addToList(dnbd3_client_t *client) +{ + int i; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] != NULL ) continue; + _clients[i] = client; + spin_unlock( &_clients_lock ); + return true; + } + if ( _num_clients >= _maxClients ) { + spin_unlock( &_clients_lock ); + logadd( LOG_ERROR, "Maximum number of clients reached!" ); + return false; + } + _clients[_num_clients++] = client; + spin_unlock( &_clients_lock ); + return true; +} + -- cgit v1.2.3-55-g7522