From 7937c7db7baea296fce4055a9e9f498e67da2d09 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 23 Jul 2019 17:20:20 +0200 Subject: [SERVER] uplink: Relay request in client's thread if possible Early benchmarking shows that this is faster, since we don't require another thread to wake up just to send out the request. --- LOCKS | 1 + src/server/globals.h | 1 + src/server/uplink.c | 59 +++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/LOCKS b/LOCKS index 935dadb..4b5b07c 100644 --- a/LOCKS +++ b/LOCKS @@ -32,6 +32,7 @@ client.statsLock statisticsSentLock statisticsReceivedLock uplink.rttLock +uplink.sendMutex If you need to lock multiple clients/images/... at once, lock the client with the lowest array index first. diff --git a/src/server/globals.h b/src/server/globals.h index 2b30bc2..9b10ee4 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -53,6 +53,7 @@ struct _dnbd3_connection int version; // remote server protocol version dnbd3_signal_t* signal; // used to wake up the process pthread_t thread; // thread holding the connection + pthread_mutex_t sendMutex; // For locking socket while sending pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer dnbd3_host_t currentServer; // Current server we're connected to diff --git a/src/server/uplink.c b/src/server/uplink.c index 31b220d..ccbf209 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -69,11 +69,14 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); + pthread_mutex_init( &link->sendMutex, NULL ); link->image = image; link->bytesReceived = 0; link->idleTime = 0; link->queueLen = 0; + pthread_mutex_lock( &link->sendMutex ); link->fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->cacheFd = -1; link->signal = NULL; link->replicationHandle = REP_NONE; @@ -267,6 +270,39 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin #endif spin_unlock( &uplink->queueLock ); + if ( foundExisting != -1 ) + return true; // Attached to pending request, do nothing + + // See if we can fire away the request + if ( pthread_mutex_trylock( &uplink->sendMutex ) != 0 ) { + logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); + } else { + if ( uplink->fd == -1 ) { + pthread_mutex_unlock( &uplink->sendMutex ); + logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); + } else { + const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); + if ( hops < 200 ) ++hops; + const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); + pthread_mutex_unlock( &uplink->sendMutex ); + if ( !ret ) { + logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); + } else { + spin_lock( &uplink->queueLock ); + if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) { + uplink->queue[freeSlot].status = ULR_PENDING; + logadd( LOG_DEBUG2, "Succesful direct uplink request" ); + } else { + logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" ); + } + spin_unlock( &uplink->queueLock ); + return true; + } + // Fall through to waking up sender thread + } + } + if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); @@ -346,7 +382,9 @@ static void* uplink_mainloop(void *data) // The rttTest worker thread has finished our request. // And says it's better to switch to another server const int fd = link->fd; + pthread_mutex_lock( &link->sendMutex ); link->fd = link->betterFd; + pthread_mutex_unlock( &link->sendMutex ); link->betterFd = -1; link->currentServer = link->betterServer; link->version = link->betterVersion; @@ -425,8 +463,10 @@ static void* uplink_mainloop(void *data) } // Don't keep link established if we're idle for too much if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { + pthread_mutex_lock( &link->sendMutex ); close( link->fd ); link->fd = events[EV_SOCKET].fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->cycleDetected = false; if ( link->recvBufferLen != 0 ) { link->recvBufferLen = 0; @@ -503,7 +543,9 @@ static void* uplink_mainloop(void *data) spin_lock( &link->queueLock ); const int fd = link->fd; const dnbd3_signal_t* signal = link->signal; + pthread_mutex_lock( &link->sendMutex ); link->fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->signal = NULL; if ( !link->shutdown ) { link->shutdown = true; @@ -524,6 +566,7 @@ static void* uplink_mainloop(void *data) } spin_destroy( &link->queueLock ); spin_destroy( &link->rttLock ); + pthread_mutex_destroy( &link->sendMutex ); free( link->recvBuffer ); link->recvBuffer = NULL; if ( link->cacheFd != -1 ) { @@ -558,7 +601,9 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) */ spin_unlock( &link->queueLock ); if ( hops < 200 ) ++hops; - const int ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); + pthread_mutex_lock( &link->sendMutex ); + const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); + pthread_mutex_unlock( &link->sendMutex ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection @@ -629,7 +674,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; link->replicationHandle = offset; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); - if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) { + pthread_mutex_lock( &link->sendMutex ); + bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ); + pthread_mutex_unlock( &link->sendMutex ); + if ( !sendOk ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); return; } @@ -874,8 +922,10 @@ static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) if ( link->fd == -1 ) return; altservers_serverFailed( &link->currentServer ); + pthread_mutex_lock( &link->sendMutex ); close( link->fd ); link->fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->replicationHandle = REP_NONE; if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { link->nextReplicationIndex = 0; @@ -911,7 +961,10 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t); uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); - if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) { + pthread_mutex_lock( &uplink->sendMutex ); + bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ); + pthread_mutex_unlock( &uplink->sendMutex ); + if ( !sendOk || bytes == 0 ) { free( buffer ); return; } -- cgit v1.2.3-55-g7522