From d9c2a6cf943ca08f31f61a3fada940f77e3a03d3 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 11 Jan 2016 12:09:23 +0100 Subject: [SERVER] Fix a lot of (mostly harmless) data races --- src/server/uplink.c | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 726b08b..e0bdcae 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -69,12 +69,15 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) goto failure; } link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); + spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); link->image = image; link->bytesReceived = 0; link->queueLen = 0; link->fd = -1; link->signal = -1; link->replicationHandle = 0; + spin_lock( &link->rttLock ); if ( sock >= 0 ) { link->betterFd = sock; link->betterServer = *host; @@ -83,9 +86,9 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->betterFd = -1; link->rttTestResult = RTT_IDLE; } + spin_unlock( &link->rttLock ); link->recvBufferLen = 0; link->shutdown = false; - spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) { logadd( LOG_ERROR, "Could not start thread for new uplink." ); goto failure; @@ -133,7 +136,7 @@ void uplink_shutdown(dnbd3_image_t *image) /** * Remove given client from uplink request queue - * Locks on: uplink.queueLock, client.sendMutex + * Locks on: uplink.queueLock */ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) { @@ -142,8 +145,8 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) if ( uplink->queue[i].client == client ) { uplink->queue[i].client = NULL; uplink->queue[i].status = ULR_FREE; - if ( uplink->queueLen == i + 1 ) uplink->queueLen--; } + if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--; } spin_unlock( &uplink->queueLock ); } @@ -273,16 +276,20 @@ static void* uplink_mainloop(void *data) } while ( !_shutdown && !link->shutdown ) { // Check if server switch is in order - if ( link->rttTestResult == RTT_DOCHANGE ) { + spin_lock( &link->rttLock ); + if ( link->rttTestResult != RTT_DOCHANGE ) { + spin_unlock( &link->rttLock ); + } else { link->rttTestResult = RTT_IDLE; - discoverFailCount = 0; // The rttTest worker thread has finished our request. // And says it's better to switch to another server const int fd = link->fd; link->fd = link->betterFd; - if ( fd != -1 ) close( fd ); link->betterFd = -1; link->currentServer = link->betterServer; + spin_unlock( &link->rttLock ); + discoverFailCount = 0; + if ( fd != -1 ) close( fd ); link->replicationHandle = 0; link->image->working = true; link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received @@ -372,7 +379,10 @@ static void* uplink_mainloop(void *data) } } // See if we should trigger an RTT measurement - if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { + spin_lock( &link->rttLock ); + const int rttTestResult = link->rttTestResult; + spin_unlock( &link->rttLock ); + if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) { // This probably means the system time was changed - handle this case properly by capping the timeout nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2; @@ -390,8 +400,10 @@ static void* uplink_mainloop(void *data) altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); nextAltCheck = now + altCheckInterval; } - } else if ( link->rttTestResult == RTT_NOT_REACHABLE ) { + } else if ( rttTestResult == RTT_NOT_REACHABLE ) { + spin_lock( &link->rttLock ); link->rttTestResult = RTT_IDLE; + spin_unlock( &link->rttLock ); discoverFailCount++; nextAltCheck = now + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED); } @@ -441,6 +453,7 @@ static void* uplink_mainloop(void *data) usleep( 10000 ); if ( link->betterFd != -1 ) close( link->betterFd ); spin_destroy( &link->queueLock ); + spin_destroy( &link->rttLock ); free( link->recvBuffer ); link->recvBuffer = NULL; spin_lock( &statisticsReceivedLock ); @@ -524,7 +537,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) /** * Receive data from uplink server and process/dispatch - * Locks on: link.lock, indirectly on images[].lock + * Locks on: link.lock, images[].lock */ static void uplink_handleReceive(dnbd3_connection_t *link) { @@ -566,7 +579,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link) struct iovec iov[2]; const uint64_t start = inReply.handle; const uint64_t end = inReply.handle + inReply.size; + spin_lock( &link->image->lock ); link->bytesReceived += inReply.size; + spin_unlock( &link->image->lock ); // 1) Write to cache file if ( link->image->cacheFd != -1 ) { uint32_t done = 0; @@ -644,7 +659,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link) #endif if ( start == link->replicationHandle ) link->replicationHandle = 0; } - if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link ); + spin_lock( &link->queueLock ); + const bool rep = ( link->queueLen == 0 ); + spin_unlock( &link->queueLock ); + if ( rep ) uplink_sendReplicationRequest( link ); return; // Error handling from failed receive or message parsing error_cleanup: ; -- cgit v1.2.3-55-g7522