summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--server.config.example/server.conf2
-rw-r--r--src/config.h1
-rw-r--r--src/server/altservers.c1
-rw-r--r--src/server/globals.c2
-rw-r--r--src/server/globals.h10
-rw-r--r--src/server/net.c14
-rw-r--r--src/server/server.c4
-rw-r--r--src/server/uplink.c37
8 files changed, 46 insertions, 25 deletions
diff --git a/server.config.example/server.conf b/server.config.example/server.conf
index ee8163b..8c8bb9e 100644
--- a/server.config.example/server.conf
+++ b/server.config.example/server.conf
@@ -3,4 +3,6 @@ basePath=/mnt/storage/dnbd3
serverPenalty=100000
clientPenalty=0
isProxy=true
+uplinkTimeout=1250
+clientTimeout=15000
diff --git a/src/config.h b/src/config.h
index 45593c2..6f8c33e 100644
--- a/src/config.h
+++ b/src/config.h
@@ -63,7 +63,6 @@
#define COMMENT_LENGTH 120
// in seconds if not stated otherwise (MS = milliseconds)
-#define SOCKET_TIMEOUT_SERVER_MS 15000
#define SOCKET_TIMEOUT_SERVER_RETRIES 3 // When waiting for next header, max reties * above timeout is the actual total timeout (ping timeout)
#define SOCKET_TIMEOUT_CLIENT_DATA 2
#define SOCKET_TIMEOUT_CLIENT_DISCOVERY 1
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 0619bc7..25d9bc9 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -395,6 +395,7 @@ static void *altservers_main(void *data)
// Empty pipe
do {
ret = read( readPipe, buffer, sizeof buffer );
+ if ( ret > 0 ) printf("*********** altserver thread woke up\n");
} while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up
if ( ret == 0 ) {
memlogf( "[WARNING] Signal pipe of alservers_main closed! Things will break!" );
diff --git a/src/server/globals.c b/src/server/globals.c
index 3fcb61d..a9aca77 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -15,6 +15,7 @@ int _clientPenalty = 0;
int _isProxy = FALSE;
int _proxyPrivateOnly = FALSE;
int _uplinkTimeout = 1250;
+int _clientTimeout = 15000;
#define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0)
#define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0; } while (0)
@@ -29,6 +30,7 @@ static int ini_handler(void *custom, const char* section, const char* key, const
SAVE_TO_VAR_INT( dnbd3, serverPenalty );
SAVE_TO_VAR_INT( dnbd3, clientPenalty );
SAVE_TO_VAR_INT( dnbd3, uplinkTimeout );
+ SAVE_TO_VAR_INT( dnbd3, clientTimeout );
return TRUE;
}
diff --git a/src/server/globals.h b/src/server/globals.h
index 1b17660..c215916 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -56,7 +56,8 @@ struct _dnbd3_connection
int recvBufferLen; // Len of ^^
volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop()
int replicatedLastBlock; // bool telling if the last block has been replicated yet
- time_t lastReplication; // timestamp of when last replication requests were sent
+ //time_t lastReplication; // timestamp of when last replication requests were sent
+ uint64_t replicationHandle; // Handle of pending replication request
};
typedef struct
@@ -167,10 +168,15 @@ extern int _isProxy;
extern int _proxyPrivateOnly;
/**
- * Read timeout when waiting for data on an uplink
+ * Read timeout when waiting for or sending data on an uplink
*/
extern int _uplinkTimeout;
+/**
+ * Read timeout when waiting for or sending data fron/to client
+ */
+extern int _clientTimeout;
+
void globals_loadConfig();
#endif /* GLOBALS_H_ */
diff --git a/src/server/net.c b/src/server/net.c
index b40413f..5ea2e36 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -301,11 +301,15 @@ void *net_client_handler(void *dnbd3_client)
if ( request.size != 0 ) {
// Send payload if request length > 0
- const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size );
- if ( ret != request.size ) {
- pthread_mutex_unlock( &client->sendMutex );
- printf( "[ERROR] sendfile failed (image to net %d/%d)\n", (int)ret, (int)request.size );
- goto exit_client_cleanup;
+ size_t done = 0;
+ while ( done < request.size ) {
+ const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size );
+ if ( ret <= 0 ) {
+ pthread_mutex_unlock( &client->sendMutex );
+ printf( "[ERROR] sendfile failed (image to net. ret=%d, sent %d/%d, errno=%d)\n", (int)ret, (int)done, (int)request.size, (int)errno );
+ goto exit_client_cleanup;
+ }
+ done += ret;
}
}
pthread_mutex_unlock( &client->sendMutex );
diff --git a/src/server/server.c b/src/server/server.c
index 844365b..e502543 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -359,9 +359,9 @@ int main(int argc, char *argv[])
usleep( 10000 ); // 10ms
continue;
}
- //memlogf("INFO: Client %s connected\n", inet_ntoa(client.sin_addr));
+ //memlogf("INFO: Client connected\n");
- sock_set_timeout( fd, SOCKET_TIMEOUT_SERVER_MS );
+ sock_set_timeout( fd, _clientTimeout );
dnbd3_client_t *dnbd3_client = dnbd3_init_client( &client, fd );
if ( dnbd3_client == NULL ) {
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 0a60ff1..6c604c1 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -55,6 +55,7 @@ int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
link->queueLen = 0;
link->fd = -1;
link->signal = -1;
+ link->replicationHandle = 0;
if ( sock >= 0 ) {
link->betterFd = sock;
link->betterServer = *host;
@@ -250,10 +251,11 @@ static void* uplink_mainloop(void *data)
if ( link->image->crc32 == NULL ) {
uplink_addCrc32( link );
}
- // Re-send all pending requests
- uplink_send_requests( link, FALSE );
link->betterFd = -1;
link->currentServer = link->betterServer;
+ link->replicationHandle = 0;
+ // Re-send all pending requests
+ uplink_send_requests( link, FALSE );
link->image->working = TRUE;
buffer[0] = '@';
if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -331,8 +333,6 @@ static void* uplink_mainloop(void *data)
}
}
// Done handling epoll sockets
- // Replicate missing blocks from the image so the proxy will eventually have a full copy
- uplink_sendReplicationRequest( link );
// See if we should trigger an RTT measurement
if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
const time_t now = time( NULL );
@@ -373,6 +373,7 @@ static void* uplink_mainloop(void *data)
}
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
+ int resend = FALSE;
time_t deadline = time( NULL ) - 10;
spin_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
@@ -381,12 +382,15 @@ static void* uplink_mainloop(void *data)
"%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name,
link->queue[i].from, link->queue[i].to, link->queue[i].status );
link->queue[i].status = ULR_NEW;
+ resend = TRUE;
spin_unlock( &link->queueLock );
printf("%s", buffer);
spin_lock( &link->queueLock );
}
}
spin_unlock( &link->queueLock );
+ if ( resend )
+ uplink_send_requests( link, TRUE );
}
#endif
}
@@ -458,27 +462,24 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
if ( link == NULL || link->fd == -1 ) return;
dnbd3_image_t * const image = link->image;
if ( image == NULL || image->cache_map == NULL || image->filesize < DNBD3_BLOCK_SIZE ) return;
- const time_t now = time( NULL );
- if ( now <= link->lastReplication + 1 ) return;
- link->lastReplication = now;
spin_lock( &image->lock );
- if ( image == NULL || image->cache_map == NULL ) {
+ if ( image == NULL || image->cache_map == NULL || link->replicationHandle != 0 ) {
spin_unlock( &image->lock );
return;
}
dnbd3_request_t request;
request.magic = dnbd3_packet_magic;
- int sent = 0;
const size_t len = IMGSIZE_TO_MAPBYTES( image->filesize ) - 1;
for (int i = 0; i <= len; ++i) {
- if ( image->cache_map == NULL || link->fd == -1 || sent > 20 ) break;
+ if ( image->cache_map == NULL || link->fd == -1 ) break;
if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue;
if ( i == len ) link->replicatedLastBlock = TRUE;
+ link->replicationHandle = 1; // Prevent race condition
spin_unlock( &image->lock );
// Unlocked - do not break or continue here...
- ++sent;
request.cmd = CMD_GET_BLOCK;
- request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8;
+ // Needs to be 8 (bit->byte, bitmap)
+ link->replicationHandle = request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8;
request.size = DNBD3_BLOCK_SIZE * (uint64_t)8;
if ( request.offset + request.size > image->filesize ) {
request.size = image->filesize - request.offset;
@@ -489,10 +490,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
printf( "[DEBUG] Error sending background replication request to uplink server!\n" );
return;
}
- // Lock again...
- spin_lock( &image->lock );
+ return; // Request was sent, bail out, nothing is locked
}
spin_unlock( &image->lock );
+ // Replication might be complete, uplink_mainloop should take care....
}
/**
@@ -592,15 +593,21 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
writev( client->sock, iov, 2 );
pthread_mutex_unlock( &client->sendMutex );
spin_lock( &link->queueLock );
- if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
+ if ( i == link->queueLen - 1 ) link->queueLen--;
}
spin_unlock( &link->queueLock );
+ if ( start == link->replicationHandle ) link->replicationHandle = 0;
+ }
+ if ( link->queueLen == 0 ) {
+ uplink_sendReplicationRequest( link );
}
error_cleanup: ;
altservers_serverFailed( &link->currentServer );
const int fd = link->fd;
link->fd = -1;
+ link->replicationHandle = 0;
if ( fd != -1 ) close( fd );
+ altservers_findUplink( link ); // Can we just call it here?
}
/**