summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2014-12-22 19:46:42 +0100
committerSimon Rettberg2014-12-22 19:46:42 +0100
commit47c28e25055cfa3c4c37035cc7888d0b87392dc1 (patch)
treef3213a89b18ddca597106af892f54bc429fc7689
parent[SERVER] Configurable client timeout, adaptive replication speed (to be teste... (diff)
downloaddnbd3-47c28e25055cfa3c4c37035cc7888d0b87392dc1.tar.gz
dnbd3-47c28e25055cfa3c4c37035cc7888d0b87392dc1.tar.xz
dnbd3-47c28e25055cfa3c4c37035cc7888d0b87392dc1.zip
[SERVER] Improve replication and reconnecting behaviour
-rw-r--r--src/server/altservers.c9
-rw-r--r--src/server/altservers.h2
-rw-r--r--src/server/globals.h2
-rw-r--r--src/server/image.c2
-rw-r--r--src/server/net.c3
-rw-r--r--src/server/uplink.c56
6 files changed, 38 insertions, 36 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 25d9bc9..7923f1a 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -226,7 +226,7 @@ int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int
* This function is suited for finding uplink servers as
* it includes private servers and ignores any "client only" servers
*/
-int altservers_get(dnbd3_host_t *output, int size)
+int altservers_get(dnbd3_host_t *output, int size, int emergency)
{
if ( size <= 0 ) return 0;
int count = 0, i;
@@ -245,10 +245,10 @@ int altservers_get(dnbd3_host_t *output, int size)
if ( altServers[i].host.type == 0 ) continue; // Slot is empty
if ( _proxyPrivateOnly && !altServers[i].isPrivate ) continue; // Config says to consider private alt-servers only? ignore!
if ( altServers[i].isClientOnly ) continue;
- if ( altServers[i].numFails > SERVER_MAX_UPLINK_FAILS // server failed X times in a row
+ if ( !emergency && altServers[i].numFails > SERVER_MAX_UPLINK_FAILS // server failed X times in a row
&& now - altServers[i].lastFail > SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore!
// server seems ok, include in output and reset its fail counter
- altServers[i].numFails = 0;
+ if ( !emergency ) altServers[i].numFails = 0;
output[count++] = altServers[i].host;
if ( count >= size ) break;
}
@@ -395,7 +395,6 @@ static void *altservers_main(void *data)
// Empty pipe
do {
ret = read( readPipe, buffer, sizeof buffer );
- if ( ret > 0 ) printf("*********** altserver thread woke up\n");
} while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up
if ( ret == 0 ) {
memlogf( "[WARNING] Signal pipe of alservers_main closed! Things will break!" );
@@ -425,7 +424,7 @@ static void *altservers_main(void *data)
}
assert( uplink->rttTestResult == RTT_INPROGRESS );
// Now get 4 alt servers
- numAlts = altservers_get( servers, ALTS );
+ numAlts = altservers_get( servers, ALTS, uplink->fd == -1 );
if ( uplink->fd != -1 ) {
// Add current server if not already in list
found = FALSE;
diff --git a/src/server/altservers.h b/src/server/altservers.h
index 13b0685..459c546 100644
--- a/src/server/altservers.h
+++ b/src/server/altservers.h
@@ -17,7 +17,7 @@ void altservers_removeUplink(dnbd3_connection_t *uplink);
int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size);
-int altservers_get(dnbd3_host_t *output, int size);
+int altservers_get(dnbd3_host_t *output, int size, int emergency);
int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2);
diff --git a/src/server/globals.h b/src/server/globals.h
index c215916..037cd08 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -41,7 +41,7 @@ typedef struct
#define RTT_NOT_REACHABLE 4 // No uplink was reachable
struct _dnbd3_connection
{
- int fd; // socket fd to remote server
+ volatile int fd; // socket fd to remote server
int signal; // eventfd used to wake up the process
pthread_t thread; // thread holding the connection
pthread_spinlock_t queueLock; // lock for synchronization on request queue etc.
diff --git a/src/server/image.c b/src/server/image.c
index 78b907c..d6fad22 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -901,7 +901,7 @@ dnbd3_image_t* image_getOrClone(char *name, uint16_t revision)
dnbd3_host_t servers[4];
int uplinkSock = -1;
dnbd3_host_t *uplinkServer = NULL;
- const int count = altservers_get( servers, 4 );
+ const int count = altservers_get( servers, 4, FALSE );
uint16_t remoteVersion, remoteRid;
uint64_t remoteImageSize;
for (i = 0; i < count; ++i) {
diff --git a/src/server/net.c b/src/server/net.c
index 5ea2e36..7d7ecb4 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -306,7 +306,8 @@ void *net_client_handler(void *dnbd3_client)
const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size );
if ( ret <= 0 ) {
pthread_mutex_unlock( &client->sendMutex );
- printf( "[ERROR] sendfile failed (image to net. ret=%d, sent %d/%d, errno=%d)\n", (int)ret, (int)done, (int)request.size, (int)errno );
+ printf( "[ERROR] sendfile failed (image to net. ret=%d, sent %d/%d, errno=%d)\n",
+ (int)ret, (int)done, (int)request.size, (int)errno );
goto exit_client_cleanup;
}
done += ret;
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 6c604c1..2821a27 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -256,6 +256,7 @@ static void* uplink_mainloop(void *data)
link->replicationHandle = 0;
// Re-send all pending requests
uplink_send_requests( link, FALSE );
+ uplink_sendReplicationRequest( link );
link->image->working = TRUE;
buffer[0] = '@';
if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
@@ -338,8 +339,8 @@ static void* uplink_mainloop(void *data)
const time_t now = time( NULL );
if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) {
// This probably means the system time was changed - handle this case properly by capping the timeout
- nextAltCheck = now + SERVER_RTT_DELAY_FAILED;
- } else if ( now >= nextAltCheck ) {
+ nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2;
+ } else if ( now >= nextAltCheck || link->fd == -1 ) {
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
@@ -381,8 +382,8 @@ static void* uplink_mainloop(void *data)
snprintf( buffer, sizeof(buffer), "[DEBUG WARNING] Starving request detected:\n"
"%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name,
link->queue[i].from, link->queue[i].to, link->queue[i].status );
- link->queue[i].status = ULR_NEW;
- resend = TRUE;
+ //link->queue[i].status = ULR_NEW;
+ //resend = TRUE;
spin_unlock( &link->queueLock );
printf("%s", buffer);
spin_lock( &link->queueLock );
@@ -508,7 +509,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT | MSG_NOSIGNAL );
if ( ret < 0 ) {
const int err = errno;
- if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases
+ if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) break; // OK cases
goto error_cleanup;
}
if ( ret == 0 ) {
@@ -547,7 +548,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
}
// Payload read completely
// Bail out if we're not interested
- if ( inReply.cmd != CMD_GET_BLOCK ) return;
+ if ( inReply.cmd != CMD_GET_BLOCK ) continue;
// Is a legit block reply
const uint64_t start = inReply.handle;
const uint64_t end = inReply.handle + inReply.size;
@@ -576,31 +577,32 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
outReply.magic = dnbd3_packet_magic;
for (i = link->queueLen - 1; i >= 0; --i) {
dnbd3_queued_request_t * const req = &link->queue[i];
- if ( req->status != ULR_PROCESSING ) continue;
- assert( req->from >= start && req->to <= end );
- dnbd3_client_t * const client = req->client;
- outReply.cmd = CMD_GET_BLOCK;
- outReply.handle = req->handle;
- outReply.size = req->to - req->from;
- iov[0].iov_base = &outReply;
- iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = link->recvBuffer + (req->from - start);
- iov[1].iov_len = outReply.size;
- fixup_reply( outReply );
- req->status = ULR_FREE;
- pthread_mutex_lock( &client->sendMutex );
- spin_unlock( &link->queueLock );
- writev( client->sock, iov, 2 );
- pthread_mutex_unlock( &client->sendMutex );
- spin_lock( &link->queueLock );
- if ( i == link->queueLen - 1 ) link->queueLen--;
+ if ( req->status == ULR_PROCESSING ) {
+ assert( req->from >= start && req->to <= end );
+ dnbd3_client_t * const client = req->client;
+ outReply.cmd = CMD_GET_BLOCK;
+ outReply.handle = req->handle;
+ outReply.size = req->to - req->from;
+ iov[0].iov_base = &outReply;
+ iov[0].iov_len = sizeof outReply;
+ iov[1].iov_base = link->recvBuffer + (req->from - start);
+ iov[1].iov_len = outReply.size;
+ fixup_reply( outReply );
+ req->status = ULR_FREE;
+ pthread_mutex_lock( &client->sendMutex );
+ spin_unlock( &link->queueLock );
+ writev( client->sock, iov, 2 );
+ pthread_mutex_unlock( &client->sendMutex );
+ spin_lock( &link->queueLock );
+ }
+ if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--;
}
spin_unlock( &link->queueLock );
if ( start == link->replicationHandle ) link->replicationHandle = 0;
}
- if ( link->queueLen == 0 ) {
- uplink_sendReplicationRequest( link );
- }
+ if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link );
+ return;
+ // Error handling from failed receive or message parsing
error_cleanup: ;
altservers_serverFailed( &link->currentServer );
const int fd = link->fd;