From e4dec3562e6cab27e1a3f40165e4c0d9d0bf05c9 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 28 Jul 2020 17:49:17 +0200 Subject: [SERVER] Add FUSE mode Still needs some cleanup and optimizations, variable naming sucks, comments, etc. --- src/server/fuse.c | 633 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/server/fuse.h | 10 + src/server/globals.h | 7 +- src/server/image.c | 2 +- src/server/image.h | 2 +- src/server/locks.h | 2 + src/server/net.c | 41 +++- src/server/net.h | 2 - src/server/server.c | 32 ++- src/server/uplink.c | 146 ++++++------ src/server/uplink.h | 6 +- 11 files changed, 781 insertions(+), 102 deletions(-) create mode 100644 src/server/fuse.c create mode 100644 src/server/fuse.h (limited to 'src') diff --git a/src/server/fuse.c b/src/server/fuse.c new file mode 100644 index 0000000..0bd7a3e --- /dev/null +++ b/src/server/fuse.c @@ -0,0 +1,633 @@ +#include "fuse.h" +#include "../types.h" +#include "../shared/log.h" + +#ifndef BUILD_SERVER_FUSE + +// +bool dfuse_init(const char *opts UNUSED, const char *dir UNUSED) +{ + logadd( LOG_ERROR, "FUSE: Not compiled in" ); + return false; +} + +void dfuse_shutdown() +{ +} + +#else + +#define PATHLEN (2000) +static char nullbytes[DNBD3_BLOCK_SIZE]; + +// FUSE ENABLED +#define FUSE_USE_VERSION 30 +// +#include "../config.h" +#include "locks.h" +#include "threadpool.h" +#include "image.h" +#include "uplink.h" +#include "reference.h" + +#include +#include +#include +#include +#include + +#define INO_ROOT (1) +#define INO_CTRL (2) +#define INO_DIR (3) +static const char *NAME_CTRL = "control"; +static const char *NAME_DIR = "images"; + +typedef struct { + fuse_req_t req; + size_t fuseWriteSize; + uint16_t rid; + char name[PATHLEN]; +} lookup_t; + +static fuse_ino_t inoCounter = 10; +typedef struct _dfuse_dir { + struct _dfuse_dir *next; + struct _dfuse_dir *child; + const char *name; + uint64_t size; + fuse_ino_t ino; + int refcount; + lookup_t *img; +} dfuse_entry_t; + +typedef struct { + fuse_req_t req; + struct fuse_file_info fi; + dfuse_entry_t *entry; + dnbd3_image_t *image; +} cmdopen_t; + +static dfuse_entry_t sroot = { + .name = "images", + .ino = INO_DIR, + .refcount = 2, +}, *root = &sroot; +static pthread_mutex_t dirLock; + +#define INIT_NONE (0) +#define INIT_DONE (1) +#define INIT_SHUTDOWN (2) +#define INIT_INPROGRESS (3) + +static struct fuse_session *fuseSession = NULL; +static struct fuse_chan *fuseChannel = NULL; +static char *fuseMountPoint = NULL; +static pthread_t fuseThreadId; +static bool haveThread = false; +static _Atomic(int) initState = INIT_NONE; +static pthread_mutex_t initLock; +static struct timespec startupTime; + +static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name); +static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino); +static void *imageLookup(void *data); +static void *imageOpen(void *data); + +static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer); +static void cleanupFuse(); +static void* fuseMainLoop(void *data); + +static void ll_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) +{ + fi->fh = 0; + if ( ino == INO_CTRL ) { + if ( ( fi->flags & 3 ) != O_WRONLY ) { + fuse_reply_err( req, EINVAL ); + } else { + fi->nonseekable = 1; + fuse_reply_open( req, fi ); + } + } else if ( ino == INO_ROOT ) { + fuse_reply_err( req, EISDIR ); + } else { + if ( ( fi->flags & 3 ) != O_RDONLY ) { + fuse_reply_err( req, EINVAL ); + return; + } + mutex_lock( &dirLock ); + dfuse_entry_t *entry = inoRecursive( root, ino ); + if ( entry == NULL ) { + mutex_unlock( &dirLock ); + fuse_reply_err( req, ENOENT ); + } else if ( entry->img == NULL ) { + mutex_unlock( &dirLock ); + fuse_reply_err( req, EISDIR ); + } else { + entry->refcount++; + mutex_unlock( &dirLock ); + cmdopen_t *handle = malloc( sizeof(cmdopen_t) ); + handle->req = req; + handle->fi = *fi; + handle->entry = entry; + handle->image = NULL; + if ( !threadpool_run( &imageOpen, (void*)handle, "fuse-open" ) ) { + free( handle ); + mutex_lock( &dirLock ); + entry->refcount--; + mutex_unlock( &dirLock ); + fuse_reply_err( req, ENOMEM ); + } + } + } +} + +static void *imageOpen(void *data) +{ + cmdopen_t *handle = (cmdopen_t*)data; + assert( handle->image == NULL ); + assert( handle->entry->img->rid != 0 ); + handle->image = image_get( handle->entry->img->name, handle->entry->img->rid, true ); + if ( handle->image == NULL ) { + fuse_reply_err( handle->req, ENOENT ); + mutex_lock( &dirLock ); + handle->entry->refcount--; + mutex_unlock( &dirLock ); + free( handle ); + } else { + handle->fi.fh = (uintptr_t)handle; + handle->fi.keep_cache = 1; + fuse_reply_open( handle->req, &handle->fi ); + handle->req = NULL; + } + return NULL; +} + +static dfuse_entry_t* addImage(dfuse_entry_t **dir, const char *name, lookup_t *img) +{ + const char *slash = strchr( name, '/' ); + if ( slash == NULL ) { + // Name portion at the end + char *path = NULL; + if ( asprintf( &path, "%s:%d", name, (int)img->rid ) == -1 ) + abort(); + dfuse_entry_t *entry = dirLookup( *dir, path ); + if ( entry == NULL ) { + entry = calloc( 1, sizeof( *entry ) ); + entry->next = *dir; + *dir = entry; + entry->name = path; + entry->ino = inoCounter++; + entry->img = img; + } else { + free( path ); + if ( entry->img == NULL ) { + return NULL; + } + } + return entry; + } else { + // Dirname + char *path = NULL; + if ( asprintf( &path, "%.*s", (int)( slash - name ), name ) == -1 ) + abort(); + dfuse_entry_t *entry = dirLookup( *dir, path ); + if ( entry == NULL ) { + entry = calloc( 1, sizeof( *entry ) ); + entry->next = *dir; + *dir = entry; + entry->name = path; + entry->ino = inoCounter++; + } else { + free( path ); + } + return addImage( &entry->child, slash + 1, img ); + } +} + +static void *imageLookup(void *data) +{ + lookup_t *lu = (lookup_t*)data; + dnbd3_image_t *image = image_getOrLoad( lu->name, lu->rid ); + if ( image == NULL ) { + fuse_reply_err( lu->req, ENOENT ); + free( lu ); + } else { + mutex_lock( &dirLock ); + dfuse_entry_t *entry = addImage( &root->child, lu->name, lu ); + if ( entry != NULL ) { + entry->size = image->virtualFilesize; + } + lu->rid = image->rid; // In case it was 0 + mutex_unlock( &dirLock ); + image_release( image ); + if ( entry == NULL ) { + fuse_reply_err( lu->req, EINVAL ); + } else { + fuse_reply_write( lu->req, lu->fuseWriteSize ); + } + } + return NULL; +} + +static void ll_write(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t size, off_t off, struct fuse_file_info *fi UNUSED) +{ + if ( ino != INO_CTRL ) { + fuse_reply_err( req, EROFS ); + return; + } + if ( off != 0 ) { + fuse_reply_err( req, ESPIPE ); + return; + } + if ( size >= PATHLEN ) { + fuse_reply_err( req, ENOSPC ); + return; + } + size_t colon = 0; + int rid = 0; + for ( size_t i = 0; i < size; ++i ) { + if ( buf[i] == '\0' || buf[i] == '\n' ) { + if ( colon == 0 ) { + colon = i; + } + break; + } + if ( colon != 0 ) { + if ( !isdigit( buf[i] ) ) { + logadd( LOG_WARNING, "FUSE: Malformed rid" ); + fuse_reply_err( req, EINVAL ); + return; + } + rid = rid * 10 + ( buf[i] - '0' ); // Can overflow but who cares + } else if ( buf[i] == ':' ) { + colon = i; // Image name starting with ':' would be broken... + } + } + if ( rid < 0 || rid > 65535 ) { + logadd( LOG_WARNING, "FUSE: Invalid rid '%d'", rid ); + fuse_reply_err( req, EINVAL ); + return; + } + if ( colon == 0 ) { + colon = size; + } + lookup_t *lu = malloc( sizeof(lookup_t) ); + lu->rid = (uint16_t)rid; + lu->req = req; + lu->fuseWriteSize = size; + if ( snprintf( lu->name, PATHLEN, "%.*s", (int)colon, buf ) == -1 ) { + free( lu ); + fuse_reply_err( req, ENOSPC ); + return; + } + logadd( LOG_DEBUG1, "FUSE: Request for '%s:%d'", lu->name, (int)lu->rid ); + if ( !threadpool_run( &imageLookup, (void*)lu, "fuse-lookup" ) ) { + free( lu ); + fuse_reply_err( req, EAGAIN ); + } +} + +static void ll_read( fuse_req_t req, fuse_ino_t ino UNUSED, size_t size, off_t off, struct fuse_file_info *fi ) +{ + if ( fi->fh == 0 ) { + fuse_reply_err( req, 0 ); + return; + } + cmdopen_t *handle = (cmdopen_t*)fi->fh; + dnbd3_image_t *image = handle->image; + if ( off < 0 || (uint64_t)off >= image->virtualFilesize ) { + fuse_reply_err( req, 0 ); + return; + } + if ( off + size > image->virtualFilesize ) { + size = image->virtualFilesize - off; + } + + // Check if cached locally + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache != NULL ) { + // This is a proxyed image, check if we need to relay the request... + const uint64_t start = (uint64_t)off & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint64_t end = (off + size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + if ( !image_isRangeCachedUnsafe( cache, start, end ) ) { + ref_put( &cache->reference ); + if ( size > (uint32_t)_maxPayload ) { + size = (uint32_t)_maxPayload; + } + if ( !uplink_request( image, req, &uplinkCallback, 0, off, (uint32_t)size ) ) { + logadd( LOG_DEBUG1, "FUSE: Could not relay uncached request to upstream proxy for image %s:%d", + image->name, image->rid ); + fuse_reply_err( req, EIO ); + } + return; // ASYNC + } + ref_put( &cache->reference ); + } + + // Is cached + size_t readSize = size; + if ( off + readSize > image->realFilesize ) { + if ( (uint64_t)off >= image->realFilesize ) { + readSize = 0; + } else { + readSize = image->realFilesize - off; + } + } + struct fuse_bufvec *vec = calloc( 1, sizeof(*vec) + sizeof(struct fuse_buf) ); + if ( readSize != 0 ) { + // Real data from file + vec->buf[vec->count++] = (struct fuse_buf){ + .size = readSize, + .flags = FUSE_BUF_IS_FD | FUSE_BUF_FD_RETRY | FUSE_BUF_FD_SEEK, + .fd = image->readFd, + .pos = off, + }; + } + if ( readSize != size ) { + vec->buf[vec->count++] = (struct fuse_buf){ + .size = size - readSize, + .mem = nullbytes, + .fd = -1, + }; + } + fuse_reply_data( req, vec, 0 ); + free( vec ); +} + +static bool statInternal(fuse_ino_t ino, struct stat *stbuf) +{ + switch ( ino ) { + case INO_ROOT: + case INO_DIR: + stbuf->st_mode = S_IFDIR | 0555; + stbuf->st_nlink = 2; + stbuf->st_mtim = startupTime; + break; + case INO_CTRL: + stbuf->st_mode = S_IFREG | 0222; + stbuf->st_nlink = 1; + stbuf->st_size = 0; + clock_gettime( CLOCK_REALTIME, &stbuf->st_mtim ); + break; + default: + return false; + } + stbuf->st_ctim = stbuf->st_atim = startupTime; + stbuf->st_uid = 0; + stbuf->st_ino = ino; + return true; +} + +/** + * HOLD LOCK + */ +static dfuse_entry_t* dirLookup(dfuse_entry_t *dir, const char *name) +{ + if ( dir == NULL ) + return NULL; + for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) { + if ( strcmp( it->name, name ) == 0 ) + return it; + } + return NULL; +} + +static dfuse_entry_t* inoRecursive(dfuse_entry_t *dir, fuse_ino_t ino) +{ + for ( dfuse_entry_t *it = dir; it != NULL; it = it->next ) { + logadd( LOG_DEBUG1, "ino %d is %s", (int)it->ino, it->name ); + if ( it->ino == ino ) + return it; + if ( it->img == NULL ) { + dir = inoRecursive( it->child, ino ); + if ( dir != NULL ) + return dir; + } + } + return NULL; +} + +/** + * HOLD LOCK + */ +static void entryToStat(dfuse_entry_t *entry, struct stat *stbuf) +{ + if ( entry->img == NULL ) { + stbuf->st_mode = S_IFDIR | 0555; + stbuf->st_nlink = 2; + } else { + stbuf->st_mode = S_IFREG | 0444; + stbuf->st_nlink = 1; + stbuf->st_size = entry->size; + } + stbuf->st_ino = entry->ino; + stbuf->st_uid = 0; + stbuf->st_ctim = stbuf->st_atim = stbuf->st_mtim = startupTime; +} + +static void ll_lookup(fuse_req_t req, fuse_ino_t parent, const char *name) +{ + logadd( LOG_DEBUG2, "Lookup at ino %d for '%s'", (int)parent, name ); + if ( parent == INO_ROOT ) { + struct fuse_entry_param e = { 0 }; + if ( strcmp( name, NAME_DIR ) == 0 ) { + e.ino = INO_DIR; + } else if ( strcmp( name, NAME_CTRL ) == 0 ) { + e.ino = INO_CTRL; + e.attr_timeout = e.entry_timeout = 3600; + } + if ( e.ino != 0 && statInternal( e.ino, &e.attr ) ) { + fuse_reply_entry( req, &e ); + return; + } + } else { + mutex_lock( &dirLock ); + dfuse_entry_t *dir = inoRecursive( root, parent ); + if ( dir != NULL ) { + if ( dir->img != NULL ) { + mutex_unlock( &dirLock ); + fuse_reply_err( req, ENOTDIR ); + return; + } + dfuse_entry_t *entry = dirLookup( dir->child, name ); + if ( entry != NULL ) { + struct fuse_entry_param e = { .ino = entry->ino }; + entryToStat( entry, &e.attr ); + mutex_unlock( &dirLock ); + fuse_reply_entry( req, &e ); + return; + } + } + mutex_unlock( &dirLock ); + } + fuse_reply_err( req, ENOENT ); +} + +static void ll_getattr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi UNUSED) +{ + struct stat stbuf = { .st_ino = 0 }; + if ( !statInternal( ino, &stbuf ) ) { + mutex_lock( &dirLock ); + dfuse_entry_t *entry = inoRecursive( root, ino ); + if ( entry != NULL ) { + entryToStat( entry, &stbuf ); + } + mutex_unlock( &dirLock ); + } + if ( stbuf.st_ino == 0 ) { + fuse_reply_err( req, ENOENT ); + } else { + fuse_reply_attr( req, &stbuf, 0 ); + } +} + +void ll_setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr UNUSED, int to_set UNUSED, struct fuse_file_info *fi) +{ + ll_getattr( req, ino, fi ); +} + +void ll_release(fuse_req_t req, fuse_ino_t ino UNUSED, struct fuse_file_info *fi) +{ + if ( fi->fh != 0 ) { + cmdopen_t *handle = (cmdopen_t*)fi->fh; + image_release( handle->image ); + free( handle ); + } + fuse_reply_err( req, 0 ); +} + +static void uplinkCallback(void *data, uint64_t handle UNUSED, uint64_t start UNUSED, uint32_t length, const char *buffer) +{ + fuse_req_t req = (fuse_req_t)data; + if ( buffer == NULL ) { + fuse_reply_err( req, EIO ); + } else { + fuse_reply_buf( req, buffer, length ); + } +} + +/* map the implemented fuse operations */ +static struct fuse_lowlevel_ops fuseOps = { + .lookup = ll_lookup, + .getattr = ll_getattr, + .setattr = ll_setattr, + //.readdir = ll_readdir, + .open = ll_open, + .release = ll_release, + .read = ll_read, + .write = ll_write, + //.init = ll_init, + //.destroy = ll_destroy, +}; + +bool dfuse_init(const char *opts, const char *dir) +{ + int ex = INIT_NONE; + if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_INPROGRESS ) ) { + logadd( LOG_ERROR, "Calling dfuse_init twice" ); + exit( 1 ); + } + mutex_init( &initLock, LOCK_FUSE_INIT ); + mutex_lock( &initLock ); + mutex_init( &dirLock, LOCK_FUSE_DIR ); + clock_gettime( CLOCK_REALTIME, &startupTime ); + struct fuse_args args = FUSE_ARGS_INIT( 0, NULL ); + if ( opts != NULL ) { + fuse_opt_add_arg( &args, opts ); + } + fuse_opt_add_arg( &args, "-odefault_permissions" ); + fuse_opt_add_arg( &args, dir ); + // + if ( fuse_parse_cmdline( &args, &fuseMountPoint, NULL, NULL ) == -1 ) { + logadd( LOG_ERROR, "FUSE: Error parsing command line" ); + goto fail; + } + fuseChannel = fuse_mount( fuseMountPoint, &args ); + if ( fuseChannel == NULL ) { + logadd( LOG_ERROR, "FUSE: Cannot mount to %s", dir ); + goto fail; + } + fuseSession = fuse_lowlevel_new( &args, &fuseOps, sizeof( fuseOps ), NULL ); + if ( fuseSession == NULL ) { + logadd( LOG_ERROR, "FUSE: Error initializing fuse session" ); + goto fail; + } + fuse_session_add_chan( fuseSession, fuseChannel ); + if ( 0 != thread_create( &fuseThreadId, NULL, &fuseMainLoop, (void *)NULL ) ) { + logadd( LOG_ERROR, "FUSE: Could not start thread" ); + goto fail; + } + haveThread = true; + // Init OK + mutex_unlock( &initLock ); + return true; +fail: + cleanupFuse(); + fuse_opt_free_args( &args ); + initState = INIT_SHUTDOWN; + mutex_unlock( &initLock ); + return false; +} + +void dfuse_shutdown() +{ + if ( initState == INIT_NONE ) + return; + for ( ;; ) { + int ex = INIT_DONE; + if ( atomic_compare_exchange_strong( &initState, &ex, INIT_SHUTDOWN ) ) + break; // OK, do the shutdown + if ( ex == INIT_INPROGRESS ) + continue; // dfuse_init in progress, wait for mutex + // Wrong state + logadd( LOG_WARNING, "Called dfuse_shutdown without dfuse_init first" ); + return; + } + logadd( LOG_INFO, "Shutting down fuse mainloop..." ); + mutex_lock( &initLock ); + if ( fuseSession != NULL ) { + fuse_session_exit( fuseSession ); + } + if ( !haveThread ) { + cleanupFuse(); + } + mutex_unlock( &initLock ); + if ( haveThread ) { + logadd( LOG_DEBUG1, "FUSE: Sending USR1 to mainloop thread" ); + pthread_kill( fuseThreadId, SIGUSR1 ); + pthread_join( fuseThreadId, NULL ); + } +} + +static void* fuseMainLoop(void *data UNUSED) +{ + int ex = INIT_INPROGRESS; + if ( !atomic_compare_exchange_strong( &initState, &ex, INIT_DONE ) ) { + logadd( LOG_WARNING, "FUSE: Unexpected state in fuseMainLoop: %d", ex ); + return NULL; + } + logadd( LOG_INFO, "FUSE: Starting mainloop" ); + fuse_session_loop_mt( fuseSession ); + logadd( LOG_INFO, "FUSE: Left mainloop" ); + mutex_lock( &initLock ); + cleanupFuse(); + mutex_unlock( &initLock ); + return NULL; +} + +static void cleanupFuse() +{ + if ( fuseChannel != NULL ) { + fuse_session_remove_chan( fuseChannel ); + } + if ( fuseSession != NULL ) { + fuse_session_destroy( fuseSession ); + fuseSession = NULL; + } + if ( fuseMountPoint != NULL && fuseChannel != NULL ) { + fuse_unmount( fuseMountPoint, fuseChannel ); + } + fuseChannel = NULL; +} + +#endif diff --git a/src/server/fuse.h b/src/server/fuse.h new file mode 100644 index 0000000..f01ad58 --- /dev/null +++ b/src/server/fuse.h @@ -0,0 +1,10 @@ +#ifndef _FUSE_H_ +#define _FUSE_H_ + +#include + +bool dfuse_init(const char *opts, const char *dir); + +void dfuse_shutdown(); + +#endif diff --git a/src/server/globals.h b/src/server/globals.h index 95d8ec2..8807019 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -18,12 +18,15 @@ typedef struct _dnbd3_uplink dnbd3_uplink_t; typedef struct _dnbd3_image dnbd3_image_t; typedef struct _dnbd3_client dnbd3_client_t; +typedef void (*uplink_callback)(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer); + typedef struct _dnbd3_queue_client { struct _dnbd3_queue_client *next; - uint64_t handle; // Handle used by client + void* data; // Passed back to callback + uint64_t handle; // Passed back to callback uint64_t from, to; // Client range - dnbd3_client_t * client; // Client to send reply to + uplink_callback callback; // Callback function } dnbd3_queue_client_t; typedef struct _dnbd3_queue_entry diff --git a/src/server/image.c b/src/server/image.c index efece62..0ffa349 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -339,7 +339,7 @@ dnbd3_image_t* image_byId(int imgId) * point... * Locks on: imageListLock, _images[].lock */ -dnbd3_image_t* image_get(char *name, uint16_t revision, bool ensureFdOpen) +dnbd3_image_t* image_get(const char *name, uint16_t revision, bool ensureFdOpen) { int i; dnbd3_image_t *candidate = NULL; diff --git a/src/server/image.h b/src/server/image.h index b23711b..7b6583c 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -19,7 +19,7 @@ bool image_ensureOpen(dnbd3_image_t *image); dnbd3_image_t* image_byId(int imgId); -dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking); +dnbd3_image_t* image_get(const char *name, uint16_t revision, bool checkIfWorking); bool image_reopenCacheFd(dnbd3_image_t *image, const bool force); diff --git a/src/server/locks.h b/src/server/locks.h index 6111d71..ff27a33 100644 --- a/src/server/locks.h +++ b/src/server/locks.h @@ -23,6 +23,8 @@ #define LOCK_UPLINK_RTT 200 #define LOCK_UPLINK_SEND 210 #define LOCK_RPC_ACL 220 +#define LOCK_FUSE_INIT 300 +#define LOCK_FUSE_DIR 310 // diff --git a/src/server/net.c b/src/server/net.c index 6b930df..c33a12e 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -58,6 +58,7 @@ static atomic_uint_fast64_t totalBytesSent = 0; static bool addToList(dnbd3_client_t *client); static void removeFromList(dnbd3_client_t *client); static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client); +static void uplinkCallback(void *data, uint64_t handle, uint64_t start, uint32_t length, const char *buffer); static inline bool recv_request_header(int sock, dnbd3_request_t *request) { @@ -113,7 +114,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff * Send reply with optional payload. payload can be null. The caller has to * acquire the sendMutex first. */ -static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) +static inline bool send_reply(int sock, dnbd3_reply_t *reply, const void *payload) { const uint32_t size = reply->size; fixup_reply( *reply ); @@ -357,7 +358,9 @@ void* net_handleNewConnection(void *clientPtr) goto exit_client_cleanup; } } - if ( !uplink_request( NULL, client, request.handle, offset, request.size, request.hops ) ) { + client->relayedCount++; + if ( !uplink_requestClient( client, &uplinkCallback, request.handle, offset, request.size, request.hops ) ) { + client->relayedCount--; logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d", client->hostName, image->name, image->rid ); goto exit_client_cleanup; @@ -682,9 +685,21 @@ static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) if ( client->image != NULL ) { dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); if ( uplink != NULL ) { - uplink_removeClient( uplink, client ); + if ( client->relayedCount != 0 ) { + uplink_removeEntry( uplink, client, &uplinkCallback ); + } ref_put( &uplink->reference ); } + if ( client->relayedCount != 0 ) { + logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); + int i; + for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { + usleep( 10000 ); + } + if ( client->relayedCount != 0 ) { + logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); + } + } } mutex_lock( &client->sendMutex ); if ( client->sock != -1 ) { @@ -726,15 +741,21 @@ static bool addToList(dnbd3_client_t *client) return true; } -void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle) +static void uplinkCallback(void *data, uint64_t handle, uint64_t start UNUSED, uint32_t length, const char *buffer) { - dnbd3_reply_t reply; - reply.magic = dnbd3_packet_magic; - reply.cmd = cmd; - reply.handle = handle; - reply.size = 0; + dnbd3_client_t *client = (dnbd3_client_t*)data; + dnbd3_reply_t reply = { + .magic = dnbd3_packet_magic, + .cmd = buffer == NULL ? CMD_ERROR : CMD_GET_BLOCK, + .handle = handle, + .size = length, + }; mutex_lock( &client->sendMutex ); - send_reply( client->sock, &reply, NULL ); + send_reply( client->sock, &reply, buffer ); + if ( buffer == NULL ) { + shutdown( client->sock, SHUT_RDWR ); + } + client->relayedCount--; mutex_unlock( &client->sendMutex ); } diff --git a/src/server/net.h b/src/server/net.h index 7719aef..6813b49 100644 --- a/src/server/net.h +++ b/src/server/net.h @@ -37,6 +37,4 @@ void net_disconnectAll(); void net_waitForAllDisconnected(); -void net_sendReply(dnbd3_client_t *client, uint16_t cmd, uint64_t handle); - #endif /* NET_H_ */ diff --git a/src/server/server.c b/src/server/server.c index fa7bcda..ad8b13e 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -29,6 +29,7 @@ #include "integrity.h" #include "threadpool.h" #include "rpc.h" +#include "fuse.h" #include "../version.h" #include "../shared/sockhelper.h" @@ -108,6 +109,9 @@ void dnbd3_printHelp(char *argv_0) printf( "Usage: %s [OPTIONS]...\n", argv_0 ); printf( "Start the DNBD3 server\n" ); printf( "-c or --config Configuration directory (default /etc/dnbd3-server/)\n" ); +#ifdef BUILD_SERVER_FUSE + printf( "-m or --mount FUSE mount point\n "); +#endif printf( "-n or --nodaemon Start server in foreground\n" ); printf( "-b or --bind Local Address to bind to\n" ); printf( "-h or --help Show this help text and quit\n" ); @@ -140,6 +144,8 @@ _Noreturn static void dnbd3_cleanup() _shutdown = true; logadd( LOG_INFO, "Cleanup..." ); + dfuse_shutdown(); + if ( hasTimerThread ) { pthread_kill( timerThread, SIGINT ); thread_join( timerThread, NULL ); @@ -190,11 +196,13 @@ int main(int argc, char *argv[]) char *paramCreate = NULL; char *bindAddress = NULL; char *errorMsg = NULL; + char *mountDir = NULL; int64_t paramSize = -1; int paramRevision = -1; - static const char *optString = "b:c:d:hnv?"; + static const char *optString = "b:c:m:d:hnv?"; static const struct option longOpts[] = { { "config", required_argument, NULL, 'c' }, + { "mount", required_argument, NULL, 'm' }, { "nodaemon", no_argument, NULL, 'n' }, { "reload", no_argument, NULL, 'r' }, { "help", no_argument, NULL, 'h' }, @@ -218,6 +226,13 @@ int main(int argc, char *argv[]) case 'c': _configDir = strdup( optarg ); break; + case 'm': +#ifndef BUILD_SERVER_FUSE + fprintf( "FUSE support not enabled at build time.\n" ); + return 8; +#endif + mountDir = strdup( optarg ); + break; case 'n': demonize = 0; break; @@ -342,6 +357,11 @@ int main(int argc, char *argv[]) net_init(); uplink_globalsInit(); rpc_init(); + if ( mountDir != NULL && !dfuse_init( NULL, mountDir ) ) { + logadd( LOG_ERROR, "Cannot mount fuse directory to %s", mountDir ); + dnbd3_cleanup(); + return EXIT_FAILURE; + } logadd( LOG_INFO, "DNBD3 server starting...." ); logadd( LOG_INFO, "Machine type: " ENDIAN_MODE ); logadd( LOG_INFO, "Build Type: " TOSTRING( BUILD_TYPE ) ); @@ -385,6 +405,7 @@ int main(int argc, char *argv[]) // Initialize thread pool if ( !threadpool_init( 8 ) ) { logadd( LOG_ERROR, "Could not init thread pool!\n" ); + dnbd3_cleanup(); exit( EXIT_FAILURE ); } @@ -526,10 +547,11 @@ static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED) if ( info->si_pid != 0 && !pthread_equal( pthread_self(), mainThread ) ) { pthread_kill( mainThread, info->si_signo ); // And relay signal if we're not the main thread } - } - if ( pthread_equal( pthread_self(), mainThread ) ) { - // Signal received by main thread -- handle - dnbd3_handleSignal( signum ); + // Source is not this process -- only then do we honor signals + if ( pthread_equal( pthread_self(), mainThread ) ) { + // Signal received by main thread -- handle + dnbd3_handleSignal( signum ); + } } } diff --git a/src/server/uplink.c b/src/server/uplink.c index bf6f32e..cb4f956 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -38,6 +38,7 @@ static bool connectionShouldShutdown(dnbd3_uplink_t *uplink); static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew); static int numWantedReplicationRequests(dnbd3_uplink_t *uplink); static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle); +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); static void *prefetchForClient(void *data); typedef struct { @@ -180,8 +181,7 @@ static void cancelAllRequests(dnbd3_uplink_t *uplink) while ( it != NULL ) { dnbd3_queue_client_t *cit = it->clients; while ( cit != NULL ) { - net_sendReply( cit->client, CMD_ERROR, cit->handle ); - cit->client->relayedCount--; + (*cit->callback)( cit->data, cit->handle, 0, 0, NULL ); dnbd3_queue_client_t *next = cit->next; free( cit ); cit = next; @@ -229,15 +229,13 @@ static void freeUplinkStruct(ref *ref) * Remove given client from uplink request queue * Locks on: uplink.queueLock */ -void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) +void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback) { - if ( client->relayedCount == 0 ) - return; mutex_lock( &uplink->queueLock ); for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) { - if ( (**cit).client == client ) { - --client->relayedCount; + if ( (**cit).data == data && (**cit).callback == callback ) { + (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL ); dnbd3_queue_client_t *entry = *cit; *cit = (**cit).next; free( entry ); @@ -247,63 +245,78 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) } } mutex_unlock( &uplink->queueLock ); - if ( unlikely( client->relayedCount != 0 ) ) { - logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); - int i; - for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { - usleep( 10000 ); - } - if ( client->relayedCount != 0 ) { - logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); - } - } } -/** - * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. - * If client is NULL, this is assumed to be a background replication request. - * Locks on: uplink.queueLock, uplink.sendMutex - */ -bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - bool getUplink = ( uplink == NULL ); - assert( client != NULL || uplink != NULL ); - if ( hops++ > 200 ) { // This is just silly + assert( client != NULL && callback != NULL ); + if ( hops > 200 ) { // This is just silly logadd( LOG_WARNING, "Refusing to relay a request that has > 200 hops" ); return false; } - if ( length > (uint32_t)_maxPayload ) { - logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); - return false; - } - if ( getUplink ) { + dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( client->image, -1, NULL, -1 ); uplink = ref_get_uplink( &client->image->uplinkref ); - if ( unlikely( uplink == NULL ) ) { - uplink_init( client->image, -1, NULL, -1 ); - uplink = ref_get_uplink( &client->image->uplinkref ); - if ( uplink == NULL ) { - logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); - return false; - } + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; } } - if ( uplink->shutdown ) { - logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); - goto fail_ref; - } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) + bool ret; if ( client != NULL && hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); - goto fail_ref; + ret = false; + } else { + ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops ); + } + ref_put( &uplink->reference ); + return ret; +} + +bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length) +{ + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( unlikely( uplink == NULL ) ) { + uplink_init( image, -1, NULL, -1 ); + uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } + } + bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 ); + ref_put( &uplink->reference ); + return ret; +} + +/** + * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. + * If client is NULL, this is assumed to be a background replication request. + * Locks on: uplink.queueLock, uplink.sendMutex + */ +static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +{ + assert( uplink != NULL ); + assert( data == NULL || callback != NULL ); + if ( uplink->shutdown ) { + logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); + return false; + } + if ( length > (uint32_t)_maxPayload ) { + logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length ); + return false; } struct { uint64_t handle, start, end; } req; + hops++; do { const uint64_t end = start + length; dnbd3_queue_entry_t *request = NULL, *last = NULL; @@ -350,14 +363,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han #endif request->hopCount = hops; request->sent = true; // Optimistic; would be set to false on failure - if ( client == NULL ) { + if ( callback == NULL ) { // BGR request->clients = NULL; } else { c = &request->clients; } isNew = true; - } else if ( client == NULL ) { + } else if ( callback == NULL ) { // Replication request that maches existing request. Do nothing isNew = false; } else { @@ -381,14 +394,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han req.handle = request->handle; req.start = request->from; req.end = request->to; - if ( client != NULL ) { + if ( callback != NULL ) { *c = malloc( sizeof( *request->clients ) ); (**c).next = NULL; (**c).handle = handle; (**c).from = start; (**c).to = end; - (**c).client = client; - client->relayedCount++; + (**c).data = data; + (**c).callback = callback; } mutex_unlock( &uplink->queueLock ); @@ -426,7 +439,7 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han } success_ref: - if ( client != NULL ) { + if ( callback != NULL ) { // Was from client -- potential prefetch // Same size as this request, but consider end of image... uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end, @@ -442,16 +455,9 @@ success_ref: threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" ); } } - if ( getUplink ) { - ref_put( &uplink->reference ); - } return true; fail_lock: mutex_unlock( &uplink->queueLock ); -fail_ref: - if ( getUplink ) { - ref_put( &uplink->reference ); - } return false; } @@ -461,7 +467,7 @@ static void *prefetchForClient(void *data) dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image ); if ( cache != NULL ) { if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) { - uplink_request( job->uplink, NULL, ++job->uplink->queueId, job->start, job->length, 0 ); + uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 ); } ref_put( &cache->reference ); } @@ -824,7 +830,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink) const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); const uint64_t handle = ++uplink->queueId; - if ( !uplink_request( uplink, NULL, handle, offset, size, 0 ) ) { + if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)", PIMG(uplink->image) ); ref_put( &cache->reference ); @@ -902,7 +908,7 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa */ static void handleReceive(dnbd3_uplink_t *uplink) { - dnbd3_reply_t inReply, outReply; + dnbd3_reply_t inReply; int ret; assert_uplink_thread(); for (;;) { @@ -967,7 +973,6 @@ static void handleReceive(dnbd3_uplink_t *uplink) logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)", inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) ); } - struct iovec iov[2]; // 1) Write to cache file if ( unlikely( uplink->cacheFd == -1 ) ) { reopenCacheFd( uplink, false ); @@ -1035,28 +1040,11 @@ static void handleReceive(dnbd3_uplink_t *uplink) PIMG(uplink->image) ); continue; } - outReply.magic = dnbd3_packet_magic; dnbd3_queue_client_t *next; for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { assert( c->from >= start && c->to <= end ); - dnbd3_client_t * const client = c->client; - outReply.cmd = CMD_GET_BLOCK; - outReply.handle = c->handle; - outReply.size = (uint32_t)( c->to - c->from ); - iov[0].iov_base = &outReply; - iov[0].iov_len = sizeof outReply; - iov[1].iov_base = uplink->recvBuffer + (c->from - start); - iov[1].iov_len = outReply.size; - fixup_reply( outReply ); - mutex_lock( &client->sendMutex ); - if ( client->sock != -1 ) { - ssize_t sent = writev( client->sock, iov, 2 ); - if ( sent > (ssize_t)sizeof outReply ) { - client->bytesSent += (size_t)sent - sizeof outReply; - } - } - mutex_unlock( &client->sendMutex ); - client->relayedCount--; + (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ), + (const char*)( uplink->recvBuffer + (c->from - start) ) ); next = c->next; free( c ); } diff --git a/src/server/uplink.h b/src/server/uplink.h index 8f69b05..6cd69ea 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -10,9 +10,11 @@ uint64_t uplink_getTotalBytesReceived(); bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version); -void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client); +void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback); -bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); +bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); + +bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length); bool uplink_shutdown(dnbd3_image_t *image); -- cgit v1.2.3-55-g7522