From cce7cf2c1428d174dd49177358dc52b234e97e5c Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 22 Dec 2014 15:51:30 +0100 Subject: [SERVER] Configurable client timeout, adaptive replication speed (to be tested against varying bw/latency), retry sendfile call if ret <= len --- server.config.example/server.conf | 2 ++ src/config.h | 1 - src/server/altservers.c | 1 + src/server/globals.c | 2 ++ src/server/globals.h | 10 ++++++++-- src/server/net.c | 14 +++++++++----- src/server/server.c | 4 ++-- src/server/uplink.c | 37 ++++++++++++++++++++++--------------- 8 files changed, 46 insertions(+), 25 deletions(-) diff --git a/server.config.example/server.conf b/server.config.example/server.conf index ee8163b..8c8bb9e 100644 --- a/server.config.example/server.conf +++ b/server.config.example/server.conf @@ -3,4 +3,6 @@ basePath=/mnt/storage/dnbd3 serverPenalty=100000 clientPenalty=0 isProxy=true +uplinkTimeout=1250 +clientTimeout=15000 diff --git a/src/config.h b/src/config.h index 45593c2..6f8c33e 100644 --- a/src/config.h +++ b/src/config.h @@ -63,7 +63,6 @@ #define COMMENT_LENGTH 120 // in seconds if not stated otherwise (MS = milliseconds) -#define SOCKET_TIMEOUT_SERVER_MS 15000 #define SOCKET_TIMEOUT_SERVER_RETRIES 3 // When waiting for next header, max reties * above timeout is the actual total timeout (ping timeout) #define SOCKET_TIMEOUT_CLIENT_DATA 2 #define SOCKET_TIMEOUT_CLIENT_DISCOVERY 1 diff --git a/src/server/altservers.c b/src/server/altservers.c index 0619bc7..25d9bc9 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -395,6 +395,7 @@ static void *altservers_main(void *data) // Empty pipe do { ret = read( readPipe, buffer, sizeof buffer ); + if ( ret > 0 ) printf("*********** altserver thread woke up\n"); } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up if ( ret == 0 ) { memlogf( "[WARNING] Signal pipe of alservers_main closed! Things will break!" ); diff --git a/src/server/globals.c b/src/server/globals.c index 3fcb61d..a9aca77 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -15,6 +15,7 @@ int _clientPenalty = 0; int _isProxy = FALSE; int _proxyPrivateOnly = FALSE; int _uplinkTimeout = 1250; +int _clientTimeout = 15000; #define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0) #define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0; } while (0) @@ -29,6 +30,7 @@ static int ini_handler(void *custom, const char* section, const char* key, const SAVE_TO_VAR_INT( dnbd3, serverPenalty ); SAVE_TO_VAR_INT( dnbd3, clientPenalty ); SAVE_TO_VAR_INT( dnbd3, uplinkTimeout ); + SAVE_TO_VAR_INT( dnbd3, clientTimeout ); return TRUE; } diff --git a/src/server/globals.h b/src/server/globals.h index 1b17660..c215916 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -56,7 +56,8 @@ struct _dnbd3_connection int recvBufferLen; // Len of ^^ volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop() int replicatedLastBlock; // bool telling if the last block has been replicated yet - time_t lastReplication; // timestamp of when last replication requests were sent + //time_t lastReplication; // timestamp of when last replication requests were sent + uint64_t replicationHandle; // Handle of pending replication request }; typedef struct @@ -167,10 +168,15 @@ extern int _isProxy; extern int _proxyPrivateOnly; /** - * Read timeout when waiting for data on an uplink + * Read timeout when waiting for or sending data on an uplink */ extern int _uplinkTimeout; +/** + * Read timeout when waiting for or sending data fron/to client + */ +extern int _clientTimeout; + void globals_loadConfig(); #endif /* GLOBALS_H_ */ diff --git a/src/server/net.c b/src/server/net.c index b40413f..5ea2e36 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -301,11 +301,15 @@ void *net_client_handler(void *dnbd3_client) if ( request.size != 0 ) { // Send payload if request length > 0 - const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size ); - if ( ret != request.size ) { - pthread_mutex_unlock( &client->sendMutex ); - printf( "[ERROR] sendfile failed (image to net %d/%d)\n", (int)ret, (int)request.size ); - goto exit_client_cleanup; + size_t done = 0; + while ( done < request.size ) { + const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size ); + if ( ret <= 0 ) { + pthread_mutex_unlock( &client->sendMutex ); + printf( "[ERROR] sendfile failed (image to net. ret=%d, sent %d/%d, errno=%d)\n", (int)ret, (int)done, (int)request.size, (int)errno ); + goto exit_client_cleanup; + } + done += ret; } } pthread_mutex_unlock( &client->sendMutex ); diff --git a/src/server/server.c b/src/server/server.c index 844365b..e502543 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -359,9 +359,9 @@ int main(int argc, char *argv[]) usleep( 10000 ); // 10ms continue; } - //memlogf("INFO: Client %s connected\n", inet_ntoa(client.sin_addr)); + //memlogf("INFO: Client connected\n"); - sock_set_timeout( fd, SOCKET_TIMEOUT_SERVER_MS ); + sock_set_timeout( fd, _clientTimeout ); dnbd3_client_t *dnbd3_client = dnbd3_init_client( &client, fd ); if ( dnbd3_client == NULL ) { diff --git a/src/server/uplink.c b/src/server/uplink.c index 0a60ff1..6c604c1 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -55,6 +55,7 @@ int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->queueLen = 0; link->fd = -1; link->signal = -1; + link->replicationHandle = 0; if ( sock >= 0 ) { link->betterFd = sock; link->betterServer = *host; @@ -250,10 +251,11 @@ static void* uplink_mainloop(void *data) if ( link->image->crc32 == NULL ) { uplink_addCrc32( link ); } - // Re-send all pending requests - uplink_send_requests( link, FALSE ); link->betterFd = -1; link->currentServer = link->betterServer; + link->replicationHandle = 0; + // Re-send all pending requests + uplink_send_requests( link, FALSE ); link->image->working = TRUE; buffer[0] = '@'; if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { @@ -331,8 +333,6 @@ static void* uplink_mainloop(void *data) } } // Done handling epoll sockets - // Replicate missing blocks from the image so the proxy will eventually have a full copy - uplink_sendReplicationRequest( link ); // See if we should trigger an RTT measurement if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { const time_t now = time( NULL ); @@ -373,6 +373,7 @@ static void* uplink_mainloop(void *data) } #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { + int resend = FALSE; time_t deadline = time( NULL ) - 10; spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { @@ -381,12 +382,15 @@ static void* uplink_mainloop(void *data) "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name, link->queue[i].from, link->queue[i].to, link->queue[i].status ); link->queue[i].status = ULR_NEW; + resend = TRUE; spin_unlock( &link->queueLock ); printf("%s", buffer); spin_lock( &link->queueLock ); } } spin_unlock( &link->queueLock ); + if ( resend ) + uplink_send_requests( link, TRUE ); } #endif } @@ -458,27 +462,24 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) if ( link == NULL || link->fd == -1 ) return; dnbd3_image_t * const image = link->image; if ( image == NULL || image->cache_map == NULL || image->filesize < DNBD3_BLOCK_SIZE ) return; - const time_t now = time( NULL ); - if ( now <= link->lastReplication + 1 ) return; - link->lastReplication = now; spin_lock( &image->lock ); - if ( image == NULL || image->cache_map == NULL ) { + if ( image == NULL || image->cache_map == NULL || link->replicationHandle != 0 ) { spin_unlock( &image->lock ); return; } dnbd3_request_t request; request.magic = dnbd3_packet_magic; - int sent = 0; const size_t len = IMGSIZE_TO_MAPBYTES( image->filesize ) - 1; for (int i = 0; i <= len; ++i) { - if ( image->cache_map == NULL || link->fd == -1 || sent > 20 ) break; + if ( image->cache_map == NULL || link->fd == -1 ) break; if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue; if ( i == len ) link->replicatedLastBlock = TRUE; + link->replicationHandle = 1; // Prevent race condition spin_unlock( &image->lock ); // Unlocked - do not break or continue here... - ++sent; request.cmd = CMD_GET_BLOCK; - request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8; + // Needs to be 8 (bit->byte, bitmap) + link->replicationHandle = request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8; request.size = DNBD3_BLOCK_SIZE * (uint64_t)8; if ( request.offset + request.size > image->filesize ) { request.size = image->filesize - request.offset; @@ -489,10 +490,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) printf( "[DEBUG] Error sending background replication request to uplink server!\n" ); return; } - // Lock again... - spin_lock( &image->lock ); + return; // Request was sent, bail out, nothing is locked } spin_unlock( &image->lock ); + // Replication might be complete, uplink_mainloop should take care.... } /** @@ -592,15 +593,21 @@ static void uplink_handle_receive(dnbd3_connection_t *link) writev( client->sock, iov, 2 ); pthread_mutex_unlock( &client->sendMutex ); spin_lock( &link->queueLock ); - if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; + if ( i == link->queueLen - 1 ) link->queueLen--; } spin_unlock( &link->queueLock ); + if ( start == link->replicationHandle ) link->replicationHandle = 0; + } + if ( link->queueLen == 0 ) { + uplink_sendReplicationRequest( link ); } error_cleanup: ; altservers_serverFailed( &link->currentServer ); const int fd = link->fd; link->fd = -1; + link->replicationHandle = 0; if ( fd != -1 ) close( fd ); + altservers_findUplink( link ); // Can we just call it here? } /** -- cgit v1.2.3-55-g7522