diff options
author | Simon Rettberg | 2015-11-21 12:24:21 +0100 |
---|---|---|
committer | Simon Rettberg | 2015-11-21 12:24:21 +0100 |
commit | 4c94cb861dfbfe2a8c9c165165cbdbc062eaa39b (patch) | |
tree | 5ef29b28a920626b572a0d6dd378440ddb7f885b /src | |
parent | [SERVER] Improve image related locking (diff) | |
download | dnbd3-4c94cb861dfbfe2a8c9c165165cbdbc062eaa39b.tar.gz dnbd3-4c94cb861dfbfe2a8c9c165165cbdbc062eaa39b.tar.xz dnbd3-4c94cb861dfbfe2a8c9c165165cbdbc062eaa39b.zip |
[FUSE] Start refactoring so we can handle multithread fuse
Diffstat (limited to 'src')
-rw-r--r-- | src/fuse/connection.c | 211 | ||||
-rw-r--r-- | src/fuse/connection.h | 25 | ||||
-rw-r--r-- | src/fuse/helper.c | 14 | ||||
-rw-r--r-- | src/fuse/helper.h | 9 | ||||
-rw-r--r-- | src/fuse/main.c | 2 | ||||
-rw-r--r-- | src/serialize.c | 2 | ||||
-rw-r--r-- | src/serialize.h | 2 | ||||
-rw-r--r-- | src/server/altservers.c | 4 | ||||
-rw-r--r-- | src/server/image.c | 4 | ||||
-rw-r--r-- | src/server/locks.c | 2 | ||||
-rw-r--r-- | src/server/threadpool.c | 2 | ||||
-rw-r--r-- | src/server/uplink.c | 4 | ||||
-rw-r--r-- | src/shared/protocol.h (renamed from src/protocol.h) | 6 | ||||
-rw-r--r-- | src/shared/signal.c (renamed from src/server/signal.c) | 0 | ||||
-rw-r--r-- | src/shared/signal.h (renamed from src/server/signal.h) | 0 |
15 files changed, 261 insertions, 26 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c new file mode 100644 index 0000000..3e1bf38 --- /dev/null +++ b/src/fuse/connection.c @@ -0,0 +1,211 @@ +#include "connection.h" +#include "helper.h" +#include "../config.h" +#include "../shared/protocol.h" +#include "../shared/signal.h" + +#include <pthread.h> +#include <string.h> +#include <stdio.h> +#include <unistd.h> + +static const size_t SHORTBUF = 100; + +static bool initDone = false; +pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; + +static struct { + dnbd3_async_t *head; + dnbd3_async_t *tail; + pthread_spinlock_t lock; +} requests; + +static struct { + char *name; + uint16_t rid; + uint64_t size; + int sockFd; + pthread_mutex_t sendMutex; + pthread_t receiveThread; +} image; + +static bool throwDataAway(int sockFd, uint32_t amount); +static void enqueueRequest(dnbd3_async_t *request); +static dnbd3_async_t* removeRequest(dnbd3_async_t *request); + +static void* connection_receiveThreadMain(void *sock); + +bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid) +{ + int sock = -1; + char host[SHORTBUF]; + const char *current, *end; + serialized_buffer_t buffer; + uint16_t remoteVersion, remoteRid; + char *remoteName; + uint64_t remoteSize; + + pthread_mutex_lock( &mutexInit ); + if ( !initDone ) { + current = hosts; + do { + // Get next host from string + while ( *current == ' ' ) current++; + end = strchr( current, ' ' ); + size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); + if ( len > SHORTBUF ) len = SHORTBUF; + snprintf( host, len, "%s", current ); + current = end + 1; + // Try to connect + sock = connect_to_server( host, PORT ); // TODO: Parse port from host + if ( sock != -1 && dnbd3_select_image( sock, lowerImage, rid, 0 ) + && dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) + && ( rid == 0 || rid == remoteRid ) ) { + image.name = strdup(remoteName); + image.rid = remoteRid; + image.size = remoteSize; + break; + } + // Failed + if ( sock != -1 ) { + close( sock ); + sock = -1; + } + // TODO: Add to alt list + } while ( end != NULL ); + if ( sock != -1 ) { + if ( pthread_mutex_init( &image.sendMutex, NULL ) != 0 + || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 + || pthread_create( &image.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) { + close( sock ); + sock = -1; + } else { + image.sockFd = sock; + requests.head = NULL; + requests.tail = NULL; + } + initDone = true; + } + } + pthread_mutex_unlock( &mutexInit ); + return sock != -1; +} + +bool connection_read(dnbd3_async_t *request) +{ + if (!initDone) return false; + enqueueRequest( request ); + pthread_mutex_lock( &image.sendMutex ); + if ( image.sockFd != -1 ) { + while ( !dnbd3_get_block( image.sockFd, request->offset, request->length, (uint64_t)request ) ) { + shutdown( image.sockFd, SHUT_RDWR ); + image.sockFd = -1; + // TODO reconnect! + pthread_mutex_unlock( &image.sendMutex ); + return false; + } + } + pthread_mutex_unlock( &image.sendMutex ); + return true; +} + +void connection_close() +{ + // +} + +static bool throwDataAway(int sockFd, uint32_t amount) +{ + uint32_t done = 0; + char tempBuffer[SHORTBUF]; + while ( done < amount ) { + if ( recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), 0 ) <= 0 ) + return false; + } + return true; +} + +static void enqueueRequest(dnbd3_async_t *request) +{ + request->next = NULL; + request->finished = false; + request->success = false; + pthread_spin_lock( &requests.lock ); + if ( requests.head == NULL ) { + requests.head = requests.tail = request; + } else { + requests.tail->next = request; + requests.tail = request; + } + pthread_spin_unlock( &requests.lock ); +} + +static dnbd3_async_t* removeRequest(dnbd3_async_t *request) +{ + pthread_spin_lock( &requests.lock ); + dnbd3_async_t *iterator, *prev = NULL; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + if ( iterator == request ) { + // Found it, break! + if ( prev != NULL ) { + prev->next = iterator->next; + } + if ( requests.tail == iterator ) { + requests.tail = prev; + } + break; + } + prev = iterator; + } + pthread_spin_unlock( &requests.lock ); + return iterator; +} + +static void* connection_receiveThreadMain(void *sockPtr) +{ + int sockFd = (int)(size_t)sockPtr; + dnbd3_reply_t reply; + for ( ;; ) { + if ( !dnbd3_get_reply( image.sockFd, &reply ) ) + goto fail; + // TODO: Ignoring anything but get block replies for now; handle the others + if ( reply.cmd != CMD_GET_BLOCK ) { + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) + goto fail; + } else { + // get block reply. find matching request + dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); + if ( request == NULL ) { + printf("WARNING BUG ALERT SOMETHING: Got block reply with no matching request\n"); + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) + goto fail; + } else { + // Found a match + request->finished = true; + uint32_t done = 0; + while ( done < request->length ) { + if ( recv( sockFd, request->buffer + done, request->length - done, 0 ) <= 0 ) { + request->success = false; + signal_call( request->signalFd ); + goto fail; + } + } + // Success, wake up caller + request->success = true; + signal_call( request->signalFd ); + } + } + } +fail:; + // Make sure noone is trying to use the socket for sending by locking, + pthread_mutex_lock( &image.sendMutex ); + // then just set the fd to -1, but only if it's the same fd as ours, + // as someone could have established a new connection already + if ( image.sockFd == sockFd ) { + image.sockFd = -1; + } + pthread_mutex_unlock( &image.sendMutex ); + // As we're the only reader, it's safe to close the socket now + close( sockFd ); + return NULL; +} diff --git a/src/fuse/connection.h b/src/fuse/connection.h new file mode 100644 index 0000000..8ab2c35 --- /dev/null +++ b/src/fuse/connection.h @@ -0,0 +1,25 @@ +#ifndef _CONNECTION_H_ +#define _CONNECTION_H_ + +#include <stdbool.h> +#include <stdint.h> + +struct _dnbd3_async; + +typedef struct _dnbd3_async { + struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller) + char* buffer; // Caller-provided buffer to be filled + uint64_t offset; + uint32_t length; + int signalFd; // Used to signal the caller + bool finished; // Will be set to true if the request has been handled + bool success; // Will be set to true if the request succeeded +} dnbd3_async_t; + +bool connection_init(const char *hosts, const char *image, const uint16_t rid); + +bool connection_read(dnbd3_async_t *request); + +void connection_close(); + +#endif /* CONNECTION_H_ */ diff --git a/src/fuse/helper.c b/src/fuse/helper.c index 7b1101d..65644f8 100644 --- a/src/fuse/helper.c +++ b/src/fuse/helper.c @@ -1,9 +1,13 @@ -/* - * Helper functions for imageFuse - * by Stephan Schwaer, January 2014 - */ - #include "helper.h" + +#include <stdio.h> +#include <string.h> +#include <errno.h> +#include <stdlib.h> +#include <unistd.h> +#include <inttypes.h> + + void printLog( log_info *info ) { FILE *logFile; diff --git a/src/fuse/helper.h b/src/fuse/helper.h index 1c972e4..bbba44c 100644 --- a/src/fuse/helper.h +++ b/src/fuse/helper.h @@ -1,14 +1,9 @@ #ifndef IMAGEHELPER_H #define IMAGEHELPER_H -#include "../protocol.h" #include <netdb.h> -#include <stdio.h> -#include <string.h> -#include <errno.h> -#include <stdlib.h> -#include <unistd.h> -#include <inttypes.h> +#include <stdbool.h> +#include <stdint.h> typedef struct log_info { diff --git a/src/fuse/main.c b/src/fuse/main.c index 1da12d8..d6a4d98 100644 --- a/src/fuse/main.c +++ b/src/fuse/main.c @@ -7,7 +7,7 @@ * Changed by Stephan Schwaer * */ -#include "../protocol.h" +#include "../shared/protocol.h" #include "../serialize.h" #include "helper.h" diff --git a/src/serialize.c b/src/serialize.c index b2be8fc..409b682 100644 --- a/src/serialize.c +++ b/src/serialize.c @@ -71,7 +71,7 @@ void serializer_put_uint64(serialized_buffer_t *buffer, uint64_t value) buffer->buffer_pointer += 8; } -void serializer_put_string(serialized_buffer_t *buffer, char *value) +void serializer_put_string(serialized_buffer_t *buffer, const char *value) { const size_t len = strlen(value) + 1; if (buffer->buffer_pointer + len > buffer->buffer_end) return; diff --git a/src/serialize.h b/src/serialize.h index 14d3d13..63b6de9 100644 --- a/src/serialize.h +++ b/src/serialize.h @@ -35,6 +35,6 @@ void serializer_put_uint16(serialized_buffer_t *buffer, uint16_t value); void serializer_put_uint64(serialized_buffer_t *buffer, uint64_t value); -void serializer_put_string(serialized_buffer_t *buffer, char *value); +void serializer_put_string(serialized_buffer_t *buffer, const char *value); #endif diff --git a/src/server/altservers.c b/src/server/altservers.c index 99cbdec..8eb07ec 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -6,9 +6,9 @@ #include "helper.h" #include "globals.h" #include "image.h" -#include "signal.h" +#include "../shared/signal.h" #include "log.h" -#include "../protocol.h" +#include "../shared/protocol.h" #include <stdlib.h> #include <unistd.h> #include <sys/errno.h> diff --git a/src/server/image.c b/src/server/image.c index fccc2a9..6f2cb6a 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -5,11 +5,11 @@ #include "uplink.h" #include "locks.h" #include "integrity.h" -#include "../protocol.h" +#include "../shared/protocol.h" #include "sockhelper.h" #include "altservers.h" #include "server.h" -#include "signal.h" +#include "../shared/signal.h" #include <assert.h> #include <stdio.h> diff --git a/src/server/locks.c b/src/server/locks.c index 6c411e5..8c4fcfb 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -17,7 +17,7 @@ #include "globals.h" #include "log.h" #include "helper.h" -#include "signal.h" +#include "../shared/signal.h" #define MAXLOCKS 2000 #define MAXTHREADS 500 diff --git a/src/server/threadpool.c b/src/server/threadpool.c index ad146ea..41f1f0b 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -1,7 +1,7 @@ #include "globals.h" #include "helper.h" #include "threadpool.h" -#include "signal.h" +#include "../shared/signal.h" #include "locks.h" #include <pthread.h> diff --git a/src/server/uplink.c b/src/server/uplink.c index aa72896..a205164 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -6,8 +6,8 @@ #include "helper.h" #include "altservers.h" #include "helper.h" -#include "../protocol.h" -#include "signal.h" +#include "../shared/protocol.h" +#include "../shared/signal.h" #include <pthread.h> #include <sys/socket.h> diff --git a/src/protocol.h b/src/shared/protocol.h index d119dc5..41e9af2 100644 --- a/src/protocol.h +++ b/src/shared/protocol.h @@ -4,8 +4,8 @@ #include <sys/types.h> #include <sys/socket.h> #include <errno.h> -#include "types.h" -#include "serialize.h" +#include "../types.h" +#include "../serialize.h" #define FLAGS8_SERVER (1) @@ -38,7 +38,7 @@ static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply) return dnbd3_read_reply( sock, reply, true ) == REPLY_OK; } -static inline bool dnbd3_select_image(int sock, char *lower_name, uint16_t rid, uint8_t flags8) +static inline bool dnbd3_select_image(int sock, const char *lower_name, uint16_t rid, uint8_t flags8) { serialized_buffer_t serialized; dnbd3_request_t request; diff --git a/src/server/signal.c b/src/shared/signal.c index a0697f8..a0697f8 100644 --- a/src/server/signal.c +++ b/src/shared/signal.c diff --git a/src/server/signal.h b/src/shared/signal.h index 0e2f85f..0e2f85f 100644 --- a/src/server/signal.h +++ b/src/shared/signal.h |