summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/fuse/connection.c32
-rw-r--r--src/fuse/main.c10
2 files changed, 26 insertions, 16 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index 20ddd8f..c0bcd6d 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -6,6 +6,7 @@
#include "../shared/sockhelper.h"
#include "../shared/log.h"
+#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
@@ -68,8 +69,14 @@ static void* connection_backgroundThread(void *something);
static void probeAltServers();
static void switchConnection(int sockFd, alt_server_t *srv);
static bool throwDataAway(int sockFd, uint32_t amount);
-static void enqueueRequest(dnbd3_async_t *request);
-static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
+
+//static void enqueueRequest(dnbd3_async_t *request);
+//static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
+static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line);
+static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line);
+#define enqueueRequest(req) __enqueueRequest(req, __FILE__, __LINE__)
+#define removeRequest(req) __removeRequest(req, __FILE__, __LINE__)
+
static uint64_t nowMilli();
static uint64_t nowMicro();
@@ -239,10 +246,8 @@ static void* connection_receiveThreadMain(void *sockPtr)
// Found a match
const ssize_t ret = sock_recv( sockFd, request->buffer, request->length );
if ( ret != (ssize_t)request->length ) {
- request->success = false;
- request->finished = true;
- signal_call( request->signalFd );
logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
+ connection_read( request );
goto fail;
}
// Success, wake up caller
@@ -298,7 +303,6 @@ static void* connection_backgroundThread(void *something UNUSED)
if ( now >= nextKeepalive ) {
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
- logadd( LOG_DEBUG1, "Sending keepalive..." );
dnbd3_request_t request;
request.magic = dnbd3_packet_magic;
request.cmd = CMD_KEEPALIVE;
@@ -349,7 +353,6 @@ static void probeAltServers()
srv->rttIndex += 1;
}
// Probe
- logadd( LOG_DEBUG1, "Trying server %d\n", altIndex );
const uint64_t start = nowMicro();
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
@@ -400,6 +403,7 @@ static void probeAltServers()
srv->rtt += srv->rtts[i];
}
srv->rtt /= RTT_COUNT;
+ srv->rtt += rand() % 30000;
// Remember rtt if this server matches the current one
if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) {
currentRtt = srv->rtt;
@@ -420,7 +424,7 @@ fail:;
}
// Switch if a better server was found
if ( bestIndex != -1
- && ( currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD
+ && ( currentRtt > altservers[bestIndex].rtt || currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD
|| RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1500 ) ) {
logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt );
switchConnection( bestSock, &altservers[bestIndex] );
@@ -470,7 +474,10 @@ static void switchConnection(int sockFd, alt_server_t *srv)
// resend queue
if ( queue != NULL ) {
pthread_mutex_lock( &connection.sendMutex );
- for ( it = queue; it != NULL; it = it->next ) {
+ dnbd3_async_t *next = NULL;
+ for ( it = queue; it != NULL; it = next ) {
+ logadd( LOG_DEBUG1, "Requeue after server change" );
+ next = it->next;
enqueueRequest( it );
if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it ) ) {
logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" );
@@ -496,12 +503,13 @@ static bool throwDataAway(int sockFd, uint32_t amount)
return true;
}
-static void enqueueRequest(dnbd3_async_t *request)
+static void __enqueueRequest(dnbd3_async_t *request, const char *file, int line)
{
request->next = NULL;
request->finished = false;
request->success = false;
pthread_spin_lock( &requests.lock );
+ //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
if ( requests.head == NULL ) {
requests.head = requests.tail = request;
} else {
@@ -511,9 +519,10 @@ static void enqueueRequest(dnbd3_async_t *request)
pthread_spin_unlock( &requests.lock );
}
-static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
+static dnbd3_async_t* __removeRequest(dnbd3_async_t *request, const char *file, int line)
{
pthread_spin_lock( &requests.lock );
+ //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line );
dnbd3_async_t *iterator, *prev = NULL;
for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
if ( iterator == request ) {
@@ -553,3 +562,4 @@ static uint64_t nowMicro()
}
return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull );
}
+
diff --git a/src/fuse/main.c b/src/fuse/main.c
index 42c3664..f237832 100644
--- a/src/fuse/main.c
+++ b/src/fuse/main.c
@@ -112,8 +112,8 @@ static int image_read(const char *path, char *buf, size_t size, off_t offset, st
}
while ( !request.finished ) {
int ret = signal_wait( request.signalFd, 10000 );
- if ( ret != SIGNAL_OK ) {
- debugf( "signal_wait returned %d", ret );
+ if ( ret < 0 ) {
+ debugf( "fuse_read signal wait returned %d", ret );
}
}
signal_close( request.signalFd );
@@ -190,7 +190,7 @@ exit_usage:
if ( testOpt ) {
/* values for testing. */
- server_address = "132.230.4.1";
+ server_address = "132.230.4.1 132.230.8.113 132.230.4.60";
image_Name = "windows7-umwelt.vmdk";
useLog = true;
}
@@ -199,11 +199,11 @@ exit_usage:
goto exit_usage;
}
- int arg_count = 5;
+ int arg_count = 4;
if ( useDebug ) {
arg_count++;
}
- char *args[6] = {"foo", "-o", "ro,allow_other", "-s", mountPoint, "-d"};
+ char *args[6] = { "foo", "-o", "ro,allow_other,kernel_cache,max_readahead=262144", mountPoint, "-d" };
if ( !connection_init( server_address, image_Name, 0 ) ) {
printf( "Tschüss\n" );