From 4c94cb861dfbfe2a8c9c165165cbdbc062eaa39b Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Sat, 21 Nov 2015 12:24:21 +0100 Subject: [FUSE] Start refactoring so we can handle multithread fuse --- src/fuse/connection.c | 211 ++++++++++++++++++++++++++++++++++++++++++++++++ src/fuse/connection.h | 25 ++++++ src/fuse/helper.c | 14 ++-- src/fuse/helper.h | 9 +-- src/fuse/main.c | 2 +- src/protocol.h | 138 ------------------------------- src/serialize.c | 2 +- src/serialize.h | 2 +- src/server/altservers.c | 4 +- src/server/image.c | 4 +- src/server/locks.c | 2 +- src/server/signal.c | 52 ------------ src/server/signal.h | 49 ----------- src/server/threadpool.c | 2 +- src/server/uplink.c | 4 +- src/shared/protocol.h | 138 +++++++++++++++++++++++++++++++ src/shared/signal.c | 52 ++++++++++++ src/shared/signal.h | 49 +++++++++++ 18 files changed, 497 insertions(+), 262 deletions(-) create mode 100644 src/fuse/connection.c create mode 100644 src/fuse/connection.h delete mode 100644 src/protocol.h delete mode 100644 src/server/signal.c delete mode 100644 src/server/signal.h create mode 100644 src/shared/protocol.h create mode 100644 src/shared/signal.c create mode 100644 src/shared/signal.h (limited to 'src') diff --git a/src/fuse/connection.c b/src/fuse/connection.c new file mode 100644 index 0000000..3e1bf38 --- /dev/null +++ b/src/fuse/connection.c @@ -0,0 +1,211 @@ +#include "connection.h" +#include "helper.h" +#include "../config.h" +#include "../shared/protocol.h" +#include "../shared/signal.h" + +#include +#include +#include +#include + +static const size_t SHORTBUF = 100; + +static bool initDone = false; +pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; + +static struct { + dnbd3_async_t *head; + dnbd3_async_t *tail; + pthread_spinlock_t lock; +} requests; + +static struct { + char *name; + uint16_t rid; + uint64_t size; + int sockFd; + pthread_mutex_t sendMutex; + pthread_t receiveThread; +} image; + +static bool throwDataAway(int sockFd, uint32_t amount); +static void enqueueRequest(dnbd3_async_t *request); +static dnbd3_async_t* removeRequest(dnbd3_async_t *request); + +static void* connection_receiveThreadMain(void *sock); + +bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid) +{ + int sock = -1; + char host[SHORTBUF]; + const char *current, *end; + serialized_buffer_t buffer; + uint16_t remoteVersion, remoteRid; + char *remoteName; + uint64_t remoteSize; + + pthread_mutex_lock( &mutexInit ); + if ( !initDone ) { + current = hosts; + do { + // Get next host from string + while ( *current == ' ' ) current++; + end = strchr( current, ' ' ); + size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); + if ( len > SHORTBUF ) len = SHORTBUF; + snprintf( host, len, "%s", current ); + current = end + 1; + // Try to connect + sock = connect_to_server( host, PORT ); // TODO: Parse port from host + if ( sock != -1 && dnbd3_select_image( sock, lowerImage, rid, 0 ) + && dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) + && ( rid == 0 || rid == remoteRid ) ) { + image.name = strdup(remoteName); + image.rid = remoteRid; + image.size = remoteSize; + break; + } + // Failed + if ( sock != -1 ) { + close( sock ); + sock = -1; + } + // TODO: Add to alt list + } while ( end != NULL ); + if ( sock != -1 ) { + if ( pthread_mutex_init( &image.sendMutex, NULL ) != 0 + || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 + || pthread_create( &image.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) { + close( sock ); + sock = -1; + } else { + image.sockFd = sock; + requests.head = NULL; + requests.tail = NULL; + } + initDone = true; + } + } + pthread_mutex_unlock( &mutexInit ); + return sock != -1; +} + +bool connection_read(dnbd3_async_t *request) +{ + if (!initDone) return false; + enqueueRequest( request ); + pthread_mutex_lock( &image.sendMutex ); + if ( image.sockFd != -1 ) { + while ( !dnbd3_get_block( image.sockFd, request->offset, request->length, (uint64_t)request ) ) { + shutdown( image.sockFd, SHUT_RDWR ); + image.sockFd = -1; + // TODO reconnect! + pthread_mutex_unlock( &image.sendMutex ); + return false; + } + } + pthread_mutex_unlock( &image.sendMutex ); + return true; +} + +void connection_close() +{ + // +} + +static bool throwDataAway(int sockFd, uint32_t amount) +{ + uint32_t done = 0; + char tempBuffer[SHORTBUF]; + while ( done < amount ) { + if ( recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), 0 ) <= 0 ) + return false; + } + return true; +} + +static void enqueueRequest(dnbd3_async_t *request) +{ + request->next = NULL; + request->finished = false; + request->success = false; + pthread_spin_lock( &requests.lock ); + if ( requests.head == NULL ) { + requests.head = requests.tail = request; + } else { + requests.tail->next = request; + requests.tail = request; + } + pthread_spin_unlock( &requests.lock ); +} + +static dnbd3_async_t* removeRequest(dnbd3_async_t *request) +{ + pthread_spin_lock( &requests.lock ); + dnbd3_async_t *iterator, *prev = NULL; + for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { + if ( iterator == request ) { + // Found it, break! + if ( prev != NULL ) { + prev->next = iterator->next; + } + if ( requests.tail == iterator ) { + requests.tail = prev; + } + break; + } + prev = iterator; + } + pthread_spin_unlock( &requests.lock ); + return iterator; +} + +static void* connection_receiveThreadMain(void *sockPtr) +{ + int sockFd = (int)(size_t)sockPtr; + dnbd3_reply_t reply; + for ( ;; ) { + if ( !dnbd3_get_reply( image.sockFd, &reply ) ) + goto fail; + // TODO: Ignoring anything but get block replies for now; handle the others + if ( reply.cmd != CMD_GET_BLOCK ) { + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) + goto fail; + } else { + // get block reply. find matching request + dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); + if ( request == NULL ) { + printf("WARNING BUG ALERT SOMETHING: Got block reply with no matching request\n"); + if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) + goto fail; + } else { + // Found a match + request->finished = true; + uint32_t done = 0; + while ( done < request->length ) { + if ( recv( sockFd, request->buffer + done, request->length - done, 0 ) <= 0 ) { + request->success = false; + signal_call( request->signalFd ); + goto fail; + } + } + // Success, wake up caller + request->success = true; + signal_call( request->signalFd ); + } + } + } +fail:; + // Make sure noone is trying to use the socket for sending by locking, + pthread_mutex_lock( &image.sendMutex ); + // then just set the fd to -1, but only if it's the same fd as ours, + // as someone could have established a new connection already + if ( image.sockFd == sockFd ) { + image.sockFd = -1; + } + pthread_mutex_unlock( &image.sendMutex ); + // As we're the only reader, it's safe to close the socket now + close( sockFd ); + return NULL; +} diff --git a/src/fuse/connection.h b/src/fuse/connection.h new file mode 100644 index 0000000..8ab2c35 --- /dev/null +++ b/src/fuse/connection.h @@ -0,0 +1,25 @@ +#ifndef _CONNECTION_H_ +#define _CONNECTION_H_ + +#include +#include + +struct _dnbd3_async; + +typedef struct _dnbd3_async { + struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller) + char* buffer; // Caller-provided buffer to be filled + uint64_t offset; + uint32_t length; + int signalFd; // Used to signal the caller + bool finished; // Will be set to true if the request has been handled + bool success; // Will be set to true if the request succeeded +} dnbd3_async_t; + +bool connection_init(const char *hosts, const char *image, const uint16_t rid); + +bool connection_read(dnbd3_async_t *request); + +void connection_close(); + +#endif /* CONNECTION_H_ */ diff --git a/src/fuse/helper.c b/src/fuse/helper.c index 7b1101d..65644f8 100644 --- a/src/fuse/helper.c +++ b/src/fuse/helper.c @@ -1,9 +1,13 @@ -/* - * Helper functions for imageFuse - * by Stephan Schwaer, January 2014 - */ - #include "helper.h" + +#include +#include +#include +#include +#include +#include + + void printLog( log_info *info ) { FILE *logFile; diff --git a/src/fuse/helper.h b/src/fuse/helper.h index 1c972e4..bbba44c 100644 --- a/src/fuse/helper.h +++ b/src/fuse/helper.h @@ -1,14 +1,9 @@ #ifndef IMAGEHELPER_H #define IMAGEHELPER_H -#include "../protocol.h" #include -#include -#include -#include -#include -#include -#include +#include +#include typedef struct log_info { diff --git a/src/fuse/main.c b/src/fuse/main.c index 1da12d8..d6a4d98 100644 --- a/src/fuse/main.c +++ b/src/fuse/main.c @@ -7,7 +7,7 @@ * Changed by Stephan Schwaer * */ -#include "../protocol.h" +#include "../shared/protocol.h" #include "../serialize.h" #include "helper.h" diff --git a/src/protocol.h b/src/protocol.h deleted file mode 100644 index d119dc5..0000000 --- a/src/protocol.h +++ /dev/null @@ -1,138 +0,0 @@ -#ifndef _PROTOCOL_H_ -#define _PROTOCOL_H_ - -#include -#include -#include -#include "types.h" -#include "serialize.h" - -#define FLAGS8_SERVER (1) - -#define REPLY_OK (0) -#define REPLY_ERRNO (-1) -#define REPLY_AGAIN (-2) -#define REPLY_INTR (-3) -#define REPLY_CLOSED (-4) -#define REPLY_INCOMPLETE (-5) -#define REPLY_WRONGMAGIC (-6) - -static inline int dnbd3_read_reply(int sock, dnbd3_reply_t *reply, bool wait) -{ - int ret = recv( sock, reply, sizeof(*reply), (wait ? MSG_WAITALL : MSG_DONTWAIT) | MSG_NOSIGNAL ); - if ( ret == 0 ) return REPLY_CLOSED; - if ( ret < 0 ) { - if ( errno == EAGAIN || errno == EWOULDBLOCK ) return REPLY_AGAIN; - if ( errno == EINTR ) return REPLY_INTR; - return REPLY_ERRNO; - } - if ( !wait && ret != sizeof(*reply) ) ret += recv( sock, ((char*)reply) + ret, sizeof(*reply) - ret, MSG_WAITALL | MSG_NOSIGNAL ); - if ( ret != sizeof(*reply) ) return REPLY_INCOMPLETE; - fixup_reply( *reply ); - if ( reply->magic != dnbd3_packet_magic ) return REPLY_WRONGMAGIC; - return REPLY_OK; -} - -static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply) -{ - return dnbd3_read_reply( sock, reply, true ) == REPLY_OK; -} - -static inline bool dnbd3_select_image(int sock, char *lower_name, uint16_t rid, uint8_t flags8) -{ - serialized_buffer_t serialized; - dnbd3_request_t request; - struct iovec iov[2]; - serializer_reset_write( &serialized ); - serializer_put_uint16( &serialized, PROTOCOL_VERSION ); - serializer_put_string( &serialized, lower_name ); - serializer_put_uint16( &serialized, rid ); - serializer_put_uint8( &serialized, flags8 ); - const ssize_t len = serializer_get_written_length( &serialized ); - request.magic = dnbd3_packet_magic; - request.cmd = CMD_SELECT_IMAGE; - request.size = len; -#ifdef _DEBUG - request.handle = 0; - request.offset = 0; -#endif - fixup_request( request ); - iov[0].iov_base = &request; - iov[0].iov_len = sizeof(request); - iov[1].iov_base = &serialized; - iov[1].iov_len = len; - return writev( sock, iov, 2 ) == len + (ssize_t)sizeof(request); -} - -static inline bool dnbd3_get_block(int sock, uint64_t offset, uint32_t size, uint64_t handle) -{ - dnbd3_request_t request; - request.magic = dnbd3_packet_magic; - request.handle = handle; - request.cmd = CMD_GET_BLOCK; - request.offset = offset; - request.size = size; - fixup_request( request ); - return send( sock, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); -} - -static inline bool dnbd3_get_crc32(int sock, uint32_t *master, void *buffer, size_t *bufferLen) -{ - dnbd3_request_t request; - dnbd3_reply_t reply; - request.magic = dnbd3_packet_magic; - request.handle = 0; - request.cmd = CMD_GET_CRC32; - request.offset = 0; - request.size = 0; - fixup_request( request ); - if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) return false; - if ( !dnbd3_get_reply( sock, &reply ) ) return false; - if ( reply.size == 0 ) { - *bufferLen = 0; - return true; - } - if ( reply.size < 4 ) return false; - reply.size -= 4; - if ( reply.cmd != CMD_GET_CRC32 || reply.size > *bufferLen ) return false; - *bufferLen = reply.size; - if ( recv( sock, master, sizeof(uint32_t), MSG_WAITALL | MSG_NOSIGNAL ) != sizeof(uint32_t) ) return false; - uint32_t done = 0; - while ( done < reply.size ) { - const ssize_t ret = recv( sock, (char*)buffer + done, reply.size - done, 0 ); - if ( ret <= 0 ) return false; - done += ret; - } - return true; -} - -/** - * Pass a full serialized_buffer_t and a socket fd. Parsed data will be returned in further arguments. - * Note that all strings will point into the passed buffer, so there's no need to free them. - * This function will also read the header for you, as this message can only occur during connection, - * where no unrequested messages could arrive inbetween. - */ -static inline bool dnbd3_select_image_reply(serialized_buffer_t *buffer, int sock, uint16_t *protocol_version, char **name, uint16_t *rid, - uint64_t *imageSize) -{ - dnbd3_reply_t reply; - if ( !dnbd3_get_reply( sock, &reply ) ) { - return false; - } - if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD ) { - return false; - } -// receive reply payload - if ( recv( sock, buffer, reply.size, MSG_WAITALL | MSG_NOSIGNAL ) != reply.size ) { - return false; - } -// handle/check reply payload - serializer_reset_read( buffer, reply.size ); - *protocol_version = serializer_get_uint16( buffer ); - *name = serializer_get_string( buffer ); - *rid = serializer_get_uint16( buffer ); - *imageSize = serializer_get_uint64( buffer ); - return true; -} - -#endif diff --git a/src/serialize.c b/src/serialize.c index b2be8fc..409b682 100644 --- a/src/serialize.c +++ b/src/serialize.c @@ -71,7 +71,7 @@ void serializer_put_uint64(serialized_buffer_t *buffer, uint64_t value) buffer->buffer_pointer += 8; } -void serializer_put_string(serialized_buffer_t *buffer, char *value) +void serializer_put_string(serialized_buffer_t *buffer, const char *value) { const size_t len = strlen(value) + 1; if (buffer->buffer_pointer + len > buffer->buffer_end) return; diff --git a/src/serialize.h b/src/serialize.h index 14d3d13..63b6de9 100644 --- a/src/serialize.h +++ b/src/serialize.h @@ -35,6 +35,6 @@ void serializer_put_uint16(serialized_buffer_t *buffer, uint16_t value); void serializer_put_uint64(serialized_buffer_t *buffer, uint64_t value); -void serializer_put_string(serialized_buffer_t *buffer, char *value); +void serializer_put_string(serialized_buffer_t *buffer, const char *value); #endif diff --git a/src/server/altservers.c b/src/server/altservers.c index 99cbdec..8eb07ec 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -6,9 +6,9 @@ #include "helper.h" #include "globals.h" #include "image.h" -#include "signal.h" +#include "../shared/signal.h" #include "log.h" -#include "../protocol.h" +#include "../shared/protocol.h" #include #include #include diff --git a/src/server/image.c b/src/server/image.c index fccc2a9..6f2cb6a 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -5,11 +5,11 @@ #include "uplink.h" #include "locks.h" #include "integrity.h" -#include "../protocol.h" +#include "../shared/protocol.h" #include "sockhelper.h" #include "altservers.h" #include "server.h" -#include "signal.h" +#include "../shared/signal.h" #include #include diff --git a/src/server/locks.c b/src/server/locks.c index 6c411e5..8c4fcfb 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -17,7 +17,7 @@ #include "globals.h" #include "log.h" #include "helper.h" -#include "signal.h" +#include "../shared/signal.h" #define MAXLOCKS 2000 #define MAXTHREADS 500 diff --git a/src/server/signal.c b/src/server/signal.c deleted file mode 100644 index a0697f8..0000000 --- a/src/server/signal.c +++ /dev/null @@ -1,52 +0,0 @@ -#include "signal.h" -#include -#include -#include -#include -#include - -int signal_new() -{ - return eventfd( 0, EFD_NONBLOCK ); -} - -int signal_newBlocking() -{ - return eventfd( 0, 0 ); -} - -int signal_call(int signalFd) -{ - if ( signalFd < 0 ) return 0; - static uint64_t one = 1; - return write( signalFd, &one, sizeof one ) == sizeof one; -} - -int signal_wait(int signalFd, int timeoutMs) -{ - struct pollfd ps = { - .fd = signalFd, - .events = POLLIN - }; - int ret = poll( &ps, 1, timeoutMs ); - if ( ret == 0 ) return SIGNAL_TIMEOUT; - if ( ret == -1 ) return SIGNAL_ERROR; - if ( ps.revents & ( POLLERR | POLLNVAL ) ) return SIGNAL_ERROR; - return signal_clear( signalFd ); -} - -int signal_clear(int signalFd) -{ - uint64_t ret; - if ( read( signalFd, &ret, sizeof ret ) != sizeof ret ) { - if ( errno == EAGAIN ) return 0; - return SIGNAL_ERROR; - } - return (int)ret; -} - -void signal_close(int signalFd) -{ - close( signalFd ); -} - diff --git a/src/server/signal.h b/src/server/signal.h deleted file mode 100644 index 0e2f85f..0000000 --- a/src/server/signal.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef _SIGNAL_H_ -#define _SIGNAL_H_ - -#define SIGNAL_OK (0) -#define SIGNAL_TIMEOUT (-2) -#define SIGNAL_ERROR (-1) - -/** - * Create a new signal fd (eventfd), nonblocking. - * @return >= 0 on success, which is the fd; < 0 on error - */ -int signal_new(); - -/** - * Create a new signal fd (eventfd), blocking. - * @return >= 0 on success, which is the fd; < 0 on error - */ -int signal_newBlocking(); - -/** - * Trigger the given signal, so a wait or clear call will succeed. - * @return SIGNAL_OK on success, SIGNAL_ERROR on error - */ -int signal_call(int signalFd); - -/** - * Wait for given signal, with an optional timeout. - * If timeout == 0, just poll once. - * If timeout < 0, wait forever. - * @return > 0 telling how many times the signal was called, - * SIGNAL_TIMEOUT if the timeout was reached, - * SIGNAL_ERROR if some error occured - */ -int signal_wait(int signalFd, int timeoutMs); - -/** - * Clears any pending signals on this signal fd. - * @return number of signals that were pending, - * SIGNAL_ERROR if some error occured - */ -int signal_clear(int signalFd); - -/** - * Close the given signal. - */ -void signal_close(int signalFd); - -#endif - diff --git a/src/server/threadpool.c b/src/server/threadpool.c index ad146ea..41f1f0b 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -1,7 +1,7 @@ #include "globals.h" #include "helper.h" #include "threadpool.h" -#include "signal.h" +#include "../shared/signal.h" #include "locks.h" #include diff --git a/src/server/uplink.c b/src/server/uplink.c index aa72896..a205164 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -6,8 +6,8 @@ #include "helper.h" #include "altservers.h" #include "helper.h" -#include "../protocol.h" -#include "signal.h" +#include "../shared/protocol.h" +#include "../shared/signal.h" #include #include diff --git a/src/shared/protocol.h b/src/shared/protocol.h new file mode 100644 index 0000000..41e9af2 --- /dev/null +++ b/src/shared/protocol.h @@ -0,0 +1,138 @@ +#ifndef _PROTOCOL_H_ +#define _PROTOCOL_H_ + +#include +#include +#include +#include "../types.h" +#include "../serialize.h" + +#define FLAGS8_SERVER (1) + +#define REPLY_OK (0) +#define REPLY_ERRNO (-1) +#define REPLY_AGAIN (-2) +#define REPLY_INTR (-3) +#define REPLY_CLOSED (-4) +#define REPLY_INCOMPLETE (-5) +#define REPLY_WRONGMAGIC (-6) + +static inline int dnbd3_read_reply(int sock, dnbd3_reply_t *reply, bool wait) +{ + int ret = recv( sock, reply, sizeof(*reply), (wait ? MSG_WAITALL : MSG_DONTWAIT) | MSG_NOSIGNAL ); + if ( ret == 0 ) return REPLY_CLOSED; + if ( ret < 0 ) { + if ( errno == EAGAIN || errno == EWOULDBLOCK ) return REPLY_AGAIN; + if ( errno == EINTR ) return REPLY_INTR; + return REPLY_ERRNO; + } + if ( !wait && ret != sizeof(*reply) ) ret += recv( sock, ((char*)reply) + ret, sizeof(*reply) - ret, MSG_WAITALL | MSG_NOSIGNAL ); + if ( ret != sizeof(*reply) ) return REPLY_INCOMPLETE; + fixup_reply( *reply ); + if ( reply->magic != dnbd3_packet_magic ) return REPLY_WRONGMAGIC; + return REPLY_OK; +} + +static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply) +{ + return dnbd3_read_reply( sock, reply, true ) == REPLY_OK; +} + +static inline bool dnbd3_select_image(int sock, const char *lower_name, uint16_t rid, uint8_t flags8) +{ + serialized_buffer_t serialized; + dnbd3_request_t request; + struct iovec iov[2]; + serializer_reset_write( &serialized ); + serializer_put_uint16( &serialized, PROTOCOL_VERSION ); + serializer_put_string( &serialized, lower_name ); + serializer_put_uint16( &serialized, rid ); + serializer_put_uint8( &serialized, flags8 ); + const ssize_t len = serializer_get_written_length( &serialized ); + request.magic = dnbd3_packet_magic; + request.cmd = CMD_SELECT_IMAGE; + request.size = len; +#ifdef _DEBUG + request.handle = 0; + request.offset = 0; +#endif + fixup_request( request ); + iov[0].iov_base = &request; + iov[0].iov_len = sizeof(request); + iov[1].iov_base = &serialized; + iov[1].iov_len = len; + return writev( sock, iov, 2 ) == len + (ssize_t)sizeof(request); +} + +static inline bool dnbd3_get_block(int sock, uint64_t offset, uint32_t size, uint64_t handle) +{ + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + request.handle = handle; + request.cmd = CMD_GET_BLOCK; + request.offset = offset; + request.size = size; + fixup_request( request ); + return send( sock, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); +} + +static inline bool dnbd3_get_crc32(int sock, uint32_t *master, void *buffer, size_t *bufferLen) +{ + dnbd3_request_t request; + dnbd3_reply_t reply; + request.magic = dnbd3_packet_magic; + request.handle = 0; + request.cmd = CMD_GET_CRC32; + request.offset = 0; + request.size = 0; + fixup_request( request ); + if ( send( sock, &request, sizeof(request), 0 ) != sizeof(request) ) return false; + if ( !dnbd3_get_reply( sock, &reply ) ) return false; + if ( reply.size == 0 ) { + *bufferLen = 0; + return true; + } + if ( reply.size < 4 ) return false; + reply.size -= 4; + if ( reply.cmd != CMD_GET_CRC32 || reply.size > *bufferLen ) return false; + *bufferLen = reply.size; + if ( recv( sock, master, sizeof(uint32_t), MSG_WAITALL | MSG_NOSIGNAL ) != sizeof(uint32_t) ) return false; + uint32_t done = 0; + while ( done < reply.size ) { + const ssize_t ret = recv( sock, (char*)buffer + done, reply.size - done, 0 ); + if ( ret <= 0 ) return false; + done += ret; + } + return true; +} + +/** + * Pass a full serialized_buffer_t and a socket fd. Parsed data will be returned in further arguments. + * Note that all strings will point into the passed buffer, so there's no need to free them. + * This function will also read the header for you, as this message can only occur during connection, + * where no unrequested messages could arrive inbetween. + */ +static inline bool dnbd3_select_image_reply(serialized_buffer_t *buffer, int sock, uint16_t *protocol_version, char **name, uint16_t *rid, + uint64_t *imageSize) +{ + dnbd3_reply_t reply; + if ( !dnbd3_get_reply( sock, &reply ) ) { + return false; + } + if ( reply.cmd != CMD_SELECT_IMAGE || reply.size < 3 || reply.size > MAX_PAYLOAD ) { + return false; + } +// receive reply payload + if ( recv( sock, buffer, reply.size, MSG_WAITALL | MSG_NOSIGNAL ) != reply.size ) { + return false; + } +// handle/check reply payload + serializer_reset_read( buffer, reply.size ); + *protocol_version = serializer_get_uint16( buffer ); + *name = serializer_get_string( buffer ); + *rid = serializer_get_uint16( buffer ); + *imageSize = serializer_get_uint64( buffer ); + return true; +} + +#endif diff --git a/src/shared/signal.c b/src/shared/signal.c new file mode 100644 index 0000000..a0697f8 --- /dev/null +++ b/src/shared/signal.c @@ -0,0 +1,52 @@ +#include "signal.h" +#include +#include +#include +#include +#include + +int signal_new() +{ + return eventfd( 0, EFD_NONBLOCK ); +} + +int signal_newBlocking() +{ + return eventfd( 0, 0 ); +} + +int signal_call(int signalFd) +{ + if ( signalFd < 0 ) return 0; + static uint64_t one = 1; + return write( signalFd, &one, sizeof one ) == sizeof one; +} + +int signal_wait(int signalFd, int timeoutMs) +{ + struct pollfd ps = { + .fd = signalFd, + .events = POLLIN + }; + int ret = poll( &ps, 1, timeoutMs ); + if ( ret == 0 ) return SIGNAL_TIMEOUT; + if ( ret == -1 ) return SIGNAL_ERROR; + if ( ps.revents & ( POLLERR | POLLNVAL ) ) return SIGNAL_ERROR; + return signal_clear( signalFd ); +} + +int signal_clear(int signalFd) +{ + uint64_t ret; + if ( read( signalFd, &ret, sizeof ret ) != sizeof ret ) { + if ( errno == EAGAIN ) return 0; + return SIGNAL_ERROR; + } + return (int)ret; +} + +void signal_close(int signalFd) +{ + close( signalFd ); +} + diff --git a/src/shared/signal.h b/src/shared/signal.h new file mode 100644 index 0000000..0e2f85f --- /dev/null +++ b/src/shared/signal.h @@ -0,0 +1,49 @@ +#ifndef _SIGNAL_H_ +#define _SIGNAL_H_ + +#define SIGNAL_OK (0) +#define SIGNAL_TIMEOUT (-2) +#define SIGNAL_ERROR (-1) + +/** + * Create a new signal fd (eventfd), nonblocking. + * @return >= 0 on success, which is the fd; < 0 on error + */ +int signal_new(); + +/** + * Create a new signal fd (eventfd), blocking. + * @return >= 0 on success, which is the fd; < 0 on error + */ +int signal_newBlocking(); + +/** + * Trigger the given signal, so a wait or clear call will succeed. + * @return SIGNAL_OK on success, SIGNAL_ERROR on error + */ +int signal_call(int signalFd); + +/** + * Wait for given signal, with an optional timeout. + * If timeout == 0, just poll once. + * If timeout < 0, wait forever. + * @return > 0 telling how many times the signal was called, + * SIGNAL_TIMEOUT if the timeout was reached, + * SIGNAL_ERROR if some error occured + */ +int signal_wait(int signalFd, int timeoutMs); + +/** + * Clears any pending signals on this signal fd. + * @return number of signals that were pending, + * SIGNAL_ERROR if some error occured + */ +int signal_clear(int signalFd); + +/** + * Close the given signal. + */ +void signal_close(int signalFd); + +#endif + -- cgit v1.2.3-55-g7522