summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSimon Rettberg2020-07-28 17:49:17 +0200
committerSimon Rettberg2020-07-28 17:49:17 +0200
commite4dec3562e6cab27e1a3f40165e4c0d9d0bf05c9 (patch)
tree4b9b9a2ddf0cec0188f64639ed18f18f437b7c74 /src
parentMerge branch 'no-working-flag' into fuse_ll (diff)
downloaddnbd3-e4dec3562e6cab27e1a3f40165e4c0d9d0bf05c9.tar.gz
dnbd3-e4dec3562e6cab27e1a3f40165e4c0d9d0bf05c9.tar.xz
dnbd3-e4dec3562e6cab27e1a3f40165e4c0d9d0bf05c9.zip
[SERVER] Add FUSE mode
Still needs some cleanup and optimizations, variable naming sucks, comments, etc.
Diffstat (limited to 'src')
-rw-r--r--src/server/fuse.c633
-rw-r--r--src/server/fuse.h10
-rw-r--r--src/server/globals.h7
-rw-r--r--src/server/image.c2
-rw-r--r--src/server/image.h2
-rw-r--r--src/server/locks.h2
-rw-r--r--src/server/net.c41
-rw-r--r--src/server/net.h2
-rw-r--r--src/server/server.c32
-rw-r--r--src/server/uplink.c146
-rw-r--r--src/server/uplink.h6
11 files changed, 781 insertions, 102 deletions
diff --git a/src/server/fuse.c b/src/server/fuse.c
new file mode 100644
index 0000000..0bd7a3e
--- /dev/null
+++ b/src/server/fuse.c
@@ -0,0 +1,633 @@
+#include "fuse.h"
+#include "../types.h"
+#include "../shared/log.h"
+
+#ifndef BUILD_SERVER_FUSE
+
+//
+bool dfuse_init(const char *opts UNUSED, const char *dir UNUSED)
+{
+ logadd( LOG_ERROR, "FUSE: Not compiled in" );
+ return false;
+}
+
+void dfuse_shutdown()
+{
+}
+
+#else
+
+#define PATHLEN (2000)
+static char nullbytes[DNBD3_BLOCK_SIZE];
+
+// FUSE ENABLED
+#define FUSE_USE_VERSION 30
+//
+#include "../config.h"
+#include "locks.h"
+#include "threadpool.h"
+#include "image.h"
+#include "uplink.h"
+#include "reference.h"
+
+#include <fuse_lowlevel.h>
+#include <ctype.h>
+#include <assert.h>
+#include <string.h>
+#include <signal.h>
+
+#define INO_ROOT (1)
+#define INO_CTRL (2)
+#define INO_DIR (3)
+static const char *NAME_CTRL = "control";
+static const char *NAME_DIR = "images";
+
+typedef struct {
+ fuse_req_t req;
+ size_t fuseWriteSize;
+ uint16_t rid;
+ char name[PATHLEN];
+} lookup_t;
+
+static fuse_ino_t inoCounter = 10;
+typedef struct _dfuse_dir {
+ struct _dfuse_dir *next;
+ struct _dfuse_dir *child;
+ const char *name;
+ uint64_t size;
+ fuse_ino_t ino;
+ int refcount;
+ lookup_t *img;
+} dfuse_entry_t;
+
+typedef struct {
+ fuse_req_t req;
+ struct fuse_file_info fi;
+ dfuse_entry_t *entry;
+ dnbd3_image_t *image;
+} cmdopen_t;
+
+static dfuse_entry_t sroot = {
+ .name = "images",
+ .ino = INO_DIR,
+ .refcount = 2,
+}, *root = &sroot;
+static pthread_mutex_t dirLock;
+
+#define INIT_NONE (0)
+#define INIT_DONE (1)
+#define INIT_SHUTDOWN (2)
+#define INIT_INPROGRESS (3)
+
+static struct fuse_session *fuseSession = NULL;
+static struct fuse_chan *fuseChannel = NULL;
+static char *fuseMountPoint = NULL;
+static pthread_t fuseThreadId;
+static bool haveThread = false;
+static _Atomic(int) initState = INIT_NONE;
+static pthread_mutex_t initLock;
+static struct timespec startupTime;
+
+static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name);
+static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino);
+static void *imageLookup(void *data);
+static void *imageOpen(void *data);
+
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer);
+static void cleanupFuse();
+static void* fuseMainLoop(void *data);
+
+static void ll_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
+{
+ fi->fh = 0;
+ if ( ino == INO_CTRL ) {
+ if ( ( fi->flags & 3 ) != O_WRONLY ) {
+ fuse_reply_err( req, EINVAL );
+ } else {
+ fi->nonseekable = 1;
+ fuse_reply_open( req, fi );
+ }
+ } else if ( ino == INO_ROOT ) {
+ fuse_reply_err( req, EISDIR );
+ } else {
+ if ( ( fi->flags & 3 ) != O_RDONLY ) {
+ fuse_reply_err( req, EINVAL );
+ return;
+ }
+ mutex_lock( &dirLock );
+ dfuse_entry_t *entry = inoRecursive( root, ino );
+ if ( entry == NULL ) {
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, ENOENT );
+ } else if ( entry->img == NULL ) {
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, EISDIR );
+ } else {
+ entry->refcount++;
+ mutex_unlock( &dirLock );
+ cmdopen_t *handle = malloc( sizeof(cmdopen_t) );
+ handle->req = req;
+ handle->fi = *fi;
+ handle->entry = entry;
+ handle->image = NULL;
+ if ( !threadpool_run( &imageOpen, (void*)handle, "fuse-open" ) ) {
+ free( handle );
+ mutex_lock( &dirLock );
+ entry->refcount--;
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, ENOMEM );
+ }
+ }
+ }
+}
+
+static void *imageOpen(void *data)
+{
+ cmdopen_t *handle = (cmdopen_t*)data;
+ assert( handle->image == NULL );
+ assert( handle->entry->img->rid != 0 );
+ handle->image = image_get( handle->entry->img->name, handle->entry->img->rid, true );
+ if ( handle->image == NULL ) {
+ fuse_reply_err( handle->req, ENOENT );
+ mutex_lock( &dirLock );
+ handle->entry->refcount--;
+ mutex_unlock( &dirLock );
+ free( handle );
+ } else {
+ handle->fi.fh = (uintptr_t)handle;
+ handle->fi.keep_cache = 1;
+ fuse_reply_open( handle->req, &handle->fi );
+ handle->req = NULL;
+ }
+ return NULL;
+}
+
+static dfuse_entry_t* addImage(dfuse_entry_t **dir, const char *name, lookup_t *img)
+{
+ const char *slash = strchr( name, '/' );
+ if ( slash == NULL ) {
+ // Name portion at the end
+ char *path = NULL;
+ if ( asprintf( &path, "%s:%d", name, (int)img->rid ) == -1 )
+ abort();
+ dfuse_entry_t *entry = dirLookup( *dir, path );
+ if ( entry == NULL ) {
+ entry = calloc( 1, sizeof( *entry ) );
+ entry->next = *dir;
+ *dir = entry;
+ entry->name = path;
+ entry->ino = inoCounter++;
+ entry->img = img;
+ } else {
+ free( path );
+ if ( entry->img == NULL ) {
+ return NULL;
+ }
+ }
+ return entry;
+ } else {
+ // Dirname
+ char *path = NULL;
+ if ( asprintf( &path, "%.*s", (int)( slash - name ), name ) == -1 )
+ abort();
+ dfuse_entry_t *entry = dirLookup( *dir, path );
+ if ( entry == NULL ) {
+ entry = calloc( 1, sizeof( *entry ) );
+ entry->next = *dir;
+ *dir = entry;
+ entry->name = path;
+ entry->ino = inoCounter++;
+ } else {
+ free( path );
+ }
+ return addImage( &entry->child, slash + 1, img );
+ }
+}
+
+static void *imageLookup(void *data)
+{
+ lookup_t *lu = (lookup_t*)data;
+ dnbd3_image_t *image = image_getOrLoad( lu->name, lu->rid );
+ if ( image == NULL ) {
+ fuse_reply_err( lu->req, ENOENT );
+ free( lu );
+ } else {
+ mutex_lock( &dirLock );
+ dfuse_entry_t *entry = addImage( &root->child, lu->name, lu );
+ if ( entry != NULL ) {
+ entry->size = image->virtualFilesize;
+ }
+ lu->rid = image->rid; // In case it was 0
+ mutex_unlock( &dirLock );
+ image_release( image );
+ if ( entry == NULL ) {
+ fuse_reply_err( lu->req, EINVAL );
+ } else {
+ fuse_reply_write( lu->req, lu->fuseWriteSize );
+ }
+ }
+ return NULL;
+}
+
+static void ll_write(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi UNUSED)
+{
+ if ( ino != INO_CTRL ) {
+ fuse_reply_err( req, EROFS );
+ return;
+ }
+ if ( off != 0 ) {
+ fuse_reply_err( req, ESPIPE );
+ return;
+ }
+ if ( size >= PATHLEN ) {
+ fuse_reply_err( req, ENOSPC );
+ return;
+ }
+ size_t colon = 0;
+ int rid = 0;
+ for ( size_t i = 0; i < size; ++i ) {
+ if ( buf[i] == '\0' || buf[i] == '\n' ) {
+ if ( colon == 0 ) {
+ colon = i;
+ }
+ break;
+ }
+ if ( colon != 0 ) {
+ if ( !isdigit( buf[i] ) ) {
+ logadd( LOG_WARNING, "FUSE: Malformed rid" );
+ fuse_reply_err( req, EINVAL );
+ return;
+ }
+ rid = rid * 10 + ( buf[i] - '0' ); // Can overflow but who cares
+ } else if ( buf[i] == ':' ) {
+ colon = i; // Image name starting with ':' would be broken...
+ }
+ }
+ if ( rid < 0 || rid > 65535 ) {
+ logadd( LOG_WARNING, "FUSE: Invalid rid '%d'", rid );
+ fuse_reply_err( req, EINVAL );
+ return;
+ }
+ if ( colon == 0 ) {
+ colon = size;
+ }
+ lookup_t *lu = malloc( sizeof(lookup_t) );
+ lu->rid = (uint16_t)rid;
+ lu->req = req;
+ lu->fuseWriteSize = size;
+ if ( snprintf( lu->name, PATHLEN, "%.*s", (int)colon, buf ) == -1 ) {
+ free( lu );
+ fuse_reply_err( req, ENOSPC );
+ return;
+ }
+ logadd( LOG_DEBUG1, "FUSE: Request for '%s:%d'", lu->name, (int)lu->rid );
+ if ( !threadpool_run( &imageLookup, (void*)lu, "fuse-lookup" ) ) {
+ free( lu );
+ fuse_reply_err( req, EAGAIN );
+ }
+}
+
+static void ll_read( fuse_req_t req, fuse_ino_t ino UNUSED, size_t size, off_t off, struct fuse_file_info *fi )
+{
+ if ( fi->fh == 0 ) {
+ fuse_reply_err( req, 0 );
+ return;
+ }
+ cmdopen_t *handle = (cmdopen_t*)fi->fh;
+ dnbd3_image_t *image = handle->image;
+ if ( off < 0 || (uint64_t)off >= image->virtualFilesize ) {
+ fuse_reply_err( req, 0 );
+ return;
+ }
+ if ( off + size > image->virtualFilesize ) {
+ size = image->virtualFilesize - off;
+ }
+
+ // Check if cached locally
+ dnbd3_cache_map_t *cache = ref_get_cachemap( image );
+ if ( cache != NULL ) {
+ // This is a proxyed image, check if we need to relay the request...
+ const uint64_t start = (uint64_t)off & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint64_t end = (off + size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ if ( !image_isRangeCachedUnsafe( cache, start, end ) ) {
+ ref_put( &cache->reference );
+ if ( size > (uint32_t)_maxPayload ) {
+ size = (uint32_t)_maxPayload;
+ }
+ if ( !uplink_request( image, req, &uplinkCallback, 0, off, (uint32_t)size ) ) {
+ logadd( LOG_DEBUG1, "FUSE: Could not relay uncached request to upstream proxy for image %s:%d",
+ image->name, image->rid );
+ fuse_reply_err( req, EIO );
+ }
+ return; // ASYNC
+ }
+ ref_put( &cache->reference );
+ }
+
+ // Is cached
+ size_t readSize = size;
+ if ( off + readSize > image->realFilesize ) {
+ if ( (uint64_t)off >= image->realFilesize ) {
+ readSize = 0;
+ } else {
+ readSize = image->realFilesize - off;
+ }
+ }
+ struct fuse_bufvec *vec = calloc( 1, sizeof(*vec) + sizeof(struct fuse_buf) );
+ if ( readSize != 0 ) {
+ // Real data from file
+ vec->buf[vec->count++] = (struct fuse_buf){
+ .size = readSize,
+ .flags = FUSE_BUF_IS_FD | FUSE_BUF_FD_RETRY | FUSE_BUF_FD_SEEK,
+ .fd = image->readFd,
+ .pos = off,
+ };
+ }
+ if ( readSize != size ) {
+ vec->buf[vec->count++] = (struct fuse_buf){
+ .size = size - readSize,
+ .mem = nullbytes,
+ .fd = -1,
+ };
+ }
+ fuse_reply_data( req, vec, 0 );
+ free( vec );
+}
+
+static bool statInternal(fuse_ino_t ino, struct stat *stbuf)
+{
+ switch ( ino ) {
+ case INO_ROOT:
+ case INO_DIR:
+ stbuf->st_mode = S_IFDIR | 0555;
+ stbuf->st_nlink = 2;
+ stbuf->st_mtim = startupTime;
+ break;
+ case INO_CTRL:
+ stbuf->st_mode = S_IFREG | 0222;
+ stbuf->st_nlink = 1;
+ stbuf->st_size = 0;
+ clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim );
+ break;
+ default:
+ return false;
+ }
+ stbuf->st_ctim = stbuf->st_atim = startupTime;
+ stbuf->st_uid = 0;
+ stbuf->st_ino = ino;
+ return true;
+}
+
+/**
+ * HOLD LOCK
+ */
+static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name)
+{
+ if ( dir == NULL )
+ return NULL;
+ for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) {
+ if ( strcmp( it->name, name ) == 0 )
+ return it;
+ }
+ return NULL;
+}
+
+static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino)
+{
+ for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) {
+ logadd( LOG_DEBUG1, "ino %d is %s", (int)it->ino, it->name );
+ if ( it->ino == ino )
+ return it;
+ if ( it->img == NULL ) {
+ dir = inoRecursive( it->child, ino );
+ if ( dir != NULL )
+ return dir;
+ }
+ }
+ return NULL;
+}
+
+/**
+ * HOLD LOCK
+ */
+static void entryToStat(dfuse_entry_t *entry, struct stat *stbuf)
+{
+ if ( entry->img == NULL ) {
+ stbuf->st_mode = S_IFDIR | 0555;
+ stbuf->st_nlink = 2;
+ } else {
+ stbuf->st_mode = S_IFREG | 0444;
+ stbuf->st_nlink = 1;
+ stbuf->st_size = entry->size;
+ }
+ stbuf->st_ino = entry->ino;
+ stbuf->st_uid = 0;
+ stbuf->st_ctim = stbuf->st_atim = stbuf->st_mtim = startupTime;
+}
+
+static void ll_lookup(fuse_req_t req, fuse_ino_t parent, const char *name)
+{
+ logadd( LOG_DEBUG2, "Lookup at ino %d for '%s'", (int)parent, name );
+ if ( parent == INO_ROOT ) {
+ struct fuse_entry_param e = { 0 };
+ if ( strcmp( name, NAME_DIR ) == 0 ) {
+ e.ino = INO_DIR;
+ } else if ( strcmp( name, NAME_CTRL ) == 0 ) {
+ e.ino = INO_CTRL;
+ e.attr_timeout = e.entry_timeout = 3600;
+ }
+ if ( e.ino != 0 && statInternal( e.ino, &e.attr ) ) {
+ fuse_reply_entry( req, &e );
+ return;
+ }
+ } else {
+ mutex_lock( &dirLock );
+ dfuse_entry_t *dir = inoRecursive( root, parent );
+ if ( dir != NULL ) {
+ if ( dir->img != NULL ) {
+ mutex_unlock( &dirLock );
+ fuse_reply_err( req, ENOTDIR );
+ return;
+ }
+ dfuse_entry_t *entry = dirLookup( dir->child, name );
+ if ( entry != NULL ) {
+ struct fuse_entry_param e = { .ino = entry->ino };
+ entryToStat( entry, &e.attr );
+ mutex_unlock( &dirLock );
+ fuse_reply_entry( req, &e );
+ return;
+ }
+ }
+ mutex_unlock( &dirLock );
+ }
+ fuse_reply_err( req, ENOENT );
+}
+
+static void ll_getattr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi UNUSED)
+{
+ struct stat stbuf = { .st_ino = 0 };
+ if ( !statInternal( ino, &stbuf ) ) {
+ mutex_lock( &dirLock );
+ dfuse_entry_t *entry = inoRecursive( root, ino );
+ if ( entry != NULL ) {
+ entryToStat( entry, &stbuf );
+ }
+ mutex_unlock( &dirLock );
+ }
+ if ( stbuf.st_ino == 0 ) {
+ fuse_reply_err( req, ENOENT );
+ } else {
+ fuse_reply_attr( req, &stbuf, 0 );
+ }
+}
+
+void ll_setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr UNUSED, int to_set UNUSED, struct fuse_file_info *fi)
+{
+ ll_getattr( req, ino, fi );
+}
+
+void ll_release(fuse_req_t req, fuse_ino_t ino UNUSED, struct fuse_file_info *fi)
+{
+ if ( fi->fh != 0 ) {
+ cmdopen_t *handle = (cmdopen_t*)fi->fh;
+ image_release( handle->image );
+ free( handle );
+ }
+ fuse_reply_err( req, 0 );
+}
+
+static void uplinkCallback(void *data, uint64_t handle UNUSED, uint64_t start UNUSED, uint32_t length, const char *buffer)
+{
+ fuse_req_t req = (fuse_req_t)data;
+ if ( buffer == NULL ) {
+ fuse_reply_err( req, EIO );
+ } else {
+ fuse_reply_buf( req, buffer, length );
+ }
+}
+
+/* map the implemented fuse operations */
+static struct fuse_lowlevel_ops fuseOps = {
+ .lookup = ll_lookup,
+ .getattr = ll_getattr,
+ .setattr = ll_setattr,
+ //.readdir = ll_readdir,
+ .open = ll_open,
+ .release = ll_release,
+ .read = ll_read,
+ .write = ll_write,
+ //.init = ll_init,
+ //.destroy = ll_destroy,
+};
+
+bool dfuse_init(const char *opts, const char *dir)
+{
+ int ex = INIT_NONE;
+ if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_INPROGRESS ) ) {
+ logadd( LOG_ERROR, "Calling dfuse_init twice" );
+ exit( 1 );
+ }
+ mutex_init( &initLock, LOCK_FUSE_INIT );
+ mutex_lock( &initLock );
+ mutex_init( &dirLock, LOCK_FUSE_DIR );
+ clock_gettime( CLOCK_REALTIME, &startupTime );
+ struct fuse_args args = FUSE_ARGS_INIT( 0, NULL );
+ if ( opts != NULL ) {
+ fuse_opt_add_arg( &args, opts );
+ }
+ fuse_opt_add_arg( &args, "-odefault_permissions" );
+ fuse_opt_add_arg( &args, dir );
+ //
+ if ( fuse_parse_cmdline( &args, &fuseMountPoint, NULL, NULL ) == -1 ) {
+ logadd( LOG_ERROR, "FUSE: Error parsing command line" );
+ goto fail;
+ }
+ fuseChannel = fuse_mount( fuseMountPoint, &args );
+ if ( fuseChannel == NULL ) {
+ logadd( LOG_ERROR, "FUSE: Cannot mount to %s", dir );
+ goto fail;
+ }
+ fuseSession = fuse_lowlevel_new( &args, &fuseOps, sizeof( fuseOps ), NULL );
+ if ( fuseSession == NULL ) {
+ logadd( LOG_ERROR, "FUSE: Error initializing fuse session" );
+ goto fail;
+ }
+ fuse_session_add_chan( fuseSession, fuseChannel );
+ if ( 0 != thread_create( &fuseThreadId, NULL, &fuseMainLoop, (void *)NULL ) ) {
+ logadd( LOG_ERROR, "FUSE: Could not start thread" );
+ goto fail;
+ }
+ haveThread = true;
+ // Init OK
+ mutex_unlock( &initLock );
+ return true;
+fail:
+ cleanupFuse();
+ fuse_opt_free_args( &args );
+ initState = INIT_SHUTDOWN;
+ mutex_unlock( &initLock );
+ return false;
+}
+
+void dfuse_shutdown()
+{
+ if ( initState == INIT_NONE )
+ return;
+ for ( ;; ) {
+ int ex = INIT_DONE;
+ if ( atomic_compare_exchange_strong( &initState, &ex, INIT_SHUTDOWN ) )
+ break; // OK, do the shutdown
+ if ( ex == INIT_INPROGRESS )
+ continue; // dfuse_init in progress, wait for mutex
+ // Wrong state
+ logadd( LOG_WARNING, "Called dfuse_shutdown without dfuse_init first" );
+ return;
+ }
+ logadd( LOG_INFO, "Shutting down fuse mainloop..." );
+ mutex_lock( &initLock );
+ if ( fuseSession != NULL ) {
+ fuse_session_exit( fuseSession );
+ }
+ if ( !haveThread ) {
+ cleanupFuse();
+ }
+ mutex_unlock( &initLock );
+ if ( haveThread ) {
+ logadd( LOG_DEBUG1, "FUSE: Sending USR1 to mainloop thread" );
+ pthread_kill( fuseThreadId, SIGUSR1 );
+ pthread_join( fuseThreadId, NULL );
+ }
+}
+
+static void* fuseMainLoop(void *data UNUSED)
+{
+ int ex = INIT_INPROGRESS;
+ if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_DONE ) ) {
+ logadd( LOG_WARNING, "FUSE: Unexpected state in fuseMainLoop: %d", ex );
+ return NULL;
+ }
+ logadd( LOG_INFO, "FUSE: Starting mainloop" );
+ fuse_session_loop_mt( fuseSession );
+ logadd( LOG_INFO, "FUSE: Left mainloop" );
+ mutex_lock( &initLock );
+ cleanupFuse();
+ mutex_unlock( &initLock );
+ return NULL;
+}
+
+static void cleanupFuse()
+{
+ if ( fuseChannel != NULL ) {
+ fuse_session_remove_chan( fuseChannel );
+ }
+ if ( fuseSession != NULL ) {
+ fuse_session_destroy( fuseSession );
+ fuseSession = NULL;
+ }
+ if ( fuseMountPoint != NULL && fuseChannel != NULL ) {
+ fuse_unmount( fuseMountPoint, fuseChannel );
+ }
+ fuseChannel = NULL;
+}
+
+#endif
diff --git a/src/server/fuse.h b/src/server/fuse.h
new file mode 100644
index 0000000..f01ad58
--- /dev/null
+++ b/src/server/fuse.h
@@ -0,0 +1,10 @@
+#ifndef _FUSE_H_
+#define _FUSE_H_
+
+#include <stdbool.h>
+
+bool dfuse_init(const char *opts, const char *dir);
+
+void dfuse_shutdown();
+
+#endif
diff --git a/src/server/globals.h b/src/server/globals.h
index 95d8ec2..8807019 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -18,12 +18,15 @@ typedef struct _dnbd3_uplink dnbd3_uplink_t;
typedef struct _dnbd3_image dnbd3_image_t;
typedef struct _dnbd3_client dnbd3_client_t;
+typedef void (*uplink_callback)(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer);
+
typedef struct _dnbd3_queue_client
{
struct _dnbd3_queue_client *next;
- uint64_t handle; // Handle used by client
+ void* data; // Passed back to callback
+ uint64_t handle; // Passed back to callback
uint64_t from, to; // Client range
- dnbd3_client_t * client; // Client to send reply to
+ uplink_callback callback; // Callback function
} dnbd3_queue_client_t;
typedef struct _dnbd3_queue_entry
diff --git a/src/server/image.c b/src/server/image.c
index efece62..0ffa349 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -339,7 +339,7 @@ dnbd3_image_t* image_byId(int imgId)
* point...
* Locks on: imageListLock, _images[].lock
*/
-dnbd3_image_t* image_get(char *name, uint16_t revision, bool ensureFdOpen)
+dnbd3_image_t* image_get(const char *name, uint16_t revision, bool ensureFdOpen)
{
int i;
dnbd3_image_t *candidate = NULL;
diff --git a/src/server/image.h b/src/server/image.h
index b23711b..7b6583c 100644
--- a/src/server/image.h
+++ b/src/server/image.h
@@ -19,7 +19,7 @@ bool image_ensureOpen(dnbd3_image_t *image);
dnbd3_image_t* image_byId(int imgId);
-dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking);
+dnbd3_image_t* image_get(const char *name, uint16_t revision, bool checkIfWorking);
bool image_reopenCacheFd(dnbd3_image_t *image, const bool force);
diff --git a/src/server/locks.h b/src/server/locks.h
index 6111d71..ff27a33 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -23,6 +23,8 @@
#define LOCK_UPLINK_RTT 200
#define LOCK_UPLINK_SEND 210
#define LOCK_RPC_ACL 220
+#define LOCK_FUSE_INIT 300
+#define LOCK_FUSE_DIR 310
//
diff --git a/src/server/net.c b/src/server/net.c
index 6b930df..c33a12e 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -58,6 +58,7 @@ static atomic_uint_fast64_t totalBytesSent = 0;
static bool addToList(dnbd3_client_t *client);
static void removeFromList(dnbd3_client_t *client);
static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client);
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer);
static inline bool recv_request_header(int sock, dnbd3_request_t *request)
{
@@ -113,7 +114,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
* Send reply with optional payload. payload can be null. The caller has to
* acquire the sendMutex first.
*/
-static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
+static inline bool send_reply(int sock, dnbd3_reply_t *reply, const void *payload)
{
const uint32_t size = reply->size;
fixup_reply( *reply );
@@ -357,7 +358,9 @@ void* net_handleNewConnection(void *clientPtr)
goto exit_client_cleanup;
}
}
- if ( !uplink_request( NULL, client, request.handle, offset, request.size, request.hops ) ) {
+ client->relayedCount++;
+ if ( !uplink_requestClient( client, &uplinkCallback, request.handle, offset, request.size, request.hops ) ) {
+ client->relayedCount--;
logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d",
client->hostName, image->name, image->rid );
goto exit_client_cleanup;
@@ -682,9 +685,21 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client)
if ( client->image != NULL ) {
dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
if ( uplink != NULL ) {
- uplink_removeClient( uplink, client );
+ if ( client->relayedCount != 0 ) {
+ uplink_removeEntry( uplink, client, &uplinkCallback );
+ }
ref_put( &uplink->reference );
}
+ if ( client->relayedCount != 0 ) {
+ logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount );
+ int i;
+ for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) {
+ usleep( 10000 );
+ }
+ if ( client->relayedCount != 0 ) {
+ logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount );
+ }
+ }
}
mutex_lock( &client->sendMutex );
if ( client->sock != -1 ) {
@@ -726,15 +741,21 @@ static bool addToList(dnbd3_client_t *client)
return true;
}
-void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle)
+static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer)
{
- dnbd3_reply_t reply;
- reply.magic = dnbd3_packet_magic;
- reply.cmd = cmd;
- reply.handle = handle;
- reply.size = 0;
+ dnbd3_client_t *client = (dnbd3_client_t*)data;
+ dnbd3_reply_t reply = {
+ .magic = dnbd3_packet_magic,
+ .cmd = buffer == NULL ? CMD_ERROR : CMD_GET_BLOCK,
+ .handle = handle,
+ .size = length,
+ };
mutex_lock( &client->sendMutex );
- send_reply( client->sock, &reply, NULL );
+ send_reply( client->sock, &reply, buffer );
+ if ( buffer == NULL ) {
+ shutdown( client->sock, SHUT_RDWR );
+ }
+ client->relayedCount--;
mutex_unlock( &client->sendMutex );
}
diff --git a/src/server/net.h b/src/server/net.h
index 7719aef..6813b49 100644
--- a/src/server/net.h
+++ b/src/server/net.h
@@ -37,6 +37,4 @@ void net_disconnectAll();
void net_waitForAllDisconnected();
-void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle);
-
#endif /* NET_H_ */
diff --git a/src/server/server.c b/src/server/server.c
index fa7bcda..ad8b13e 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -29,6 +29,7 @@
#include "integrity.h"
#include "threadpool.h"
#include "rpc.h"
+#include "fuse.h"
#include "../version.h"
#include "../shared/sockhelper.h"
@@ -108,6 +109,9 @@ void dnbd3_printHelp(char *argv_0)
printf( "Usage: %s [OPTIONS]...\n", argv_0 );
printf( "Start the DNBD3 server\n" );
printf( "-c or --config Configuration directory (default /etc/dnbd3-server/)\n" );
+#ifdef BUILD_SERVER_FUSE
+ printf( "-m or --mount FUSE mount point\n ");
+#endif
printf( "-n or --nodaemon Start server in foreground\n" );
printf( "-b or --bind Local Address to bind to\n" );
printf( "-h or --help Show this help text and quit\n" );
@@ -140,6 +144,8 @@ _Noreturn static void dnbd3_cleanup()
_shutdown = true;
logadd( LOG_INFO, "Cleanup..." );
+ dfuse_shutdown();
+
if ( hasTimerThread ) {
pthread_kill( timerThread, SIGINT );
thread_join( timerThread, NULL );
@@ -190,11 +196,13 @@ int main(int argc, char *argv[])
char *paramCreate = NULL;
char *bindAddress = NULL;
char *errorMsg = NULL;
+ char *mountDir = NULL;
int64_t paramSize = -1;
int paramRevision = -1;
- static const char *optString = "b:c:d:hnv?";
+ static const char *optString = "b:c:m:d:hnv?";
static const struct option longOpts[] = {
{ "config", required_argument, NULL, 'c' },
+ { "mount", required_argument, NULL, 'm' },
{ "nodaemon", no_argument, NULL, 'n' },
{ "reload", no_argument, NULL, 'r' },
{ "help", no_argument, NULL, 'h' },
@@ -218,6 +226,13 @@ int main(int argc, char *argv[])
case 'c':
_configDir = strdup( optarg );
break;
+ case 'm':
+#ifndef BUILD_SERVER_FUSE
+ fprintf( "FUSE support not enabled at build time.\n" );
+ return 8;
+#endif
+ mountDir = strdup( optarg );
+ break;
case 'n':
demonize = 0;
break;
@@ -342,6 +357,11 @@ int main(int argc, char *argv[])
net_init();
uplink_globalsInit();
rpc_init();
+ if ( mountDir != NULL && !dfuse_init( NULL, mountDir ) ) {
+ logadd( LOG_ERROR, "Cannot mount fuse directory to %s", mountDir );
+ dnbd3_cleanup();
+ return EXIT_FAILURE;
+ }
logadd( LOG_INFO, "DNBD3 server starting...." );
logadd( LOG_INFO, "Machine type: " ENDIAN_MODE );
logadd( LOG_INFO, "Build Type: " TOSTRING( BUILD_TYPE ) );
@@ -385,6 +405,7 @@ int main(int argc, char *argv[])
// Initialize thread pool
if ( !threadpool_init( 8 ) ) {
logadd( LOG_ERROR, "Could not init thread pool!\n" );
+ dnbd3_cleanup();
exit( EXIT_FAILURE );
}
@@ -526,10 +547,11 @@ static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED)
if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) {
pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread
}
- }
- if ( pthread_equal( pthread_self(), mainThread ) ) {
- // Signal received by main thread -- handle
- dnbd3_handleSignal( signum );
+ // Source is not this process -- only then do we honor signals
+ if ( pthread_equal( pthread_self(), mainThread ) ) {
+ // Signal received by main thread -- handle
+ dnbd3_handleSignal( signum );
+ }
}
}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index bf6f32e..cb4f956 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -38,6 +38,7 @@ static bool connectionShouldShutdown(dnbd3_uplink_t *uplink);
static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
static void *prefetchForClient(void *data);
typedef struct {
@@ -180,8 +181,7 @@ static void cancelAllRequests(dnbd3_uplink_t *uplink)
while ( it != NULL ) {
dnbd3_queue_client_t *cit = it->clients;
while ( cit != NULL ) {
- net_sendReply( cit->client, CMD_ERROR, cit->handle );
- cit->client->relayedCount--;
+ (*cit->callback)( cit->data, cit->handle, 0, 0, NULL );
dnbd3_queue_client_t *next = cit->next;
free( cit );
cit = next;
@@ -229,15 +229,13 @@ static void freeUplinkStruct(ref *ref)
* Remove given client from uplink request queue
* Locks on: uplink.queueLock
*/
-void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
+void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback)
{
- if ( client->relayedCount == 0 )
- return;
mutex_lock( &uplink->queueLock );
for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) {
- if ( (**cit).client == client ) {
- --client->relayedCount;
+ if ( (**cit).data == data && (**cit).callback == callback ) {
+ (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL );
dnbd3_queue_client_t *entry = *cit;
*cit = (**cit).next;
free( entry );
@@ -247,63 +245,78 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
}
}
mutex_unlock( &uplink->queueLock );
- if ( unlikely( client->relayedCount != 0 ) ) {
- logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount );
- int i;
- for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) {
- usleep( 10000 );
- }
- if ( client->relayedCount != 0 ) {
- logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount );
- }
- }
}
-/**
- * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
- * If client is NULL, this is assumed to be a background replication request.
- * Locks on: uplink.queueLock, uplink.sendMutex
- */
-bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
- bool getUplink = ( uplink == NULL );
- assert( client != NULL || uplink != NULL );
- if ( hops++ > 200 ) { // This is just silly
+ assert( client != NULL && callback != NULL );
+ if ( hops > 200 ) { // This is just silly
logadd( LOG_WARNING, "Refusing to relay a request that has > 200 hops" );
return false;
}
- if ( length > (uint32_t)_maxPayload ) {
- logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
- return false;
- }
- if ( getUplink ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( client->image, -1, NULL, -1 );
uplink = ref_get_uplink( &client->image->uplinkref );
- if ( unlikely( uplink == NULL ) ) {
- uplink_init( client->image, -1, NULL, -1 );
- uplink = ref_get_uplink( &client->image->uplinkref );
- if ( uplink == NULL ) {
- logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
- return false;
- }
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
}
}
- if ( uplink->shutdown ) {
- logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
- goto fail_ref;
- }
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
+ bool ret;
if ( client != NULL && hops > 1
&& isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- goto fail_ref;
+ ret = false;
+ } else {
+ ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops );
+ }
+ ref_put( &uplink->reference );
+ return ret;
+}
+
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length)
+{
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( image, -1, NULL, -1 );
+ uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
+ }
+ }
+ bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 );
+ ref_put( &uplink->reference );
+ return ret;
+}
+
+/**
+ * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
+ * If client is NULL, this is assumed to be a background replication request.
+ * Locks on: uplink.queueLock, uplink.sendMutex
+ */
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+{
+ assert( uplink != NULL );
+ assert( data == NULL || callback != NULL );
+ if ( uplink->shutdown ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
+ return false;
+ }
+ if ( length > (uint32_t)_maxPayload ) {
+ logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length );
+ return false;
}
struct {
uint64_t handle, start, end;
} req;
+ hops++;
do {
const uint64_t end = start + length;
dnbd3_queue_entry_t *request = NULL, *last = NULL;
@@ -350,14 +363,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han
#endif
request->hopCount = hops;
request->sent = true; // Optimistic; would be set to false on failure
- if ( client == NULL ) {
+ if ( callback == NULL ) {
// BGR
request->clients = NULL;
} else {
c = &request->clients;
}
isNew = true;
- } else if ( client == NULL ) {
+ } else if ( callback == NULL ) {
// Replication request that maches existing request. Do nothing
isNew = false;
} else {
@@ -381,14 +394,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han
req.handle = request->handle;
req.start = request->from;
req.end = request->to;
- if ( client != NULL ) {
+ if ( callback != NULL ) {
*c = malloc( sizeof( *request->clients ) );
(**c).next = NULL;
(**c).handle = handle;
(**c).from = start;
(**c).to = end;
- (**c).client = client;
- client->relayedCount++;
+ (**c).data = data;
+ (**c).callback = callback;
}
mutex_unlock( &uplink->queueLock );
@@ -426,7 +439,7 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han
}
success_ref:
- if ( client != NULL ) {
+ if ( callback != NULL ) {
// Was from client -- potential prefetch
// Same size as this request, but consider end of image...
uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end,
@@ -442,16 +455,9 @@ success_ref:
threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" );
}
}
- if ( getUplink ) {
- ref_put( &uplink->reference );
- }
return true;
fail_lock:
mutex_unlock( &uplink->queueLock );
-fail_ref:
- if ( getUplink ) {
- ref_put( &uplink->reference );
- }
return false;
}
@@ -461,7 +467,7 @@ static void *prefetchForClient(void *data)
dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image );
if ( cache != NULL ) {
if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) {
- uplink_request( job->uplink, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
+ uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
}
ref_put( &cache->reference );
}
@@ -824,7 +830,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
const uint64_t handle = ++uplink->queueId;
- if ( !uplink_request( uplink, NULL, handle, offset, size, 0 ) ) {
+ if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)",
PIMG(uplink->image) );
ref_put( &cache->reference );
@@ -902,7 +908,7 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa
*/
static void handleReceive(dnbd3_uplink_t *uplink)
{
- dnbd3_reply_t inReply, outReply;
+ dnbd3_reply_t inReply;
int ret;
assert_uplink_thread();
for (;;) {
@@ -967,7 +973,6 @@ static void handleReceive(dnbd3_uplink_t *uplink)
logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)",
inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) );
}
- struct iovec iov[2];
// 1) Write to cache file
if ( unlikely( uplink->cacheFd == -1 ) ) {
reopenCacheFd( uplink, false );
@@ -1035,28 +1040,11 @@ static void handleReceive(dnbd3_uplink_t *uplink)
PIMG(uplink->image) );
continue;
}
- outReply.magic = dnbd3_packet_magic;
dnbd3_queue_client_t *next;
for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
assert( c->from >= start && c->to <= end );
- dnbd3_client_t * const client = c->client;
- outReply.cmd = CMD_GET_BLOCK;
- outReply.handle = c->handle;
- outReply.size = (uint32_t)( c->to - c->from );
- iov[0].iov_base = &outReply;
- iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = uplink->recvBuffer + (c->from - start);
- iov[1].iov_len = outReply.size;
- fixup_reply( outReply );
- mutex_lock( &client->sendMutex );
- if ( client->sock != -1 ) {
- ssize_t sent = writev( client->sock, iov, 2 );
- if ( sent > (ssize_t)sizeof outReply ) {
- client->bytesSent += (size_t)sent - sizeof outReply;
- }
- }
- mutex_unlock( &client->sendMutex );
- client->relayedCount--;
+ (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ),
+ (const char*)( uplink->recvBuffer + (c->from - start) ) );
next = c->next;
free( c );
}
diff --git a/src/server/uplink.h b/src/server/uplink.h
index 8f69b05..6cd69ea 100644
--- a/src/server/uplink.h
+++ b/src/server/uplink.h
@@ -10,9 +10,11 @@ uint64_t uplink_getTotalBytesReceived();
bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version);
-void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client);
+void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback);
-bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
+bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
+
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length);
bool uplink_shutdown(dnbd3_image_t *image);