summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2013-08-02 18:13:24 +0200
committerSimon Rettberg2013-08-02 18:13:24 +0200
commit762f7a4d7b3155254416b460c28a23c418ae59ed (patch)
treefb904393252a9f5688d2327c8c04965edf6c505b
parentfix0rs (diff)
downloaddnbd3-762f7a4d7b3155254416b460c28a23c418ae59ed.tar.gz
dnbd3-762f7a4d7b3155254416b460c28a23c418ae59ed.tar.xz
dnbd3-762f7a4d7b3155254416b460c28a23c418ae59ed.zip
[SERVER] several improvements
1) Close uplink if local copy is complete 2) Fix memleak when closing uplink (recv buffer was not cleared) 3) Add configurable artificial delays for client and server connections
-rw-r--r--LOCKS2
-rw-r--r--src/server/altservers.c31
-rw-r--r--src/server/globals.c6
-rw-r--r--src/server/globals.h11
-rw-r--r--src/server/image.c4
-rw-r--r--src/server/locks.c59
-rw-r--r--src/server/locks.h3
-rw-r--r--src/server/net.c11
-rw-r--r--src/server/server.c6
-rw-r--r--src/server/uplink.c47
10 files changed, 158 insertions, 22 deletions
diff --git a/LOCKS b/LOCKS
index c542904..1f7f22b 100644
--- a/LOCKS
+++ b/LOCKS
@@ -14,7 +14,7 @@ _clients_lock
_clients[].lock
_images_lock
_images[].lock
-uplink.lock
+uplink.queueLock
_alts_lock
If you need to lock multiple clients at once,
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 49da7d4..641e36b 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -23,6 +23,7 @@ static int signalPipe = -1;
static dnbd3_alt_server_t _alt_servers[SERVER_MAX_ALTS];
static int _num_alts = 0;
static pthread_spinlock_t _alts_lock;
+static int initDone = FALSE;
static pthread_t altThread;
@@ -32,10 +33,19 @@ static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const
void altserver_init()
{
spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE );
+ memset( _alt_servers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
if ( 0 != pthread_create( &altThread, NULL, &altserver_main, (void *)NULL ) ) {
memlogf( "[ERROR] Could not start altservers connector thread" );
exit( EXIT_FAILURE );
}
+ initDone = TRUE;
+}
+
+void altservers_shutdown()
+{
+ if ( !initDone ) return;
+ spin_destroy( &_alts_lock );
+ pthread_join( altThread, NULL );
}
int altservers_load()
@@ -78,7 +88,14 @@ int altservers_add(dnbd3_host_t *host, const char *comment)
freeSlot = i;
}
}
- if ( freeSlot == -1 ) freeSlot = _num_alts++;
+ if ( freeSlot == -1 ) {
+ if ( _num_alts >= SERVER_MAX_ALTS ) {
+ memlogf( "[WARNING] Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS );
+ spin_unlock( &_alts_lock );
+ return FALSE;
+ }
+ freeSlot = _num_alts++;
+ }
_alt_servers[freeSlot].host = *host;
if ( comment != NULL ) snprintf( _alt_servers[freeSlot].comment, COMMENT_LENGTH, "%s", comment );
spin_unlock( &_alts_lock );
@@ -106,6 +123,18 @@ void altserver_find_uplink(dnbd3_connection_t *uplink)
}
/**
+ * The given uplink is about to disappear, so remove it from any queues
+ */
+void altservers_remove_uplink(dnbd3_connection_t *uplink)
+{
+ spin_lock( &pendingLock );
+ for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
+ if ( pending[i] == uplink ) pending[i] = NULL;
+ }
+ spin_unlock( &pendingLock );
+}
+
+/**
* Get <size> known (working) alt servers, ordered by network closeness
* (by finding the smallest possible subnet)
*/
diff --git a/src/server/globals.c b/src/server/globals.c
index 182b2cc..1e23fb3 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -10,6 +10,8 @@ char *_configDir = NULL;
char *_basePath = NULL;
int _vmdkLegacyMode = FALSE;
int _shutdown = 0;
+int _serverPenalty = 0;
+int _clientPenalty = 0;
#define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0)
#define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0; } while (0)
@@ -19,6 +21,8 @@ static int ini_handler(void *custom, const char* section, const char* key, const
{
SAVE_TO_VAR_STR( dnbd3, basePath );
SAVE_TO_VAR_BOOL( dnbd3, vmdkLegacyMode );
+ SAVE_TO_VAR_INT( dnbd3, serverPenalty );
+ SAVE_TO_VAR_INT( dnbd3, clientPenalty );
return TRUE;
}
@@ -40,4 +44,6 @@ void globals_loadConfig()
char *end = _basePath + strlen( _basePath ) - 1;
while ( end >= _basePath && *end == '/' )
*end-- = '\0';
+ if ( _serverPenalty < 0 ) _serverPenalty = 0;
+ if ( _clientPenalty < 0 ) _clientPenalty = 0;
}
diff --git a/src/server/globals.h b/src/server/globals.h
index b063fc3..b77c547 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -133,6 +133,17 @@ extern char *_basePath;
*/
extern int _vmdkLegacyMode;
+/**
+ * How much artificial delay should we add when a server connects to us?
+ */
+extern int _serverPenalty;
+
+/**
+ * How much artificial delay should we add when a client connects to us?
+ */
+extern int _clientPenalty;
+
+
extern int _shutdown;
void globals_loadConfig();
diff --git a/src/server/image.c b/src/server/image.c
index d4ed5cf..679dd40 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -39,7 +39,7 @@ int image_is_complete(dnbd3_image_t *image)
if ( image->working && image->cache_map == NULL ) {
return TRUE;
}
- if ( image->filesize == 0 || !image->working ) {
+ if ( image->filesize == 0 ) {
return FALSE;
}
int complete = TRUE, j;
@@ -175,7 +175,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision)
// Found, see if it works
struct stat st;
- if ( stat( candidate->path, &st ) < 0 ) {
+ if ( candidate->working && stat( candidate->path, &st ) < 0 ) {
printf( "[DEBUG] File '%s' has gone away...\n", candidate->path );
candidate->working = FALSE; // No file? OUT!
}
diff --git a/src/server/locks.c b/src/server/locks.c
index 4293c2f..ce29b9a 100644
--- a/src/server/locks.c
+++ b/src/server/locks.c
@@ -106,7 +106,7 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo
break;
}
pthread_spin_unlock( &initdestory );
- int retval = pthread_spin_lock( lock );
+ const int retval = pthread_spin_lock( lock );
pthread_spin_lock( &initdestory );
t->tid = 0;
t->time = 0;
@@ -125,6 +125,55 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo
return retval;
}
+int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock)
+{
+ debug_lock_t *l = NULL;
+ pthread_spin_lock( &initdestory );
+ for (int i = 0; i < MAXLOCKS; ++i) {
+ if ( locks[i].lock == lock ) {
+ l = &locks[i];
+ break;
+ }
+ }
+ pthread_spin_unlock( &initdestory );
+ if ( l == NULL ) {
+ printf( "[ERROR] Tried to lock uninitialized lock %p (%s) at %s:%d\n", lock, name, file, line );
+ debug_dump_lock_stats();
+ exit( 4 );
+ }
+ debug_thread_t *t = NULL;
+ pthread_spin_lock( &initdestory );
+ for (int i = 0; i < MAXTHREADS; ++i) {
+ if ( threads[i].tid != 0 ) continue;
+ threads[i].tid = pthread_self();
+ threads[i].time = time( NULL );
+ snprintf( threads[i].name, LOCKLEN, "%s", name );
+ snprintf( threads[i].where, LOCKLEN, "%s:%d", file, line );
+ t = &threads[i];
+ break;
+ }
+ pthread_spin_unlock( &initdestory );
+ const int retval = pthread_spin_trylock( lock );
+ pthread_spin_lock( &initdestory );
+ t->tid = 0;
+ t->time = 0;
+ pthread_spin_unlock( &initdestory );
+ if ( retval == 0 ) {
+ if ( l->locked ) {
+ printf( "[ERROR] Lock sanity check: lock %p (%s) already locked at %s:%d\n", lock, name, file, line );
+ exit( 4 );
+ }
+ l->locked = 1;
+ l->locktime = time( NULL );
+ l->thread = pthread_self();
+ snprintf( l->where, LOCKLEN, "L %s:%d", file, line );
+ pthread_spin_lock( &initdestory );
+ l->lockId = ++lockId;
+ pthread_spin_unlock( &initdestory );
+ }
+ return retval;
+}
+
int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock)
{
debug_lock_t *l = NULL;
@@ -140,7 +189,6 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin
printf( "[ERROR] Tried to unlock uninitialized lock %p (%s) at %s:%d\n", lock, name, file, line );
exit( 4 );
}
- int retval = pthread_spin_unlock( lock );
if ( !l->locked ) {
printf( "[ERROR] Unlock sanity check: lock %p (%s) not locked at %s:%d\n", lock, name, file, line );
exit( 4 );
@@ -149,6 +197,7 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin
l->locktime = 0;
l->thread = 0;
snprintf( l->where, LOCKLEN, "U %s:%d", file, line );
+ int retval = pthread_spin_unlock( lock );
return retval;
}
@@ -192,7 +241,7 @@ void debug_dump_lock_stats()
"* Locked: %d\n", locks[i].name, locks[i].where, (int)locks[i].locked );
}
}
- printf( "\n **** THREADS ****\n\n" );
+ printf( "\n **** WAITING THREADS ****\n\n" );
for (int i = 0; i < MAXTHREADS; ++i) {
if ( threads[i].tid == 0 ) continue;
printf( "* *** Thread %d ***\n"
@@ -206,7 +255,7 @@ void debug_dump_lock_stats()
static void *debug_thread_watchdog(void *something)
{
while ( !_shutdown ) {
- if (init_done) {
+ if ( init_done ) {
time_t now = time( NULL );
pthread_spin_lock( &initdestory );
for (int i = 0; i < MAXTHREADS; ++i) {
@@ -242,7 +291,7 @@ void debug_locks_stop_watchdog()
{
#ifdef _DEBUG
_shutdown = TRUE;
- printf("Killing debug watchdog...\n");
+ printf( "Killing debug watchdog...\n" );
pthread_spin_lock( &initdestory );
pthread_spin_unlock( &initdestory );
pthread_join( watchdog, NULL );
diff --git a/src/server/locks.h b/src/server/locks.h
index 5d4367b..43a3943 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -7,11 +7,13 @@
#define spin_init( lock, type ) debug_spin_init( #lock, __FILE__, __LINE__, lock, type)
#define spin_lock( lock ) debug_spin_lock( #lock, __FILE__, __LINE__, lock)
+#define spin_trylock( lock ) debug_spin_trylock( #lock, __FILE__, __LINE__, lock)
#define spin_unlock( lock ) debug_spin_unlock( #lock, __FILE__, __LINE__, lock)
#define spin_destroy( lock ) debug_spin_destroy( #lock, __FILE__, __LINE__, lock)
int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared);
int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
+int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock);
@@ -21,6 +23,7 @@ void debug_dump_lock_stats();
#define spin_init( lock, type ) pthread_spin_init(lock, type)
#define spin_lock( lock ) pthread_spin_lock(lock)
+#define spin_trylock( lock ) pthread_spin_trylock(lock)
#define spin_unlock( lock ) pthread_spin_unlock(lock)
#define spin_destroy( lock ) pthread_spin_destroy(lock)
diff --git a/src/server/net.c b/src/server/net.c
index a7e110b..b9fee3d 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -101,7 +101,7 @@ static inline char send_reply(int sock, dnbd3_reply_t *reply, void *payload)
iov[0].iov_base = reply;
iov[0].iov_len = sizeof(dnbd3_reply_t);
iov[1].iov_base = payload;
- iov[1].iov_len = size;
+ iov[1].iov_len = (size_t)size;
if ( writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) {
printf( "[DEBUG] Send failed (reply with payload of %u bytes)\n", size );
return FALSE;
@@ -178,8 +178,14 @@ void *net_client_handler(void *dnbd3_client)
}
}
- // client handling mainloop
if ( bOk ) {
+ // add artificial delay if applicable
+ if ( client->is_server && _serverPenalty != 0 ) {
+ usleep( _serverPenalty );
+ } else if ( !client->is_server && _clientPenalty != 0 ) {
+ usleep( _clientPenalty );
+ }
+ // client handling mainloop
while ( recv_request_header( client->sock, &request ) ) {
switch ( request.cmd ) {
@@ -238,6 +244,7 @@ void *net_client_handler(void *dnbd3_client)
isCached = FALSE;
break;
}
+ ++pos;
}
}
// Last byte
diff --git a/src/server/server.c b/src/server/server.c
index 12b437b..4bfa2d5 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -381,12 +381,6 @@ void dnbd3_remove_client(dnbd3_client_t *client)
dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client)
{
spin_lock( &client->lock );
- /*
- for (it = client->sendqueue; it; it = it->next) {
- free( it->data );
- }
- g_slist_free( client->sendqueue );
- */
if ( client->sock >= 0 ) close( client->sock );
client->sock = -1;
if ( client->image != NULL ) image_release( client->image );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 3541728..5eda88f 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -13,6 +13,7 @@
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
+#include <inttypes.h>
static void* uplink_mainloop(void *data);
static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
@@ -63,11 +64,14 @@ void uplink_shutdown(dnbd3_image_t *image)
assert( image != NULL );
if ( image->uplink == NULL || image->uplink->shutdown ) return;
dnbd3_connection_t * const uplink = image->uplink;
+ spin_lock( &uplink->queueLock );
image->uplink = NULL;
uplink->shutdown = TRUE;
if ( uplink->signal != -1 ) write( uplink->signal, "", 1 );
- pthread_join( uplink->thread, NULL );
- spin_lock( &uplink->queueLock );
+ if ( uplink->image != NULL ) {
+ pthread_join( uplink->thread, NULL );
+ }
+ free( uplink->recvBuffer );
spin_unlock( &uplink->queueLock );
spin_destroy( &uplink->queueLock );
free( uplink );
@@ -78,7 +82,12 @@ void uplink_shutdown(dnbd3_image_t *image)
*/
int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
{
- if ( client == NULL || client->image == NULL || client->image->uplink == NULL ) return FALSE;
+ if ( client == NULL || client->image == NULL ) return FALSE;
+ spin_lock( &client->image->lock );
+ if ( client->image->uplink == NULL ) {
+ spin_unlock( &client->image->lock );
+ return FALSE;
+ }
dnbd3_connection_t * const uplink = client->image->uplink;
int foundExisting = FALSE; // Is there a pending request that is a superset of our range?
int i;
@@ -86,6 +95,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
const uint64_t end = start + length;
spin_lock( &uplink->queueLock );
+ spin_unlock( &client->image->lock );
for (i = 0; i < uplink->queueLen; ++i) {
if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i;
if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
@@ -127,6 +137,7 @@ static void* uplink_mainloop(void *data)
int fdEpoll = -1, fdPipe = -1;
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
+ int bFree = FALSE;
time_t nextAltCheck = 0;
char buffer[100];
//
@@ -226,6 +237,7 @@ static void* uplink_mainloop(void *data)
uplink_send_requests( link, FALSE );
link->betterFd = -1;
link->currentServer = link->betterServer;
+ link->image->working = TRUE;
memset( &ev, 0, sizeof(ev) );
ev.events = EPOLLIN;
ev.data.fd = link->fd;
@@ -241,11 +253,35 @@ static void* uplink_mainloop(void *data)
if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
const time_t now = time( NULL );
if ( nextAltCheck - now > SERVER_RTT_DELAY_MAX ) {
+ // This probably means the system time was changed - handle this case properly by capping the timeout
nextAltCheck = now + SERVER_RTT_DELAY_MAX;
} else if ( now >= nextAltCheck ) {
+ // It seems it's time for a check
+ if ( image_is_complete( link->image ) ) {
+ // Quit work if image is complete
+ if ( spin_trylock( &link->image->lock ) == 0 ) {
+ if ( link->image->cache_map != NULL ) {
+ free( link->image->cache_map );
+ link->image->cache_map = NULL;
+ }
+ link->image->uplink = NULL;
+ link->shutdown = TRUE;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ bFree = TRUE;
+ spin_lock( &link->queueLock );
+ spin_unlock( &link->queueLock );
+ spin_destroy( &link->queueLock );
+ spin_unlock( &link->image->lock );
+ pthread_detach( link->thread );
+ goto cleanup;
+ }
+ } else {
+ // Not complete- do measurement
+ altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ }
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
nextAltCheck = now + altCheckInterval;
- altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
}
}
}
@@ -262,6 +298,7 @@ static void* uplink_mainloop(void *data)
while ( link->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
if ( link->betterFd != -1 ) close( link->betterFd );
+ if ( bFree ) free( link );
return NULL ;
}
@@ -275,7 +312,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
for (j = 0; j < link->queueLen; ++j) {
if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
link->queue[j].status = ULR_PENDING;
- request.handle = link->queue[j].handle;
+ request.handle = link->queue[j].from; // HACK: Store offset in handle too, as it won't be included in the reply
request.cmd = CMD_GET_BLOCK;
request.offset = link->queue[j].from;
request.size = link->queue[j].to - link->queue[j].from;