diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/server/altservers.c | 31 | ||||
-rw-r--r-- | src/server/globals.c | 6 | ||||
-rw-r--r-- | src/server/globals.h | 11 | ||||
-rw-r--r-- | src/server/image.c | 4 | ||||
-rw-r--r-- | src/server/locks.c | 59 | ||||
-rw-r--r-- | src/server/locks.h | 3 | ||||
-rw-r--r-- | src/server/net.c | 11 | ||||
-rw-r--r-- | src/server/server.c | 6 | ||||
-rw-r--r-- | src/server/uplink.c | 47 |
9 files changed, 157 insertions, 21 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c index 49da7d4..641e36b 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -23,6 +23,7 @@ static int signalPipe = -1; static dnbd3_alt_server_t _alt_servers[SERVER_MAX_ALTS]; static int _num_alts = 0; static pthread_spinlock_t _alts_lock; +static int initDone = FALSE; static pthread_t altThread; @@ -32,10 +33,19 @@ static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const void altserver_init() { spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE ); + memset( _alt_servers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); if ( 0 != pthread_create( &altThread, NULL, &altserver_main, (void *)NULL ) ) { memlogf( "[ERROR] Could not start altservers connector thread" ); exit( EXIT_FAILURE ); } + initDone = TRUE; +} + +void altservers_shutdown() +{ + if ( !initDone ) return; + spin_destroy( &_alts_lock ); + pthread_join( altThread, NULL ); } int altservers_load() @@ -78,7 +88,14 @@ int altservers_add(dnbd3_host_t *host, const char *comment) freeSlot = i; } } - if ( freeSlot == -1 ) freeSlot = _num_alts++; + if ( freeSlot == -1 ) { + if ( _num_alts >= SERVER_MAX_ALTS ) { + memlogf( "[WARNING] Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS ); + spin_unlock( &_alts_lock ); + return FALSE; + } + freeSlot = _num_alts++; + } _alt_servers[freeSlot].host = *host; if ( comment != NULL ) snprintf( _alt_servers[freeSlot].comment, COMMENT_LENGTH, "%s", comment ); spin_unlock( &_alts_lock ); @@ -106,6 +123,18 @@ void altserver_find_uplink(dnbd3_connection_t *uplink) } /** + * The given uplink is about to disappear, so remove it from any queues + */ +void altservers_remove_uplink(dnbd3_connection_t *uplink) +{ + spin_lock( &pendingLock ); + for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( pending[i] == uplink ) pending[i] = NULL; + } + spin_unlock( &pendingLock ); +} + +/** * Get <size> known (working) alt servers, ordered by network closeness * (by finding the smallest possible subnet) */ diff --git a/src/server/globals.c b/src/server/globals.c index 182b2cc..1e23fb3 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -10,6 +10,8 @@ char *_configDir = NULL; char *_basePath = NULL; int _vmdkLegacyMode = FALSE; int _shutdown = 0; +int _serverPenalty = 0; +int _clientPenalty = 0; #define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0) #define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0; } while (0) @@ -19,6 +21,8 @@ static int ini_handler(void *custom, const char* section, const char* key, const { SAVE_TO_VAR_STR( dnbd3, basePath ); SAVE_TO_VAR_BOOL( dnbd3, vmdkLegacyMode ); + SAVE_TO_VAR_INT( dnbd3, serverPenalty ); + SAVE_TO_VAR_INT( dnbd3, clientPenalty ); return TRUE; } @@ -40,4 +44,6 @@ void globals_loadConfig() char *end = _basePath + strlen( _basePath ) - 1; while ( end >= _basePath && *end == '/' ) *end-- = '\0'; + if ( _serverPenalty < 0 ) _serverPenalty = 0; + if ( _clientPenalty < 0 ) _clientPenalty = 0; } diff --git a/src/server/globals.h b/src/server/globals.h index b063fc3..b77c547 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -133,6 +133,17 @@ extern char *_basePath; */ extern int _vmdkLegacyMode; +/** + * How much artificial delay should we add when a server connects to us? + */ +extern int _serverPenalty; + +/** + * How much artificial delay should we add when a client connects to us? + */ +extern int _clientPenalty; + + extern int _shutdown; void globals_loadConfig(); diff --git a/src/server/image.c b/src/server/image.c index d4ed5cf..679dd40 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -39,7 +39,7 @@ int image_is_complete(dnbd3_image_t *image) if ( image->working && image->cache_map == NULL ) { return TRUE; } - if ( image->filesize == 0 || !image->working ) { + if ( image->filesize == 0 ) { return FALSE; } int complete = TRUE, j; @@ -175,7 +175,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision) // Found, see if it works struct stat st; - if ( stat( candidate->path, &st ) < 0 ) { + if ( candidate->working && stat( candidate->path, &st ) < 0 ) { printf( "[DEBUG] File '%s' has gone away...\n", candidate->path ); candidate->working = FALSE; // No file? OUT! } diff --git a/src/server/locks.c b/src/server/locks.c index 4293c2f..ce29b9a 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -106,7 +106,7 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo break; } pthread_spin_unlock( &initdestory ); - int retval = pthread_spin_lock( lock ); + const int retval = pthread_spin_lock( lock ); pthread_spin_lock( &initdestory ); t->tid = 0; t->time = 0; @@ -125,6 +125,55 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo return retval; } +int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock) +{ + debug_lock_t *l = NULL; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == lock ) { + l = &locks[i]; + break; + } + } + pthread_spin_unlock( &initdestory ); + if ( l == NULL ) { + printf( "[ERROR] Tried to lock uninitialized lock %p (%s) at %s:%d\n", lock, name, file, line ); + debug_dump_lock_stats(); + exit( 4 ); + } + debug_thread_t *t = NULL; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXTHREADS; ++i) { + if ( threads[i].tid != 0 ) continue; + threads[i].tid = pthread_self(); + threads[i].time = time( NULL ); + snprintf( threads[i].name, LOCKLEN, "%s", name ); + snprintf( threads[i].where, LOCKLEN, "%s:%d", file, line ); + t = &threads[i]; + break; + } + pthread_spin_unlock( &initdestory ); + const int retval = pthread_spin_trylock( lock ); + pthread_spin_lock( &initdestory ); + t->tid = 0; + t->time = 0; + pthread_spin_unlock( &initdestory ); + if ( retval == 0 ) { + if ( l->locked ) { + printf( "[ERROR] Lock sanity check: lock %p (%s) already locked at %s:%d\n", lock, name, file, line ); + exit( 4 ); + } + l->locked = 1; + l->locktime = time( NULL ); + l->thread = pthread_self(); + snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); + pthread_spin_lock( &initdestory ); + l->lockId = ++lockId; + pthread_spin_unlock( &initdestory ); + } + return retval; +} + int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock) { debug_lock_t *l = NULL; @@ -140,7 +189,6 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin printf( "[ERROR] Tried to unlock uninitialized lock %p (%s) at %s:%d\n", lock, name, file, line ); exit( 4 ); } - int retval = pthread_spin_unlock( lock ); if ( !l->locked ) { printf( "[ERROR] Unlock sanity check: lock %p (%s) not locked at %s:%d\n", lock, name, file, line ); exit( 4 ); @@ -149,6 +197,7 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin l->locktime = 0; l->thread = 0; snprintf( l->where, LOCKLEN, "U %s:%d", file, line ); + int retval = pthread_spin_unlock( lock ); return retval; } @@ -192,7 +241,7 @@ void debug_dump_lock_stats() "* Locked: %d\n", locks[i].name, locks[i].where, (int)locks[i].locked ); } } - printf( "\n **** THREADS ****\n\n" ); + printf( "\n **** WAITING THREADS ****\n\n" ); for (int i = 0; i < MAXTHREADS; ++i) { if ( threads[i].tid == 0 ) continue; printf( "* *** Thread %d ***\n" @@ -206,7 +255,7 @@ void debug_dump_lock_stats() static void *debug_thread_watchdog(void *something) { while ( !_shutdown ) { - if (init_done) { + if ( init_done ) { time_t now = time( NULL ); pthread_spin_lock( &initdestory ); for (int i = 0; i < MAXTHREADS; ++i) { @@ -242,7 +291,7 @@ void debug_locks_stop_watchdog() { #ifdef _DEBUG _shutdown = TRUE; - printf("Killing debug watchdog...\n"); + printf( "Killing debug watchdog...\n" ); pthread_spin_lock( &initdestory ); pthread_spin_unlock( &initdestory ); pthread_join( watchdog, NULL ); diff --git a/src/server/locks.h b/src/server/locks.h index 5d4367b..43a3943 100644 --- a/src/server/locks.h +++ b/src/server/locks.h @@ -7,11 +7,13 @@ #define spin_init( lock, type ) debug_spin_init( #lock, __FILE__, __LINE__, lock, type) #define spin_lock( lock ) debug_spin_lock( #lock, __FILE__, __LINE__, lock) +#define spin_trylock( lock ) debug_spin_trylock( #lock, __FILE__, __LINE__, lock) #define spin_unlock( lock ) debug_spin_unlock( #lock, __FILE__, __LINE__, lock) #define spin_destroy( lock ) debug_spin_destroy( #lock, __FILE__, __LINE__, lock) int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared); int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock); +int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock); int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock); int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock); @@ -21,6 +23,7 @@ void debug_dump_lock_stats(); #define spin_init( lock, type ) pthread_spin_init(lock, type) #define spin_lock( lock ) pthread_spin_lock(lock) +#define spin_trylock( lock ) pthread_spin_trylock(lock) #define spin_unlock( lock ) pthread_spin_unlock(lock) #define spin_destroy( lock ) pthread_spin_destroy(lock) diff --git a/src/server/net.c b/src/server/net.c index a7e110b..b9fee3d 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -101,7 +101,7 @@ static inline char send_reply(int sock, dnbd3_reply_t *reply, void *payload) iov[0].iov_base = reply; iov[0].iov_len = sizeof(dnbd3_reply_t); iov[1].iov_base = payload; - iov[1].iov_len = size; + iov[1].iov_len = (size_t)size; if ( writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) { printf( "[DEBUG] Send failed (reply with payload of %u bytes)\n", size ); return FALSE; @@ -178,8 +178,14 @@ void *net_client_handler(void *dnbd3_client) } } - // client handling mainloop if ( bOk ) { + // add artificial delay if applicable + if ( client->is_server && _serverPenalty != 0 ) { + usleep( _serverPenalty ); + } else if ( !client->is_server && _clientPenalty != 0 ) { + usleep( _clientPenalty ); + } + // client handling mainloop while ( recv_request_header( client->sock, &request ) ) { switch ( request.cmd ) { @@ -238,6 +244,7 @@ void *net_client_handler(void *dnbd3_client) isCached = FALSE; break; } + ++pos; } } // Last byte diff --git a/src/server/server.c b/src/server/server.c index 12b437b..4bfa2d5 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -381,12 +381,6 @@ void dnbd3_remove_client(dnbd3_client_t *client) dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) { spin_lock( &client->lock ); - /* - for (it = client->sendqueue; it; it = it->next) { - free( it->data ); - } - g_slist_free( client->sendqueue ); - */ if ( client->sock >= 0 ) close( client->sock ); client->sock = -1; if ( client->image != NULL ) image_release( client->image ); diff --git a/src/server/uplink.c b/src/server/uplink.c index 3541728..5eda88f 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -13,6 +13,7 @@ #include <unistd.h> #include <stdlib.h> #include <stdio.h> +#include <inttypes.h> static void* uplink_mainloop(void *data); static void uplink_send_requests(dnbd3_connection_t *link, int newOnly); @@ -63,11 +64,14 @@ void uplink_shutdown(dnbd3_image_t *image) assert( image != NULL ); if ( image->uplink == NULL || image->uplink->shutdown ) return; dnbd3_connection_t * const uplink = image->uplink; + spin_lock( &uplink->queueLock ); image->uplink = NULL; uplink->shutdown = TRUE; if ( uplink->signal != -1 ) write( uplink->signal, "", 1 ); - pthread_join( uplink->thread, NULL ); - spin_lock( &uplink->queueLock ); + if ( uplink->image != NULL ) { + pthread_join( uplink->thread, NULL ); + } + free( uplink->recvBuffer ); spin_unlock( &uplink->queueLock ); spin_destroy( &uplink->queueLock ); free( uplink ); @@ -78,7 +82,12 @@ void uplink_shutdown(dnbd3_image_t *image) */ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length) { - if ( client == NULL || client->image == NULL || client->image->uplink == NULL ) return FALSE; + if ( client == NULL || client->image == NULL ) return FALSE; + spin_lock( &client->image->lock ); + if ( client->image->uplink == NULL ) { + spin_unlock( &client->image->lock ); + return FALSE; + } dnbd3_connection_t * const uplink = client->image->uplink; int foundExisting = FALSE; // Is there a pending request that is a superset of our range? int i; @@ -86,6 +95,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint const uint64_t end = start + length; spin_lock( &uplink->queueLock ); + spin_unlock( &client->image->lock ); for (i = 0; i < uplink->queueLen; ++i) { if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; @@ -127,6 +137,7 @@ static void* uplink_mainloop(void *data) int fdEpoll = -1, fdPipe = -1; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; + int bFree = FALSE; time_t nextAltCheck = 0; char buffer[100]; // @@ -226,6 +237,7 @@ static void* uplink_mainloop(void *data) uplink_send_requests( link, FALSE ); link->betterFd = -1; link->currentServer = link->betterServer; + link->image->working = TRUE; memset( &ev, 0, sizeof(ev) ); ev.events = EPOLLIN; ev.data.fd = link->fd; @@ -241,11 +253,35 @@ static void* uplink_mainloop(void *data) if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { const time_t now = time( NULL ); if ( nextAltCheck - now > SERVER_RTT_DELAY_MAX ) { + // This probably means the system time was changed - handle this case properly by capping the timeout nextAltCheck = now + SERVER_RTT_DELAY_MAX; } else if ( now >= nextAltCheck ) { + // It seems it's time for a check + if ( image_is_complete( link->image ) ) { + // Quit work if image is complete + if ( spin_trylock( &link->image->lock ) == 0 ) { + if ( link->image->cache_map != NULL ) { + free( link->image->cache_map ); + link->image->cache_map = NULL; + } + link->image->uplink = NULL; + link->shutdown = TRUE; + free( link->recvBuffer ); + link->recvBuffer = NULL; + bFree = TRUE; + spin_lock( &link->queueLock ); + spin_unlock( &link->queueLock ); + spin_destroy( &link->queueLock ); + spin_unlock( &link->image->lock ); + pthread_detach( link->thread ); + goto cleanup; + } + } else { + // Not complete- do measurement + altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) + } altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); nextAltCheck = now + altCheckInterval; - altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) } } } @@ -262,6 +298,7 @@ static void* uplink_mainloop(void *data) while ( link->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); if ( link->betterFd != -1 ) close( link->betterFd ); + if ( bFree ) free( link ); return NULL ; } @@ -275,7 +312,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) for (j = 0; j < link->queueLen; ++j) { if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue; link->queue[j].status = ULR_PENDING; - request.handle = link->queue[j].handle; + request.handle = link->queue[j].from; // HACK: Store offset in handle too, as it won't be included in the reply request.cmd = CMD_GET_BLOCK; request.offset = link->queue[j].from; request.size = link->queue[j].to - link->queue[j].from; |