summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2014-06-16 19:24:17 +0200
committerSimon Rettberg2014-06-16 19:24:17 +0200
commit49f9218d330f5842fe24bce79267bd2c5b239df3 (patch)
tree74e7b49e9c058145069ac4e2aa6de5d9f2cac3ce
parent[CLIENT] Debug argument handling in daemon mode (diff)
downloaddnbd3-49f9218d330f5842fe24bce79267bd2c5b239df3.tar.gz
dnbd3-49f9218d330f5842fe24bce79267bd2c5b239df3.tar.xz
dnbd3-49f9218d330f5842fe24bce79267bd2c5b239df3.zip
Improve uplink handling, add code to debug thread creation/destruction, change stupid convention of freeDiskSpace returning 0 on error, which is ambiguous to the disk simply being full...
-rw-r--r--src/server/altservers.c47
-rw-r--r--src/server/altservers.h2
-rw-r--r--src/server/fileutil.c6
-rw-r--r--src/server/fileutil.h2
-rw-r--r--src/server/globals.h10
-rw-r--r--src/server/image.c21
-rw-r--r--src/server/image.h2
-rw-r--r--src/server/integrity.c6
-rw-r--r--src/server/locks.c6
-rw-r--r--src/server/locks.h50
-rw-r--r--src/server/server.c11
-rw-r--r--src/server/uplink.c35
12 files changed, 135 insertions, 63 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index a26ac2f..0619bc7 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -41,7 +41,7 @@ void altservers_init()
{
spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE );
memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) );
- if ( 0 != pthread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
+ if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) {
memlogf( "[ERROR] Could not start altservers connector thread" );
exit( EXIT_FAILURE );
}
@@ -52,7 +52,7 @@ void altservers_shutdown()
{
if ( !initDone ) return;
write( signalPipe, "", 1 ); // Wake altservers thread up
- pthread_join( altThread, NULL );
+ thread_join( altThread, NULL );
}
int altservers_load()
@@ -162,7 +162,10 @@ void altservers_removeUplink(dnbd3_connection_t *uplink)
{
pthread_mutex_lock( &pendingLockConsume );
for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) {
- if ( pending[i] == uplink ) pending[i] = NULL;
+ if ( pending[i] == uplink ) {
+ uplink->rttTestResult = RTT_NOT_REACHABLE;
+ pending[i] = NULL;
+ }
}
pthread_mutex_unlock( &pendingLockConsume );
}
@@ -404,13 +407,21 @@ static void *altservers_main(void *data)
}
// Work your way through the queue
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
- if ( pending[itLink] == NULL ) continue; // Check once before locking, as a mutex is expensive
+ dnbd3_connection_t * const uplink = pending[itLink];
+ if ( uplink == NULL ) continue; // Check once before locking, as a mutex is expensive
pthread_mutex_lock( &pendingLockConsume );
- if ( pending[itLink] == NULL ) { // Check again after locking
+ if ( uplink == NULL ) { // Check again after locking
+ pthread_mutex_unlock( &pendingLockConsume );
continue;
+ }
+ dnbd3_image_t * const image = image_lock( uplink->image );
+ if ( image == NULL ) { // Check again after locking
+ uplink->rttTestResult = RTT_NOT_REACHABLE;
+ pending[itLink] = NULL;
pthread_mutex_unlock( &pendingLockConsume );
+ printf( "[DEBUG] Image has gone away that was queued for RTT measurement\n" );
+ continue;
}
- dnbd3_connection_t * const uplink = pending[itLink];
assert( uplink->rttTestResult == RTT_INPROGRESS );
// Now get 4 alt servers
numAlts = altservers_get( servers, ALTS );
@@ -436,7 +447,7 @@ static void *altservers_main(void *data)
int sock = sock_connect( &servers[itAlt], 750, _uplinkTimeout );
if ( sock < 0 ) continue;
// Select image ++++++++++++++++++++++++++++++
- if ( !dnbd3_select_image( sock, uplink->image->lower_name, uplink->image->rid, FLAGS8_SERVER ) ) {
+ if ( !dnbd3_select_image( sock, image->lower_name, image->rid, FLAGS8_SERVER ) ) {
goto server_failed;
}
// See if selecting the image succeeded ++++++++++++++++++++++++++++++
@@ -447,36 +458,36 @@ static void *altservers_main(void *data)
goto server_image_not_available;
}
if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed;
- if ( name == NULL || strcmp( name, uplink->image->lower_name ) != 0 ) {
- ERROR_GOTO_VA( server_failed, "[ERROR] Server offers image '%s', requested '%s'", name, uplink->image->lower_name );
+ if ( name == NULL || strcmp( name, image->lower_name ) != 0 ) {
+ ERROR_GOTO_VA( server_failed, "[ERROR] Server offers image '%s', requested '%s'", name, image->lower_name );
}
- if ( rid != uplink->image->rid ) {
+ if ( rid != image->rid ) {
ERROR_GOTO_VA( server_failed, "[ERROR] Server provides rid %d, requested was %d (%s)",
- (int)rid, (int)uplink->image->rid, uplink->image->lower_name );
+ (int)rid, (int)image->rid, image->lower_name );
}
- if ( imageSize != uplink->image->filesize ) {
+ if ( imageSize != image->filesize ) {
ERROR_GOTO_VA( server_failed, "[ERROR] Remote size: %" PRIu64 ", expected: %" PRIu64 " (%s)",
- imageSize, uplink->image->filesize, uplink->image->lower_name );
+ imageSize, image->filesize, image->lower_name );
}
// Request first block (NOT random!) ++++++++++++++++++++++++++++++
fixup_request( request );
if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE ) ) {
- ERROR_GOTO_VA( server_failed, "[ERROR] Could not request random block for %s", uplink->image->lower_name );
+ ERROR_GOTO_VA( server_failed, "[ERROR] Could not request random block for %s", image->lower_name );
}
// See if requesting the block succeeded ++++++++++++++++++++++
if ( !dnbd3_get_reply( sock, &reply ) ) {
char buf[100] = { 0 };
host_to_string( &servers[itAlt], buf, 100 );
ERROR_GOTO_VA( server_failed, "[ERROR] Received corrupted reply header (%s) after CMD_GET_BLOCK (%s)",
- buf, uplink->image->lower_name );
+ buf, image->lower_name );
}
// check reply header
if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) {
ERROR_GOTO_VA( server_failed, "[ERROR] Reply to random block request is %d bytes for %s",
- reply.size, uplink->image->lower_name );
+ reply.size, image->lower_name );
}
if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) {
- ERROR_GOTO_VA( server_failed, "[ERROR] Could not read random block payload for %s", uplink->image->lower_name );
+ ERROR_GOTO_VA( server_failed, "[ERROR] Could not read random block payload for %s", image->lower_name );
}
clock_gettime( CLOCK_MONOTONIC_RAW, &end );
// Measurement done - everything fine so far
@@ -505,6 +516,7 @@ static void *altservers_main(void *data)
server_image_not_available: ;
close( sock );
}
+ image_release( image );
// Done testing all servers. See if we should switch
if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) {
// yep
@@ -539,3 +551,4 @@ static void *altservers_main(void *data)
signalPipe = -1;
return NULL ;
}
+
diff --git a/src/server/altservers.h b/src/server/altservers.h
index cbe99e2..13b0685 100644
--- a/src/server/altservers.h
+++ b/src/server/altservers.h
@@ -13,6 +13,8 @@ int altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate,
void altservers_findUplink(dnbd3_connection_t *uplink);
+void altservers_removeUplink(dnbd3_connection_t *uplink);
+
int altservers_getMatching(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size);
int altservers_get(dnbd3_host_t *output, int size);
diff --git a/src/server/fileutil.c b/src/server/fileutil.c
index 0cb1894..5adc90a 100644
--- a/src/server/fileutil.c
+++ b/src/server/fileutil.c
@@ -58,13 +58,13 @@ int file_alloc(int fd, uint64_t offset, uint64_t size)
return TRUE;
}
-uint64_t file_freeDiskSpace(const char * const path)
+int64_t file_freeDiskSpace(const char * const path)
{
struct statvfs fiData;
if ( (statvfs( path, &fiData )) < 0 ) {
- return 0;
+ return -1;
}
- return ((uint64_t)fiData.f_bavail * (uint64_t)fiData.f_bsize);
+ return ((int64_t)fiData.f_bavail * (int64_t)fiData.f_bsize);
}
time_t file_lastModification(const char * const file)
diff --git a/src/server/fileutil.h b/src/server/fileutil.h
index 394338b..db60699 100644
--- a/src/server/fileutil.h
+++ b/src/server/fileutil.h
@@ -8,7 +8,7 @@ int file_isReadable(char *file);
int file_isWritable(char *file);
int mkdir_p(const char* path);
int file_alloc(int fd, uint64_t offset, uint64_t size);
-uint64_t file_freeDiskSpace(const char * const path);
+int64_t file_freeDiskSpace(const char * const path);
time_t file_lastModification(const char * const file);
#endif /* FILEUTIL_H_ */
diff --git a/src/server/globals.h b/src/server/globals.h
index e44b26d..1b17660 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -42,19 +42,19 @@ typedef struct
struct _dnbd3_connection
{
int fd; // socket fd to remote server
- int signal; // write end of pipe used to wake up the process
+ int signal; // eventfd used to wake up the process
pthread_t thread; // thread holding the connection
pthread_spinlock_t queueLock; // lock for synchronization on request queue etc.
dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE];
volatile int queueLen; // length of queue
- dnbd3_image_t *image; // image that this uplink is used for do not call get/release for this pointer
+ dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer
dnbd3_host_t currentServer; // Current server we're connected to
volatile int rttTestResult; // RTT_*
dnbd3_host_t betterServer; // The better server
int betterFd; // Active connection to better server, ready to use
uint8_t *recvBuffer; // Buffer for receiving payload
int recvBufferLen; // Len of ^^
- volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown()
+ volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop()
int replicatedLastBlock; // bool telling if the last block has been replicated yet
time_t lastReplication; // timestamp of when last replication requests were sent
};
@@ -98,10 +98,10 @@ struct _dnbd3_image
{
char *path; // absolute path of the image
char *lower_name; // relative path, all lowercase, minus revision ID
- uint8_t *cache_map; // cache map telling which parts are locally cached, NULL if complete
+ uint8_t * volatile cache_map; // cache map telling which parts are locally cached, NULL if complete
uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image
uint32_t masterCrc32; // CRC-32 of the crc-32 list
- dnbd3_connection_t *uplink; // pointer to a server connection
+ dnbd3_connection_t * volatile uplink; // pointer to a server connection
uint64_t filesize; // size of image
int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD!
int rid; // revision of image
diff --git a/src/server/image.c b/src/server/image.c
index ad04b2a..78b907c 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -21,6 +21,7 @@
#include <zlib.h>
#include <inttypes.h>
#include <pthread.h>
+#include <errno.h>
// ##########################################
@@ -310,15 +311,15 @@ dnbd3_image_t* image_lock(dnbd3_image_t *image)
* anymore, the image will be freed
* Locks on: _images_lock, _images[].lock
*/
-void image_release(dnbd3_image_t *image)
+dnbd3_image_t* image_release(dnbd3_image_t *image)
{
- assert( image != NULL );
+ if ( image == NULL ) return NULL;
spin_lock( &image->lock );
assert( image->users > 0 );
image->users--;
if ( image->users > 0 ) { // Still in use, do nothing
spin_unlock( &image->lock );
- return;
+ return NULL;
}
spin_unlock( &image->lock );
spin_lock( &_images_lock );
@@ -330,17 +331,18 @@ void image_release(dnbd3_image_t *image)
if ( _images[i] == image ) { // Found, do nothing
spin_unlock( &image->lock );
spin_unlock( &_images_lock );
- return;
+ return NULL;
}
}
spin_unlock( &image->lock );
spin_unlock( &_images_lock );
// Not found, free
image_free( image );
- return;
+ return NULL;
}
spin_unlock( &image->lock );
spin_unlock( &_images_lock );
+ return NULL;
}
/**
@@ -576,7 +578,7 @@ static int image_load(char *base, char *path, int withUplink)
// Check CRC32
if ( crc32list != NULL ) {
- if ( !image_checkRandomBlocks( 3, fdImage, fileSize, crc32list, cache_map ) ) {
+ if ( !image_checkRandomBlocks( 4, fdImage, fileSize, crc32list, cache_map ) ) {
memlogf( "[ERROR] quick crc32 check of %s failed. Data corruption?", path );
goto load_error;
}
@@ -1229,9 +1231,10 @@ static int64_t image_pad(const char *path, const int64_t currentSize)
static int image_ensureDiskSpace(uint64_t size)
{
for (;;) {
- const uint64_t available = file_freeDiskSpace( _basePath );
- if ( available == 0 ) {
- memlogf( "[WARNING] Could not get free disk space, will assume there is enough space left... ;-)\n" );
+ const int64_t available = file_freeDiskSpace( _basePath );
+ if ( available == -1 ) {
+ const int e = errno;
+ memlogf( "[WARNING] Could not get free disk space (errno %d), will assume there is enough space left... ;-)\n", e );
return TRUE;
}
if ( available > size ) return TRUE;
diff --git a/src/server/image.h b/src/server/image.h
index 5ed2f97..a624bfb 100644
--- a/src/server/image.h
+++ b/src/server/image.h
@@ -24,7 +24,7 @@ dnbd3_image_t* image_getOrClone(char *name, uint16_t revision);
dnbd3_image_t* image_lock(dnbd3_image_t *image);
-void image_release(dnbd3_image_t *image);
+dnbd3_image_t* image_release(dnbd3_image_t *image);
int image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, const uint64_t fileSize);
diff --git a/src/server/integrity.c b/src/server/integrity.c
index 4e637f6..16bc9eb 100644
--- a/src/server/integrity.c
+++ b/src/server/integrity.c
@@ -42,7 +42,7 @@ void integrity_init()
pthread_mutex_init( &integrityQueueLock, NULL );
pthread_cond_init( &queueSignal, NULL );
bRunning = TRUE;
- if ( 0 != pthread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) {
+ if ( 0 != thread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) {
bRunning = FALSE;
memlogf( "[WARNING] Could not start integrity check thread. Corrupted images will not be detected." );
return;
@@ -57,7 +57,7 @@ void integrity_shutdown()
pthread_mutex_lock( &integrityQueueLock );
pthread_cond_signal( &queueSignal );
pthread_mutex_unlock( &integrityQueueLock );
- pthread_join( thread, NULL );
+ thread_join( thread, NULL );
while ( bRunning )
usleep( 10000 );
pthread_mutex_destroy( &integrityQueueLock );
@@ -135,7 +135,7 @@ static void* integrity_main(void *data)
int fd = open( image->path, O_RDONLY );
if ( fd >= 0 ) {
if ( image_checkBlocksCrc32( fd, (uint32_t*)buffer, blocks, fileSize ) ) {
- printf( "[DEBUG] CRC check of block %d for %s succeeded :-)\n", blocks[0], image->lower_name );
+ //printf( "[DEBUG] CRC check of block %d for %s succeeded :-)\n", blocks[0], image->lower_name );
} else {
memlogf( "[WARNING] Hash check for block %d of %s failed!", blocks[0], image->lower_name );
image_updateCachemap( image, blocks[0] * HASH_BLOCK_SIZE, (blocks[0] + 1) * HASH_BLOCK_SIZE, FALSE );
diff --git a/src/server/locks.c b/src/server/locks.c
index 72e6069..d93be40 100644
--- a/src/server/locks.c
+++ b/src/server/locks.c
@@ -41,6 +41,8 @@ typedef struct
} debug_thread_t;
+int debugThreadCount = 0;
+
static debug_lock_t locks[MAXLOCKS];
static debug_thread_t threads[MAXTHREADS];
static int init_done = 0;
@@ -284,7 +286,7 @@ static void *debug_thread_watchdog(void *something)
void debug_locks_start_watchdog()
{
#ifdef _DEBUG
- if ( 0 != pthread_create( &watchdog, NULL, &debug_thread_watchdog, (void *)NULL ) ) {
+ if ( 0 != thread_create( &watchdog, NULL, &debug_thread_watchdog, (void *)NULL ) ) {
memlogf( "[ERROR] Could not start debug-lock watchdog." );
return;
}
@@ -298,6 +300,6 @@ void debug_locks_stop_watchdog()
printf( "Killing debug watchdog...\n" );
pthread_spin_lock( &initdestory );
pthread_spin_unlock( &initdestory );
- pthread_join( watchdog, NULL );
+ thread_join( watchdog, NULL );
#endif
}
diff --git a/src/server/locks.h b/src/server/locks.h
index 43a3943..ab355c9 100644
--- a/src/server/locks.h
+++ b/src/server/locks.h
@@ -4,6 +4,9 @@
#ifdef _DEBUG
#include <pthread.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
#define spin_init( lock, type ) debug_spin_init( #lock, __FILE__, __LINE__, lock, type)
#define spin_lock( lock ) debug_spin_lock( #lock, __FILE__, __LINE__, lock)
@@ -19,6 +22,7 @@ int debug_spin_destroy(const char *name, const char *file, int line, pthread_spi
void debug_dump_lock_stats();
+
#else
#define spin_init( lock, type ) pthread_spin_init(lock, type)
@@ -29,6 +33,52 @@ void debug_dump_lock_stats();
#endif
+#ifdef DEBUG_THREADS
+
+extern int debugThreadCount;
+#define thread_create(thread,attr,routine,arg) (printf("[THREAD CREATE] %d @ %s:%d\n", debugThreadCount, __FILE__, (int)__LINE__), debug_thread_create(thread, attr, routine, arg))
+static inline pthread_t debug_thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg)
+{
+ int i;
+ if (attr == NULL || pthread_attr_getdetachstate(attr, &i) != 0 || i == PTHREAD_CREATE_JOINABLE) {
+ ++debugThreadCount;
+ }
+ return pthread_create( thread, attr, start_routine, arg );
+}
+
+#define thread_detach(thread) (printf("[THREAD DETACH] %d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_detach(thread))
+static inline int debug_thread_detach(pthread_t thread)
+{
+ const int ret = pthread_detach(thread);
+ if (ret == 0) {
+ --debugThreadCount;
+ } else {
+ printf("[THREAD DETACH] Tried to detach invalid thread (error %d)\n", (int)errno);
+ exit(1);
+ }
+ return ret;
+}
+#define thread_join(thread,value) (printf("[THREAD JOIN] %d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_join(thread,value))
+static inline int debug_thread_join(pthread_t thread, void **value_ptr)
+{
+ const int ret = pthread_join(thread, value_ptr);
+ if (ret == 0) {
+ --debugThreadCount;
+ } else {
+ printf("[THREAD JOIN] Tried to join invalid thread (error %d)\n", (int)errno);
+ exit(1);
+ }
+ return ret;
+}
+
+#else
+
+#define thread_create(thread,attr,routine,param) pthread_create( thread, attr, routine, param )
+#define thread_detach(thread) pthread_detach( thread )
+#define thread_join(thread,value) pthread_join( thread, value )
+
+#endif
+
void debug_locks_start_watchdog();
void debug_locks_stop_watchdog();
diff --git a/src/server/server.c b/src/server/server.c
index f19b4fe..844365b 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -326,7 +326,10 @@ int main(int argc, char *argv[])
// setup rpc
//pthread_t thread_rpc;
- //pthread_create(&(thread_rpc), NULL, &dnbd3_rpc_mainloop, NULL);
+ //thread_create(&(thread_rpc), NULL, &dnbd3_rpc_mainloop, NULL);
+ pthread_attr_t threadAttrs;
+ pthread_attr_init( &threadAttrs );
+ pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
memlogf( "[INFO] Server is ready..." );
@@ -372,16 +375,14 @@ int main(int argc, char *argv[])
continue;
}
- pthread_attr_t threadAttrs;
- pthread_attr_init( &threadAttrs );
- pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED );
- if ( 0 != pthread_create( &(dnbd3_client->thread), &threadAttrs, net_client_handler, (void *)(uintptr_t)dnbd3_client ) ) {
+ if ( 0 != thread_create( &(dnbd3_client->thread), &threadAttrs, net_client_handler, (void *)(uintptr_t)dnbd3_client ) ) {
memlogf( "[ERROR] Could not start thread for new client." );
dnbd3_remove_client( dnbd3_client );
dnbd3_client = dnbd3_free_client( dnbd3_client );
continue;
}
}
+ pthread_attr_destroy( &threadAttrs );
dnbd3_cleanup();
}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index b6f7887..0a60ff1 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -66,7 +66,7 @@ int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
link->recvBufferLen = 0;
link->shutdown = FALSE;
spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
- if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) {
+ if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) {
memlogf( "[ERROR] Could not start thread for new client." );
goto failure;
}
@@ -81,6 +81,9 @@ int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
/**
* Locks on image.lock, uplink.lock
+ * Sets image->uplink to NULL while locked, so
+ * calling it multiple times, even concurrently, will
+ * not break anything.
*/
void uplink_shutdown(dnbd3_image_t *image)
{
@@ -99,12 +102,12 @@ void uplink_shutdown(dnbd3_image_t *image)
}
image->uplink = NULL;
uplink->shutdown = TRUE;
+ static uint64_t counter = 1;
+ if ( uplink->signal != -1 ) write( uplink->signal, &counter, sizeof(counter) );
+ pthread_t thread = uplink->thread;
spin_unlock( &uplink->queueLock );
spin_unlock( &image->lock );
- if ( uplink->signal != -1 ) write( uplink->signal, "", 1 );
- if ( uplink->image != NULL ) {
- pthread_join( uplink->thread, NULL );
- }
+ thread_join( thread, NULL );
}
/**
@@ -191,7 +194,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
static uint64_t counter = 1;
- write( signalFd, &counter, sizeof(uint64_t) );
+ write( signalFd, &counter, sizeof(counter) );
}
return TRUE;
}
@@ -343,13 +346,6 @@ static void* uplink_mainloop(void *data)
memlogf( "[INFO] Replication of %s complete.", link->image->lower_name );
if ( spin_trylock( &link->image->lock ) == 0 ) {
image_markComplete( link->image );
- spin_lock( &link->queueLock );
- if ( !link->shutdown ) {
- link->image->uplink = NULL;
- link->shutdown = TRUE;
- pthread_detach( link->thread );
- }
- spin_unlock( &link->queueLock );
spin_unlock( &link->image->lock );
goto cleanup;
}
@@ -383,7 +379,7 @@ static void* uplink_mainloop(void *data)
if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) {
snprintf( buffer, sizeof(buffer), "[DEBUG WARNING] Starving request detected:\n"
"%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name,
- link->queue[i].from, link->queue[i].to, link->queue[i].status );
+ link->queue[i].from, link->queue[i].to, link->queue[i].status );
link->queue[i].status = ULR_NEW;
spin_unlock( &link->queueLock );
printf("%s", buffer);
@@ -395,14 +391,19 @@ static void* uplink_mainloop(void *data)
#endif
}
cleanup: ;
+ altservers_removeUplink( link );
spin_lock( &link->image->lock );
spin_lock( &link->queueLock );
- if (link->image != NULL) link->image->uplink = NULL;
- spin_unlock( &link->image->lock );
+ link->image->uplink = NULL;
const int fd = link->fd;
const int signal = link->signal;
link->fd = -1;
link->signal = -1;
+ if ( !link->shutdown ) {
+ link->shutdown = TRUE;
+ thread_detach( link->thread );
+ }
+ spin_unlock( &link->image->lock );
spin_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
if ( signal != -1 ) close( signal );
@@ -517,7 +518,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
if ( ret != sizeof inReply ) {
const int err = errno;
memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply),
- err );
+ err );
goto error_cleanup;
}
fixup_reply( inReply );