From 4c94cb861dfbfe2a8c9c165165cbdbc062eaa39b Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Sat, 21 Nov 2015 12:24:21 +0100 Subject: [FUSE] Start refactoring so we can handle multithread fuse --- src/fuse/connection.c | 211 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/fuse/connection.h | 25 ++++++ src/fuse/helper.c | 14 ++-- src/fuse/helper.h | 9 +-- src/fuse/main.c | 2 +- 5 files changed, 248 insertions(+), 13 deletions(-) create mode 100644 src/fuse/connection.c create mode 100644 src/fuse/connection.h (limited to 'src/fuse') diff --git a/src/fuse/connection.c b/src/fuse/connection.c new file mode 100644 index 0000000..3e1bf38 --- /dev/null +++ b/src/fuse/connection.c @@ -0,0 +1,211 @@ +#include "connection.h" +#include "helper.h" +#include "../config.h" +#include "../shared/protocol.h" +#include "../shared/signal.h" + +#include +#include +#include +#include + +static const size_t SHORTBUF = 100; + +static bool initDone = false; +pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; + +static struct { + dnbd3_async_t *head; + dnbd3_async_t *tail; + pthread_spinlock_t lock; +} requests; + +static struct { + char *name; + uint16_t rid; + uint64_t size; + int sockFd; + pthread_mutex_t sendMutex; + pthread_t receiveThread; +} image; + +static bool throwDataAway(int sockFd, uint32_t amount); +static void enqueueRequest(dnbd3_async_t *request); +static dnbd3_async_t* removeRequest(dnbd3_async_t *request); + +static void* connection_receiveThreadMain(void *sock); + +bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid) +{ + int sock = -1; + char host[SHORTBUF]; + const char *current, *end; + serialized_buffer_t buffer; + uint16_t remoteVersion, remoteRid; + char *remoteName; + uint64_t remoteSize; + + pthread_mutex_lock( &mutexInit ); + if ( !initDone ) { + current = hosts; + do { + // Get next host from string + while ( *current == ' ' ) current++; + end = strchr( current, ' ' ); + size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); + if ( len > SHORTBUF ) len = SHORTBUF; + snprintf( host, len, "%s", current ); + current = end + 1; + // Try to connect + sock = connect_to_server( host, PORT ); // TODO: Parse port from host + if ( sock != -1 && dnbd3_select_image( sock, lowerImage, rid, 0 ) + && dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) + && ( rid == 0 || rid == remoteRid ) ) { + image.name = strdup(remoteName); + image.rid = remoteRid; + image.size = remoteSize; + break; + } + // Failed + if ( sock != -1 ) { + close( sock ); + sock = -1; + } + // TODO: Add to alt list + } while ( end != NULL ); + if ( sock != -1 ) { + if ( pthread_mutex_init( &image.sendMutex, NULL ) != 0 + || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 + || pthread_create( &image.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) { + close( sock ); + sock = -1; + } else { + image.sockFd = sock; + requests.head = NULL; + requests.tail = NULL; + } + initDone = true; + } + } + pthread_mutex_unlock( &mutexInit ); + return sock != -1; +} + +bool connection_read(dnbd3_async_t *request) +{ + if (!initDone) return false; + enqueueRequest( request ); + pthread_mutex_lock( &image.sendMutex ); + if ( image.sockFd != -1 ) { + while ( !dnbd3_get_block( image.sockFd, request->offset, request->length, (uint64_t)request ) ) { + shutdown( image.sockFd, SHUT_RDWR ); + image.sockFd = -1; + // TODO reconnect! + pthread_mutex_unlock( &image.sendMutex ); + return false; + } + } + pthread_mutex_unlock( &image.sendMutex ); + return true; +} + +void connection_close() +{ + // +} + +static bool throwDataAway(int sockFd, uint32_t amount) +{ + uint32_t done = 0; + char tempBuffer[SHORTBUF]; + while ( done < amount ) { + if ( recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), 0 ) <= 0 ) + return false; + } + return true; +} + +static void enqueueRequest(dnbd3_async_t *request) +{ + request->next = NULL; + request->finished = false; + request->success = false; + pthread_spin_lock( &requests.lock ); + if ( requests.head == NULL ) { + requests.head = requests.tail = request; + } else { + requests.tail->next = request; + requests.tail = request; + } + pthread_spin_unlock( &requests.lock ); +} + +static dnbd3_async_t* removeRequest(dnbd3_async_t *request) +{ + pthread_spin_lock( &requests.lock ); + dnbd3_async_t *iterator, *prev = NULL; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + if ( iterator == request ) { + // Found it, break! + if ( prev != NULL ) { + prev->next = iterator->next; + } + if ( requests.tail == iterator ) { + requests.tail = prev; + } + break; + } + prev = iterator; + } + pthread_spin_unlock( &requests.lock ); + return iterator; +} + +static void* connection_receiveThreadMain(void *sockPtr) +{ + int sockFd = (int)(size_t)sockPtr; + dnbd3_reply_t reply; + for ( ;; ) { + if ( !dnbd3_get_reply( image.sockFd, &reply ) ) + goto fail; + // TODO: Ignoring anything but get block replies for now; handle the others + if ( reply.cmd != CMD_GET_BLOCK ) { + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) + goto fail; + } else { + // get block reply. find matching request + dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); + if ( request == NULL ) { + printf("WARNING BUG ALERT SOMETHING: Got block reply with no matching request\n"); + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) + goto fail; + } else { + // Found a match + request->finished = true; + uint32_t done = 0; + while ( done < request->length ) { + if ( recv( sockFd, request->buffer + done, request->length - done, 0 ) <= 0 ) { + request->success = false; + signal_call( request->signalFd ); + goto fail; + } + } + // Success, wake up caller + request->success = true; + signal_call( request->signalFd ); + } + } + } +fail:; + // Make sure noone is trying to use the socket for sending by locking, + pthread_mutex_lock( &image.sendMutex ); + // then just set the fd to -1, but only if it's the same fd as ours, + // as someone could have established a new connection already + if ( image.sockFd == sockFd ) { + image.sockFd = -1; + } + pthread_mutex_unlock( &image.sendMutex ); + // As we're the only reader, it's safe to close the socket now + close( sockFd ); + return NULL; +} diff --git a/src/fuse/connection.h b/src/fuse/connection.h new file mode 100644 index 0000000..8ab2c35 --- /dev/null +++ b/src/fuse/connection.h @@ -0,0 +1,25 @@ +#ifndef _CONNECTION_H_ +#define _CONNECTION_H_ + +#include +#include + +struct _dnbd3_async; + +typedef struct _dnbd3_async { + struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller) + char* buffer; // Caller-provided buffer to be filled + uint64_t offset; + uint32_t length; + int signalFd; // Used to signal the caller + bool finished; // Will be set to true if the request has been handled + bool success; // Will be set to true if the request succeeded +} dnbd3_async_t; + +bool connection_init(const char *hosts, const char *image, const uint16_t rid); + +bool connection_read(dnbd3_async_t *request); + +void connection_close(); + +#endif /* CONNECTION_H_ */ diff --git a/src/fuse/helper.c b/src/fuse/helper.c index 7b1101d..65644f8 100644 --- a/src/fuse/helper.c +++ b/src/fuse/helper.c @@ -1,9 +1,13 @@ -/* - * Helper functions for imageFuse - * by Stephan Schwaer, January 2014 - */ - #include "helper.h" + +#include +#include +#include +#include +#include +#include + + void printLog( log_info *info ) { FILE *logFile; diff --git a/src/fuse/helper.h b/src/fuse/helper.h index 1c972e4..bbba44c 100644 --- a/src/fuse/helper.h +++ b/src/fuse/helper.h @@ -1,14 +1,9 @@ #ifndef IMAGEHELPER_H #define IMAGEHELPER_H -#include "../protocol.h" #include -#include -#include -#include -#include -#include -#include +#include +#include typedef struct log_info { diff --git a/src/fuse/main.c b/src/fuse/main.c index 1da12d8..d6a4d98 100644 --- a/src/fuse/main.c +++ b/src/fuse/main.c @@ -7,7 +7,7 @@ * Changed by Stephan Schwaer * */ -#include "../protocol.h" +#include "../shared/protocol.h" #include "../serialize.h" #include "helper.h" -- cgit v1.2.3-55-g7522