summaryrefslogtreecommitdiffstats
path: root/src/fuse
diff options
context:
space:
mode:
authorSimon Rettberg2015-11-30 17:31:06 +0100
committerSimon Rettberg2015-11-30 17:31:06 +0100
commit1d0211719bc6ad76f924f8c08e1a28f0509cd4fd (patch)
tree58b8f520514357bb689c781bcaa717092649b03e /src/fuse
parent[FUSE] Compiles again (diff)
downloaddnbd3-1d0211719bc6ad76f924f8c08e1a28f0509cd4fd.tar.gz
dnbd3-1d0211719bc6ad76f924f8c08e1a28f0509cd4fd.tar.xz
dnbd3-1d0211719bc6ad76f924f8c08e1a28f0509cd4fd.zip
[FUSE] It works! Kinda...
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/connection.c203
-rw-r--r--src/fuse/connection.h2
-rw-r--r--src/fuse/main.c203
3 files changed, 207 insertions, 201 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index cdfc1df..53949ca 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -4,6 +4,7 @@
#include "../shared/protocol.h"
#include "../shared/signal.h"
#include "../shared/sockhelper.h"
+#include "../shared/log.h"
#include <pthread.h>
#include <string.h>
@@ -43,9 +44,9 @@ static struct {
static struct {
int sockFd;
pthread_mutex_t sendMutex;
- pthread_t receiveThread;
int panicSignalFd;
- bool panicMode;
+ dnbd3_host_t currentServer;
+ uint64_t startupTime;
} connection;
// Known alt servers
@@ -62,8 +63,10 @@ typedef struct _alt_server alt_server_t;
static void* connection_receiveThreadMain(void *sock);
+static void* connection_backgroundThread(void *something);
static void probeAltServers();
+static void switchConnection(int sockFd, alt_server_t *srv);
static bool throwDataAway(int sockFd, uint32_t amount);
static void enqueueRequest(dnbd3_async_t *request);
static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
@@ -102,41 +105,58 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
}
current = end + 1;
} while ( end != NULL && altIndex < MAX_ALTS );
- printf( "Got %d servers from init call\n", altIndex );
+ logadd( LOG_INFO, "Got %d servers from init call", altIndex );
// Connect
for ( int i = 0; i < altIndex; ++i ) {
if ( altservers[i].host.type == 0 )
continue;
// Try to connect
- sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 2000 ); // TODO timeout...
- printf( "Got socket %d\n", sock );
- if ( sock != -1 && dnbd3_select_image( sock, lowerImage, rid, 0 )
- && dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize )
- && ( rid == 0 || rid == remoteRid ) ) {
+ sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 2000 );
+ logadd( LOG_DEBUG1, "Got socket %d", sock );
+ if ( sock == -1 ) {
+ //
+ } else if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) {
+ logadd( LOG_ERROR, "Could not send select image" );
+ } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) {
+ logadd( LOG_ERROR, "Could not read select image reply (%d)", errno );
+ } else if ( rid != 0 && rid != remoteRid ) {
+ logadd( LOG_ERROR, "rid mismatch" );
+ } else {
image.name = strdup(remoteName);
image.rid = remoteRid;
image.size = remoteSize;
+ connection.currentServer = altservers[i].host;
+ connection.panicSignalFd = signal_new();
+ connection.startupTime = nowMilli();
+ connection.sockFd = sock;
+ requests.head = NULL;
+ requests.tail = NULL;
break;
}
// Failed
+ logadd( LOG_DEBUG1, "Server does not offer requested image... " );
if ( sock != -1 ) {
close( sock );
sock = -1;
}
}
if ( sock != -1 ) {
- printf( "Initializing stuff\n" );
+ pthread_t thread;
+ logadd( LOG_DEBUG1, "Initializing stuff" );
if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0
- || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0
- || pthread_create( &connection.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) {
+ || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 ) {
close( sock );
sock = -1;
} else {
- connection.sockFd = sock;
- connection.panicMode = false;
- connection.panicSignalFd = signal_new();
- requests.head = NULL;
- requests.tail = NULL;
+ if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create receive thread" );
+ close( sock );
+ sock = -1;
+ } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) {
+ logadd( LOG_ERROR, "Could not create background thread" );
+ shutdown( sock, SHUT_RDWR );
+ sock = -1;
+ }
}
initDone = true;
}
@@ -145,18 +165,23 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
return sock != -1;
}
+uint64_t connection_getImageSize()
+{
+ return image.size;
+}
+
bool connection_read(dnbd3_async_t *request)
{
if (!initDone) return false;
- enqueueRequest( request );
pthread_mutex_lock( &connection.sendMutex );
+ enqueueRequest( request );
if ( connection.sockFd != -1 ) {
- while ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request ) ) {
+ if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request ) ) {
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
- // TODO reconnect!
pthread_mutex_unlock( &connection.sendMutex );
- return false;
+ signal_call( connection.panicSignalFd );
+ return true;
}
}
pthread_mutex_unlock( &connection.sendMutex );
@@ -183,42 +208,67 @@ static void* connection_receiveThreadMain(void *sockPtr)
{
int sockFd = (int)(size_t)sockPtr;
dnbd3_reply_t reply;
+ pthread_detach(pthread_self());
+
while ( keepRunning ) {
- if ( !dnbd3_get_reply( connection.sockFd, &reply ) )
+ int ret;
+ do {
+ ret = dnbd3_read_reply( sockFd, &reply, true );
+ if ( ret == REPLY_OK ) break;
+ logadd( LOG_DEBUG1, "Receive return code %d", ret );
+ } while ( ret == REPLY_INTR || ret == REPLY_AGAIN );
+ if ( ret != REPLY_OK ) {
+ logadd( LOG_DEBUG1, "Error receiving reply on receiveThread" );
goto fail;
+ }
// TODO: Ignoring anything but block replies for now; handle the others
if ( reply.cmd != CMD_GET_BLOCK ) {
- if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) )
+ if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
+ logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd );
goto fail;
+ }
} else {
// get block reply. find matching request
+ logadd( LOG_DEBUG1, "Got a block reply :)" );
dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle );
if ( request == NULL ) {
- printf("WARNING BUG ALERT SOMETHING: Got block reply with no matching request\n");
- if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) )
+ logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" );
+ if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) {
+ logadd( LOG_DEBUG1, "....and choked on reply payload" );
goto fail;
+ }
} else {
// Found a match
+ logadd( LOG_DEBUG1, "Found pending entry :)" );
request->finished = true;
uint32_t done = 0;
while ( done < request->length ) {
- if ( recv( sockFd, request->buffer + done, request->length - done, 0 ) <= 0 ) {
+ logadd( LOG_DEBUG1, "Will read %d bytes...", (int)(request->length - done) );
+ ssize_t ret = recv( sockFd, request->buffer + done, request->length - done, 0 );
+ if ( ret <= 0 ) {
+ if ( ret == -1 && errno == EINTR ) continue;
+ logadd( LOG_DEBUG1, "Read failure :(" );
request->success = false;
signal_call( request->signalFd );
+ logadd( LOG_DEBUG1, "receiving payload for a block reply failed" );
goto fail;
}
+ done += (uint32_t)ret;
}
// Success, wake up caller
+ logadd( LOG_DEBUG1, "Read completed :)" );
request->success = true;
signal_call( request->signalFd );
}
}
}
+ logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" );
fail:;
// Make sure noone is trying to use the socket for sending by locking,
pthread_mutex_lock( &connection.sendMutex );
// then just set the fd to -1, but only if it's the same fd as ours,
// as someone could have established a new connection already
+ logadd( LOG_DEBUG1, "RT: Local sock: %d, global: %d", sockFd, connection.sockFd );
if ( connection.sockFd == sockFd ) {
connection.sockFd = -1;
}
@@ -232,7 +282,6 @@ static void* connection_backgroundThread(void *something UNUSED)
{
uint64_t nextKeepalive = 0;
uint64_t nextRttCheck = 0;
- const uint64_t startupTime = nowMilli();
while ( keepRunning ) {
const uint64_t now = nowMilli();
@@ -240,14 +289,15 @@ static void* connection_backgroundThread(void *something UNUSED)
int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now );
int waitRes = signal_wait( connection.panicSignalFd, waitTime );
if ( waitRes == SIGNAL_ERROR ) {
- printf( "Error waiting on signal in background thread! Errno = %d\n", errno );
+ logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
}
}
// Woken up, see what we have to do
+ const bool panic = connection.sockFd == -1;
// Check alt servers
- if ( connection.panicMode || now < nextRttCheck ) {
+ if ( panic || now < nextRttCheck ) {
probeAltServers();
- if ( connection.panicMode || startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
+ if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull;
} else {
nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull;
@@ -257,7 +307,7 @@ static void* connection_backgroundThread(void *something UNUSED)
if ( now < nextKeepalive ) {
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
- printf( "Sending keepalive...\n" );
+ logadd( LOG_DEBUG1, "Sending keepalive..." );
dnbd3_request_t request;
request.magic = dnbd3_packet_magic;
request.cmd = CMD_KEEPALIVE;
@@ -267,7 +317,6 @@ static void* connection_backgroundThread(void *something UNUSED)
if ( (size_t)ret != sizeof request ) {
shutdown( connection.sockFd, SHUT_RDWR );
connection.sockFd = -1;
- connection.panicMode = true;
}
}
pthread_mutex_unlock( &connection.sendMutex );
@@ -282,19 +331,20 @@ static void* connection_backgroundThread(void *something UNUSED)
static void probeAltServers()
{
serialized_buffer_t buffer;
- dnbd3_request_t request;
dnbd3_reply_t reply;
int bestIndex = -1;
int bestSock = -1;
+ int currentRtt = RTT_UNREACHABLE;
uint16_t remoteRid, remoteProto;
uint64_t remoteSize;
char *remoteName;
+ const bool panic = connection.sockFd == -1;
for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) {
alt_server_t * const srv = &altservers[altIndex];
if ( srv->host.type == 0 )
continue;
- if ( !connection.panicMode && srv->consecutiveFails > MAX_CONSECUTIVE_FAILURES
+ if ( !panic && srv->consecutiveFails > MAX_CONSECUTIVE_FAILURES
&& srv->consecutiveFails % ( srv->consecutiveFails / 8 ) != 0 ) {
continue;
}
@@ -305,9 +355,9 @@ static void probeAltServers()
}
// Probe
const uint64_t start = nowMicro();
- int sock = sock_connect( &srv->host, connection.panicMode ? 1000 : 333, 1000 );
+ int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
- printf( "Could not crrate socket for probing. errno = %d\n", errno );
+ logadd( LOG_WARNING, "Could not crrate socket for probing. errno = %d", errno );
continue;
}
if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) {
@@ -317,11 +367,13 @@ static void probeAltServers()
goto fail;
}
if ( remoteProto < MIN_SUPPORTED_SERVER || remoteProto > PROTOCOL_VERSION ) {
- printf( "Unsupported remote version\n" );
+ logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto );
+ srv->consecutiveFails += 10;
goto fail;
}
if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) {
- printf( "Remote rid or name mismatch\n" );
+ logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName );
+ srv->consecutiveFails += 10;
goto fail;
}
if ( !dnbd3_get_block( sock, 0, RTT_BLOCK_SIZE, 0 ) ) {
@@ -332,6 +384,13 @@ static void probeAltServers()
goto fail;
}
// Yay, success
+ // Panic mode? Just switch to server
+ if ( panic ) {
+ switchConnection( sock, srv );
+ return;
+ }
+ // Non-panic mode:
+ // Update stats of server
const uint64_t end = nowMicro();
srv->consecutiveFails = 0;
srv->rtts[srv->rttIndex] = (int)(end - start);
@@ -340,6 +399,11 @@ static void probeAltServers()
srv->rtt += srv->rtts[i];
}
srv->rtt /= RTT_COUNT;
+ // Remember rtt if this server matches the current one
+ if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) {
+ currentRtt = srv->rtt;
+ }
+ // Keep socket open if this is currently the best one
if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) {
bestIndex = altIndex;
close( bestSock );
@@ -347,12 +411,73 @@ static void probeAltServers()
} else {
close( sock );
}
- continue; // XXX: Remember current server, compare to it, update value on change,
+ continue;
fail:;
close( sock );
srv->rtts[srv->rttIndex] = RTT_UNREACHABLE;
srv->consecutiveFails += 1;
}
+ // Switch if a better server was found
+ if ( bestIndex != -1
+ && ( currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD
+ || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1500 ) ) {
+ switchConnection( bestSock, &altservers[bestIndex] );
+ } else {
+ // No switch
+ close( bestSock );
+ }
+}
+
+static void switchConnection(int sockFd, alt_server_t *srv)
+{
+ pthread_t thread;
+ struct sockaddr_storage addr;
+ socklen_t addrLen = sizeof(addr);
+ char message[200] = "Connection switched to ";
+ size_t len = strlen(message);
+ int ret;
+ dnbd3_async_t *queue, *it;
+
+ pthread_mutex_lock( &connection.sendMutex );
+ if ( connection.sockFd != -1 ) {
+ shutdown( connection.sockFd, SHUT_RDWR );
+ }
+ ret = getpeername( sockFd, (struct sockaddr*)&addr, &addrLen );
+ if ( ret == 0 ) {
+ connection.currentServer = srv->host;
+ connection.sockFd = sockFd;
+ pthread_spin_lock( &requests.lock );
+ queue = requests.head;
+ requests.head = requests.tail = NULL;
+ pthread_spin_unlock( &requests.lock );
+ } else {
+ connection.sockFd = -1;
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ if ( ret != 0 ) {
+ close( sockFd );
+ logadd( LOG_WARNING, "Could not getpeername after connection switch, assuming connection already dead again. (Errno=%d)", errno );
+ signal_call( connection.panicSignalFd );
+ return;
+ }
+ connection.startupTime = nowMilli();
+ pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd );
+ sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len );
+ logadd( LOG_INFO, message );
+ // resend queue
+ if ( queue != NULL ) {
+ pthread_mutex_lock( &connection.sendMutex );
+ for ( it = queue; it != NULL; it = it->next ) {
+ enqueueRequest( it );
+ if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it ) ) {
+ logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" );
+ shutdown( connection.sockFd, SHUT_RDWR );
+ connection.sockFd = -1;
+ signal_call( connection.panicSignalFd );
+ }
+ }
+ pthread_mutex_unlock( &connection.sendMutex );
+ }
}
static bool throwDataAway(int sockFd, uint32_t amount)
@@ -391,6 +516,8 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
// Found it, break!
if ( prev != NULL ) {
prev->next = iterator->next;
+ } else {
+ requests.head = iterator->next;
}
if ( requests.tail == iterator ) {
requests.tail = prev;
diff --git a/src/fuse/connection.h b/src/fuse/connection.h
index 8ab2c35..076704d 100644
--- a/src/fuse/connection.h
+++ b/src/fuse/connection.h
@@ -18,6 +18,8 @@ typedef struct _dnbd3_async {
bool connection_init(const char *hosts, const char *image, const uint16_t rid);
+uint64_t connection_getImageSize();
+
bool connection_read(dnbd3_async_t *request);
void connection_close();
diff --git a/src/fuse/main.c b/src/fuse/main.c
index 7889023..42c3664 100644
--- a/src/fuse/main.c
+++ b/src/fuse/main.c
@@ -8,43 +8,31 @@
* */
#include "../shared/protocol.h"
+#include "../shared/signal.h"
+#include "connection.h"
#include "../serialize.h"
#include "helper.h"
+#include "../shared/log.h"
#define FUSE_USE_VERSION 30
#include <fuse.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
-#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
-/* for socket */
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netdb.h>
/* for printing uint */
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
-#define debugf(...) do { if (useDebug) printf(__VA_ARGS__); } while (0)
+#define debugf(...) do { logadd( LOG_DEBUG1, __VA_ARGS__ ); } while (0)
-/* variables for socket */
-int sock = -1;
-int n;
-
-char *server_address = NULL;
-int portno = -1;
-char *image_Name = NULL;
-const char *imagePathName = "/img";
-uint16_t rid;
+static const char *imagePathName = "/img";
static uint64_t imageSize;
/* Debug/Benchmark variables */
-bool useDebug = false;
-bool useLog = false;
-log_info logInfo;
-uint8_t printCount = 0;
+static bool useDebug = false;
+static bool useLog = false;
+static log_info logInfo;
void error(const char *msg)
{
@@ -52,53 +40,6 @@ void error(const char *msg)
exit( 0 );
}
-static void dnbd3_connect()
-{
- while ( true ) {
- if ( sock != -1 ) {
- close( sock );
- }
- sock = -1; // connect_to_server( server_address, portno );
-
- if ( sock == -1 ) {
- debugf( "[ERROR] Connection Error!\n" );
- goto fail;
- }
-
- debugf( "Selecting image " );
-
- serialized_buffer_t sbuffer;
- uint16_t protocol_version;
- char *name;
- uint16_t rrid;
-
- if ( dnbd3_select_image( sock, image_Name, rid, 0 ) != 1 ) {
- debugf( "- Error\n" );
- goto fail;
- }
- debugf( "- Success\n" );
-
- if ( !dnbd3_select_image_reply( &sbuffer, sock, &protocol_version, &name, &rrid, &imageSize ) ) {
- debugf( "Error reading reply\n" );
- goto fail;
- }
- debugf( "Reply successful\n" );
-
- if ( rid != 0 && rid != rrid ) {
- debugf( "Got unexpected rid %d, wanted %d\n", (int)rrid, (int)rid );
- sleep( 10 );
- goto fail;
- }
- rid = rrid;
-
- debugf( "Protocol version: %i, Image: %s, RevisionID: %i, Size: %i MiB\n", (int)protocol_version, name, (int) rrid, (int)( imageSize/ ( 1024*1024 ) ) );
- return;
-
-fail: ;
- sleep( 2 );
- }
-}
-
static int image_getattr(const char *path, struct stat *stbuf)
{
int res = 0;
@@ -140,36 +81,19 @@ static int image_open(const char *path, struct fuse_file_info *fi)
static int image_read(const char *path, char *buf, size_t size, off_t offset, struct fuse_file_info *fi UNUSED)
{
- /* buffer for throwing away unwanted messages. */
- char tBuf[100];
-
if ( (uint64_t)offset >= imageSize ) {
return 0;
}
- if ( strcmp( path, imagePathName ) != 0 ) {
- return -ENOENT;
- }
+// if ( strcmp( path, imagePathName ) != 0 ) {
+// return -ENOENT;
+// }
if ( offset + size > imageSize ) {
size = imageSize - offset;
}
- if ( sock == -1 ) {
-retry: ;
- dnbd3_connect();
- }
-
- /* seek inside the image */
- if ( !dnbd3_get_block( sock, offset, size, offset ) ) {
- debugf( "[ERROR] Get block error!\n" );
- goto retry;
- }
-
/* count the requested blocks */
uint64_t startBlock = offset / ( 4096 );
- uint64_t endBlock = ( offset + size - 1 ) / ( 4096 );
-
- debugf( "StartBlockRequest: %"PRIu64"\n", startBlock );
- debugf( "EndBlockRequest: %"PRIu64"\n", endBlock );
+ const uint64_t endBlock = ( offset + size - 1 ) / ( 4096 );
if ( useDebug ) {
for ( ; startBlock <= endBlock; startBlock++ ) {
@@ -177,75 +101,27 @@ retry: ;
}
}
- dnbd3_reply_t reply;
+ dnbd3_async_t request;
+ request.buffer = buf;
+ request.length = (uint32_t)size;
+ request.offset = offset;
+ request.signalFd = signal_newBlocking();
- /*see if the received package is a requested block, throw away if not */
- while ( true ) {
- if ( !dnbd3_get_reply( sock, &reply ) ) {
- debugf( "[ERROR] Reply error\n" );
- goto retry;
- }
- debugf( "Reply success\n" );
-
- if ( reply.cmd == CMD_ERROR ) {
- debugf( "Got a CMD_ERROR!\n" );
- goto retry;
- }
- if ( reply.cmd != CMD_GET_BLOCK ) {
- debugf( "Received block isn't a wanted block, throwing it away...\n" );
- uint32_t tDone = 0;
- int todo;
- while ( tDone < reply.size ) {
- todo = reply.size - tDone > 100 ? 100: reply.size - tDone;
-
- n = read( sock, tBuf, todo );
- if ( n <= 0 ) {
- if ( n < 0 && ( errno == EAGAIN || errno == EINTR ) ) {
- continue;
- }
- debugf( "[ERROR] Errno %i and %i\n",errno, n );
- goto retry;
- }
- tDone += n;
- }
- continue;
- }
- break;
+ if ( !connection_read( &request ) ) {
+ return -EINVAL;
}
-
- debugf( "Payloadsize: %i\n", ( int ) reply.size );
- debugf( "Offset: %"PRIu64"\n", reply.handle );
-
- if ( size != reply.size || (uint64_t)offset != reply.handle ) {
- debugf( "Size: %i, reply.size: %i!\n", ( int ) size, ( int ) reply.size );
- debugf( "Handle: %" PRIu64 ", reply.handle: %" PRIu64 "!\n", offset, reply.handle );
- goto retry;
- }
- /* read the data block data from received package */
- uint32_t done = 0;
- while ( done < size ) {
- n = read( sock, buf + done, size - done );
- if ( n <= 0 ) {
- if ( n < 0 && ( errno == EAGAIN || errno == EINTR ) ) {
- continue;
- }
- debugf( "[ERROR] Error: %i and %i\n",errno, n );
- goto retry;
+ while ( !request.finished ) {
+ int ret = signal_wait( request.signalFd, 10000 );
+ if ( ret != SIGNAL_OK ) {
+ debugf( "signal_wait returned %d", ret );
}
- done += n;
- /* for benchmarking */
- logInfo.receivedBytes += n;
}
- debugf( "Received bytes: %i MiB\n", ( int )( logInfo.receivedBytes/ ( 1024*1024 ) ) );
-
- /* logfile stuff */
- if ( useLog ) {
- if ( printCount == 0 ) {
- printLog( &logInfo );
- }
- printCount++;
+ signal_close( request.signalFd );
+ if ( request.success ) {
+ return request.length;
+ } else {
+ return -EIO;
}
- return size;
}
/* close the connection */
@@ -254,10 +130,7 @@ void image_destroy(void *private_data UNUSED)
if ( useLog ) {
printLog( &logInfo );
}
- if ( sock != -1 ) {
- close( sock );
- sock = -1;
- }
+ connection_close();
return;
}
@@ -272,19 +145,23 @@ static struct fuse_operations image_oper = {
int main(int argc, char *argv[])
{
+ char *server_address = NULL;
+ char *image_Name = NULL;
char *mountPoint = NULL;
int opt;
bool testOpt = false;
if ( argc == 1 || strcmp( argv[1], "--help" ) == 0 || strcmp( argv[1], "--usage" ) == 0 ) {
exit_usage:
- printf( "Usage: %s [-l] [-d] [-t] -m <mountpoint> -s <serverAdress> -p <port> -i <imageName>\n", argv[0] );
+ printf( "Usage: %s [-l] [-d] [-t] -m <mountpoint> -s <serverAdress> -i <imageName>\n", argv[0] );
printf( " -l: creates a logfile log.txt at program path\n" );
printf( " -d: fuse debug mode\n" );
printf( " -t: use hardcoded server, port and image for testing\n" );
exit( EXIT_FAILURE );
}
+ log_setConsoleMask( 65535 );
+
while ( ( opt = getopt( argc,argv,"m:s:p:i:tdl" ) ) != -1 ) {
switch ( opt ) {
case 'm':
@@ -293,9 +170,6 @@ exit_usage:
case 's':
server_address = optarg;
break;
- case 'p':
- portno = atoi( optarg );
- break;
case 'i':
image_Name = optarg;
break;
@@ -317,12 +191,11 @@ exit_usage:
if ( testOpt ) {
/* values for testing. */
server_address = "132.230.4.1";
- portno = 5003;
image_Name = "windows7-umwelt.vmdk";
useLog = true;
}
- if ( server_address == NULL || portno == -1 || image_Name == NULL || mountPoint == NULL ) {
+ if ( server_address == NULL || image_Name == NULL || mountPoint == NULL ) {
goto exit_usage;
}
@@ -332,7 +205,11 @@ exit_usage:
}
char *args[6] = {"foo", "-o", "ro,allow_other", "-s", mountPoint, "-d"};
- dnbd3_connect();
+ if ( !connection_init( server_address, image_Name, 0 ) ) {
+ printf( "Tschüss\n" );
+ return 1;
+ }
+ imageSize = connection_getImageSize();
/* initialize benchmark variables */
logInfo.receivedBytes = 0;