From 63e040ea63755052ceca98aac3e6f9843cadbeee Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 6 Jan 2015 22:36:11 +0100 Subject: [SERVER] Big code cleanup, refactoring, minor bugfixing --- src/server/image.c | 83 +++++++++------ src/server/image.h | 4 +- src/server/net.c | 8 +- src/server/server.c | 103 +++++++++--------- src/server/server.h | 6 +- src/server/signal.c | 1 + src/server/sockhelper.c | 270 +++++++++++++++++++++++------------------------- src/server/sockhelper.h | 57 +++++----- src/server/threadpool.c | 35 +++++-- src/server/threadpool.h | 6 ++ src/server/uplink.c | 15 ++- 11 files changed, 308 insertions(+), 280 deletions(-) diff --git a/src/server/image.c b/src/server/image.c index ae7b942..e6d0ab4 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -9,6 +9,7 @@ #include "sockhelper.h" #include "altservers.h" #include "server.h" +#include "signal.h" #include #include @@ -43,6 +44,7 @@ static int remoteCloneCacheIndex = -1; // ########################################## +static dnbd3_image_t* image_free(dnbd3_image_t *image); static bool image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); static bool image_load_all_internal(char *base, char *path); static bool image_load(char *base, char *path, int withUplink); @@ -116,7 +118,7 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co while ( pos < end ) { const int map_y = pos >> 15; const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = 0b00000001 << map_x; + const uint8_t bit_mask = 1 << map_x; if ( set ) { if ( (image->cache_map[map_y] & bit_mask) == 0 ) dirty = true; image->cache_map[map_y] |= bit_mask; @@ -322,26 +324,21 @@ dnbd3_image_t* image_release(dnbd3_image_t *image) return NULL; } spin_unlock( &image->lock ); + // Getting here means we decreased the usage counter to zero + // If the image is not in the images list anymore, we're + // responsible for freeing it spin_lock( &_images_lock ); - spin_lock( &image->lock ); - // Check active users again as we unlocked - if ( image->users == 0 ) { - // Not in use anymore, see if it's in the images array - for (int i = 0; i < _num_images; ++i) { - if ( _images[i] == image ) { // Found, do nothing - spin_unlock( &image->lock ); - spin_unlock( &_images_lock ); - return NULL; - } + for (int i = 0; i < _num_images; ++i) { + if ( _images[i] == image ) { // Found, do nothing + spin_unlock( &_images_lock ); + return NULL; } - spin_unlock( &image->lock ); - spin_unlock( &_images_lock ); - // Not found, free - image_free( image ); - return NULL; } - spin_unlock( &image->lock ); spin_unlock( &_images_lock ); + // So it wasn't in the images list anymore either, get rid of it, + // but check usage count once again, since it might have been increased + // after we unlocked above + if ( image->users == 0 ) image_free( image ); return NULL; } @@ -376,20 +373,48 @@ void image_killUplinks() spin_lock( &_images[i]->lock ); if ( _images[i]->uplink != NULL ) { _images[i]->uplink->shutdown = true; - if ( _images[i]->uplink->signal != -1 ) { - write( _images[i]->uplink->signal, "", 1 ); - } + signal_call( _images[i]->uplink->signal ); } spin_unlock( &_images[i]->lock ); } spin_unlock( &_images_lock ); } +/** + * Load all images in given path recursively. + * Pass NULL to use path from config. + */ +bool image_loadAll(char *path) +{ + if ( path == NULL ) { + return image_load_all_internal( _basePath, _basePath ); + } + return image_load_all_internal( path, path ); +} + +/** + * Free all images we have, but only if they're not in use anymore. + * Locks on _images_lock + * @return true if all images have been freed + */ +bool image_tryFreeAll() +{ + spin_lock( &_images_lock ); + for (int i = _num_images - 1; i >= 0; --i) { + if ( _images[i] != NULL && _images[i]->users == 0 ) { + _images[i] = image_free( _images[i] ); + } + if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--; + } + spin_unlock( &_images_lock ); + return _num_images == 0; +} + /** * Free image. DOES NOT check if it's in use. * Indirectly locks on image.lock, uplink.queueLock */ -dnbd3_image_t* image_free(dnbd3_image_t *image) +static dnbd3_image_t* image_free(dnbd3_image_t *image) { assert( image != NULL ); // @@ -399,7 +424,9 @@ dnbd3_image_t* image_free(dnbd3_image_t *image) free( image->crc32 ); free( image->path ); free( image->lower_name ); - if ( image->cacheFd != -1 ) close( image->cacheFd ); + if ( image->cacheFd != -1 ) { + close( image->cacheFd ); + } spin_destroy( &image->lock ); // memset( image, 0, sizeof(dnbd3_image_t) ); @@ -407,18 +434,6 @@ dnbd3_image_t* image_free(dnbd3_image_t *image) return NULL ; } -/** - * Load all images in given path recursively. - * Pass NULL to use path from config. - */ -bool image_loadAll(char *path) -{ - if ( path == NULL ) { - return image_load_all_internal( _basePath, _basePath ); - } - return image_load_all_internal( path, path ); -} - static bool image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize) { if ( cacheMap == NULL ) return true; diff --git a/src/server/image.h b/src/server/image.h index 6d283ff..664225c 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -30,10 +30,10 @@ bool image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, cons void image_killUplinks(); -dnbd3_image_t* image_free(dnbd3_image_t *image); - bool image_loadAll(char *path); +bool image_tryFreeAll(); + bool image_create(char *image, int revision, uint64_t size); bool image_generateCrcFile(char *image); diff --git a/src/server/net.c b/src/server/net.c index 9525456..5a9a640 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -239,7 +239,7 @@ void *net_client_handler(void *dnbd3_client) const uint64_t lastByte = (end - 1) >> 15; do { const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = 0b00000001 << map_x; + const uint8_t bit_mask = 1 << map_x; if ( (image->cache_map[firstByte] & bit_mask) == 0 ) { isCached = false; break; @@ -263,7 +263,7 @@ void *net_client_handler(void *dnbd3_client) while ( pos < end ) { assert( lastByte == (pos >> 15) ); const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = 0b00000001 << map_x; + const uint8_t bit_mask = 1 << map_x; if ( (image->cache_map[lastByte] & bit_mask) == 0 ) { isCached = false; break; @@ -354,8 +354,8 @@ void *net_client_handler(void *dnbd3_client) } exit_client_cleanup: ; if ( image_file != -1 ) close( image_file ); - dnbd3_remove_client( client ); - client = dnbd3_free_client( client ); + dnbd3_removeClient( client ); + client = dnbd3_freeClient( client ); return NULL ; } diff --git a/src/server/server.c b/src/server/server.c index d3c03f4..d03a9fd 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -46,8 +46,7 @@ #include "helper.h" #include "threadpool.h" -#define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on -static int sockets[MAX_SERVER_SOCKETS], socket_count = 0; +poll_list_t *listeners = NULL; #ifdef _DEBUG int _fake_delay = 0; #endif @@ -59,17 +58,17 @@ pthread_spinlock_t _clients_lock; /** * Time the server was started */ -static time_t _startupTime = 0; +static time_t startupTime = 0; static bool sigReload = false, sigPrintStats = false; -static bool dnbd3_add_client(dnbd3_client_t *client); -static void dnbd3_handle_signal(int signum); +static bool dnbd3_addClient(dnbd3_client_t *client); +static void dnbd3_handleSignal(int signum); static void dnbd3_printClients(); /** * Print help text for usage instructions */ -void dnbd3_print_help(char *argv_0) +void dnbd3_printHelp(char *argv_0) { printf( "Version: %s\n\n", VERSION_STRING ); printf( "Usage: %s [OPTIONS]...\n", argv_0 ); @@ -96,7 +95,7 @@ void dnbd3_print_help(char *argv_0) /** * Print version information */ -void dnbd3_print_version() +void dnbd3_printVersion() { printf( "Version: %s\n", VERSION_STRING ); exit( 0 ); @@ -107,18 +106,14 @@ void dnbd3_print_version() */ void dnbd3_cleanup() { - int i, count; + int i, count, retries; _shutdown = true; debug_locks_stop_watchdog(); memlogf( "INFO: Cleanup...\n" ); - for (int i = 0; i < socket_count; ++i) { - if ( sockets[i] == -1 ) continue; - close( sockets[i] ); - sockets[i] = -1; - } - socket_count = 0; + if ( listeners != NULL ) sock_destroyPollList( listeners ); + listeners = NULL; // Kill connection to all clients spin_lock( &_clients_lock ); @@ -131,6 +126,9 @@ void dnbd3_cleanup() } spin_unlock( &_clients_lock ); + // Disable threadpool + threadpool_close(); + // Terminate the altserver checking thread altservers_shutdown(); @@ -141,7 +139,7 @@ void dnbd3_cleanup() integrity_shutdown(); // Wait for clients to disconnect - int retries = 10; + retries = 10; do { count = 0; spin_lock( &_clients_lock ); @@ -158,13 +156,11 @@ void dnbd3_cleanup() _num_clients = 0; // Clean up images - spin_lock( &_images_lock ); - for (i = 0; i < _num_images; ++i) { - if ( _images[i] == NULL ) continue; - _images[i] = image_free( _images[i] ); + retries = 5; + while ( !image_tryFreeAll() && --retries > 0 ) { + printf( "Waiting for images to free...\n" ); + sleep( 1 ); } - _num_images = 0; - spin_unlock( &_images_lock ); exit( EXIT_SUCCESS ); } @@ -232,10 +228,10 @@ int main(int argc, char *argv[]) return EXIT_SUCCESS; case 'h': case '?': - dnbd3_print_help( argv[0] ); + dnbd3_printHelp( argv[0] ); break; case 'v': - dnbd3_print_version(); + dnbd3_printVersion(); break; case 'b': bindAddress = strdup( optarg ); @@ -294,11 +290,11 @@ int main(int argc, char *argv[]) #endif // setup signal handler - signal( SIGTERM, dnbd3_handle_signal ); - signal( SIGINT, dnbd3_handle_signal ); - signal( SIGUSR1, dnbd3_handle_signal ); - signal( SIGHUP, dnbd3_handle_signal ); - signal( SIGUSR2, dnbd3_handle_signal ); + signal( SIGTERM, dnbd3_handleSignal ); + signal( SIGINT, dnbd3_handleSignal ); + signal( SIGUSR1, dnbd3_handleSignal ); + signal( SIGHUP, dnbd3_handleSignal ); + signal( SIGUSR2, dnbd3_handleSignal ); printf( "Loading images....\n" ); // Load all images in base path @@ -307,19 +303,21 @@ int main(int argc, char *argv[]) return EXIT_FAILURE; } - _startupTime = time( NULL ); + startupTime = time( NULL ); // Give other threads some time to start up before accepting connections - sleep( 2 ); + sleep( 1 ); // setup network - sockets[socket_count] = sock_listen_any( PF_INET, PORT, bindAddress ); - if ( sockets[socket_count] != -1 ) ++socket_count; -#ifdef WITH_IPV6 - sockets[socket_count] = sock_listen_any(PF_INET6, PORT, NULL); - if (sockets[socket_count] != -1) ++socket_count; -#endif - if ( socket_count == 0 ) exit( EXIT_FAILURE ); + listeners = sock_newPollList(); + if ( listeners == NULL ) { + printf( "Didnt get a poll list!\n" ); + exit( EXIT_FAILURE ); + } + if ( !sock_listen( listeners, bindAddress, PORT ) ) { + printf( "Could not listen on any local interface.\n" ); + exit( EXIT_FAILURE ); + } struct sockaddr_storage client; socklen_t len; int fd; @@ -328,7 +326,7 @@ int main(int argc, char *argv[]) //pthread_t thread_rpc; //thread_create(&(thread_rpc), NULL, &dnbd3_rpc_mainloop, NULL); // Initialize thread pool - if ( !threadpool_init( 10 ) ) { + if ( !threadpool_init( 8 ) ) { printf( "Could not init thread pool!\n" ); exit( EXIT_FAILURE ); } @@ -353,7 +351,7 @@ int main(int argc, char *argv[]) } // len = sizeof(client); - fd = accept_any( sockets, socket_count, &client, &len ); + fd = sock_accept( listeners, &client, &len ); if ( fd < 0 ) { const int err = errno; if ( err == EINTR || err == EAGAIN ) continue; @@ -363,24 +361,25 @@ int main(int argc, char *argv[]) } //memlogf("INFO: Client connected\n"); - sock_set_timeout( fd, _clientTimeout ); + sock_setTimeout( fd, _clientTimeout ); - dnbd3_client_t *dnbd3_client = dnbd3_init_client( &client, fd ); + dnbd3_client_t *dnbd3_client = dnbd3_initClient( &client, fd ); if ( dnbd3_client == NULL ) { close( fd ); continue; } - // This has to be done before creating the thread, otherwise a race condition might occur when the new thread dies faster than this thread adds the client to the list after creating the thread - if ( !dnbd3_add_client( dnbd3_client ) ) { - dnbd3_client = dnbd3_free_client( dnbd3_client ); + // This has to be done before creating the thread, otherwise a race condition might occur when the new thread + // dies faster than this thread adds the client to the list after creating the thread + if ( !dnbd3_addClient( dnbd3_client ) ) { + dnbd3_client = dnbd3_freeClient( dnbd3_client ); continue; } if ( !threadpool_run( net_client_handler, (void *)dnbd3_client ) ) { memlogf( "[ERROR] Could not start thread for new client." ); - dnbd3_remove_client( dnbd3_client ); - dnbd3_client = dnbd3_free_client( dnbd3_client ); + dnbd3_removeClient( dnbd3_client ); + dnbd3_client = dnbd3_freeClient( dnbd3_client ); continue; } } @@ -391,7 +390,7 @@ int main(int argc, char *argv[]) * Initialize and populate the client struct - called when an incoming * connection is accepted */ -dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd) +dnbd3_client_t* dnbd3_initClient(struct sockaddr_storage *client, int fd) { dnbd3_client_t *dnbd3_client = calloc( 1, sizeof(dnbd3_client_t) ); if ( dnbd3_client == NULL ) { // This will never happen thanks to memory overcommit @@ -424,7 +423,7 @@ dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd) * Remove a client from the clients array * Locks on: _clients_lock */ -void dnbd3_remove_client(dnbd3_client_t *client) +void dnbd3_removeClient(dnbd3_client_t *client) { int i; spin_lock( &_clients_lock ); @@ -443,7 +442,7 @@ void dnbd3_remove_client(dnbd3_client_t *client) * Locks on: _clients[].lock, _images[].lock * might call functions that lock on _images, _image[], uplink.queueLock, client.sendMutex */ -dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) +dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client) { spin_lock( &client->lock ); pthread_mutex_lock( &client->sendMutex ); @@ -470,7 +469,7 @@ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) * Add client to the clients array. * Locks on: _clients_lock */ -static bool dnbd3_add_client(dnbd3_client_t *client) +static bool dnbd3_addClient(dnbd3_client_t *client) { int i; spin_lock( &_clients_lock ); @@ -490,7 +489,7 @@ static bool dnbd3_add_client(dnbd3_client_t *client) return true; } -static void dnbd3_handle_signal(int signum) +static void dnbd3_handleSignal(int signum) { if ( signum == SIGINT || signum == SIGTERM ) { _shutdown = true; @@ -503,7 +502,7 @@ static void dnbd3_handle_signal(int signum) int dnbd3_serverUptime() { - return (int)(time( NULL ) - _startupTime); + return (int)(time( NULL ) - startupTime); } static void dnbd3_printClients() diff --git a/src/server/server.h b/src/server/server.h index f45ae98..2b10b75 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -39,9 +39,9 @@ extern int _fake_delay; #endif void dnbd3_cleanup(); -void dnbd3_remove_client(dnbd3_client_t *client); -dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client); -dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd); +void dnbd3_removeClient(dnbd3_client_t *client); +dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client); +dnbd3_client_t* dnbd3_initClient(struct sockaddr_storage *client, int fd); int dnbd3_serverUptime(); #if !defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64 diff --git a/src/server/signal.c b/src/server/signal.c index f0988b9..3b7d50b 100644 --- a/src/server/signal.c +++ b/src/server/signal.c @@ -12,6 +12,7 @@ int signal_new() int signal_call(int signalFd) { + if ( signalFd < 0 ) return 0; static uint64_t one = 1; return write( signalFd, &one, sizeof one ) == sizeof one; } diff --git a/src/server/sockhelper.c b/src/server/sockhelper.c index 2fc0620..4cd0b56 100644 --- a/src/server/sockhelper.c +++ b/src/server/sockhelper.c @@ -1,78 +1,71 @@ #include "sockhelper.h" #include "memlog.h" +#include +#include +#include // inet_ntop +#include #include #include #include #include #include +#include +#include -static inline int connect_shared(const int client_sock, void* addr, const int addrlen, int connect_ms, int rw_ms) -{ - struct timeval tv; - // Connect to server - tv.tv_sec = connect_ms / 1000; - tv.tv_usec = connect_ms * 1000; - setsockopt( client_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv) ); - setsockopt( client_sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) ); - if ( connect( client_sock, (struct sockaddr *)addr, addrlen ) == -1 ) { - close( client_sock ); - return -1; - } - // Apply read/write timeout - tv.tv_sec = rw_ms / 1000; - tv.tv_usec = rw_ms * 1000; - setsockopt( client_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv) ); - setsockopt( client_sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) ); - return client_sock; -} +#define MAXLISTEN 20 -int sock_connect4(struct sockaddr_in *addr, const int connect_ms, const int rw_ms) -{ - int client_sock = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP ); - if ( client_sock == -1 ) return -1; - return connect_shared( client_sock, addr, sizeof(struct sockaddr_in), connect_ms, rw_ms ); -} - -int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_ms) -{ -#ifdef WITH_IPV6 - int client_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); - if (client_sock == -1) return -1; - return connect_shared(client_sock, addr, sizeof(struct sockaddr_in6), connect_ms, rw_ms); -#else - printf( "[DEBUG] Not compiled with IPv6 support.\n" ); - return -1; -#endif -} +struct _poll_list { + int count; + struct pollfd entry[MAXLISTEN]; +}; int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms) { + // TODO: Move out of here, this unit should contain general socket functions + // TODO: Rework the dnbd3_host_t to not use AF_* as these could theoretically change + // TODO: Abstract away from sockaddr_in* like the rest of the functions here do, + // so WITH_IPV6 can finally be removed as everything is transparent. + struct sockaddr_storage ss; + int proto, addrlen; + memset( &ss, 0, sizeof ss ); if ( addr->type == AF_INET ) { // Set host (IPv4) - struct sockaddr_in addr4; - memset( &addr4, 0, sizeof(addr4) ); - addr4.sin_family = AF_INET; - memcpy( &addr4.sin_addr, addr->addr, 4 ); - addr4.sin_port = addr->port; - return sock_connect4( &addr4, connect_ms, rw_ms ); + struct sockaddr_in *addr4 = (struct sockaddr_in*)&ss; + addr4->sin_family = AF_INET; + memcpy( &addr4->sin_addr, addr->addr, 4 ); + addr4->sin_port = addr->port; + proto = PF_INET; + addrlen = sizeof *addr4; } #ifdef WITH_IPV6 - else if (addr->type == AF_INET6) - { + else if ( addr->type == AF_INET6 ) { // Set host (IPv6) - struct sockaddr_in6 addr6; - memset(&addr6, 0, sizeof(addr6)); - addr6.sin6_family = AF_INET6; - memcpy(&addr6.sin6_addr, addr->addr, 16); - addr6.sin6_port = addr->port; - return sock_connect6(&addr6, connect_ms, rw_ms); + struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)&ss; + addr6->sin6_family = AF_INET6; + memcpy( &addr6->sin6_addr, addr->addr, 16 ); + addr6->sin6_port = addr->port; + proto = PF_INET6; + addrlen = sizeof *addr6; } #endif - printf( "[DEBUG] Unsupported address type: %d\n", (int)addr->type ); - return -1; + else { + printf( "[DEBUG] Unsupported address type: %d\n", (int)addr->type ); + return -1; + } + int client_sock = socket( proto, SOCK_STREAM, IPPROTO_TCP ); + if ( client_sock == -1 ) return -1; + // Apply connect timeout + sock_setTimeout( client_sock, connect_ms ); + if ( connect( client_sock, (struct sockaddr *)&ss, addrlen ) == -1 ) { + close( client_sock ); + return -1; + } + // Apply read/write timeout + sock_setTimeout( client_sock, rw_ms ); + return client_sock; } -void sock_set_timeout(const int sockfd, const int milliseconds) +void sock_setTimeout(const int sockfd, const int milliseconds) { struct timeval tv; tv.tv_sec = milliseconds / 1000; @@ -81,94 +74,97 @@ void sock_set_timeout(const int sockfd, const int milliseconds) setsockopt( sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) ); } -int sock_listen_any(int protocol_family, uint16_t port, char* bind_addr) +poll_list_t* sock_newPollList() { - struct sockaddr_storage addr; - struct in_addr local; - if (bind_addr != NULL) { - if (!inet_aton(bind_addr, &local)) return -1; - } - memset( &addr, 0, sizeof(addr) ); - if ( protocol_family == PF_INET ) { - struct sockaddr_in *v4 = (struct sockaddr_in *)&addr; - if (bind_addr == NULL) { - v4->sin_addr.s_addr = INADDR_ANY; - } else { - v4->sin_addr = local; - } - v4->sin_port = htons( port ); - v4->sin_family = AF_INET; - } -#ifdef WITH_IPV6 - else if (protocol_family == PF_INET6) - { - struct sockaddr_in6 *v6 = (struct sockaddr_in6 *)&addr; - v6->sin6_addr = in6addr_any; - v6->sin6_port = htons(port); - v6->sin6_family = AF_INET6; - } -#endif - else { - printf( "[DEBUG] sock_listen: Unsupported protocol: %d\n", protocol_family ); - return -1; - } - return sock_listen( &addr, sizeof(addr) ); + poll_list_t *list = (poll_list_t*)malloc( sizeof( poll_list_t ) ); + list->count = 0; + return list; } -int sock_listen(struct sockaddr_storage *addr, int addrlen) +void sock_destroyPollList(poll_list_t *list) { - int pf; // On Linux AF_* == PF_*, but this is not guaranteed on all platforms, so let's be safe here: - if ( addr->ss_family == AF_INET ) pf = PF_INET; -#ifdef WITH_IPV6 - else if (addr->ss_family == AF_INET6) - pf = PF_INET6; -#endif - else { - printf( "[DEBUG] sock_listen: unsupported address type: %d\n", (int)addr->ss_family ); - return -1; - } - int sock; - - // Create socket - sock = socket( pf, SOCK_STREAM, IPPROTO_TCP ); - if ( sock < 0 ) { - memlogf( "[ERROR] sock_listen: Socket setup failure" ); // TODO: print port number to help troubleshooting - return -1; + for ( int i = 0; i < list->count; ++i ) { + if ( list->entry[i].fd >= 0 ) close( list->entry[i].fd ); } - const int on = 1; - setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) ); - if ( pf == PF_INET6 ) setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on) ); + free( list ); +} - // Bind to socket - if ( bind( sock, (struct sockaddr *)addr, addrlen ) < 0 ) { - int e = errno; - close( sock ); - memlogf( "[ERROR] Bind failure (%d)", e ); // TODO: print port number to help troubleshooting - return -1; - } +bool sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int len) +{ + char host[100], port[10]; + int ret = getnameinfo( addr, addrLen, host, 100, port, 10, NI_NUMERICHOST | NI_NUMERICSERV ); + if ( ret == 0 ) snprintf( output, len, "[%s]:%s", host, port ); + return ret == 0; +} - // Listen on socket - if ( listen( sock, 20 ) == -1 ) { - close( sock ); - memlogf( "[ERROR] Listen failure" ); // TODO ... - return -1; +bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port) +{ + if ( list->count >= MAXLISTEN ) return false; + struct addrinfo hints, *res, *ptr; + char portStr[6]; + const int on = 1; + int openCount = 0; + // Set hints for local addresses. + memset( &hints, 0, sizeof(hints) ); + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + snprintf( portStr, sizeof portStr, "%d", (int)port ); + if ( getaddrinfo( bind_addr, portStr, &hints, &res ) != 0 || res == NULL ) return false; + // Attempt to bind to all of the addresses as long as there's room in the poll list + for( ptr = res; ptr != NULL; ptr = ptr->ai_next ) { + char bla[100]; + if ( !sock_printable( (struct sockaddr*)ptr->ai_addr, ptr->ai_addrlen, bla, 100 ) ) snprintf( bla, 100, "[invalid]" ); + printf( "Trying to bind to %s ", bla ); + int sock = socket( ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol ); + if ( sock < 0 ) { + printf( "...cannot socket(), errno=%d\n", errno ); + continue; + } + setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) ); + if ( ptr->ai_family == PF_INET6 ) setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on) ); + if ( bind( sock, ptr->ai_addr, ptr->ai_addrlen ) == -1 ) { + printf( "...cannot bind(), errno=%d\n", errno ); + close( sock ); + continue; + } + if ( listen( sock, 20 ) == -1 ) { + printf( "...cannot listen(), errno=%d\n", errno ); + close( sock ); + continue; + } + printf( "...OK!\n" ); + list->entry[list->count].fd = sock; + list->entry[list->count].events = POLLIN | POLLRDHUP; + list->count++; + openCount++; + if ( list->count >= MAXLISTEN ) break; } + freeaddrinfo( res ); + return openCount > 0; +} - return sock; +int sock_listenAny(poll_list_t* list, uint16_t port) +{ + return sock_listen( list, NULL, port ); } -int accept_any(const int * const sockets, const int socket_count, struct sockaddr_storage *addr, socklen_t *length_ptr) +int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr) { - fd_set set; - FD_ZERO( &set ); - int max = 0; - for (int i = 0; i < socket_count; ++i) { - FD_SET( sockets[i], &set ); - if ( sockets[i] > max ) max = sockets[i]; + int ret = poll( list->entry, list->count, -1 ); + if ( ret < 0 ) { + printf( "poll errno=%d\n", errno ); + return -1; } - if ( select( max + 1, &set, NULL, NULL, NULL ) <= 0 ) return -1; - for (int i = 0; i < socket_count; ++i) { - if ( FD_ISSET(sockets[i], &set)) return accept( sockets[i], (struct sockaddr *)addr, length_ptr ); + for ( int i = list->count - 1; i >= 0; --i ) { + if ( list->entry[i].revents == 0 ) continue; + if ( list->entry[i].revents == POLLIN ) return accept( list->entry[i].fd, (struct sockaddr *)addr, length_ptr ); + if ( list->entry[i].revents & ( POLLNVAL | POLLHUP | POLLERR | POLLRDHUP ) ) { + printf( "poll fd revents=%d for index=%d and fd=%d\n", (int)list->entry[i].revents, i, list->entry[i].fd ); + if ( ( list->entry[i].revents & POLLNVAL ) == 0 ) close( list->entry[i].fd ); + if ( i != list->count ) list->entry[i] = list->entry[list->count]; + list->count--; + } } return -1; } @@ -187,17 +183,11 @@ void sock_set_block(int sock) fcntl( sock, F_SETFL, flags & ~(int)O_NONBLOCK ); } -bool sock_add_array(const int sock, int *array, int *array_fill, const int array_length) +bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite) { - if ( sock == -1 ) return true; - for (int i = 0; i < *array_fill; ++i) { - if ( array[i] == -1 ) { - array[i] = sock; - return true; - } - } - if ( *array_fill >= array_length ) return false; - array[*array_fill] = sock; - (*array_fill) += 1; + if ( sock == -1 || list->count >= MAXLISTEN ) return false; + list->entry[list->count++].fd = sock; + list->entry[list->count++].events = ( wantRead ? POLLIN : 0 ) | ( wantWrite ? POLLOUT : 0 ) | POLLRDHUP; + list->count++; return true; } diff --git a/src/server/sockhelper.h b/src/server/sockhelper.h index 394e5e4..b6b0b11 100644 --- a/src/server/sockhelper.h +++ b/src/server/sockhelper.h @@ -1,16 +1,17 @@ #ifndef SOCKHELPER_H_ #define SOCKHELPER_H_ +/* + * Helper functions for dealing with sockets. These functions should + * abstract from the IP version by using getaddrinfo() and thelike. + */ + #include #include "../types.h" #include -#include -#include #include -int sock_connect4(struct sockaddr_in *addr, const int connect_ms, const int rw_ms); - -int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_ms); +typedef struct _poll_list poll_list_t; /** * Connect to given dnbd3_host_t. @@ -21,7 +22,17 @@ int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_ */ int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms); -void sock_set_timeout(const int sockfd, const int milliseconds); +void sock_setTimeout(const int sockfd, const int milliseconds); + +/** + * Create new poll list. + */ +poll_list_t* sock_newPollList(); + +/** + * Delete a poll list, closing all sockets first if necessary. + */ +void sock_destroyPollList(poll_list_t *list); /** * Listen on all interfaces/available IP addresses, using the given protocol. @@ -30,14 +41,14 @@ void sock_set_timeout(const int sockfd, const int milliseconds); * @param port port to listen on * @return the socket descriptor if successful, -1 otherwise. */ -int sock_listen_any(int protocol_family, uint16_t port, char* bind_addr); +int sock_listenAny(poll_list_t* list, uint16_t port); /** * Listen on a specific address and port. * @param addr pointer to a properly filled sockaddr_in or sockaddr_in6 * @param addrlen length of the passed struct */ -int sock_listen(struct sockaddr_storage *addr, int addrlen); +bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port); /** * This is a multi-socket version of accept. Pass in an array of listening sockets. @@ -47,38 +58,22 @@ int sock_listen(struct sockaddr_storage *addr, int addrlen); * @param socket_count number of sockets in that array * @return fd of new client socket, -1 on error */ -int accept_any(const int * const sockets, const int socket_count, struct sockaddr_storage *addr, socklen_t *length_ptr); +int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr); void sock_set_nonblock(int sock); void sock_set_block(int sock); -/** - * Take IPv4 as string and a port and fill sockaddr_in struct. - * This should be refactored to work for IPv4 and IPv6 and use sockaddr_storage. - */ -inline void sock_set_addr4(char *ip, uint16_t port, struct sockaddr_in *addr) -{ - memset( addr, 0, sizeof(*addr) ); - addr->sin_family = AF_INET; // IPv4 - addr->sin_addr.s_addr = inet_addr( ip ); - addr->sin_port = htons( port ); // set port number -} - /** * Add given socket to array. Take an existing empty slot ( == -1) if available, * append to end otherwise. Updates socket count variable passed by reference. - * The passed socket fd is only added if it is != -1 for convenience, so you can - * directly pass the return value of sock_listen or sock_create, without checking the - * return value first. + * + * @param poll_list_t list the poll list to add the socket to * @param sock socket fd to add - * @param array the array of socket fds to add the socket to - * @param array_fill pointer to int telling how many sockets there are in the array. Empty slots - * in between are counted too. In other words: represents the index of the last valid socket fd in the - * array plus one, or 0 if there are none. - * @param array_length the capacity of the array - * @return true on success or if the passed fd was -1, false iff the array is already full + * @param wantRead whether to set the EPOLLIN flag + * @param wantWrite whether to set the EPOLLOUT flag + * @return true on success, false iff the array is already full or socket is < 0 */ -bool sock_add_array(const int sock, int *array, int *array_fill, const int array_length); +bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite); #endif /* SOCKHELPER_H_ */ diff --git a/src/server/threadpool.c b/src/server/threadpool.c index 04a8e86..34e996c 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -25,13 +25,29 @@ static pthread_spinlock_t poolLock; bool threadpool_init(int maxIdle) { if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; - maxIdleThreads = maxIdle; spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); + maxIdleThreads = maxIdle; pthread_attr_init( &threadAttrs ); pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); return true; } +void threadpool_close() +{ + _shutdown = true; + if ( maxIdleThreads < 0 ) return; + spin_lock( &poolLock ); + maxIdleThreads = -1; + entry_t *ptr = pool; + while ( ptr != NULL ) { + entry_t *current = ptr; + ptr = ptr->next; + signal_call( current->signalFd ); + } + spin_unlock( &poolLock ); + spin_destroy( &poolLock ); +} + bool threadpool_run(void *(*startRoutine)(void *), void *arg) { spin_lock( &poolLock ); @@ -40,9 +56,14 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) spin_unlock( &poolLock ); if ( entry == NULL ) { entry = (entry_t*)malloc( sizeof(entry_t) ); + if ( entry == NULL ) { + printf( "[WARNING] Could not alloc entry_t for new thread\n" ); + return false; + } entry->signalFd = signal_new(); if ( entry->signalFd < 0 ) { printf( "[WARNING] Could not create signalFd for new thread pool thread\n" ); + free( entry ); return false; } if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) { @@ -67,9 +88,10 @@ static void *threadpool_worker(void *entryPtr) { blockNoncriticalSignals(); entry_t *entry = (entry_t*)entryPtr; - while ( !_shutdown ) { + for ( ;; ) { // Wait for signal from outside that we have work to do int ret = signal_wait( entry->signalFd, -1 ); + if ( _shutdown ) break; if ( ret > 0 ) { if ( entry->startRoutine == NULL ) { printf( "[DEBUG] Worker woke up but has no work to do!\n" ); @@ -80,6 +102,7 @@ static void *threadpool_worker(void *entryPtr) // Reset vars for safety entry->startRoutine = NULL; entry->arg = NULL; + if ( _shutdown ) break; // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise int threadCount = 0; spin_lock( &poolLock ); @@ -90,10 +113,7 @@ static void *threadpool_worker(void *entryPtr) } if ( threadCount >= maxIdleThreads ) { spin_unlock( &poolLock ); - signal_close( entry->signalFd ); - free( entry ); - printf(" [DEBUG] Thread killed!\n" ); - return NULL; + break; } entry->next = pool; pool = entry; @@ -103,6 +123,9 @@ static void *threadpool_worker(void *entryPtr) printf( "[DEBUG] Unexpected return value %d for signal_wait in threadpool worker!\n", ret ); } } + signal_close( entry->signalFd ); + free( entry ); + printf(" [DEBUG] Thread killed!\n" ); return NULL; } diff --git a/src/server/threadpool.h b/src/server/threadpool.h index b3b0fe6..15dd151 100644 --- a/src/server/threadpool.h +++ b/src/server/threadpool.h @@ -11,6 +11,12 @@ */ bool threadpool_init(int maxIdleThreadCount); +/** + * Shut down threadpool. + * Only call if it has been initialized before. + */ +void threadpool_close(); + /** * Run a thread using the thread pool. * @param startRoutine function to run in new thread diff --git a/src/server/uplink.c b/src/server/uplink.c index 5819ed1..82c1f5a 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -43,8 +43,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) assert( image != NULL ); spin_lock( &image->lock ); if ( image->uplink != NULL ) { - spin_unlock( &image->lock ); - return true; + goto failure; } if ( image->cache_map == NULL ) { memlogf( "[WARNING] Uplink was requested for image %s, but it is already complete", image->lower_name ); @@ -73,7 +72,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) } spin_unlock( &image->lock ); return true; - failure: ; +failure: ; if ( link != NULL ) free( link ); link = image->uplink = NULL; spin_unlock( &image->lock ); @@ -103,7 +102,7 @@ void uplink_shutdown(dnbd3_image_t *image) } image->uplink = NULL; uplink->shutdown = true; - if ( uplink->signal != -1 ) signal_call( uplink->signal ); + signal_call( uplink->signal ); pthread_t thread = uplink->thread; spin_unlock( &uplink->queueLock ); spin_unlock( &image->lock ); @@ -251,15 +250,15 @@ static void* uplink_mainloop(void *data) link->betterFd = -1; link->currentServer = link->betterServer; link->replicationHandle = 0; - // Re-send all pending requests - uplink_sendRequests( link, false ); - uplink_sendReplicationRequest( link ); link->image->working = true; buffer[0] = '@'; if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { printf( "[DEBUG] Now connected to %s\n", buffer + 1 ); setThreadName( buffer ); } + // Re-send all pending requests + uplink_sendRequests( link, false ); + uplink_sendReplicationRequest( link ); memset( &ev, 0, sizeof(ev) ); ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI; ev.data.fd = link->fd; @@ -469,7 +468,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) spin_unlock( &image->lock ); // Unlocked - do not break or continue here... // Needs to be 8 (bit->byte, bitmap) - const uint64_t offset = link->replicationHandle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)requestBlockSize; + const uint64_t offset = link->replicationHandle = (uint64_t)i * (uint64_t)requestBlockSize; const uint32_t size = MIN( image->filesize - offset, requestBlockSize ); if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle ) ) { printf( "[DEBUG] Error sending background replication request to uplink server!\n" ); -- cgit v1.2.3-55-g7522