summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--LOCKS2
-rw-r--r--src/server/altservers.c31
-rw-r--r--src/server/globals.c6
-rw-r--r--src/server/globals.h11
-rw-r--r--src/server/image.c4
-rw-r--r--src/server/locks.c59
-rw-r--r--src/server/locks.h3
-rw-r--r--src/server/net.c11
-rw-r--r--src/server/server.c6
-rw-r--r--src/server/uplink.c47
10 files changed, 158 insertions, 22 deletions
diff --git a/LOCKS b/LOCKS
index c542904..1f7f22b 100644
--- a/LOCKS
+++ b/LOCKS
@@ -14,7 +14,7 @@ _clients_lock
_clients[].lock
_images_lock
_images[].lock
-uplink.lock
+uplink.queueLock
_alts_lock
If you need to lock multiple clients at once,
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 49da7d4..641e36b 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -23,6 +23,7 @@ static int signalPipe = -1;
static dnbd3_alt_server_t _alt_servers[SERVER_MAX_ALTS];
static int _num_alts = 0;
static pthread_spinlock_t _alts_lock;
+static int initDone = FALSE;
static pthread_t altThread;
@@ -32,10 +33,19 @@ static unsigned int altservers_update_rtt(const dnbd3_host_t * const host, const
void altserver_init()
{
spin_init( &_alts_lock, PTHREAD_PROCESS_PRIVATE );
+ memset( _alt_servers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
if ( 0 != pthread_create( &altThread, NULL, &altserver_main, (void *)NULL ) ) {
memlogf( "[ERROR] Could not start altservers connector thread" );
exit( EXIT_FAILURE );
}
+ initDone = TRUE;
+}
+
+void altservers_shutdown()
+{
+ if ( !initDone ) return;
+ spin_destroy( &_alts_lock );
+ pthread_join( altThread, NULL );
}
int altservers_load()
@@ -78,7 +88,14 @@ int altservers_add(dnbd3_host_t *host, const char *comment)
freeSlot = i;
}
}
- if ( freeSlot == -1 ) freeSlot = _num_alts++;
+ if ( freeSlot == -1 ) {
+ if ( _num_alts >= SERVER_MAX_ALTS ) {
+ memlogf( "[WARNING] Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS );
+ spin_unlock( &_alts_lock );
+ return FALSE;
+ }
+ freeSlot = _num_alts++;
+ }
_alt_servers[freeSlot].host = *host;
if ( comment != NULL ) snprintf( _alt_servers[freeSlot].comment, COMMENT_LENGTH, "%s", comment );
spin_unlock( &_alts_lock );
@@ -106,6 +123,18 @@ void altserver_find_uplink(dnbd3_connection_t *uplink)
}
/**
+ * The given uplink is about to disappear, so remove it from any queues
+ */
+void altservers_remove_uplink(dnbd3_connection_t *uplink)
+{
+ spin_lock( &pendingLock );
+ for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
+ if ( pending[i] == uplink ) pending[i] = NULL;
+ }
+ spin_unlock( &pendingLock );
+}
+
+/**
* Get <size> known (working) alt servers, ordered by network closeness
* (by finding the smallest possible subnet)
*/
diff --git a/src/server/globals.c b/src/server/globals.c
index 182b2cc..1e23fb3 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -10,6 +10,8 @@ char *_configDir = NULL;
char *_basePath = NULL;
int _vmdkLegacyMode = FALSE;
int _shutdown = 0;
+int _serverPenalty = 0;
+int _clientPenalty = 0;
#define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0)
#define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0; } while (0)
@@ -19,6 +21,8 @@ static int ini_handler(void *custom, const char* section, const char* key, const
{
SAVE_TO_VAR_STR( dnbd3, basePath );
SAVE_TO_VAR_BOOL( dnbd3, vmdkLegacyMode );
+ SAVE_TO_VAR_INT( dnbd3, serverPenalty );
+ SAVE_TO_VAR_INT( dnbd3, clientPenalty );
return TRUE;
}
@@ -40,4 +44,6 @@ void globals_loadConfig()
char *end = _basePath + strlen( _basePath ) - 1;
while ( end >= _basePath && *end == '/' )
*end-- = '\0';
+ if ( _serverPenalty < 0 ) _serverPenalty = 0;
+ if ( _clientPenalty < 0 ) _clientPenalty = 0;
}
diff --git a/src/server/globals.h b/src/server/globals.h
index b063fc3..b77c547 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -133,6 +133,17 @@ extern char *_basePath;
*/
extern int _vmdkLegacyMode;
+/**
+ * How much artificial delay should we add when a server connects to us?
+ */
+extern int _serverPenalty;
+
+/**
+ * How much artificial delay should we add when a client connects to us?
+ */
+extern int _clientPenalty;
+
+
extern int _shutdown;
void globals_loadConfig();
diff --git a/src/server/image.c b/src/server/image.c
index d4ed5cf..679dd40 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -39,7 +39,7 @@ int image_is_complete(dnbd3_image_t *image)
if ( image->working && image->cache_map == NULL ) {
return TRUE;
}
- if ( image->filesize == 0 || !image->working ) {
+ if ( image->filesize == 0 ) {
return FALSE;
}
int complete = TRUE, j;
@@ -175,7 +175,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision)
// Found, see if it works
struct stat st;
- if ( stat( candidate->path, &st ) < 0 ) {
+ if ( candidate->working && stat( candidate->path, &st ) < 0 ) {
printf( "[DEBUG] File '%s' has gone away...\n", candidate->path );
candidate->working = FALSE; // No file? OUT!
}
diff --git a/src/server/locks.c b/src/server/locks.c
index 4293c2f..ce29b9a 100644
--- a/src/server/locks.c
+++ b/src/server/locks.c
@@ -106,7 +106,7 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo
break;
}
pthread_spin_unlock( &initdestory );
- int retval = pthread_spin_lock( lock );
+ const int retval = pthread_spin_lock( lock );
pthread_spin_lock( &initdestory );
t->tid = 0;
t->time = 0;
@@ -125,6 +125,55 @@ int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlo
return retval;
}
+int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock)
+{
+ debug_lock_t *l = NULL;
+ pthread_spin_lock( &initdestory );
+ for (int i = 0; i < MAXLOCKS; ++i) {
+ if ( locks[i].lock == lock ) {
+ l = &locks[i];
+ break;
+ }
+ }
+ pthread_spin_unlock( &initdestory );
+ if ( l == NULL ) {
+ printf( "[ERROR] Tried to lock uninitialized lock %p (%s) at %s:%d\n", lock, name, file, line );
+ debug_dump_lock_stats();
+ exit( 4 );
+ }
+ debug_thread_t *t = NULL;
+ pthread_spin_lock( &initdestory );
+ for (int i = 0; i < MAXTHREADS; ++i) {
+ if ( threads[i].tid != 0 ) continue;
+ threads[i].tid = pthread_self();
+ threads[i].time = time( NULL );
+ snprintf( threads[i].name, LOCKLEN, "%s", name );
+ snprintf( threads[i].where, LOCKLEN, "%s:%d", file, line );
+ t = &threads[i];
+ break;
+ }
+ pthread_spin_unlock( &initdestory );
+ const int retval = pthread_spin_trylock( lock );
+ pthread_spin_lock( &initdestory );
+ t->tid = 0;
+ t->time = 0;
+ pthread_spin_unlock( &initdestory );
+ if ( retval == 0 ) {
+ if ( l->locked ) {
+ printf( "[ERROR] Lock sanity check: lock %p (%s) already locked at %s:%d\n", lock, name, file, line );
+ exit( 4 );
+ }
+ l->locked = 1;
+ l->locktime = time( NULL );
+ l->thread = pthread_self();
+ snprintf( l->where, LOCKLEN, "L %s:%d", file, line );
+ pthread_spin_lock( &initdestory );
+ l->lockId = ++lockId;
+ pthread_spin_unlock( &initdestory );
+ }
+ return retval;
+}
+
int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock)
{
debug_lock_t *l = NULL;
@@ -140,7 +189,6 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin
printf( "[ERROR] Tried to unlock uninitialized lock %p (%s) at %s:%d\n", lock, name, file, line );
exit( 4 );
}
- int retval = pthread_spin_unlock( lock );
if ( !l->locked ) {
printf( "[ERROR] Unlock sanity check: lock %p (%s) not locked at %s:%d\n", lock, name, file, line );
exit( 4 );
@@ -149,6 +197,7 @@ int debug_spin_unlock(const char *name, const char *file, int line, pthread_spin
l->locktime = 0;
l->thread = 0;
snprintf( l->where, LOCKLEN, "U %s:%d", file, line );
+ int retval = pthread_spin_unlock( lock );
return retval;
}
@@ -192,7 +241,7 @@ void debug_dump_lock_stats()
"* Locked: %d\n", locks[i].name, locks[i].where, (int)locks[i].locked );
}
}
- printf( "\n **** THREADS ****\n\n" );
+ printf( "\n **** WAITING THREADS ****\n\n" );
for (int i = 0; i < MAXTHREADS; ++i) {
if ( threads[i].tid == 0 ) continue;
printf( "* *** Thread %d ***\n"
@@ -206,7 +255,7 @@ void debug_dump_lock_stats()
static void *debug_thread_watchdog(void *something)
{
while ( !_shutdown ) {
- if (init_done) {
+ if ( init_done ) {
time_t now = time( NULL );
pthread_spin_lock( &initdestory );
for (int i = 0; i < MAXTHREADS; ++i) {
@@ -242,7 +291,7 @@ void debug_locks_stop_watchdog()
{
#ifdef _DEBUG
_shutdown = TRUE;
- printf("Killing debug watchdog...\n");
+ printf( "Killing debug watchdog...\n" );
pthread_spin_lock( &initdestory );
pthread_spin_unlock( &initdestory );
pthread_join( watchdog, NULL );
diff --git a/src/server/locks.h b/src/server/locks.h
index 5d4367b..43a3943 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -7,11 +7,13 @@
#define spin_init( lock, type ) debug_spin_init( #lock, __FILE__, __LINE__, lock, type)
#define spin_lock( lock ) debug_spin_lock( #lock, __FILE__, __LINE__, lock)
+#define spin_trylock( lock ) debug_spin_trylock( #lock, __FILE__, __LINE__, lock)
#define spin_unlock( lock ) debug_spin_unlock( #lock, __FILE__, __LINE__, lock)
#define spin_destroy( lock ) debug_spin_destroy( #lock, __FILE__, __LINE__, lock)
int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared);
int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
+int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock);
int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock);
@@ -21,6 +23,7 @@ void debug_dump_lock_stats();
#define spin_init( lock, type ) pthread_spin_init(lock, type)
#define spin_lock( lock ) pthread_spin_lock(lock)
+#define spin_trylock( lock ) pthread_spin_trylock(lock)
#define spin_unlock( lock ) pthread_spin_unlock(lock)
#define spin_destroy( lock ) pthread_spin_destroy(lock)
diff --git a/src/server/net.c b/src/server/net.c
index a7e110b..b9fee3d 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -101,7 +101,7 @@ static inline char send_reply(int sock, dnbd3_reply_t *reply, void *payload)
iov[0].iov_base = reply;
iov[0].iov_len = sizeof(dnbd3_reply_t);
iov[1].iov_base = payload;
- iov[1].iov_len = size;
+ iov[1].iov_len = (size_t)size;
if ( writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) {
printf( "[DEBUG] Send failed (reply with payload of %u bytes)\n", size );
return FALSE;
@@ -178,8 +178,14 @@ void *net_client_handler(void *dnbd3_client)
}
}
- // client handling mainloop
if ( bOk ) {
+ // add artificial delay if applicable
+ if ( client->is_server && _serverPenalty != 0 ) {
+ usleep( _serverPenalty );
+ } else if ( !client->is_server && _clientPenalty != 0 ) {
+ usleep( _clientPenalty );
+ }
+ // client handling mainloop
while ( recv_request_header( client->sock, &request ) ) {
switch ( request.cmd ) {
@@ -238,6 +244,7 @@ void *net_client_handler(void *dnbd3_client)
isCached = FALSE;
break;
}
+ ++pos;
}
}
// Last byte
diff --git a/src/server/server.c b/src/server/server.c
index 12b437b..4bfa2d5 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -381,12 +381,6 @@ void dnbd3_remove_client(dnbd3_client_t *client)
dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client)
{
spin_lock( &client->lock );
- /*
- for (it = client->sendqueue; it; it = it->next) {
- free( it->data );
- }
- g_slist_free( client->sendqueue );
- */
if ( client->sock >= 0 ) close( client->sock );
client->sock = -1;
if ( client->image != NULL ) image_release( client->image );
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 3541728..5eda88f 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -13,6 +13,7 @@
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
+#include <inttypes.h>
static void* uplink_mainloop(void *data);
static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
@@ -63,11 +64,14 @@ void uplink_shutdown(dnbd3_image_t *image)
assert( image != NULL );
if ( image->uplink == NULL || image->uplink->shutdown ) return;
dnbd3_connection_t * const uplink = image->uplink;
+ spin_lock( &uplink->queueLock );
image->uplink = NULL;
uplink->shutdown = TRUE;
if ( uplink->signal != -1 ) write( uplink->signal, "", 1 );
- pthread_join( uplink->thread, NULL );
- spin_lock( &uplink->queueLock );
+ if ( uplink->image != NULL ) {
+ pthread_join( uplink->thread, NULL );
+ }
+ free( uplink->recvBuffer );
spin_unlock( &uplink->queueLock );
spin_destroy( &uplink->queueLock );
free( uplink );
@@ -78,7 +82,12 @@ void uplink_shutdown(dnbd3_image_t *image)
*/
int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
{
- if ( client == NULL || client->image == NULL || client->image->uplink == NULL ) return FALSE;
+ if ( client == NULL || client->image == NULL ) return FALSE;
+ spin_lock( &client->image->lock );
+ if ( client->image->uplink == NULL ) {
+ spin_unlock( &client->image->lock );
+ return FALSE;
+ }
dnbd3_connection_t * const uplink = client->image->uplink;
int foundExisting = FALSE; // Is there a pending request that is a superset of our range?
int i;
@@ -86,6 +95,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
const uint64_t end = start + length;
spin_lock( &uplink->queueLock );
+ spin_unlock( &client->image->lock );
for (i = 0; i < uplink->queueLen; ++i) {
if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i;
if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
@@ -127,6 +137,7 @@ static void* uplink_mainloop(void *data)
int fdEpoll = -1, fdPipe = -1;
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
+ int bFree = FALSE;
time_t nextAltCheck = 0;
char buffer[100];
//
@@ -226,6 +237,7 @@ static void* uplink_mainloop(void *data)
uplink_send_requests( link, FALSE );
link->betterFd = -1;
link->currentServer = link->betterServer;
+ link->image->working = TRUE;
memset( &ev, 0, sizeof(ev) );
ev.events = EPOLLIN;
ev.data.fd = link->fd;
@@ -241,11 +253,35 @@ static void* uplink_mainloop(void *data)
if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
const time_t now = time( NULL );
if ( nextAltCheck - now > SERVER_RTT_DELAY_MAX ) {
+ // This probably means the system time was changed - handle this case properly by capping the timeout
nextAltCheck = now + SERVER_RTT_DELAY_MAX;
} else if ( now >= nextAltCheck ) {
+ // It seems it's time for a check
+ if ( image_is_complete( link->image ) ) {
+ // Quit work if image is complete
+ if ( spin_trylock( &link->image->lock ) == 0 ) {
+ if ( link->image->cache_map != NULL ) {
+ free( link->image->cache_map );
+ link->image->cache_map = NULL;
+ }
+ link->image->uplink = NULL;
+ link->shutdown = TRUE;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ bFree = TRUE;
+ spin_lock( &link->queueLock );
+ spin_unlock( &link->queueLock );
+ spin_destroy( &link->queueLock );
+ spin_unlock( &link->image->lock );
+ pthread_detach( link->thread );
+ goto cleanup;
+ }
+ } else {
+ // Not complete- do measurement
+ altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ }
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
nextAltCheck = now + altCheckInterval;
- altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
}
}
}
@@ -262,6 +298,7 @@ static void* uplink_mainloop(void *data)
while ( link->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
if ( link->betterFd != -1 ) close( link->betterFd );
+ if ( bFree ) free( link );
return NULL ;
}
@@ -275,7 +312,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
for (j = 0; j < link->queueLen; ++j) {
if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
link->queue[j].status = ULR_PENDING;
- request.handle = link->queue[j].handle;
+ request.handle = link->queue[j].from; // HACK: Store offset in handle too, as it won't be included in the reply
request.cmd = CMD_GET_BLOCK;
request.offset = link->queue[j].from;
request.size = link->queue[j].to - link->queue[j].from;