From 4249a20d130262bc9b5e562fc73b712ce072b0b7 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 13 Aug 2013 16:51:16 +0200 Subject: [SERVER] Fix race condition in uplink request aggregation, other small improvements and debugging features --- src/server/altservers.c | 1 + src/server/helper.c | 7 +++++++ src/server/helper.h | 1 + src/server/integrity.c | 4 +++- src/server/locks.c | 2 ++ src/server/net.c | 4 +++- src/server/server.c | 2 ++ src/server/uplink.c | 23 +++++++++++++---------- 8 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/server/altservers.c b/src/server/altservers.c index f255b58..86b2dce 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -285,6 +285,7 @@ static void *altserver_main(void *data) struct iovec iov[2]; struct timespec start, end; + setThreadName( "altserver-check" ); // Make valgrind happy memset( &reply, 0, sizeof(reply) ); memset( &request, 0, sizeof(request) ); diff --git a/src/server/helper.c b/src/server/helper.c index 24ee0fb..bc723e3 100644 --- a/src/server/helper.c +++ b/src/server/helper.c @@ -5,6 +5,7 @@ #include #include #include +#include // For thread names #include "../types.h" #include "../config.h" @@ -172,3 +173,9 @@ int file_alloc(int fd, uint64_t offset, uint64_t size) if ( write( fd, "", 1 ) != 1 ) return FALSE; return TRUE; } + +void setThreadName(char *name) +{ + if ( strlen( name ) > 16 ) name[16] = '\0'; + prctl( PR_SET_NAME, (unsigned long)name, 0, 0, 0 ); +} diff --git a/src/server/helper.h b/src/server/helper.h index 6c116ef..e939a66 100644 --- a/src/server/helper.h +++ b/src/server/helper.h @@ -20,6 +20,7 @@ int file_exists(char *file); int file_writable(char *file); int mkdir_p(const char* path); int file_alloc(int fd, uint64_t offset, uint64_t size); +void setThreadName(char *name); static inline int is_same_server(const dnbd3_host_t * const a, const dnbd3_host_t * const b) { diff --git a/src/server/integrity.c b/src/server/integrity.c index 875b62d..400e171 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -38,7 +38,9 @@ void integrity_init() assert( queueLen == -1 ); pthread_mutex_init( &integrityQueueLock, NULL ); pthread_cond_init( &queueSignal, NULL ); + bRunning = TRUE; if ( 0 != pthread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) { + bRunning = FALSE; memlogf( "[WARNING] Could not start integrity check thread. Corrupted images will not be detected." ); return; } @@ -96,10 +98,10 @@ void integrity_check(dnbd3_image_t *image, int block) static void* integrity_main(void *data) { - bRunning = TRUE; int i; uint8_t *buffer = NULL; size_t bufferSize = 0; + setThreadName( "image-check" ); pthread_mutex_lock( &integrityQueueLock ); while ( !_shutdown ) { for (i = queueLen - 1; i >= 0; --i) { diff --git a/src/server/locks.c b/src/server/locks.c index ce29b9a..413d317 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -16,6 +16,7 @@ #include #include "globals.h" #include "memlog.h" +#include "helper.h" #define MAXLOCKS 500 #define MAXTHREADS 500 @@ -254,6 +255,7 @@ void debug_dump_lock_stats() static void *debug_thread_watchdog(void *something) { + setThreadName("debug-watchdog"); while ( !_shutdown ) { if ( init_done ) { time_t now = time( NULL ); diff --git a/src/server/net.c b/src/server/net.c index 4af577e..c068338 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -38,6 +38,7 @@ #include "uplink.h" #include "altservers.h" #include "memlog.h" +#include "helper.h" #include "../serialize.h" #include "../config.h" #include "../types.h" @@ -190,7 +191,8 @@ void *net_client_handler(void *dnbd3_client) usleep( _clientPenalty ); } if ( host_to_string( &client->host, buffer, sizeof buffer ) ) { - printf( "[DEBUG] Client %s gets %s\n", buffer, image_name ); + //printf( "[DEBUG] Client %s gets %s\n", buffer, image_name ); + setThreadName( buffer ); } // client handling mainloop while ( recv_request_header( client->sock, &request ) ) { diff --git a/src/server/server.c b/src/server/server.c index d2238cc..289b901 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -312,6 +312,8 @@ int main(int argc, char *argv[]) memlogf( "[INFO] Server is ready..." ); + setThreadName( "client-listener" ); + // +++++++++++++++++++++++++++++++++++++++++++++++++++ main loop while ( 1 ) { len = sizeof(client); diff --git a/src/server/uplink.c b/src/server/uplink.c index 6d05f94..13ae7ad 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -5,6 +5,7 @@ #include "image.h" #include "helper.h" #include "altservers.h" +#include "helper.h" #include #include #include @@ -110,7 +111,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint return FALSE; } dnbd3_connection_t * const uplink = client->image->uplink; - int foundExisting = FALSE; // Is there a pending request that is a superset of our range? + int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise int i; int freeSlot = -1; const uint64_t end = start + length; @@ -120,12 +121,12 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint for (i = 0; i < uplink->queueLen; ++i) { if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; - if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { - foundExisting = TRUE; + if ( foundExisting == -1 && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { + foundExisting = i; break; } } - if ( freeSlot == -1 ) { + if ( freeSlot == -1 || freeSlot < foundExisting ) { // Second check: If we "attach" to a thread the request has to be added after the existing one, otherwise a race condition might occur where the now request will starve if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { spin_unlock( &uplink->queueLock ); memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); @@ -168,6 +169,7 @@ static void* uplink_mainloop(void *data) // assert( link != NULL ); assert( link->queueLen == 0 ); + setThreadName( "uplink" ); // fdEpoll = epoll_create( 2 ); if ( fdEpoll == -1 ) { @@ -323,8 +325,8 @@ static void* uplink_mainloop(void *data) for (i = 0; i < link->queueLen; ++i) { if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) { printf( "[DEBUG WARNING] Starving request detected:\n" - "%s\n(from %" PRIu64 " to %" PRIu64 "\n", link->queue[i].client->image->lower_name, link->queue[i].from, - link->queue[i].to ); + "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name, + link->queue[i].from, link->queue[i].to, link->queue[i].status ); } } spin_unlock( &link->queueLock ); @@ -455,6 +457,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) dnbd3_queued_request_t * const req = &link->queue[i]; if ( req->status != ULR_PROCESSING ) continue; assert( req->from >= start && req->to <= end ); + dnbd3_client_t * const client = req->client; outReply.cmd = CMD_GET_BLOCK; outReply.handle = req->handle; outReply.size = req->to - req->from; @@ -463,12 +466,12 @@ static void uplink_handle_receive(dnbd3_connection_t *link) iov[1].iov_base = link->recvBuffer + (req->from - start); iov[1].iov_len = outReply.size; fixup_reply( outReply ); - pthread_mutex_lock( &req->client->sendMutex ); + req->status = ULR_FREE; + pthread_mutex_lock( &client->sendMutex ); spin_unlock( &link->queueLock ); - writev( req->client->sock, iov, 2 ); - pthread_mutex_unlock( &req->client->sendMutex ); + writev( client->sock, iov, 2 ); + pthread_mutex_unlock( &client->sendMutex ); spin_lock( &link->queueLock ); - if ( req->status == ULR_PROCESSING ) req->status = ULR_FREE; // Might have changed in the meantime if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; } spin_unlock( &link->queueLock ); -- cgit v1.2.3-55-g7522