diff options
Diffstat (limited to 'src/server/net.c')
-rw-r--r-- | src/server/net.c | 192 |
1 files changed, 119 insertions, 73 deletions
diff --git a/src/server/net.c b/src/server/net.c index ad228f4..a467620 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -85,7 +85,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" ); return false; } - if ( recv( sock, payload->buffer, size, MSG_WAITALL ) != size ) { + if ( sock_recv( sock, payload->buffer, size ) != size ) { logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size ); return false; } @@ -94,34 +94,35 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff return true; } +/** + * Send reply with optional payload. payload can be null. The caller has to + * acquire the sendMutex first. + */ static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) { - const unsigned int size = reply->size; + const size_t size = reply->size; fixup_reply( *reply ); - if ( !payload || size == 0 ) { - if ( send( sock, reply, sizeof(dnbd3_reply_t), 0 ) != sizeof(dnbd3_reply_t) ) { - logadd( LOG_DEBUG1, "Send failed (header-only)\n" ); - return false; - } - } else { - struct iovec iov[2]; - iov[0].iov_base = reply; - iov[0].iov_len = sizeof(dnbd3_reply_t); - iov[1].iov_base = payload; - iov[1].iov_len = (size_t)size; - if ( (size_t)writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) { - logadd( LOG_DEBUG1, "Send failed (reply with payload of %u bytes)\n", size ); + if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) { + logadd( LOG_DEBUG1, "Sending reply header to client failed" ); + return false; + } + if ( size != 0 && payload != NULL ) { + if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) { + logadd( LOG_DEBUG1, "Sending payload of %u bytes to client failed", size ); return false; } } return true; } +/** + * Send given amount of null bytes. The caller has to acquire the sendMutex first. + */ static inline bool sendPadding( const int fd, uint32_t bytes ) { ssize_t ret; while ( bytes >= sizeof(nullbytes) ) { - ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 1 ); + ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 ); if ( ret <= 0 ) return false; bytes -= (uint32_t)ret; @@ -131,8 +132,9 @@ static inline bool sendPadding( const int fd, uint32_t bytes ) uint64_t net_getTotalBytesSent() { + // reads and writes to 64bit ints are not atomic on x86, so let's be safe and use locking spin_lock( &statisticsSentLock ); - uint64_t tmp = totalBytesSent; + const uint64_t tmp = totalBytesSent; spin_unlock( &statisticsSentLock ); return tmp; } @@ -156,10 +158,10 @@ void *net_client_handler(void *dnbd3_client) bool hasName = false; serialized_buffer_t payload; - char *image_name; uint16_t rid, client_version; uint64_t start, end; - char buffer[100]; + uint32_t tempBytesSent = 0; + char hostPrintable[100]; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -174,25 +176,38 @@ void *net_client_handler(void *dnbd3_client) if ( recv_request_header( client->sock, &request ) ) { if ( request.cmd != CMD_SELECT_IMAGE ) { logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd ); - } else { - if ( recv_request_payload( client->sock, request.size, &payload ) ) { - client_version = serializer_get_uint16( &payload ); - image_name = serializer_get_string( &payload ); - rid = serializer_get_uint16( &payload ); - client->isServer = serializer_get_uint8( &payload ); - if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { - if ( client_version < MIN_SUPPORTED_CLIENT ) { - logadd( LOG_DEBUG1, "Client too old\n" ); - } else { - logadd( LOG_DEBUG1, "Incomplete handshake received\n" ); - } + } else if ( recv_request_payload( client->sock, request.size, &payload ) ) { + char *image_name; + client_version = serializer_get_uint16( &payload ); + image_name = serializer_get_string( &payload ); + rid = serializer_get_uint16( &payload ); + client->isServer = serializer_get_uint8( &payload ); + if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( client_version < MIN_SUPPORTED_CLIENT ) { + logadd( LOG_DEBUG1, "Client too old\n" ); } else { - client->image = image = image_getOrLoad( image_name, rid ); - if ( image == NULL ) { - //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( !image->working ) { - logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else { + logadd( LOG_DEBUG1, "Incomplete handshake received\n" ); + } + } else { + client->image = image = image_getOrLoad( image_name, rid ); + if ( image == NULL ) { + //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + } else if ( !image->working ) { + logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + } else { + // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable + bOk = true; + if ( image->cache_map != NULL ) { + spin_lock( &image->lock ); + if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + bOk = ( rand() % 4 ) == 1; + } + spin_unlock( &image->lock ); + if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + } + if ( bOk ) { image_file = image->readFd; serializer_reset_write( &payload ); serializer_put_uint16( &payload, PROTOCOL_VERSION ); @@ -201,9 +216,8 @@ void *net_client_handler(void *dnbd3_client) serializer_put_uint64( &payload, image->virtualFilesize ); reply.cmd = CMD_SELECT_IMAGE; reply.size = serializer_get_written_length( &payload ); - if ( send_reply( client->sock, &reply, &payload ) ) { - if ( !client->isServer ) image->atime = time( NULL ); - bOk = true; + if ( !send_reply( client->sock, &reply, &payload ) ) { + bOk = false; } } } @@ -211,6 +225,11 @@ void *net_client_handler(void *dnbd3_client) } } else if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) { rpc_sendStatsJson( client->sock ); + } else { + // Unknown request + if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) { + logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable ); + } } if ( bOk ) { @@ -251,23 +270,13 @@ void *net_client_handler(void *dnbd3_client) spin_lock( &image->lock ); // Check again as we only aquired the lock just now if ( image->cache_map != NULL ) { - const uint64_t firstByte = start >> 15; - const uint64_t lastByte = (end - 1) >> 15; - // First byte - uint64_t pos = start; - do { - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = 1 << map_x; - if ( (image->cache_map[firstByte] & bit_mask) == 0 ) { - isCached = false; - break; - } - pos += DNBD3_BLOCK_SIZE; - } while ( firstByte == (pos >> 15) && pos < end ); + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + uint64_t pos; // Middle - quick checking if ( isCached ) { - pos = firstByte + 1; - while ( pos < lastByte ) { + pos = firstByteInMap + 1; + while ( pos < lastByteInMap ) { if ( image->cache_map[pos] != 0xff ) { isCached = false; break; @@ -275,14 +284,27 @@ void *net_client_handler(void *dnbd3_client) ++pos; } } - // Last byte - if ( isCached && firstByte != lastByte ) { - pos = lastByte << 15; + // First byte + if ( isCached ) { + pos = start; + do { + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = 1 << map_x; + if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) { + isCached = false; + break; + } + pos += DNBD3_BLOCK_SIZE; + } while ( firstByteInMap == (pos >> 15) && pos < end ); + } + // Last byte - only check if request spans multiple bytes in cache map + if ( isCached && firstByteInMap != lastByteInMap ) { + pos = lastByteInMap << 15; while ( pos < end ) { - assert( lastByte == (pos >> 15) ); + assert( lastByteInMap == (pos >> 15) ); const int map_x = (pos >> 12) & 7; // mod 8 const uint8_t bit_mask = 1 << map_x; - if ( (image->cache_map[lastByte] & bit_mask) == 0 ) { + if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) { isCached = false; break; } @@ -293,7 +315,9 @@ void *net_client_handler(void *dnbd3_client) spin_unlock( &image->lock ); if ( !isCached ) { if ( !uplink_request( client, request.handle, request.offset, request.size ) ) { - logadd( LOG_DEBUG1, "Could not relay uncached request to upstream proxy\n" ); + logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d", + image->lower_name, (int)image->rid ); + image->working = false; goto exit_client_cleanup; } break; // DONE @@ -329,10 +353,13 @@ void *net_client_handler(void *dnbd3_client) if ( ret <= 0 ) { const int err = errno; if ( lock ) pthread_mutex_unlock( &client->sendMutex ); - if ( ret < 0 && err != EPIPE && err != ECONNRESET ) - logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n", - (int)done, (int)realBytes, err ); - if ( err == EBADF || err == EINVAL || err == EIO ) image->working = false; + if ( ret == -1 ) { + if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) { + logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n", + (int)done, (int)realBytes, err ); + } + if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false; + } goto exit_client_cleanup; } done += ret; @@ -343,9 +370,20 @@ void *net_client_handler(void *dnbd3_client) goto exit_client_cleanup; } } - client->bytesSent += request.size; // Increase counter for statistics. } if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + // Global per-client counter + spin_lock( &client->lock ); + client->bytesSent += request.size; // Increase counter for statistics. + spin_unlock( &client->lock ); + // Local counter that gets added to the global total bytes sent counter periodically + tempBytesSent += request.size; + if (tempBytesSent > 1024 * 1024 ) { + spin_lock( &statisticsSentLock ); + totalBytesSent += tempBytesSent; + spin_unlock( &statisticsSentLock ); + tempBytesSent = 0; + } break; case CMD_GET_SERVERS: @@ -353,19 +391,25 @@ void *net_client_handler(void *dnbd3_client) num = altservers_getMatching( &client->host, server_list, NUMBER_SERVERS ); reply.cmd = CMD_GET_SERVERS; reply.size = num * sizeof(dnbd3_server_entry_t); + pthread_mutex_lock( &client->sendMutex ); send_reply( client->sock, &reply, server_list ); - client->isServer = false; // Only clients request list of servers + pthread_mutex_unlock( &client->sendMutex ); goto set_name; break; case CMD_KEEPALIVE: reply.cmd = CMD_KEEPALIVE; reply.size = 0; + pthread_mutex_lock( &client->sendMutex ); send_reply( client->sock, &reply, NULL ); + pthread_mutex_unlock( &client->sendMutex ); + image->atime = time( NULL ); set_name: ; - if ( !hasName && host_to_string( &client->host, buffer, sizeof buffer ) ) { + if ( !hasName ) { hasName = true; - setThreadName( buffer ); + if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) { + setThreadName( hostPrintable ); + } } break; @@ -376,19 +420,21 @@ set_name: ; case CMD_GET_CRC32: reply.cmd = CMD_GET_CRC32; + pthread_mutex_lock( &client->sendMutex ); if ( image->crc32 == NULL ) { reply.size = 0; send_reply( client->sock, &reply, NULL ); } else { const int size = reply.size = (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t); send_reply( client->sock, &reply, NULL ); - send( client->sock, &image->masterCrc32, sizeof(uint32_t), 0 ); + send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE ); send( client->sock, image->crc32, size - sizeof(uint32_t), 0 ); } + pthread_mutex_unlock( &client->sendMutex ); break; default: - logadd( LOG_ERROR, "Unknown command: %d", (int)request.cmd ); + logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd ); break; } @@ -397,9 +443,9 @@ set_name: ; exit_client_cleanup: ; dnbd3_removeClient( client ); spin_lock( &statisticsSentLock ); - totalBytesSent += client->bytesSent;// Add the amount of bytes sent by the client to the statistics of the server. + totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server. spin_unlock( &statisticsSentLock ); - client = dnbd3_freeClient( client ); + client = dnbd3_freeClient( client ); // This will also call image_release on client->image return NULL ; } |