summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
authorSimon Rettberg2015-12-01 13:38:10 +0100
committerSimon Rettberg2015-12-01 13:38:10 +0100
commitecc6e5dd28821bf1c207e074f1b5903426a7e4c4 (patch)
tree5f9d8f8ffbcad5ae6ac1b9a0b3075155995537d8 /src/fuse
parent[FUSE] It works! Kinda... (diff)
downloaddnbd3-ecc6e5dd28821bf1c207e074f1b5903426a7e4c4.tar.gz
dnbd3-ecc6e5dd28821bf1c207e074f1b5903426a7e4c4.tar.xz
dnbd3-ecc6e5dd28821bf1c207e074f1b5903426a7e4c4.zip
[FUSE] Stability improvements, runs for longer than a couple secs now :)
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/connection.c60
1 files changed, 32 insertions, 28 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 53949ca..20ddd8f 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -122,7 +122,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
} else if ( rid != 0 && rid != remoteRid ) {
logadd( LOG_ERROR, "rid mismatch" );
} else {
- image.name = strdup(remoteName);
+ image.name = strdup( remoteName );
image.rid = remoteRid;
image.size = remoteSize;
connection.currentServer = altservers[i].host;
@@ -215,10 +215,9 @@ static void* connection_receiveThreadMain(void *sockPtr)
do {
ret = dnbd3_read_reply( sockFd, &reply, true );
if ( ret == REPLY_OK ) break;
- logadd( LOG_DEBUG1, "Receive return code %d", ret );
} while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
if ( ret != REPLY_OK ) {
- logadd( LOG_DEBUG1, "Error receiving reply on receiveThread" );
+ logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
goto fail;
}
// TODO: Ignoring anything but block replies for now; handle the others
@@ -229,7 +228,6 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
} else {
// get block reply. find matching request
- logadd( LOG_DEBUG1, "Got a block reply :)" );
dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
if ( request == NULL ) {
logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" );
@@ -239,25 +237,17 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
} else {
// Found a match
- logadd( LOG_DEBUG1, "Found pending entry :)" );
- request->finished = true;
- uint32_t done = 0;
- while ( done < request->length ) {
- logadd( LOG_DEBUG1, "Will read %d bytes...", (int)(request->length - done) );
- ssize_t ret = recv( sockFd, request->buffer + done, request->length - done, 0 );
- if ( ret <= 0 ) {
- if ( ret == -1 && errno == EINTR ) continue;
- logadd( LOG_DEBUG1, "Read failure :(" );
- request->success = false;
- signal_call( request->signalFd );
- logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
- goto fail;
- }
- done += (uint32_t)ret;
+ const ssize_t ret = sock_recv( sockFd, request->buffer, request->length );
+ if ( ret != (ssize_t)request->length ) {
+ request->success = false;
+ request->finished = true;
+ signal_call( request->signalFd );
+ logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
+ goto fail;
}
// Success, wake up caller
- logadd( LOG_DEBUG1, "Read completed :)" );
request->success = true;
+ request->finished = true;
signal_call( request->signalFd );
}
}
@@ -271,6 +261,7 @@ fail:;
logadd( LOG_DEBUG1, "RT: Local sock: %d, global: %d", sockFd, connection.sockFd );
if ( connection.sockFd == sockFd ) {
connection.sockFd = -1;
+ signal_call( connection.panicSignalFd );
}
pthread_mutex_unlock( &connection.sendMutex );
// As we're the only reader, it's safe to close the socket now
@@ -295,7 +286,7 @@ static void* connection_backgroundThread(void *something UNUSED)
// Woken up, see what we have to do
const bool panic = connection.sockFd == -1;
// Check alt servers
- if ( panic || now < nextRttCheck ) {
+ if ( panic || now >= nextRttCheck ) {
probeAltServers();
if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull;
@@ -304,7 +295,7 @@ static void* connection_backgroundThread(void *something UNUSED)
}
}
// Send keepalive packet
- if ( now < nextKeepalive ) {
+ if ( now >= nextKeepalive ) {
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
logadd( LOG_DEBUG1, "Sending keepalive..." );
@@ -317,6 +308,7 @@ static void* connection_backgroundThread(void *something UNUSED)
if ( (size_t)ret != sizeof request ) {
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
+ nextRttCheck = now;
}
}
pthread_mutex_unlock( &connection.sendMutex );
@@ -340,6 +332,9 @@ static void probeAltServers()
char *remoteName;
const bool panic = connection.sockFd == -1;
+ if ( panic ) {
+ logadd( LOG_DEBUG1, "C'est la panique, panique!" );
+ }
for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
alt_server_t * const srv = &altservers[altIndex];
if ( srv->host.type == 0 )
@@ -354,6 +349,7 @@ static void probeAltServers()
srv->rttIndex += 1;
}
// Probe
+ logadd( LOG_DEBUG1, "Trying server %d\n", altIndex );
const uint64_t start = nowMicro();
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
@@ -361,9 +357,11 @@ static void probeAltServers()
continue;
}
if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
+ logadd( LOG_DEBUG1, "-> select fail" );
goto fail;
}
if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
+ logadd( LOG_DEBUG1, "<- select fail" );
goto fail;
}
if ( remoteProto < MIN_SUPPORTED_SERVER || remoteProto > PROTOCOL_VERSION ) {
@@ -377,10 +375,13 @@ static void probeAltServers()
goto fail;
}
if ( !dnbd3_get_block( sock, 0, RTT_BLOCK_SIZE, 0 ) ) {
+ logadd( LOG_DEBUG1, "-> block request fail" );
goto fail;
}
- if ( !dnbd3_get_reply( sock, &reply ) || reply.size != RTT_BLOCK_SIZE
- || !throwDataAway( sock, RTT_BLOCK_SIZE ) ) {
+ int a = 111, b = 111;
+ if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != RTT_BLOCK_SIZE
+ || !(b = throwDataAway( sock, RTT_BLOCK_SIZE )) ) {
+ logadd( LOG_DEBUG1, "<- block paxload fail %d %d %d", a, (int)reply.size, b );
goto fail;
}
// Yay, success
@@ -421,9 +422,11 @@ fail:;
if ( bestIndex != -1
&& ( currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD
|| RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1500 ) ) {
+ logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt );
switchConnection( bestSock, &altservers[bestIndex] );
- } else {
+ } else if ( bestIndex != -1 ) {
// No switch
+ logadd( LOG_DEBUG1, "Current: %dµs, best: %dµs. Will not switch.", currentRtt, altservers[bestIndex].rtt );
close( bestSock );
}
}
@@ -482,12 +485,13 @@ static void switchConnection(int sockFd, alt_server_t *srv)
static bool throwDataAway(int sockFd, uint32_t amount)
{
- uint32_t done = 0;
+ size_t done = 0;
char tempBuffer[SHORTBUF];
while ( done < amount ) {
- const ssize_t ret = recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), MSG_NOSIGNAL );
- if ( ret == 0 || ( ret < 0 && ret != EINTR ) )
+ const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) );
+ if ( ret <= 0 )
return false;
+ done += (size_t)ret;
}
return true;
}