summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2015-12-01 13:38:10 +0100
committerSimon Rettberg2015-12-01 13:38:10 +0100
commitecc6e5dd28821bf1c207e074f1b5903426a7e4c4 (patch)
tree5f9d8f8ffbcad5ae6ac1b9a0b3075155995537d8
parent[FUSE] It works! Kinda... (diff)
downloaddnbd3-ecc6e5dd28821bf1c207e074f1b5903426a7e4c4.tar.gz
dnbd3-ecc6e5dd28821bf1c207e074f1b5903426a7e4c4.tar.xz
dnbd3-ecc6e5dd28821bf1c207e074f1b5903426a7e4c4.zip
[FUSE] Stability improvements, runs for longer than a couple secs now :)
-rw-r--r--src/fuse/connection.c60
-rw-r--r--src/shared/protocol.h40
-rw-r--r--src/shared/sockhelper.c23
-rw-r--r--src/shared/sockhelper.h7
4 files changed, 75 insertions, 55 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 53949ca..20ddd8f 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -122,7 +122,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
} else if ( rid != 0 && rid != remoteRid ) {
logadd( LOG_ERROR, "rid mismatch" );
} else {
- image.name = strdup(remoteName);
+ image.name = strdup( remoteName );
image.rid = remoteRid;
image.size = remoteSize;
connection.currentServer = altservers[i].host;
@@ -215,10 +215,9 @@ static void* connection_receiveThreadMain(void *sockPtr)
do {
ret = dnbd3_read_reply( sockFd, &reply, true );
if ( ret == REPLY_OK ) break;
- logadd( LOG_DEBUG1, "Receive return code %d", ret );
} while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
if ( ret != REPLY_OK ) {
- logadd( LOG_DEBUG1, "Error receiving reply on receiveThread" );
+ logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret );
goto fail;
}
// TODO: Ignoring anything but block replies for now; handle the others
@@ -229,7 +228,6 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
} else {
// get block reply. find matching request
- logadd( LOG_DEBUG1, "Got a block reply :)" );
dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
if ( request == NULL ) {
logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" );
@@ -239,25 +237,17 @@ static void* connection_receiveThreadMain(void *sockPtr)
}
} else {
// Found a match
- logadd( LOG_DEBUG1, "Found pending entry :)" );
- request->finished = true;
- uint32_t done = 0;
- while ( done < request->length ) {
- logadd( LOG_DEBUG1, "Will read %d bytes...", (int)(request->length - done) );
- ssize_t ret = recv( sockFd, request->buffer + done, request->length - done, 0 );
- if ( ret <= 0 ) {
- if ( ret == -1 && errno == EINTR ) continue;
- logadd( LOG_DEBUG1, "Read failure :(" );
- request->success = false;
- signal_call( request->signalFd );
- logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
- goto fail;
- }
- done += (uint32_t)ret;
+ const ssize_t ret = sock_recv( sockFd, request->buffer, request->length );
+ if ( ret != (ssize_t)request->length ) {
+ request->success = false;
+ request->finished = true;
+ signal_call( request->signalFd );
+ logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
+ goto fail;
}
// Success, wake up caller
- logadd( LOG_DEBUG1, "Read completed :)" );
request->success = true;
+ request->finished = true;
signal_call( request->signalFd );
}
}
@@ -271,6 +261,7 @@ fail:;
logadd( LOG_DEBUG1, "RT: Local sock: %d, global: %d", sockFd, connection.sockFd );
if ( connection.sockFd == sockFd ) {
connection.sockFd = -1;
+ signal_call( connection.panicSignalFd );
}
pthread_mutex_unlock( &connection.sendMutex );
// As we're the only reader, it's safe to close the socket now
@@ -295,7 +286,7 @@ static void* connection_backgroundThread(void *something UNUSED)
// Woken up, see what we have to do
const bool panic = connection.sockFd == -1;
// Check alt servers
- if ( panic || now < nextRttCheck ) {
+ if ( panic || now >= nextRttCheck ) {
probeAltServers();
if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull;
@@ -304,7 +295,7 @@ static void* connection_backgroundThread(void *something UNUSED)
}
}
// Send keepalive packet
- if ( now < nextKeepalive ) {
+ if ( now >= nextKeepalive ) {
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
logadd( LOG_DEBUG1, "Sending keepalive..." );
@@ -317,6 +308,7 @@ static void* connection_backgroundThread(void *something UNUSED)
if ( (size_t)ret != sizeof request ) {
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
+ nextRttCheck = now;
}
}
pthread_mutex_unlock( &connection.sendMutex );
@@ -340,6 +332,9 @@ static void probeAltServers()
char *remoteName;
const bool panic = connection.sockFd == -1;
+ if ( panic ) {
+ logadd( LOG_DEBUG1, "C'est la panique, panique!" );
+ }
for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
alt_server_t * const srv = &altservers[altIndex];
if ( srv->host.type == 0 )
@@ -354,6 +349,7 @@ static void probeAltServers()
srv->rttIndex += 1;
}
// Probe
+ logadd( LOG_DEBUG1, "Trying server %d\n", altIndex );
const uint64_t start = nowMicro();
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
@@ -361,9 +357,11 @@ static void probeAltServers()
continue;
}
if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
+ logadd( LOG_DEBUG1, "-> select fail" );
goto fail;
}
if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) {
+ logadd( LOG_DEBUG1, "<- select fail" );
goto fail;
}
if ( remoteProto < MIN_SUPPORTED_SERVER || remoteProto > PROTOCOL_VERSION ) {
@@ -377,10 +375,13 @@ static void probeAltServers()
goto fail;
}
if ( !dnbd3_get_block( sock, 0, RTT_BLOCK_SIZE, 0 ) ) {
+ logadd( LOG_DEBUG1, "-> block request fail" );
goto fail;
}
- if ( !dnbd3_get_reply( sock, &reply ) || reply.size != RTT_BLOCK_SIZE
- || !throwDataAway( sock, RTT_BLOCK_SIZE ) ) {
+ int a = 111, b = 111;
+ if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != RTT_BLOCK_SIZE
+ || !(b = throwDataAway( sock, RTT_BLOCK_SIZE )) ) {
+ logadd( LOG_DEBUG1, "<- block paxload fail %d %d %d", a, (int)reply.size, b );
goto fail;
}
// Yay, success
@@ -421,9 +422,11 @@ fail:;
if ( bestIndex != -1
&& ( currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD
|| RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1500 ) ) {
+ logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt );
switchConnection( bestSock, &altservers[bestIndex] );
- } else {
+ } else if ( bestIndex != -1 ) {
// No switch
+ logadd( LOG_DEBUG1, "Current: %dµs, best: %dµs. Will not switch.", currentRtt, altservers[bestIndex].rtt );
close( bestSock );
}
}
@@ -482,12 +485,13 @@ static void switchConnection(int sockFd, alt_server_t *srv)
static bool throwDataAway(int sockFd, uint32_t amount)
{
- uint32_t done = 0;
+ size_t done = 0;
char tempBuffer[SHORTBUF];
while ( done < amount ) {
- const ssize_t ret = recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), MSG_NOSIGNAL );
- if ( ret == 0 || ( ret < 0 && ret != EINTR ) )
+ const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) );
+ if ( ret <= 0 )
return false;
+ done += (size_t)ret;
}
return true;
}
diff --git a/src/shared/protocol.h b/src/shared/protocol.h
index a5c7bbd..3539c21 100644
--- a/src/shared/protocol.h
+++ b/src/shared/protocol.h
@@ -4,6 +4,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
+#include "sockhelper.h"
#include "../types.h"
#include "../serialize.h"
@@ -35,9 +36,11 @@ static inline int dnbd3_read_reply(int sock, dnbd3_reply_t *reply, bool wait)
static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply)
{
- int ret = dnbd3_read_reply( sock, reply, true );
- if ( ret != REPLY_INTR ) return ret == REPLY_OK;
- return dnbd3_read_reply( sock, reply, true ) == REPLY_OK;
+ int ret;
+ do {
+ ret = dnbd3_read_reply( sock, reply, true );
+ } while ( ret == REPLY_INTR );
+ return ret == REPLY_OK;
}
static inline bool dnbd3_select_image(int sock, const char *lower_name, uint16_t rid, uint8_t flags8)
@@ -63,10 +66,10 @@ static inline bool dnbd3_select_image(int sock, const char *lower_name, uint16_t
iov[0].iov_len = sizeof(request);
iov[1].iov_base = &serialized;
iov[1].iov_len = len;
- ssize_t ret = writev( sock, iov, 2 );
- if ( ret == -1 && errno == EINTR ) {
+ ssize_t ret;
+ do {
ret = writev( sock, iov, 2 );
- }
+ } while ( ret == -1 && errno == EINTR );
return ret == len + (ssize_t)sizeof(request);
}
@@ -79,7 +82,7 @@ static inline bool dnbd3_get_block(int sock, uint64_t offset, uint32_t size, uin
request.offset = offset;
request.size = size;
fixup_request( request );
- return send( sock, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
+ return sock_sendAll( sock, &request, sizeof(request), 2 ) == (ssize_t)sizeof(request);
}
static inline bool dnbd3_get_crc32(int sock, uint32_t *master, void *buffer, size_t *bufferLen)
@@ -92,7 +95,7 @@ static inline bool dnbd3_get_crc32(int sock, uint32_t *master, void *buffer, siz
request.offset = 0;
request.size = 0;
fixup_request( request );
- if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) return false;
+ if ( sock_sendAll( sock, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) return false;
if ( !dnbd3_get_reply( sock, &reply ) ) return false;
if ( reply.size == 0 ) {
*bufferLen = 0;
@@ -102,14 +105,8 @@ static inline bool dnbd3_get_crc32(int sock, uint32_t *master, void *buffer, siz
reply.size -= 4;
if ( reply.cmd != CMD_GET_CRC32 || reply.size > *bufferLen ) return false;
*bufferLen = reply.size;
- if ( recv( sock, master, sizeof(uint32_t), MSG_WAITALL | MSG_NOSIGNAL ) != sizeof(uint32_t) ) return false;
- uint32_t done = 0;
- while ( done < reply.size ) {
- const ssize_t ret = recv( sock, (char*)buffer + done, reply.size - done, 0 );
- if ( ret <= 0 ) return false;
- done += ret;
- }
- return true;
+ if ( sock_recv( sock, master, sizeof(uint32_t) ) != (ssize_t)sizeof(uint32_t) ) return false;
+ return sock_recv( sock, buffer, reply.size ) == (ssize_t)reply.size;
}
/**
@@ -128,15 +125,12 @@ static inline bool dnbd3_select_image_reply(serialized_buffer_t *buffer, int soc
if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD ) {
return false;
}
-// receive reply payload
- ssize_t ret = recv( sock, buffer, reply.size, MSG_WAITALL | MSG_NOSIGNAL );
- if ( ret == -1 && errno == EINTR ) {
- ret = recv( sock, buffer, reply.size, MSG_WAITALL | MSG_NOSIGNAL );
- }
- if ( ret != reply.size ) {
+ // receive reply payload
+ ssize_t ret = sock_recv( sock, buffer, reply.size );
+ if ( ret != (ssize_t)reply.size ) {
return false;
}
-// handle/check reply payload
+ // handle/check reply payload
serializer_reset_read( buffer, reply.size );
*protocol_version = serializer_get_uint16( buffer );
*name = serializer_get_string( buffer );
diff --git a/src/shared/sockhelper.c b/src/shared/sockhelper.c
index 0e732c1..f2f8b97 100644
--- a/src/shared/sockhelper.c
+++ b/src/shared/sockhelper.c
@@ -264,17 +264,16 @@ bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrit
return true;
}
-ssize_t sock_sendAll(int sock, void *buffer, size_t len, int maxtries)
+ssize_t sock_sendAll(const int sock, void *buffer, const size_t len, int maxtries)
{
size_t done = 0;
ssize_t ret = 0;
while ( done < len ) {
if ( maxtries >= 0 && --maxtries == -1 ) break;
ret = write( sock, (char*)buffer + done, len - done );
- if ( ret < 0 ) {
+ if ( ret == -1 ) {
if ( errno == EINTR ) continue;
if ( errno == EAGAIN || errno == EWOULDBLOCK ) {
- usleep( 1000 );
continue;
}
break;
@@ -286,3 +285,21 @@ ssize_t sock_sendAll(int sock, void *buffer, size_t len, int maxtries)
return done;
}
+ssize_t sock_recv(const int sock, void *buffer, const size_t len)
+{
+ size_t done = 0;
+ ssize_t ret = 0;
+ int intrs = 0;
+ while ( done < len ) {
+ ret = recv( sock, (char*)buffer + done, len - done, MSG_NOSIGNAL );
+ if ( ret == -1 ) {
+ if ( errno == EINTR && ++intrs < 10 ) continue;
+ break;
+ }
+ if ( ret == 0 ) break;
+ done += ret;
+ }
+ if ( done == 0 ) return ret;
+ return done;
+}
+
diff --git a/src/shared/sockhelper.h b/src/shared/sockhelper.h
index dc22e2b..6ffc31a 100644
--- a/src/shared/sockhelper.h
+++ b/src/shared/sockhelper.h
@@ -90,6 +90,11 @@ bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrit
* Give up after calling write() maxtries times.
* Set maxtries < 0 to try infinitely.
*/
-ssize_t sock_sendAll(int sock, void *buffer, size_t len, int maxtries);
+ssize_t sock_sendAll(const int sock, void *buffer, const size_t len, int maxtries);
+
+/**
+ * Send given buffer, repeatedly calling recv on partial send or EINTR.
+ */
+ssize_t sock_recv(const int sock, void *buffer, const size_t len);
#endif /* SOCKHELPER_H_ */