summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2016-01-11 12:09:23 +0100
committerSimon Rettberg2016-01-11 12:09:23 +0100
commitd9c2a6cf943ca08f31f61a3fada940f77e3a03d3 (patch)
tree31f627a3d52ff838b046f41516a0fbef0b58b9ee /src/server/uplink.c
parent[KERNEL/CLIENT] Several minor tweaks and changes (diff)
downloaddnbd3-d9c2a6cf943ca08f31f61a3fada940f77e3a03d3.tar.gz
dnbd3-d9c2a6cf943ca08f31f61a3fada940f77e3a03d3.tar.xz
dnbd3-d9c2a6cf943ca08f31f61a3fada940f77e3a03d3.zip
[SERVER] Fix a lot of (mostly harmless) data races
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c38
1 files changed, 28 insertions, 10 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 726b08b..e0bdcae 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -69,12 +69,15 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
goto failure;
}
link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
+ spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
link->image = image;
link->bytesReceived = 0;
link->queueLen = 0;
link->fd = -1;
link->signal = -1;
link->replicationHandle = 0;
+ spin_lock( &link->rttLock );
if ( sock >= 0 ) {
link->betterFd = sock;
link->betterServer = *host;
@@ -83,9 +86,9 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
link->betterFd = -1;
link->rttTestResult = RTT_IDLE;
}
+ spin_unlock( &link->rttLock );
link->recvBufferLen = 0;
link->shutdown = false;
- spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) {
logadd( LOG_ERROR, "Could not start thread for new uplink." );
goto failure;
@@ -133,7 +136,7 @@ void uplink_shutdown(dnbd3_image_t *image)
/**
* Remove given client from uplink request queue
- * Locks on: uplink.queueLock, client.sendMutex
+ * Locks on: uplink.queueLock
*/
void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
{
@@ -142,8 +145,8 @@ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
if ( uplink->queue[i].client == client ) {
uplink->queue[i].client = NULL;
uplink->queue[i].status = ULR_FREE;
- if ( uplink->queueLen == i + 1 ) uplink->queueLen--;
}
+ if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--;
}
spin_unlock( &uplink->queueLock );
}
@@ -273,16 +276,20 @@ static void* uplink_mainloop(void *data)
}
while ( !_shutdown && !link->shutdown ) {
// Check if server switch is in order
- if ( link->rttTestResult == RTT_DOCHANGE ) {
+ spin_lock( &link->rttLock );
+ if ( link->rttTestResult != RTT_DOCHANGE ) {
+ spin_unlock( &link->rttLock );
+ } else {
link->rttTestResult = RTT_IDLE;
- discoverFailCount = 0;
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
const int fd = link->fd;
link->fd = link->betterFd;
- if ( fd != -1 ) close( fd );
link->betterFd = -1;
link->currentServer = link->betterServer;
+ spin_unlock( &link->rttLock );
+ discoverFailCount = 0;
+ if ( fd != -1 ) close( fd );
link->replicationHandle = 0;
link->image->working = true;
link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received
@@ -372,7 +379,10 @@ static void* uplink_mainloop(void *data)
}
}
// See if we should trigger an RTT measurement
- if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
+ spin_lock( &link->rttLock );
+ const int rttTestResult = link->rttTestResult;
+ spin_unlock( &link->rttLock );
+ if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) {
if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) {
// This probably means the system time was changed - handle this case properly by capping the timeout
nextAltCheck = now + SERVER_RTT_DELAY_FAILED / 2;
@@ -390,8 +400,10 @@ static void* uplink_mainloop(void *data)
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
nextAltCheck = now + altCheckInterval;
}
- } else if ( link->rttTestResult == RTT_NOT_REACHABLE ) {
+ } else if ( rttTestResult == RTT_NOT_REACHABLE ) {
+ spin_lock( &link->rttLock );
link->rttTestResult = RTT_IDLE;
+ spin_unlock( &link->rttLock );
discoverFailCount++;
nextAltCheck = now + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED);
}
@@ -441,6 +453,7 @@ static void* uplink_mainloop(void *data)
usleep( 10000 );
if ( link->betterFd != -1 ) close( link->betterFd );
spin_destroy( &link->queueLock );
+ spin_destroy( &link->rttLock );
free( link->recvBuffer );
link->recvBuffer = NULL;
spin_lock( &statisticsReceivedLock );
@@ -524,7 +537,7 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
/**
* Receive data from uplink server and process/dispatch
- * Locks on: link.lock, indirectly on images[].lock
+ * Locks on: link.lock, images[].lock
*/
static void uplink_handleReceive(dnbd3_connection_t *link)
{
@@ -566,7 +579,9 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
struct iovec iov[2];
const uint64_t start = inReply.handle;
const uint64_t end = inReply.handle + inReply.size;
+ spin_lock( &link->image->lock );
link->bytesReceived += inReply.size;
+ spin_unlock( &link->image->lock );
// 1) Write to cache file
if ( link->image->cacheFd != -1 ) {
uint32_t done = 0;
@@ -644,7 +659,10 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
#endif
if ( start == link->replicationHandle ) link->replicationHandle = 0;
}
- if ( link->queueLen == 0 ) uplink_sendReplicationRequest( link );
+ spin_lock( &link->queueLock );
+ const bool rep = ( link->queueLen == 0 );
+ spin_unlock( &link->queueLock );
+ if ( rep ) uplink_sendReplicationRequest( link );
return;
// Error handling from failed receive or message parsing
error_cleanup: ;