From 7b51c287a60d2f202fb131eeed9d1bf19b65a7a3 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 24 Nov 2015 12:30:46 +0100 Subject: [FUSE] Mid-refactoring, does not compile --- src/config.h | 2 +- src/fuse/connection.c | 245 ++++++++++++++++++++++++++++++++++++----- src/fuse/helper.c | 51 --------- src/fuse/helper.h | 8 ++ src/fuse/log.h | 43 ++++++++ src/fuse/main.c | 2 +- src/server/altservers.c | 2 +- src/server/image.c | 2 +- src/server/net.c | 2 +- src/server/rpc.c | 2 +- src/server/server.c | 2 +- src/server/sockhelper.c | 214 ------------------------------------ src/server/sockhelper.h | 86 --------------- src/server/uplink.c | 2 +- src/shared/signal.h | 1 - src/shared/sockhelper.c | 287 ++++++++++++++++++++++++++++++++++++++++++++++++ src/shared/sockhelper.h | 93 ++++++++++++++++ 17 files changed, 659 insertions(+), 385 deletions(-) create mode 100644 src/fuse/log.h delete mode 100644 src/server/sockhelper.c delete mode 100644 src/server/sockhelper.h create mode 100644 src/shared/sockhelper.c create mode 100644 src/shared/sockhelper.h diff --git a/src/config.h b/src/config.h index 271e4bc..3ec0f8b 100644 --- a/src/config.h +++ b/src/config.h @@ -25,7 +25,7 @@ // ########### SERVER ########### // +++++ Performance related -#define SERVER_MAX_CLIENTS 2000 +#define SERVER_MAX_CLIENTS 400 #define SERVER_MAX_IMAGES 5000 #define SERVER_MAX_ALTS 250 #define SERVER_MAX_UPLINK_QUEUE 1500 diff --git a/src/fuse/connection.c b/src/fuse/connection.c index 039c532..cdfc1df 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -3,21 +3,28 @@ #include "../config.h" #include "../shared/protocol.h" #include "../shared/signal.h" +#include "../shared/sockhelper.h" #include #include #include #include +#include +#include /* Constants */ static const size_t SHORTBUF = 100; #define MAX_ALTS (8) +#define MAX_HOSTS_PER_ADDRESS (2) +static const int MAX_CONSECUTIVE_FAILURES = 16; +#define RTT_COUNT (4) /* Module variables */ // Init guard static bool initDone = false; static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; +static bool keepRunning = true; // List of pending requests static struct { @@ -31,14 +38,23 @@ static struct { char *name; uint16_t rid; uint64_t size; +} image; + +static struct { int sockFd; pthread_mutex_t sendMutex; pthread_t receiveThread; -} image; + int panicSignalFd; + bool panicMode; +} connection; // Known alt servers static struct _alt_server { - + dnbd3_host_t host; + int consecutiveFails; + int rtt; + int rtts[RTT_COUNT]; + int rttIndex; } altservers[MAX_ALTS]; typedef struct _alt_server alt_server_t; @@ -47,22 +63,28 @@ typedef struct _alt_server alt_server_t; static void* connection_receiveThreadMain(void *sock); +static void probeAltServers(); static bool throwDataAway(int sockFd, uint32_t amount); static void enqueueRequest(dnbd3_async_t *request); static dnbd3_async_t* removeRequest(dnbd3_async_t *request); +static uint64_t nowMilli(); +static uint64_t nowMicro(); bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid) { int sock = -1; char host[SHORTBUF]; - const char *current, *end; serialized_buffer_t buffer; uint16_t remoteVersion, remoteRid; char *remoteName; uint64_t remoteSize; pthread_mutex_lock( &mutexInit ); - if ( !initDone ) { + if ( !initDone && keepRunning ) { + dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; + const char *current, *end; + int altIndex = 0; + memset( altservers, 0, sizeof altservers ); current = hosts; do { // Get next host from string @@ -71,9 +93,23 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); if ( len > SHORTBUF ) len = SHORTBUF; snprintf( host, len, "%s", current ); + int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); + for ( int i = 0; i < newHosts; ++i ) { + if ( altIndex >= MAX_ALTS ) + break; + altservers[altIndex].host = tempHosts[i]; + altIndex += 1; + } current = end + 1; + } while ( end != NULL && altIndex < MAX_ALTS ); + printf( "Got %d servers from init call\n", altIndex ); + // Connect + for ( int i = 0; i < altIndex; ++i ) { + if ( altservers[i].host.type == 0 ) + continue; // Try to connect - sock = connect_to_server( host, PORT ); // TODO: Parse port from host + sock = sock_connect( &altservers[i].host, 500, SOCKET_KEEPALIVE_TIMEOUT * 2000 ); // TODO timeout... + printf( "Got socket %d\n", sock ); if ( sock != -1 && dnbd3_select_image( sock, lowerImage, rid, 0 ) && dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) && ( rid == 0 || rid == remoteRid ) ) { @@ -87,16 +123,18 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r close( sock ); sock = -1; } - // TODO: Add to alt list - } while ( end != NULL ); + } if ( sock != -1 ) { - if ( pthread_mutex_init( &image.sendMutex, NULL ) != 0 + printf( "Initializing stuff\n" ); + if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 - || pthread_create( &image.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) { + || pthread_create( &connection.receiveThread, NULL, &connection_receiveThreadMain, (void*)(size_t)sock ) != 0 ) { close( sock ); sock = -1; } else { - image.sockFd = sock; + connection.sockFd = sock; + connection.panicMode = false; + connection.panicSignalFd = signal_new(); requests.head = NULL; requests.tail = NULL; } @@ -111,31 +149,42 @@ bool connection_read(dnbd3_async_t *request) { if (!initDone) return false; enqueueRequest( request ); - pthread_mutex_lock( &image.sendMutex ); - if ( image.sockFd != -1 ) { - while ( !dnbd3_get_block( image.sockFd, request->offset, request->length, (uint64_t)request ) ) { - shutdown( image.sockFd, SHUT_RDWR ); - image.sockFd = -1; + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + while ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request ) ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; // TODO reconnect! - pthread_mutex_unlock( &image.sendMutex ); + pthread_mutex_unlock( &connection.sendMutex ); return false; } } - pthread_mutex_unlock( &image.sendMutex ); + pthread_mutex_unlock( &connection.sendMutex ); return true; } void connection_close() { - // + pthread_mutex_lock( &mutexInit ); + keepRunning = false; + if ( !initDone ) { + pthread_mutex_unlock( &mutexInit ); + return; + } + pthread_mutex_unlock( &mutexInit ); + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + shutdown( connection.sockFd, SHUT_RDWR ); + } + pthread_mutex_unlock( &connection.sendMutex ); } static void* connection_receiveThreadMain(void *sockPtr) { int sockFd = (int)(size_t)sockPtr; dnbd3_reply_t reply; - for ( ;; ) { - if ( !dnbd3_get_reply( image.sockFd, &reply ) ) + while ( keepRunning ) { + if ( !dnbd3_get_reply( connection.sockFd, &reply ) ) goto fail; // TODO: Ignoring anything but block replies for now; handle the others if ( reply.cmd != CMD_GET_BLOCK ) { @@ -167,26 +216,152 @@ static void* connection_receiveThreadMain(void *sockPtr) } fail:; // Make sure noone is trying to use the socket for sending by locking, - pthread_mutex_lock( &image.sendMutex ); + pthread_mutex_lock( &connection.sendMutex ); // then just set the fd to -1, but only if it's the same fd as ours, // as someone could have established a new connection already - if ( image.sockFd == sockFd ) { - image.sockFd = -1; + if ( connection.sockFd == sockFd ) { + connection.sockFd = -1; } - pthread_mutex_unlock( &image.sendMutex ); + pthread_mutex_unlock( &connection.sendMutex ); // As we're the only reader, it's safe to close the socket now close( sockFd ); return NULL; } +static void* connection_backgroundThread(void *something UNUSED) +{ + uint64_t nextKeepalive = 0; + uint64_t nextRttCheck = 0; + const uint64_t startupTime = nowMilli(); + + while ( keepRunning ) { + const uint64_t now = nowMilli(); + if ( now < nextKeepalive && now < nextRttCheck ) { + int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now ); + int waitRes = signal_wait( connection.panicSignalFd, waitTime ); + if ( waitRes == SIGNAL_ERROR ) { + printf( "Error waiting on signal in background thread! Errno = %d\n", errno ); + } + } + // Woken up, see what we have to do + // Check alt servers + if ( connection.panicMode || now < nextRttCheck ) { + probeAltServers(); + if ( connection.panicMode || startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) { + nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull; + } else { + nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull; + } + } + // Send keepalive packet + if ( now < nextKeepalive ) { + pthread_mutex_lock( &connection.sendMutex ); + if ( connection.sockFd != -1 ) { + printf( "Sending keepalive...\n" ); + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + request.cmd = CMD_KEEPALIVE; + request.size = 0; + fixup_request( request ); + ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 ); + if ( (size_t)ret != sizeof request ) { + shutdown( connection.sockFd, SHUT_RDWR ); + connection.sockFd = -1; + connection.panicMode = true; + } + } + pthread_mutex_unlock( &connection.sendMutex ); + nextKeepalive = now + TIMER_INTERVAL_KEEPALIVE_PACKET * 1000ull; + } + } + return NULL; +} + // Private quick helpers +static void probeAltServers() +{ + serialized_buffer_t buffer; + dnbd3_request_t request; + dnbd3_reply_t reply; + int bestIndex = -1; + int bestSock = -1; + uint16_t remoteRid, remoteProto; + uint64_t remoteSize; + char *remoteName; + + for ( int altIndex = 0; altIndex < MAX_ALTS; ++altIndex ) { + alt_server_t * const srv = &altservers[altIndex]; + if ( srv->host.type == 0 ) + continue; + if ( !connection.panicMode && srv->consecutiveFails > MAX_CONSECUTIVE_FAILURES + && srv->consecutiveFails % ( srv->consecutiveFails / 8 ) != 0 ) { + continue; + } + if (srv->rttIndex >= RTT_COUNT) { + srv->rttIndex = 0; + } else { + srv->rttIndex += 1; + } + // Probe + const uint64_t start = nowMicro(); + int sock = sock_connect( &srv->host, connection.panicMode ? 1000 : 333, 1000 ); + if ( sock == -1 ) { + printf( "Could not crrate socket for probing. errno = %d\n", errno ); + continue; + } + if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { + goto fail; + } + if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) { + goto fail; + } + if ( remoteProto < MIN_SUPPORTED_SERVER || remoteProto > PROTOCOL_VERSION ) { + printf( "Unsupported remote version\n" ); + goto fail; + } + if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) { + printf( "Remote rid or name mismatch\n" ); + goto fail; + } + if ( !dnbd3_get_block( sock, 0, RTT_BLOCK_SIZE, 0 ) ) { + goto fail; + } + if ( !dnbd3_get_reply( sock, &reply ) || reply.size != RTT_BLOCK_SIZE + || !throwDataAway( sock, RTT_BLOCK_SIZE ) ) { + goto fail; + } + // Yay, success + const uint64_t end = nowMicro(); + srv->consecutiveFails = 0; + srv->rtts[srv->rttIndex] = (int)(end - start); + srv->rtt = 0; + for ( int i = 0; i < RTT_COUNT; ++i ) { + srv->rtt += srv->rtts[i]; + } + srv->rtt /= RTT_COUNT; + if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) { + bestIndex = altIndex; + close( bestSock ); + bestSock = sock; + } else { + close( sock ); + } + continue; // XXX: Remember current server, compare to it, update value on change, +fail:; + close( sock ); + srv->rtts[srv->rttIndex] = RTT_UNREACHABLE; + srv->consecutiveFails += 1; + } +} + static bool throwDataAway(int sockFd, uint32_t amount) { uint32_t done = 0; char tempBuffer[SHORTBUF]; while ( done < amount ) { - if ( recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), 0 ) <= 0 ) + const ssize_t ret = recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ), MSG_NOSIGNAL ); + if ( ret == 0 || ( ret < 0 && ret != EINTR ) ) return false; } return true; @@ -227,3 +402,23 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request) pthread_spin_unlock( &requests.lock ); return iterator; } + +static uint64_t nowMilli() +{ + struct timespec ts; + if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { + printf( "clock_gettime() failed. Errno: %d\n", errno ); + return 0; + } + return ( ts.tv_sec * 1000ull ) + ( ts.tv_nsec / 1000000ull ); +} + +static uint64_t nowMicro() +{ + struct timespec ts; + if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { + printf( "clock_gettime() failed. Errno: %d\n", errno ); + return 0; + } + return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull ); +} diff --git a/src/fuse/helper.c b/src/fuse/helper.c index 65644f8..6e46352 100644 --- a/src/fuse/helper.c +++ b/src/fuse/helper.c @@ -47,54 +47,3 @@ bool sock_printable( struct sockaddr *addr, socklen_t addrLen, char *output, int } return ret == 0; } - -// TODO: Pretty much same as in server/* -int connect_to_server( char *server_address, int port ) -{ - const int on = 1; - int sock = -1; - struct addrinfo hints, *res, *ptr; - char portStr[6]; - - // Set hints for local addresses. - memset( &hints, 0, sizeof( hints ) ); - hints.ai_flags = AI_PASSIVE; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - snprintf( portStr, sizeof portStr, "%d", port ); - if ( getaddrinfo( server_address, portStr, &hints, &res ) != 0 || res == NULL ) { - return false; - } - // Attempt to bind to all of the addresses as long as there's room in the poll list - for ( ptr = res; ptr != NULL; ptr = ptr->ai_next ) { - char bla[100]; - if ( !sock_printable( ( struct sockaddr * ) ptr->ai_addr, ptr->ai_addrlen, bla, 100 ) ) { - snprintf( bla, 100, "[invalid]" ); - } - printf( "Trying to connect to %s ", bla ); - sock = socket( ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol ); - if ( sock < 0 ) { - printf( "...cannot create socket, errno=%d\n", errno ); - sock = -1; - continue; - } - setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof( on ) ); - if ( ptr->ai_family == PF_INET6 ) { - setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof( on ) ); - } - if ( connect( sock, ptr->ai_addr, ptr->ai_addrlen ) < 0 ) { - // if ( bind( sock, ptr->ai_addr, ptr->ai_addrlen ) == -1 ) { - printf( "...socket Error, errno=%d\n", errno ); - close( sock ); - sock = -1; - continue; - } else { - printf( "... connecting successful!\n" ); - break; - } - } - - freeaddrinfo( res ); - return sock; -} - diff --git a/src/fuse/helper.h b/src/fuse/helper.h index bbba44c..35cdc8a 100644 --- a/src/fuse/helper.h +++ b/src/fuse/helper.h @@ -4,6 +4,9 @@ #include #include #include +#include + +#include "../types.h" typedef struct log_info { @@ -21,4 +24,9 @@ bool sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int int connect_to_server(char *server_adress, int port); +static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_host_t * const b) +{ + return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == AF_INET ? 4 : 16) )); +} + #endif diff --git a/src/fuse/log.h b/src/fuse/log.h new file mode 100644 index 0000000..e429861 --- /dev/null +++ b/src/fuse/log.h @@ -0,0 +1,43 @@ +#ifndef LOG_H_ +#define LOG_H_ + +#include +#include +#include + +typedef unsigned int logmask_t; +#define LOG_ERROR ((logmask_t)1) // Fatal error, server will terminate +#define LOG_WARNING ((logmask_t)2) // Major issue, something is broken but keep running +#define LOG_MINOR ((logmask_t)4) // Minor issue, more of a hickup than serious problem +#define LOG_INFO ((logmask_t)8) // Informational message +#define LOG_DEBUG1 ((logmask_t)16) // Debug information, use this for non-spammy stuff +#define LOG_DEBUG2 ((logmask_t)32) // Use this for debug messages that will show up a lot + +//void log_setFileMask(logmask_t mask); + +//void log_setConsoleMask(logmask_t mask); + +/** + * Open or reopen the log file. If path is NULL and the + * function was called with a path before, the same path + * will be used again. + */ +//bool log_openLogFile(const char *path); + +/** + * Add a line to the log + */ +void logadd(const logmask_t mask, const char *text, ...) +{ + va_list args; + va_start( args, text ); + vprintf( text, args ); + va_end( args ); +} + +/** + * Return last size bytes of log. + */ +//bool log_fetch(char *buffer, int size); + +#endif /* LOG_H_ */ diff --git a/src/fuse/main.c b/src/fuse/main.c index d6a4d98..7889023 100644 --- a/src/fuse/main.c +++ b/src/fuse/main.c @@ -58,7 +58,7 @@ static void dnbd3_connect() if ( sock != -1 ) { close( sock ); } - sock = connect_to_server( server_address, portno ); + sock = -1; // connect_to_server( server_address, portno ); if ( sock == -1 ) { debugf( "[ERROR] Connection Error!\n" ); diff --git a/src/server/altservers.c b/src/server/altservers.c index 7a2aaf7..bbe33ba 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -1,7 +1,7 @@ #include "altservers.h" #include "uplink.h" #include "locks.h" -#include "sockhelper.h" +#include "../shared/sockhelper.h" #include "log.h" #include "helper.h" #include "globals.h" diff --git a/src/server/image.c b/src/server/image.c index 6f2cb6a..8d2a77f 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -6,7 +6,7 @@ #include "locks.h" #include "integrity.h" #include "../shared/protocol.h" -#include "sockhelper.h" +#include "../shared/sockhelper.h" #include "altservers.h" #include "server.h" #include "../shared/signal.h" diff --git a/src/server/net.c b/src/server/net.c index a7bdf86..4aab050 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -31,7 +31,7 @@ #include #include -#include "sockhelper.h" +#include "../shared/sockhelper.h" #include "helper.h" #include "server.h" #include "image.h" diff --git a/src/server/rpc.c b/src/server/rpc.c index b709b94..6152d99 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -4,7 +4,7 @@ #include "uplink.h" #include "log.h" #include "locks.h" -#include "sockhelper.h" +#include "../shared/sockhelper.h" #include "helper.h" #include "image.h" diff --git a/src/server/server.c b/src/server/server.c index 384ffa6..f47f208 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -33,7 +33,7 @@ #include "../version.h" #include "locks.h" -#include "sockhelper.h" +#include "../shared/sockhelper.h" #include "server.h" #include "image.h" #include "uplink.h" diff --git a/src/server/sockhelper.c b/src/server/sockhelper.c deleted file mode 100644 index fb09ec2..0000000 --- a/src/server/sockhelper.c +++ /dev/null @@ -1,214 +0,0 @@ -#include "sockhelper.h" -#include "log.h" -#include -#include -#include // inet_ntop -#include -#include -#include -#include -#include -#include -#include -#include - -#define MAXLISTEN 20 - -struct _poll_list { - int count; - struct pollfd entry[MAXLISTEN]; -}; - -int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms) -{ - // TODO: Move out of here, this unit should contain general socket functions - // TODO: Rework the dnbd3_host_t to not use AF_* as these could theoretically change - // TODO: Abstract away from sockaddr_in* like the rest of the functions here do, - // so WITH_IPV6 can finally be removed as everything is transparent. - struct sockaddr_storage ss; - int proto, addrlen; - memset( &ss, 0, sizeof ss ); - if ( addr->type == AF_INET ) { - // Set host (IPv4) - struct sockaddr_in *addr4 = (struct sockaddr_in*)&ss; - addr4->sin_family = AF_INET; - memcpy( &addr4->sin_addr, addr->addr, 4 ); - addr4->sin_port = addr->port; - proto = PF_INET; - addrlen = sizeof *addr4; - } -#ifdef WITH_IPV6 - else if ( addr->type == AF_INET6 ) { - // Set host (IPv6) - struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)&ss; - addr6->sin6_family = AF_INET6; - memcpy( &addr6->sin6_addr, addr->addr, 16 ); - addr6->sin6_port = addr->port; - proto = PF_INET6; - addrlen = sizeof *addr6; - } -#endif - else { - logadd( LOG_DEBUG1, "Unsupported address type: %d\n", (int)addr->type ); - return -1; - } - int client_sock = socket( proto, SOCK_STREAM, IPPROTO_TCP ); - if ( client_sock == -1 ) return -1; - // Apply connect timeout - sock_setTimeout( client_sock, connect_ms ); - if ( connect( client_sock, (struct sockaddr *)&ss, addrlen ) == -1 ) { - close( client_sock ); - return -1; - } - // Apply read/write timeout - sock_setTimeout( client_sock, rw_ms ); - return client_sock; -} - -void sock_setTimeout(const int sockfd, const int milliseconds) -{ - struct timeval tv; - tv.tv_sec = milliseconds / 1000; - tv.tv_usec = (milliseconds * 1000) % 1000000; - setsockopt( sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv) ); - setsockopt( sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) ); -} - -poll_list_t* sock_newPollList() -{ - poll_list_t *list = (poll_list_t*)malloc( sizeof( poll_list_t ) ); - list->count = 0; - return list; -} - -void sock_destroyPollList(poll_list_t *list) -{ - for ( int i = 0; i < list->count; ++i ) { - if ( list->entry[i].fd >= 0 ) close( list->entry[i].fd ); - } - free( list ); -} - -bool sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int len) -{ - char host[100], port[10]; - int ret = getnameinfo( addr, addrLen, host, 100, port, 10, NI_NUMERICHOST | NI_NUMERICSERV ); - if ( ret == 0 ) snprintf( output, len, "[%s]:%s", host, port ); - return ret == 0; -} - -bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port) -{ - if ( list->count >= MAXLISTEN ) return false; - struct addrinfo hints, *res, *ptr; - char portStr[6]; - const int on = 1; - int openCount = 0; - // Set hints for local addresses. - memset( &hints, 0, sizeof(hints) ); - hints.ai_flags = AI_PASSIVE; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - snprintf( portStr, sizeof portStr, "%d", (int)port ); - if ( getaddrinfo( bind_addr, portStr, &hints, &res ) != 0 || res == NULL ) return false; - // Attempt to bind to all of the addresses as long as there's room in the poll list - for( ptr = res; ptr != NULL; ptr = ptr->ai_next ) { - char bla[100]; - if ( !sock_printable( (struct sockaddr*)ptr->ai_addr, ptr->ai_addrlen, bla, 100 ) ) snprintf( bla, 100, "[invalid]" ); - logadd( LOG_DEBUG1, "Binding to %s...", bla ); - int sock = socket( ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol ); - if ( sock < 0 ) { - logadd( LOG_WARNING, "(Bind to %s): cannot socket(), errno=%d", bla, errno ); - continue; - } - setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) ); - if ( ptr->ai_family == PF_INET6 ) setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on) ); - if ( bind( sock, ptr->ai_addr, ptr->ai_addrlen ) == -1 ) { - logadd( LOG_WARNING, "(Bind to %s): cannot bind(), errno=%d", bla, errno ); - close( sock ); - continue; - } - if ( listen( sock, 20 ) == -1 ) { - logadd( LOG_WARNING, "(Bind to %s): cannot listen(), errno=%d", errno ); - close( sock ); - continue; - } - list->entry[list->count].fd = sock; - list->entry[list->count].events = POLLIN | POLLRDHUP; - list->count++; - openCount++; - if ( list->count >= MAXLISTEN ) break; - } - freeaddrinfo( res ); - return openCount > 0; -} - -int sock_listenAny(poll_list_t* list, uint16_t port) -{ - return sock_listen( list, NULL, port ); -} - -int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr) -{ - int ret = poll( list->entry, list->count, -1 ); - if ( ret < 0 ) { - return -1; - } - for ( int i = list->count - 1; i >= 0; --i ) { - if ( list->entry[i].revents == 0 ) continue; - if ( list->entry[i].revents == POLLIN ) return accept( list->entry[i].fd, (struct sockaddr *)addr, length_ptr ); - if ( list->entry[i].revents & ( POLLNVAL | POLLHUP | POLLERR | POLLRDHUP ) ) { - logadd( LOG_DEBUG1, "poll fd revents=%d for index=%d and fd=%d", (int)list->entry[i].revents, i, list->entry[i].fd ); - if ( ( list->entry[i].revents & POLLNVAL ) == 0 ) close( list->entry[i].fd ); - if ( i != list->count ) list->entry[i] = list->entry[list->count]; - list->count--; - } - } - return -1; -} - -void sock_set_nonblock(int sock) -{ - int flags = fcntl( sock, F_GETFL, 0 ); - if ( flags == -1 ) flags = 0; - fcntl( sock, F_SETFL, flags | O_NONBLOCK ); -} - -void sock_set_block(int sock) -{ - int flags = fcntl( sock, F_GETFL, 0 ); - if ( flags == -1 ) flags = 0; - fcntl( sock, F_SETFL, flags & ~(int)O_NONBLOCK ); -} - -bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite) -{ - if ( sock == -1 || list->count >= MAXLISTEN ) return false; - list->entry[list->count++].fd = sock; - list->entry[list->count++].events = ( wantRead ? POLLIN : 0 ) | ( wantWrite ? POLLOUT : 0 ) | POLLRDHUP; - list->count++; - return true; -} - -ssize_t sock_sendAll(int sock, void *buffer, size_t len, int maxtries) -{ - size_t done = 0; - ssize_t ret = 0; - while ( done < len ) { - if ( maxtries >= 0 && --maxtries == -1 ) break; - ret = write( sock, (char*)buffer + done, len - done ); - if ( ret < 0 ) { - if ( errno == EINTR ) continue; - if ( errno == EAGAIN || errno == EWOULDBLOCK ) { - usleep( 1000 ); - continue; - } - break; - } - if ( ret == 0 ) break; - done += ret; - } - if ( done == 0 ) return ret; - return done; -} - diff --git a/src/server/sockhelper.h b/src/server/sockhelper.h deleted file mode 100644 index 3f4d485..0000000 --- a/src/server/sockhelper.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef SOCKHELPER_H_ -#define SOCKHELPER_H_ - -/* - * Helper functions for dealing with sockets. These functions should - * abstract from the IP version by using getaddrinfo() and thelike. - */ - -#include -#include "../types.h" -#include -#include - -typedef struct _poll_list poll_list_t; - -/** - * Connect to given dnbd3_host_t. - * @param addr - address of host to connect to - * @param connect_ms - timeout in milliseconds after which the connection attempt fails - * @param rw_ms - read/write timeout in milliseconds to apply on successful connect - * @return socket file descriptor, or -1 on error - */ -int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms); - -void sock_setTimeout(const int sockfd, const int milliseconds); - -/** - * Create new poll list. - */ -poll_list_t* sock_newPollList(); - -/** - * Delete a poll list, closing all sockets first if necessary. - */ -void sock_destroyPollList(poll_list_t *list); - -/** - * Listen on all interfaces/available IP addresses, using the given protocol. - * IPv4 and IPv6 are supported. - * @param protocol_family PF_INET or PF_INET6 - * @param port port to listen on - * @return the socket descriptor if successful, -1 otherwise. - */ -int sock_listenAny(poll_list_t* list, uint16_t port); - -/** - * Listen on a specific address and port. - * @param addr pointer to a properly filled sockaddr_in or sockaddr_in6 - * @param addrlen length of the passed struct - */ -bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port); - -/** - * This is a multi-socket version of accept. Pass in an array of listening sockets. - * If any of the sockets has an incoming connection, accept it and return the new socket's fd. - * On error, return -1, just like accept(). - * @param sockets array of listening socket fds - * @param socket_count number of sockets in that array - * @return fd of new client socket, -1 on error - */ -int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr); - -void sock_set_nonblock(int sock); - -void sock_set_block(int sock); - -/** - * Add given socket to array. Take an existing empty slot ( == -1) if available, - * append to end otherwise. Updates socket count variable passed by reference. - * - * @param poll_list_t list the poll list to add the socket to - * @param sock socket fd to add - * @param wantRead whether to set the EPOLLIN flag - * @param wantWrite whether to set the EPOLLOUT flag - * @return true on success, false iff the array is already full or socket is < 0 - */ -bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite); - -/** - * Send the whole buffer, calling write() multiple times if neccessary. - * Give up after calling write() maxtries times. - * Set maxtries < 0 to try infinitely. - */ -ssize_t sock_sendAll(int sock, void *buffer, size_t len, int maxtries); - -#endif /* SOCKHELPER_H_ */ diff --git a/src/server/uplink.c b/src/server/uplink.c index a205164..7910dc1 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -1,7 +1,7 @@ #include "uplink.h" #include "locks.h" #include "log.h" -#include "sockhelper.h" +#include "../shared/sockhelper.h" #include "image.h" #include "helper.h" #include "altservers.h" diff --git a/src/shared/signal.h b/src/shared/signal.h index 0e2f85f..6fd2765 100644 --- a/src/shared/signal.h +++ b/src/shared/signal.h @@ -46,4 +46,3 @@ int signal_clear(int signalFd); void signal_close(int signalFd); #endif - diff --git a/src/shared/sockhelper.c b/src/shared/sockhelper.c new file mode 100644 index 0000000..0b7a1db --- /dev/null +++ b/src/shared/sockhelper.c @@ -0,0 +1,287 @@ +#include "sockhelper.h" +//#include "log.h" +#include +#include +#include // inet_ntop +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAXLISTEN 20 + +struct _poll_list { + int count; + struct pollfd entry[MAXLISTEN]; +}; + +int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms) +{ + // TODO: Move out of here, this unit should contain general socket functions + // TODO: Rework the dnbd3_host_t to not use AF_* as these could theoretically change + // TODO: Abstract away from sockaddr_in* like the rest of the functions here do, + // so WITH_IPV6 can finally be removed as everything is transparent. + struct sockaddr_storage ss; + int proto, addrlen; + memset( &ss, 0, sizeof ss ); + if ( addr->type == AF_INET ) { + // Set host (IPv4) + struct sockaddr_in *addr4 = (struct sockaddr_in*)&ss; + addr4->sin_family = AF_INET; + memcpy( &addr4->sin_addr, addr->addr, 4 ); + addr4->sin_port = addr->port; + proto = PF_INET; + addrlen = sizeof *addr4; + } +#ifdef WITH_IPV6 + else if ( addr->type == AF_INET6 ) { + // Set host (IPv6) + struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)&ss; + addr6->sin6_family = AF_INET6; + memcpy( &addr6->sin6_addr, addr->addr, 16 ); + addr6->sin6_port = addr->port; + proto = PF_INET6; + addrlen = sizeof *addr6; + } +#endif + else { + logadd( LOG_DEBUG1, "Unsupported address type: %d\n", (int)addr->type ); + return -1; + } + int client_sock = socket( proto, SOCK_STREAM, IPPROTO_TCP ); + if ( client_sock == -1 ) return -1; + // Apply connect timeout + sock_setTimeout( client_sock, connect_ms ); + if ( connect( client_sock, (struct sockaddr *)&ss, addrlen ) == -1 ) { + close( client_sock ); + return -1; + } + // Apply read/write timeout + sock_setTimeout( client_sock, rw_ms ); + return client_sock; +} + +// TODO: Pretty much same as in server/* +int sock_resolveToDnbd3Host(const char * const address, dnbd3_host_t * const dest, const int count) +{ + if ( count <= 0 ) + return 0; + const int on = 1; + int sock = -1; + struct addrinfo hints, *res, *ptr; + char bufferAddr[100], bufferPort[6]; + char *addr = bufferAddr; + const char *portStr = NULL; + int addCount = 0; + + // See if we have a port + snprintf( bufferAddr, sizeof bufferAddr, "%s", address ); + const char *c1, *c2; + c1 = strchr( addr, ':' ); + if ( c1 != NULL ) { + c2 = strchr( c1 + 1, ':' ); + if ( c2 == NULL ) { + *c1 = '\0'; + portStr = c1 + 1; + } else if ( *addr == '[' ) { + // IPv6 - support [1:2::3]:123 + do { + c1 = strchr( c2 + 1, ':' ); + if ( c1 != NULL ) c2 = c1; + } while ( c1 != NULL ); + if ( c2[-1] == ']' ) { + c2[-1] = '\0'; + *c2 = '\0'; + addr += 1; + portStr = c2 + 1; + } + } + } + if ( portStr == NULL ) { + portStr = bufferPort; + snprintf( bufferPort, sizeof bufferPort, "%d", (int)PORT ); + } + + // Set hints for local addresses. + memset( &hints, 0, sizeof( hints ) ); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + if ( getaddrinfo( addr, portStr, &hints, &res ) != 0 || res == NULL ) { + return 0; + } + for ( ptr = res; ptr != NULL && count > 0; ptr = ptr->ai_next ) { + // TODO: AF->DNBD3 + if ( ptr->ai_addr->sa_family == AF_INET ) { + // Set host (IPv4) + struct sockaddr_in *addr4 = (struct sockaddr_in*)ptr->ai_addr; + dest[addCount].type = AF_INET; + dest[addCount].port = addr4->sin_port; + memcpy( dest[addCount].addr, &addr4->sin_addr, 4 ); + addCount += 1; +#ifdef WITH_IPV6 + } else if ( ptr->ai_addr->sa_family == AF_INET6 ) { + // Set host (IPv6) + struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)ptr->ai_addr; + dest[addCount].type = AF_INET6; + dest[addCount].port = addr6->sin6_port; + memcpy( dest[addCount].addr, &addr6->sin6_addr, 16 ); + addCount += 1; +#endif + } + } + + freeaddrinfo( res ); + return addCount; +} + +void sock_setTimeout(const int sockfd, const int milliseconds) +{ + struct timeval tv; + tv.tv_sec = milliseconds / 1000; + tv.tv_usec = (milliseconds * 1000) % 1000000; + setsockopt( sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv) ); + setsockopt( sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) ); +} + +poll_list_t* sock_newPollList() +{ + poll_list_t *list = (poll_list_t*)malloc( sizeof( poll_list_t ) ); + list->count = 0; + return list; +} + +void sock_destroyPollList(poll_list_t *list) +{ + for ( int i = 0; i < list->count; ++i ) { + if ( list->entry[i].fd >= 0 ) close( list->entry[i].fd ); + } + free( list ); +} + +bool sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int len) +{ + char host[100], port[10]; + int ret = getnameinfo( addr, addrLen, host, 100, port, 10, NI_NUMERICHOST | NI_NUMERICSERV ); + if ( ret == 0 ) snprintf( output, len, "[%s]:%s", host, port ); + return ret == 0; +} + +bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port) +{ + if ( list->count >= MAXLISTEN ) return false; + struct addrinfo hints, *res, *ptr; + char portStr[6]; + const int on = 1; + int openCount = 0; + // Set hints for local addresses. + memset( &hints, 0, sizeof(hints) ); + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + snprintf( portStr, sizeof portStr, "%d", (int)port ); + if ( getaddrinfo( bind_addr, portStr, &hints, &res ) != 0 || res == NULL ) return false; + // Attempt to bind to all of the addresses as long as there's room in the poll list + for( ptr = res; ptr != NULL; ptr = ptr->ai_next ) { + char bla[100]; + if ( !sock_printable( (struct sockaddr*)ptr->ai_addr, ptr->ai_addrlen, bla, 100 ) ) snprintf( bla, 100, "[invalid]" ); + logadd( LOG_DEBUG1, "Binding to %s...", bla ); + int sock = socket( ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol ); + if ( sock < 0 ) { + logadd( LOG_WARNING, "(Bind to %s): cannot socket(), errno=%d", bla, errno ); + continue; + } + setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) ); + if ( ptr->ai_family == PF_INET6 ) setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on) ); + if ( bind( sock, ptr->ai_addr, ptr->ai_addrlen ) == -1 ) { + logadd( LOG_WARNING, "(Bind to %s): cannot bind(), errno=%d", bla, errno ); + close( sock ); + continue; + } + if ( listen( sock, 20 ) == -1 ) { + logadd( LOG_WARNING, "(Bind to %s): cannot listen(), errno=%d", errno ); + close( sock ); + continue; + } + list->entry[list->count].fd = sock; + list->entry[list->count].events = POLLIN | POLLRDHUP; + list->count++; + openCount++; + if ( list->count >= MAXLISTEN ) break; + } + freeaddrinfo( res ); + return openCount > 0; +} + +int sock_listenAny(poll_list_t* list, uint16_t port) +{ + return sock_listen( list, NULL, port ); +} + +int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr) +{ + int ret = poll( list->entry, list->count, -1 ); + if ( ret < 0 ) { + return -1; + } + for ( int i = list->count - 1; i >= 0; --i ) { + if ( list->entry[i].revents == 0 ) continue; + if ( list->entry[i].revents == POLLIN ) return accept( list->entry[i].fd, (struct sockaddr *)addr, length_ptr ); + if ( list->entry[i].revents & ( POLLNVAL | POLLHUP | POLLERR | POLLRDHUP ) ) { + logadd( LOG_DEBUG1, "poll fd revents=%d for index=%d and fd=%d", (int)list->entry[i].revents, i, list->entry[i].fd ); + if ( ( list->entry[i].revents & POLLNVAL ) == 0 ) close( list->entry[i].fd ); + if ( i != list->count ) list->entry[i] = list->entry[list->count]; + list->count--; + } + } + return -1; +} + +void sock_set_nonblock(int sock) +{ + int flags = fcntl( sock, F_GETFL, 0 ); + if ( flags == -1 ) flags = 0; + fcntl( sock, F_SETFL, flags | O_NONBLOCK ); +} + +void sock_set_block(int sock) +{ + int flags = fcntl( sock, F_GETFL, 0 ); + if ( flags == -1 ) flags = 0; + fcntl( sock, F_SETFL, flags & ~(int)O_NONBLOCK ); +} + +bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite) +{ + if ( sock == -1 || list->count >= MAXLISTEN ) return false; + list->entry[list->count++].fd = sock; + list->entry[list->count++].events = ( wantRead ? POLLIN : 0 ) | ( wantWrite ? POLLOUT : 0 ) | POLLRDHUP; + list->count++; + return true; +} + +ssize_t sock_sendAll(int sock, void *buffer, size_t len, int maxtries) +{ + size_t done = 0; + ssize_t ret = 0; + while ( done < len ) { + if ( maxtries >= 0 && --maxtries == -1 ) break; + ret = write( sock, (char*)buffer + done, len - done ); + if ( ret < 0 ) { + if ( errno == EINTR ) continue; + if ( errno == EAGAIN || errno == EWOULDBLOCK ) { + usleep( 1000 ); + continue; + } + break; + } + if ( ret == 0 ) break; + done += ret; + } + if ( done == 0 ) return ret; + return done; +} + diff --git a/src/shared/sockhelper.h b/src/shared/sockhelper.h new file mode 100644 index 0000000..3a4ab6c --- /dev/null +++ b/src/shared/sockhelper.h @@ -0,0 +1,93 @@ +#ifndef SOCKHELPER_H_ +#define SOCKHELPER_H_ + +/* + * Helper functions for dealing with sockets. These functions should + * abstract from the IP version by using getaddrinfo() and thelike. + */ + +#include +#include "../types.h" +#include +#include + +typedef struct _poll_list poll_list_t; + +/** + * Connect to given dnbd3_host_t. + * @param addr - address of host to connect to + * @param connect_ms - timeout in milliseconds after which the connection attempt fails + * @param rw_ms - read/write timeout in milliseconds to apply on successful connect + * @return socket file descriptor, or -1 on error + */ +int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms); + +/** + * Resolve/parse given address and put the result(s) into passed dnbd3_host_t array, + * but only up to count entries. + * @return Number of items added to array + */ +int sock_resolveToDnbd3Host(const char * const address, dnbd3_host_t * const dest, const int count); + +void sock_setTimeout(const int sockfd, const int milliseconds); + +/** + * Create new poll list. + */ +poll_list_t* sock_newPollList(); + +/** + * Delete a poll list, closing all sockets first if necessary. + */ +void sock_destroyPollList(poll_list_t *list); + +/** + * Listen on all interfaces/available IP addresses, using the given protocol. + * IPv4 and IPv6 are supported. + * @param protocol_family PF_INET or PF_INET6 + * @param port port to listen on + * @return the socket descriptor if successful, -1 otherwise. + */ +int sock_listenAny(poll_list_t* list, uint16_t port); + +/** + * Listen on a specific address and port. + * @param addr pointer to a properly filled sockaddr_in or sockaddr_in6 + * @param addrlen length of the passed struct + */ +bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port); + +/** + * This is a multi-socket version of accept. Pass in an array of listening sockets. + * If any of the sockets has an incoming connection, accept it and return the new socket's fd. + * On error, return -1, just like accept(). + * @param sockets array of listening socket fds + * @param socket_count number of sockets in that array + * @return fd of new client socket, -1 on error + */ +int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr); + +void sock_set_nonblock(int sock); + +void sock_set_block(int sock); + +/** + * Add given socket to array. Take an existing empty slot ( == -1) if available, + * append to end otherwise. Updates socket count variable passed by reference. + * + * @param poll_list_t list the poll list to add the socket to + * @param sock socket fd to add + * @param wantRead whether to set the EPOLLIN flag + * @param wantWrite whether to set the EPOLLOUT flag + * @return true on success, false iff the array is already full or socket is < 0 + */ +bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite); + +/** + * Send the whole buffer, calling write() multiple times if neccessary. + * Give up after calling write() maxtries times. + * Set maxtries < 0 to try infinitely. + */ +ssize_t sock_sendAll(int sock, void *buffer, size_t len, int maxtries); + +#endif /* SOCKHELPER_H_ */ -- cgit v1.2.3-55-g7522