summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/image.c83
-rw-r--r--src/server/image.h4
-rw-r--r--src/server/net.c8
-rw-r--r--src/server/server.c103
-rw-r--r--src/server/server.h6
-rw-r--r--src/server/signal.c1
-rw-r--r--src/server/sockhelper.c270
-rw-r--r--src/server/sockhelper.h57
-rw-r--r--src/server/threadpool.c35
-rw-r--r--src/server/threadpool.h6
-rw-r--r--src/server/uplink.c15
11 files changed, 308 insertions, 280 deletions
diff --git a/src/server/image.c b/src/server/image.c
index ae7b942..e6d0ab4 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -9,6 +9,7 @@
#include "sockhelper.h"
#include "altservers.h"
#include "server.h"
+#include "signal.h"
#include <assert.h>
#include <stdio.h>
@@ -43,6 +44,7 @@ static int remoteCloneCacheIndex = -1;
// ##########################################
+static dnbd3_image_t* image_free(dnbd3_image_t *image);
static bool image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize);
static bool image_load_all_internal(char *base, char *path);
static bool image_load(char *base, char *path, int withUplink);
@@ -116,7 +118,7 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co
while ( pos < end ) {
const int map_y = pos >> 15;
const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = 0b00000001 << map_x;
+ const uint8_t bit_mask = 1 << map_x;
if ( set ) {
if ( (image->cache_map[map_y] & bit_mask) == 0 ) dirty = true;
image->cache_map[map_y] |= bit_mask;
@@ -322,26 +324,21 @@ dnbd3_image_t* image_release(dnbd3_image_t *image)
return NULL;
}
spin_unlock( &image->lock );
+ // Getting here means we decreased the usage counter to zero
+ // If the image is not in the images list anymore, we're
+ // responsible for freeing it
spin_lock( &_images_lock );
- spin_lock( &image->lock );
- // Check active users again as we unlocked
- if ( image->users == 0 ) {
- // Not in use anymore, see if it's in the images array
- for (int i = 0; i < _num_images; ++i) {
- if ( _images[i] == image ) { // Found, do nothing
- spin_unlock( &image->lock );
- spin_unlock( &_images_lock );
- return NULL;
- }
+ for (int i = 0; i < _num_images; ++i) {
+ if ( _images[i] == image ) { // Found, do nothing
+ spin_unlock( &_images_lock );
+ return NULL;
}
- spin_unlock( &image->lock );
- spin_unlock( &_images_lock );
- // Not found, free
- image_free( image );
- return NULL;
}
- spin_unlock( &image->lock );
spin_unlock( &_images_lock );
+ // So it wasn't in the images list anymore either, get rid of it,
+ // but check usage count once again, since it might have been increased
+ // after we unlocked above
+ if ( image->users == 0 ) image_free( image );
return NULL;
}
@@ -376,9 +373,7 @@ void image_killUplinks()
spin_lock( &_images[i]->lock );
if ( _images[i]->uplink != NULL ) {
_images[i]->uplink->shutdown = true;
- if ( _images[i]->uplink->signal != -1 ) {
- write( _images[i]->uplink->signal, "", 1 );
- }
+ signal_call( _images[i]->uplink->signal );
}
spin_unlock( &_images[i]->lock );
}
@@ -386,10 +381,40 @@ void image_killUplinks()
}
/**
+ * Load all images in given path recursively.
+ * Pass NULL to use path from config.
+ */
+bool image_loadAll(char *path)
+{
+ if ( path == NULL ) {
+ return image_load_all_internal( _basePath, _basePath );
+ }
+ return image_load_all_internal( path, path );
+}
+
+/**
+ * Free all images we have, but only if they're not in use anymore.
+ * Locks on _images_lock
+ * @return true if all images have been freed
+ */
+bool image_tryFreeAll()
+{
+ spin_lock( &_images_lock );
+ for (int i = _num_images - 1; i >= 0; --i) {
+ if ( _images[i] != NULL && _images[i]->users == 0 ) {
+ _images[i] = image_free( _images[i] );
+ }
+ if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--;
+ }
+ spin_unlock( &_images_lock );
+ return _num_images == 0;
+}
+
+/**
* Free image. DOES NOT check if it's in use.
* Indirectly locks on image.lock, uplink.queueLock
*/
-dnbd3_image_t* image_free(dnbd3_image_t *image)
+static dnbd3_image_t* image_free(dnbd3_image_t *image)
{
assert( image != NULL );
//
@@ -399,7 +424,9 @@ dnbd3_image_t* image_free(dnbd3_image_t *image)
free( image->crc32 );
free( image->path );
free( image->lower_name );
- if ( image->cacheFd != -1 ) close( image->cacheFd );
+ if ( image->cacheFd != -1 ) {
+ close( image->cacheFd );
+ }
spin_destroy( &image->lock );
//
memset( image, 0, sizeof(dnbd3_image_t) );
@@ -407,18 +434,6 @@ dnbd3_image_t* image_free(dnbd3_image_t *image)
return NULL ;
}
-/**
- * Load all images in given path recursively.
- * Pass NULL to use path from config.
- */
-bool image_loadAll(char *path)
-{
- if ( path == NULL ) {
- return image_load_all_internal( _basePath, _basePath );
- }
- return image_load_all_internal( path, path );
-}
-
static bool image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize)
{
if ( cacheMap == NULL ) return true;
diff --git a/src/server/image.h b/src/server/image.h
index 6d283ff..664225c 100644
--- a/src/server/image.h
+++ b/src/server/image.h
@@ -30,10 +30,10 @@ bool image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, cons
void image_killUplinks();
-dnbd3_image_t* image_free(dnbd3_image_t *image);
-
bool image_loadAll(char *path);
+bool image_tryFreeAll();
+
bool image_create(char *image, int revision, uint64_t size);
bool image_generateCrcFile(char *image);
diff --git a/src/server/net.c b/src/server/net.c
index 9525456..5a9a640 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -239,7 +239,7 @@ void *net_client_handler(void *dnbd3_client)
const uint64_t lastByte = (end - 1) >> 15;
do {
const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = 0b00000001 << map_x;
+ const uint8_t bit_mask = 1 << map_x;
if ( (image->cache_map[firstByte] & bit_mask) == 0 ) {
isCached = false;
break;
@@ -263,7 +263,7 @@ void *net_client_handler(void *dnbd3_client)
while ( pos < end ) {
assert( lastByte == (pos >> 15) );
const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = 0b00000001 << map_x;
+ const uint8_t bit_mask = 1 << map_x;
if ( (image->cache_map[lastByte] & bit_mask) == 0 ) {
isCached = false;
break;
@@ -354,8 +354,8 @@ void *net_client_handler(void *dnbd3_client)
}
exit_client_cleanup: ;
if ( image_file != -1 ) close( image_file );
- dnbd3_remove_client( client );
- client = dnbd3_free_client( client );
+ dnbd3_removeClient( client );
+ client = dnbd3_freeClient( client );
return NULL ;
}
diff --git a/src/server/server.c b/src/server/server.c
index d3c03f4..d03a9fd 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -46,8 +46,7 @@
#include "helper.h"
#include "threadpool.h"
-#define MAX_SERVER_SOCKETS 50 // Assume there will be no more than 50 sockets the server will listen on
-static int sockets[MAX_SERVER_SOCKETS], socket_count = 0;
+poll_list_t *listeners = NULL;
#ifdef _DEBUG
int _fake_delay = 0;
#endif
@@ -59,17 +58,17 @@ pthread_spinlock_t _clients_lock;
/**
* Time the server was started
*/
-static time_t _startupTime = 0;
+static time_t startupTime = 0;
static bool sigReload = false, sigPrintStats = false;
-static bool dnbd3_add_client(dnbd3_client_t *client);
-static void dnbd3_handle_signal(int signum);
+static bool dnbd3_addClient(dnbd3_client_t *client);
+static void dnbd3_handleSignal(int signum);
static void dnbd3_printClients();
/**
* Print help text for usage instructions
*/
-void dnbd3_print_help(char *argv_0)
+void dnbd3_printHelp(char *argv_0)
{
printf( "Version: %s\n\n", VERSION_STRING );
printf( "Usage: %s [OPTIONS]...\n", argv_0 );
@@ -96,7 +95,7 @@ void dnbd3_print_help(char *argv_0)
/**
* Print version information
*/
-void dnbd3_print_version()
+void dnbd3_printVersion()
{
printf( "Version: %s\n", VERSION_STRING );
exit( 0 );
@@ -107,18 +106,14 @@ void dnbd3_print_version()
*/
void dnbd3_cleanup()
{
- int i, count;
+ int i, count, retries;
_shutdown = true;
debug_locks_stop_watchdog();
memlogf( "INFO: Cleanup...\n" );
- for (int i = 0; i < socket_count; ++i) {
- if ( sockets[i] == -1 ) continue;
- close( sockets[i] );
- sockets[i] = -1;
- }
- socket_count = 0;
+ if ( listeners != NULL ) sock_destroyPollList( listeners );
+ listeners = NULL;
// Kill connection to all clients
spin_lock( &_clients_lock );
@@ -131,6 +126,9 @@ void dnbd3_cleanup()
}
spin_unlock( &_clients_lock );
+ // Disable threadpool
+ threadpool_close();
+
// Terminate the altserver checking thread
altservers_shutdown();
@@ -141,7 +139,7 @@ void dnbd3_cleanup()
integrity_shutdown();
// Wait for clients to disconnect
- int retries = 10;
+ retries = 10;
do {
count = 0;
spin_lock( &_clients_lock );
@@ -158,13 +156,11 @@ void dnbd3_cleanup()
_num_clients = 0;
// Clean up images
- spin_lock( &_images_lock );
- for (i = 0; i < _num_images; ++i) {
- if ( _images[i] == NULL ) continue;
- _images[i] = image_free( _images[i] );
+ retries = 5;
+ while ( !image_tryFreeAll() && --retries > 0 ) {
+ printf( "Waiting for images to free...\n" );
+ sleep( 1 );
}
- _num_images = 0;
- spin_unlock( &_images_lock );
exit( EXIT_SUCCESS );
}
@@ -232,10 +228,10 @@ int main(int argc, char *argv[])
return EXIT_SUCCESS;
case 'h':
case '?':
- dnbd3_print_help( argv[0] );
+ dnbd3_printHelp( argv[0] );
break;
case 'v':
- dnbd3_print_version();
+ dnbd3_printVersion();
break;
case 'b':
bindAddress = strdup( optarg );
@@ -294,11 +290,11 @@ int main(int argc, char *argv[])
#endif
// setup signal handler
- signal( SIGTERM, dnbd3_handle_signal );
- signal( SIGINT, dnbd3_handle_signal );
- signal( SIGUSR1, dnbd3_handle_signal );
- signal( SIGHUP, dnbd3_handle_signal );
- signal( SIGUSR2, dnbd3_handle_signal );
+ signal( SIGTERM, dnbd3_handleSignal );
+ signal( SIGINT, dnbd3_handleSignal );
+ signal( SIGUSR1, dnbd3_handleSignal );
+ signal( SIGHUP, dnbd3_handleSignal );
+ signal( SIGUSR2, dnbd3_handleSignal );
printf( "Loading images....\n" );
// Load all images in base path
@@ -307,19 +303,21 @@ int main(int argc, char *argv[])
return EXIT_FAILURE;
}
- _startupTime = time( NULL );
+ startupTime = time( NULL );
// Give other threads some time to start up before accepting connections
- sleep( 2 );
+ sleep( 1 );
// setup network
- sockets[socket_count] = sock_listen_any( PF_INET, PORT, bindAddress );
- if ( sockets[socket_count] != -1 ) ++socket_count;
-#ifdef WITH_IPV6
- sockets[socket_count] = sock_listen_any(PF_INET6, PORT, NULL);
- if (sockets[socket_count] != -1) ++socket_count;
-#endif
- if ( socket_count == 0 ) exit( EXIT_FAILURE );
+ listeners = sock_newPollList();
+ if ( listeners == NULL ) {
+ printf( "Didnt get a poll list!\n" );
+ exit( EXIT_FAILURE );
+ }
+ if ( !sock_listen( listeners, bindAddress, PORT ) ) {
+ printf( "Could not listen on any local interface.\n" );
+ exit( EXIT_FAILURE );
+ }
struct sockaddr_storage client;
socklen_t len;
int fd;
@@ -328,7 +326,7 @@ int main(int argc, char *argv[])
//pthread_t thread_rpc;
//thread_create(&(thread_rpc), NULL, &dnbd3_rpc_mainloop, NULL);
// Initialize thread pool
- if ( !threadpool_init( 10 ) ) {
+ if ( !threadpool_init( 8 ) ) {
printf( "Could not init thread pool!\n" );
exit( EXIT_FAILURE );
}
@@ -353,7 +351,7 @@ int main(int argc, char *argv[])
}
//
len = sizeof(client);
- fd = accept_any( sockets, socket_count, &client, &len );
+ fd = sock_accept( listeners, &client, &len );
if ( fd < 0 ) {
const int err = errno;
if ( err == EINTR || err == EAGAIN ) continue;
@@ -363,24 +361,25 @@ int main(int argc, char *argv[])
}
//memlogf("INFO: Client connected\n");
- sock_set_timeout( fd, _clientTimeout );
+ sock_setTimeout( fd, _clientTimeout );
- dnbd3_client_t *dnbd3_client = dnbd3_init_client( &client, fd );
+ dnbd3_client_t *dnbd3_client = dnbd3_initClient( &client, fd );
if ( dnbd3_client == NULL ) {
close( fd );
continue;
}
- // This has to be done before creating the thread, otherwise a race condition might occur when the new thread dies faster than this thread adds the client to the list after creating the thread
- if ( !dnbd3_add_client( dnbd3_client ) ) {
- dnbd3_client = dnbd3_free_client( dnbd3_client );
+ // This has to be done before creating the thread, otherwise a race condition might occur when the new thread
+ // dies faster than this thread adds the client to the list after creating the thread
+ if ( !dnbd3_addClient( dnbd3_client ) ) {
+ dnbd3_client = dnbd3_freeClient( dnbd3_client );
continue;
}
if ( !threadpool_run( net_client_handler, (void *)dnbd3_client ) ) {
memlogf( "[ERROR] Could not start thread for new client." );
- dnbd3_remove_client( dnbd3_client );
- dnbd3_client = dnbd3_free_client( dnbd3_client );
+ dnbd3_removeClient( dnbd3_client );
+ dnbd3_client = dnbd3_freeClient( dnbd3_client );
continue;
}
}
@@ -391,7 +390,7 @@ int main(int argc, char *argv[])
* Initialize and populate the client struct - called when an incoming
* connection is accepted
*/
-dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd)
+dnbd3_client_t* dnbd3_initClient(struct sockaddr_storage *client, int fd)
{
dnbd3_client_t *dnbd3_client = calloc( 1, sizeof(dnbd3_client_t) );
if ( dnbd3_client == NULL ) { // This will never happen thanks to memory overcommit
@@ -424,7 +423,7 @@ dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd)
* Remove a client from the clients array
* Locks on: _clients_lock
*/
-void dnbd3_remove_client(dnbd3_client_t *client)
+void dnbd3_removeClient(dnbd3_client_t *client)
{
int i;
spin_lock( &_clients_lock );
@@ -443,7 +442,7 @@ void dnbd3_remove_client(dnbd3_client_t *client)
* Locks on: _clients[].lock, _images[].lock
* might call functions that lock on _images, _image[], uplink.queueLock, client.sendMutex
*/
-dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client)
+dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client)
{
spin_lock( &client->lock );
pthread_mutex_lock( &client->sendMutex );
@@ -470,7 +469,7 @@ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client)
* Add client to the clients array.
* Locks on: _clients_lock
*/
-static bool dnbd3_add_client(dnbd3_client_t *client)
+static bool dnbd3_addClient(dnbd3_client_t *client)
{
int i;
spin_lock( &_clients_lock );
@@ -490,7 +489,7 @@ static bool dnbd3_add_client(dnbd3_client_t *client)
return true;
}
-static void dnbd3_handle_signal(int signum)
+static void dnbd3_handleSignal(int signum)
{
if ( signum == SIGINT || signum == SIGTERM ) {
_shutdown = true;
@@ -503,7 +502,7 @@ static void dnbd3_handle_signal(int signum)
int dnbd3_serverUptime()
{
- return (int)(time( NULL ) - _startupTime);
+ return (int)(time( NULL ) - startupTime);
}
static void dnbd3_printClients()
diff --git a/src/server/server.h b/src/server/server.h
index f45ae98..2b10b75 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -39,9 +39,9 @@ extern int _fake_delay;
#endif
void dnbd3_cleanup();
-void dnbd3_remove_client(dnbd3_client_t *client);
-dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client);
-dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd);
+void dnbd3_removeClient(dnbd3_client_t *client);
+dnbd3_client_t* dnbd3_freeClient(dnbd3_client_t *client);
+dnbd3_client_t* dnbd3_initClient(struct sockaddr_storage *client, int fd);
int dnbd3_serverUptime();
#if !defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64
diff --git a/src/server/signal.c b/src/server/signal.c
index f0988b9..3b7d50b 100644
--- a/src/server/signal.c
+++ b/src/server/signal.c
@@ -12,6 +12,7 @@ int signal_new()
int signal_call(int signalFd)
{
+ if ( signalFd < 0 ) return 0;
static uint64_t one = 1;
return write( signalFd, &one, sizeof one ) == sizeof one;
}
diff --git a/src/server/sockhelper.c b/src/server/sockhelper.c
index 2fc0620..4cd0b56 100644
--- a/src/server/sockhelper.c
+++ b/src/server/sockhelper.c
@@ -1,78 +1,71 @@
#include "sockhelper.h"
#include "memlog.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h> // inet_ntop
+#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
+#include <poll.h>
+#include <stdlib.h>
-static inline int connect_shared(const int client_sock, void* addr, const int addrlen, int connect_ms, int rw_ms)
-{
- struct timeval tv;
- // Connect to server
- tv.tv_sec = connect_ms / 1000;
- tv.tv_usec = connect_ms * 1000;
- setsockopt( client_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv) );
- setsockopt( client_sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) );
- if ( connect( client_sock, (struct sockaddr *)addr, addrlen ) == -1 ) {
- close( client_sock );
- return -1;
- }
- // Apply read/write timeout
- tv.tv_sec = rw_ms / 1000;
- tv.tv_usec = rw_ms * 1000;
- setsockopt( client_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv) );
- setsockopt( client_sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) );
- return client_sock;
-}
+#define MAXLISTEN 20
-int sock_connect4(struct sockaddr_in *addr, const int connect_ms, const int rw_ms)
-{
- int client_sock = socket( PF_INET, SOCK_STREAM, IPPROTO_TCP );
- if ( client_sock == -1 ) return -1;
- return connect_shared( client_sock, addr, sizeof(struct sockaddr_in), connect_ms, rw_ms );
-}
-
-int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_ms)
-{
-#ifdef WITH_IPV6
- int client_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
- if (client_sock == -1) return -1;
- return connect_shared(client_sock, addr, sizeof(struct sockaddr_in6), connect_ms, rw_ms);
-#else
- printf( "[DEBUG] Not compiled with IPv6 support.\n" );
- return -1;
-#endif
-}
+struct _poll_list {
+ int count;
+ struct pollfd entry[MAXLISTEN];
+};
int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms)
{
+ // TODO: Move out of here, this unit should contain general socket functions
+ // TODO: Rework the dnbd3_host_t to not use AF_* as these could theoretically change
+ // TODO: Abstract away from sockaddr_in* like the rest of the functions here do,
+ // so WITH_IPV6 can finally be removed as everything is transparent.
+ struct sockaddr_storage ss;
+ int proto, addrlen;
+ memset( &ss, 0, sizeof ss );
if ( addr->type == AF_INET ) {
// Set host (IPv4)
- struct sockaddr_in addr4;
- memset( &addr4, 0, sizeof(addr4) );
- addr4.sin_family = AF_INET;
- memcpy( &addr4.sin_addr, addr->addr, 4 );
- addr4.sin_port = addr->port;
- return sock_connect4( &addr4, connect_ms, rw_ms );
+ struct sockaddr_in *addr4 = (struct sockaddr_in*)&ss;
+ addr4->sin_family = AF_INET;
+ memcpy( &addr4->sin_addr, addr->addr, 4 );
+ addr4->sin_port = addr->port;
+ proto = PF_INET;
+ addrlen = sizeof *addr4;
}
#ifdef WITH_IPV6
- else if (addr->type == AF_INET6)
- {
+ else if ( addr->type == AF_INET6 ) {
// Set host (IPv6)
- struct sockaddr_in6 addr6;
- memset(&addr6, 0, sizeof(addr6));
- addr6.sin6_family = AF_INET6;
- memcpy(&addr6.sin6_addr, addr->addr, 16);
- addr6.sin6_port = addr->port;
- return sock_connect6(&addr6, connect_ms, rw_ms);
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)&ss;
+ addr6->sin6_family = AF_INET6;
+ memcpy( &addr6->sin6_addr, addr->addr, 16 );
+ addr6->sin6_port = addr->port;
+ proto = PF_INET6;
+ addrlen = sizeof *addr6;
}
#endif
- printf( "[DEBUG] Unsupported address type: %d\n", (int)addr->type );
- return -1;
+ else {
+ printf( "[DEBUG] Unsupported address type: %d\n", (int)addr->type );
+ return -1;
+ }
+ int client_sock = socket( proto, SOCK_STREAM, IPPROTO_TCP );
+ if ( client_sock == -1 ) return -1;
+ // Apply connect timeout
+ sock_setTimeout( client_sock, connect_ms );
+ if ( connect( client_sock, (struct sockaddr *)&ss, addrlen ) == -1 ) {
+ close( client_sock );
+ return -1;
+ }
+ // Apply read/write timeout
+ sock_setTimeout( client_sock, rw_ms );
+ return client_sock;
}
-void sock_set_timeout(const int sockfd, const int milliseconds)
+void sock_setTimeout(const int sockfd, const int milliseconds)
{
struct timeval tv;
tv.tv_sec = milliseconds / 1000;
@@ -81,94 +74,97 @@ void sock_set_timeout(const int sockfd, const int milliseconds)
setsockopt( sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv) );
}
-int sock_listen_any(int protocol_family, uint16_t port, char* bind_addr)
+poll_list_t* sock_newPollList()
{
- struct sockaddr_storage addr;
- struct in_addr local;
- if (bind_addr != NULL) {
- if (!inet_aton(bind_addr, &local)) return -1;
- }
- memset( &addr, 0, sizeof(addr) );
- if ( protocol_family == PF_INET ) {
- struct sockaddr_in *v4 = (struct sockaddr_in *)&addr;
- if (bind_addr == NULL) {
- v4->sin_addr.s_addr = INADDR_ANY;
- } else {
- v4->sin_addr = local;
- }
- v4->sin_port = htons( port );
- v4->sin_family = AF_INET;
- }
-#ifdef WITH_IPV6
- else if (protocol_family == PF_INET6)
- {
- struct sockaddr_in6 *v6 = (struct sockaddr_in6 *)&addr;
- v6->sin6_addr = in6addr_any;
- v6->sin6_port = htons(port);
- v6->sin6_family = AF_INET6;
- }
-#endif
- else {
- printf( "[DEBUG] sock_listen: Unsupported protocol: %d\n", protocol_family );
- return -1;
- }
- return sock_listen( &addr, sizeof(addr) );
+ poll_list_t *list = (poll_list_t*)malloc( sizeof( poll_list_t ) );
+ list->count = 0;
+ return list;
}
-int sock_listen(struct sockaddr_storage *addr, int addrlen)
+void sock_destroyPollList(poll_list_t *list)
{
- int pf; // On Linux AF_* == PF_*, but this is not guaranteed on all platforms, so let's be safe here:
- if ( addr->ss_family == AF_INET ) pf = PF_INET;
-#ifdef WITH_IPV6
- else if (addr->ss_family == AF_INET6)
- pf = PF_INET6;
-#endif
- else {
- printf( "[DEBUG] sock_listen: unsupported address type: %d\n", (int)addr->ss_family );
- return -1;
- }
- int sock;
-
- // Create socket
- sock = socket( pf, SOCK_STREAM, IPPROTO_TCP );
- if ( sock < 0 ) {
- memlogf( "[ERROR] sock_listen: Socket setup failure" ); // TODO: print port number to help troubleshooting
- return -1;
+ for ( int i = 0; i < list->count; ++i ) {
+ if ( list->entry[i].fd >= 0 ) close( list->entry[i].fd );
}
- const int on = 1;
- setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
- if ( pf == PF_INET6 ) setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on) );
+ free( list );
+}
- // Bind to socket
- if ( bind( sock, (struct sockaddr *)addr, addrlen ) < 0 ) {
- int e = errno;
- close( sock );
- memlogf( "[ERROR] Bind failure (%d)", e ); // TODO: print port number to help troubleshooting
- return -1;
- }
+bool sock_printable(struct sockaddr *addr, socklen_t addrLen, char *output, int len)
+{
+ char host[100], port[10];
+ int ret = getnameinfo( addr, addrLen, host, 100, port, 10, NI_NUMERICHOST | NI_NUMERICSERV );
+ if ( ret == 0 ) snprintf( output, len, "[%s]:%s", host, port );
+ return ret == 0;
+}
- // Listen on socket
- if ( listen( sock, 20 ) == -1 ) {
- close( sock );
- memlogf( "[ERROR] Listen failure" ); // TODO ...
- return -1;
+bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port)
+{
+ if ( list->count >= MAXLISTEN ) return false;
+ struct addrinfo hints, *res, *ptr;
+ char portStr[6];
+ const int on = 1;
+ int openCount = 0;
+ // Set hints for local addresses.
+ memset( &hints, 0, sizeof(hints) );
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ snprintf( portStr, sizeof portStr, "%d", (int)port );
+ if ( getaddrinfo( bind_addr, portStr, &hints, &res ) != 0 || res == NULL ) return false;
+ // Attempt to bind to all of the addresses as long as there's room in the poll list
+ for( ptr = res; ptr != NULL; ptr = ptr->ai_next ) {
+ char bla[100];
+ if ( !sock_printable( (struct sockaddr*)ptr->ai_addr, ptr->ai_addrlen, bla, 100 ) ) snprintf( bla, 100, "[invalid]" );
+ printf( "Trying to bind to %s ", bla );
+ int sock = socket( ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol );
+ if ( sock < 0 ) {
+ printf( "...cannot socket(), errno=%d\n", errno );
+ continue;
+ }
+ setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
+ if ( ptr->ai_family == PF_INET6 ) setsockopt( sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on) );
+ if ( bind( sock, ptr->ai_addr, ptr->ai_addrlen ) == -1 ) {
+ printf( "...cannot bind(), errno=%d\n", errno );
+ close( sock );
+ continue;
+ }
+ if ( listen( sock, 20 ) == -1 ) {
+ printf( "...cannot listen(), errno=%d\n", errno );
+ close( sock );
+ continue;
+ }
+ printf( "...OK!\n" );
+ list->entry[list->count].fd = sock;
+ list->entry[list->count].events = POLLIN | POLLRDHUP;
+ list->count++;
+ openCount++;
+ if ( list->count >= MAXLISTEN ) break;
}
+ freeaddrinfo( res );
+ return openCount > 0;
+}
- return sock;
+int sock_listenAny(poll_list_t* list, uint16_t port)
+{
+ return sock_listen( list, NULL, port );
}
-int accept_any(const int * const sockets, const int socket_count, struct sockaddr_storage *addr, socklen_t *length_ptr)
+int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr)
{
- fd_set set;
- FD_ZERO( &set );
- int max = 0;
- for (int i = 0; i < socket_count; ++i) {
- FD_SET( sockets[i], &set );
- if ( sockets[i] > max ) max = sockets[i];
+ int ret = poll( list->entry, list->count, -1 );
+ if ( ret < 0 ) {
+ printf( "poll errno=%d\n", errno );
+ return -1;
}
- if ( select( max + 1, &set, NULL, NULL, NULL ) <= 0 ) return -1;
- for (int i = 0; i < socket_count; ++i) {
- if ( FD_ISSET(sockets[i], &set)) return accept( sockets[i], (struct sockaddr *)addr, length_ptr );
+ for ( int i = list->count - 1; i >= 0; --i ) {
+ if ( list->entry[i].revents == 0 ) continue;
+ if ( list->entry[i].revents == POLLIN ) return accept( list->entry[i].fd, (struct sockaddr *)addr, length_ptr );
+ if ( list->entry[i].revents & ( POLLNVAL | POLLHUP | POLLERR | POLLRDHUP ) ) {
+ printf( "poll fd revents=%d for index=%d and fd=%d\n", (int)list->entry[i].revents, i, list->entry[i].fd );
+ if ( ( list->entry[i].revents & POLLNVAL ) == 0 ) close( list->entry[i].fd );
+ if ( i != list->count ) list->entry[i] = list->entry[list->count];
+ list->count--;
+ }
}
return -1;
}
@@ -187,17 +183,11 @@ void sock_set_block(int sock)
fcntl( sock, F_SETFL, flags & ~(int)O_NONBLOCK );
}
-bool sock_add_array(const int sock, int *array, int *array_fill, const int array_length)
+bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite)
{
- if ( sock == -1 ) return true;
- for (int i = 0; i < *array_fill; ++i) {
- if ( array[i] == -1 ) {
- array[i] = sock;
- return true;
- }
- }
- if ( *array_fill >= array_length ) return false;
- array[*array_fill] = sock;
- (*array_fill) += 1;
+ if ( sock == -1 || list->count >= MAXLISTEN ) return false;
+ list->entry[list->count++].fd = sock;
+ list->entry[list->count++].events = ( wantRead ? POLLIN : 0 ) | ( wantWrite ? POLLOUT : 0 ) | POLLRDHUP;
+ list->count++;
return true;
}
diff --git a/src/server/sockhelper.h b/src/server/sockhelper.h
index 394e5e4..b6b0b11 100644
--- a/src/server/sockhelper.h
+++ b/src/server/sockhelper.h
@@ -1,16 +1,17 @@
#ifndef SOCKHELPER_H_
#define SOCKHELPER_H_
+/*
+ * Helper functions for dealing with sockets. These functions should
+ * abstract from the IP version by using getaddrinfo() and thelike.
+ */
+
#include <stdint.h>
#include "../types.h"
#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <netinet/in.h>
#include <string.h>
-int sock_connect4(struct sockaddr_in *addr, const int connect_ms, const int rw_ms);
-
-int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_ms);
+typedef struct _poll_list poll_list_t;
/**
* Connect to given dnbd3_host_t.
@@ -21,7 +22,17 @@ int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_
*/
int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms);
-void sock_set_timeout(const int sockfd, const int milliseconds);
+void sock_setTimeout(const int sockfd, const int milliseconds);
+
+/**
+ * Create new poll list.
+ */
+poll_list_t* sock_newPollList();
+
+/**
+ * Delete a poll list, closing all sockets first if necessary.
+ */
+void sock_destroyPollList(poll_list_t *list);
/**
* Listen on all interfaces/available IP addresses, using the given protocol.
@@ -30,14 +41,14 @@ void sock_set_timeout(const int sockfd, const int milliseconds);
* @param port port to listen on
* @return the socket descriptor if successful, -1 otherwise.
*/
-int sock_listen_any(int protocol_family, uint16_t port, char* bind_addr);
+int sock_listenAny(poll_list_t* list, uint16_t port);
/**
* Listen on a specific address and port.
* @param addr pointer to a properly filled sockaddr_in or sockaddr_in6
* @param addrlen length of the passed struct
*/
-int sock_listen(struct sockaddr_storage *addr, int addrlen);
+bool sock_listen(poll_list_t* list, char* bind_addr, uint16_t port);
/**
* This is a multi-socket version of accept. Pass in an array of listening sockets.
@@ -47,38 +58,22 @@ int sock_listen(struct sockaddr_storage *addr, int addrlen);
* @param socket_count number of sockets in that array
* @return fd of new client socket, -1 on error
*/
-int accept_any(const int * const sockets, const int socket_count, struct sockaddr_storage *addr, socklen_t *length_ptr);
+int sock_accept(poll_list_t *list, struct sockaddr_storage *addr, socklen_t *length_ptr);
void sock_set_nonblock(int sock);
void sock_set_block(int sock);
/**
- * Take IPv4 as string and a port and fill sockaddr_in struct.
- * This should be refactored to work for IPv4 and IPv6 and use sockaddr_storage.
- */
-inline void sock_set_addr4(char *ip, uint16_t port, struct sockaddr_in *addr)
-{
- memset( addr, 0, sizeof(*addr) );
- addr->sin_family = AF_INET; // IPv4
- addr->sin_addr.s_addr = inet_addr( ip );
- addr->sin_port = htons( port ); // set port number
-}
-
-/**
* Add given socket to array. Take an existing empty slot ( == -1) if available,
* append to end otherwise. Updates socket count variable passed by reference.
- * The passed socket fd is only added if it is != -1 for convenience, so you can
- * directly pass the return value of sock_listen or sock_create, without checking the
- * return value first.
+ *
+ * @param poll_list_t list the poll list to add the socket to
* @param sock socket fd to add
- * @param array the array of socket fds to add the socket to
- * @param array_fill pointer to int telling how many sockets there are in the array. Empty slots
- * in between are counted too. In other words: represents the index of the last valid socket fd in the
- * array plus one, or 0 if there are none.
- * @param array_length the capacity of the array
- * @return true on success or if the passed fd was -1, false iff the array is already full
+ * @param wantRead whether to set the EPOLLIN flag
+ * @param wantWrite whether to set the EPOLLOUT flag
+ * @return true on success, false iff the array is already full or socket is < 0
*/
-bool sock_add_array(const int sock, int *array, int *array_fill, const int array_length);
+bool sock_append(poll_list_t *list, const int sock, bool wantRead, bool wantWrite);
#endif /* SOCKHELPER_H_ */
diff --git a/src/server/threadpool.c b/src/server/threadpool.c
index 04a8e86..34e996c 100644
--- a/src/server/threadpool.c
+++ b/src/server/threadpool.c
@@ -25,13 +25,29 @@ static pthread_spinlock_t poolLock;
bool threadpool_init(int maxIdle)
{
if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false;
- maxIdleThreads = maxIdle;
spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE );
+ maxIdleThreads = maxIdle;
pthread_attr_init( &threadAttrs );
pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
return true;
}
+void threadpool_close()
+{
+ _shutdown = true;
+ if ( maxIdleThreads < 0 ) return;
+ spin_lock( &poolLock );
+ maxIdleThreads = -1;
+ entry_t *ptr = pool;
+ while ( ptr != NULL ) {
+ entry_t *current = ptr;
+ ptr = ptr->next;
+ signal_call( current->signalFd );
+ }
+ spin_unlock( &poolLock );
+ spin_destroy( &poolLock );
+}
+
bool threadpool_run(void *(*startRoutine)(void *), void *arg)
{
spin_lock( &poolLock );
@@ -40,9 +56,14 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg)
spin_unlock( &poolLock );
if ( entry == NULL ) {
entry = (entry_t*)malloc( sizeof(entry_t) );
+ if ( entry == NULL ) {
+ printf( "[WARNING] Could not alloc entry_t for new thread\n" );
+ return false;
+ }
entry->signalFd = signal_new();
if ( entry->signalFd < 0 ) {
printf( "[WARNING] Could not create signalFd for new thread pool thread\n" );
+ free( entry );
return false;
}
if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) {
@@ -67,9 +88,10 @@ static void *threadpool_worker(void *entryPtr)
{
blockNoncriticalSignals();
entry_t *entry = (entry_t*)entryPtr;
- while ( !_shutdown ) {
+ for ( ;; ) {
// Wait for signal from outside that we have work to do
int ret = signal_wait( entry->signalFd, -1 );
+ if ( _shutdown ) break;
if ( ret > 0 ) {
if ( entry->startRoutine == NULL ) {
printf( "[DEBUG] Worker woke up but has no work to do!\n" );
@@ -80,6 +102,7 @@ static void *threadpool_worker(void *entryPtr)
// Reset vars for safety
entry->startRoutine = NULL;
entry->arg = NULL;
+ if ( _shutdown ) break;
// Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise
int threadCount = 0;
spin_lock( &poolLock );
@@ -90,10 +113,7 @@ static void *threadpool_worker(void *entryPtr)
}
if ( threadCount >= maxIdleThreads ) {
spin_unlock( &poolLock );
- signal_close( entry->signalFd );
- free( entry );
- printf(" [DEBUG] Thread killed!\n" );
- return NULL;
+ break;
}
entry->next = pool;
pool = entry;
@@ -103,6 +123,9 @@ static void *threadpool_worker(void *entryPtr)
printf( "[DEBUG] Unexpected return value %d for signal_wait in threadpool worker!\n", ret );
}
}
+ signal_close( entry->signalFd );
+ free( entry );
+ printf(" [DEBUG] Thread killed!\n" );
return NULL;
}
diff --git a/src/server/threadpool.h b/src/server/threadpool.h
index b3b0fe6..15dd151 100644
--- a/src/server/threadpool.h
+++ b/src/server/threadpool.h
@@ -12,6 +12,12 @@
bool threadpool_init(int maxIdleThreadCount);
/**
+ * Shut down threadpool.
+ * Only call if it has been initialized before.
+ */
+void threadpool_close();
+
+/**
* Run a thread using the thread pool.
* @param startRoutine function to run in new thread
* @param arg argument to pass to thead
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 5819ed1..82c1f5a 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -43,8 +43,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
assert( image != NULL );
spin_lock( &image->lock );
if ( image->uplink != NULL ) {
- spin_unlock( &image->lock );
- return true;
+ goto failure;
}
if ( image->cache_map == NULL ) {
memlogf( "[WARNING] Uplink was requested for image %s, but it is already complete", image->lower_name );
@@ -73,7 +72,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
}
spin_unlock( &image->lock );
return true;
- failure: ;
+failure: ;
if ( link != NULL ) free( link );
link = image->uplink = NULL;
spin_unlock( &image->lock );
@@ -103,7 +102,7 @@ void uplink_shutdown(dnbd3_image_t *image)
}
image->uplink = NULL;
uplink->shutdown = true;
- if ( uplink->signal != -1 ) signal_call( uplink->signal );
+ signal_call( uplink->signal );
pthread_t thread = uplink->thread;
spin_unlock( &uplink->queueLock );
spin_unlock( &image->lock );
@@ -251,15 +250,15 @@ static void* uplink_mainloop(void *data)
link->betterFd = -1;
link->currentServer = link->betterServer;
link->replicationHandle = 0;
- // Re-send all pending requests
- uplink_sendRequests( link, false );
- uplink_sendReplicationRequest( link );
link->image->working = true;
buffer[0] = '@';
if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
printf( "[DEBUG] Now connected to %s\n", buffer + 1 );
setThreadName( buffer );
}
+ // Re-send all pending requests
+ uplink_sendRequests( link, false );
+ uplink_sendReplicationRequest( link );
memset( &ev, 0, sizeof(ev) );
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI;
ev.data.fd = link->fd;
@@ -469,7 +468,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
spin_unlock( &image->lock );
// Unlocked - do not break or continue here...
// Needs to be 8 (bit->byte, bitmap)
- const uint64_t offset = link->replicationHandle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)requestBlockSize;
+ const uint64_t offset = link->replicationHandle = (uint64_t)i * (uint64_t)requestBlockSize;
const uint32_t size = MIN( image->filesize - offset, requestBlockSize );
if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle ) ) {
printf( "[DEBUG] Error sending background replication request to uplink server!\n" );