diff options
author | Simon Rettberg | 2019-07-23 17:20:20 +0200 |
---|---|---|
committer | Simon Rettberg | 2019-07-23 17:24:27 +0200 |
commit | 7937c7db7baea296fce4055a9e9f498e67da2d09 (patch) | |
tree | 72574175ea6aa5bc2890fd792c499355a7b2bfab /src | |
parent | [FUSE] Add --sticky mode to ignore alt-servers announced by servers (diff) | |
download | dnbd3-7937c7db7baea296fce4055a9e9f498e67da2d09.tar.gz dnbd3-7937c7db7baea296fce4055a9e9f498e67da2d09.tar.xz dnbd3-7937c7db7baea296fce4055a9e9f498e67da2d09.zip |
[SERVER] uplink: Relay request in client's thread if possible
Early benchmarking shows that this is faster, since we don't
require another thread to wake up just to send out the request.
Diffstat (limited to 'src')
-rw-r--r-- | src/server/globals.h | 1 | ||||
-rw-r--r-- | src/server/uplink.c | 59 |
2 files changed, 57 insertions, 3 deletions
diff --git a/src/server/globals.h b/src/server/globals.h index 2b30bc2..9b10ee4 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -53,6 +53,7 @@ struct _dnbd3_connection int version; // remote server protocol version dnbd3_signal_t* signal; // used to wake up the process pthread_t thread; // thread holding the connection + pthread_mutex_t sendMutex; // For locking socket while sending pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer dnbd3_host_t currentServer; // Current server we're connected to diff --git a/src/server/uplink.c b/src/server/uplink.c index 31b220d..ccbf209 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -69,11 +69,14 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); + pthread_mutex_init( &link->sendMutex, NULL ); link->image = image; link->bytesReceived = 0; link->idleTime = 0; link->queueLen = 0; + pthread_mutex_lock( &link->sendMutex ); link->fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->cacheFd = -1; link->signal = NULL; link->replicationHandle = REP_NONE; @@ -267,6 +270,39 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin #endif spin_unlock( &uplink->queueLock ); + if ( foundExisting != -1 ) + return true; // Attached to pending request, do nothing + + // See if we can fire away the request + if ( pthread_mutex_trylock( &uplink->sendMutex ) != 0 ) { + logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); + } else { + if ( uplink->fd == -1 ) { + pthread_mutex_unlock( &uplink->sendMutex ); + logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); + } else { + const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); + if ( hops < 200 ) ++hops; + const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); + pthread_mutex_unlock( &uplink->sendMutex ); + if ( !ret ) { + logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); + } else { + spin_lock( &uplink->queueLock ); + if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) { + uplink->queue[freeSlot].status = ULR_PENDING; + logadd( LOG_DEBUG2, "Succesful direct uplink request" ); + } else { + logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" ); + } + spin_unlock( &uplink->queueLock ); + return true; + } + // Fall through to waking up sender thread + } + } + if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); @@ -346,7 +382,9 @@ static void* uplink_mainloop(void *data) // The rttTest worker thread has finished our request. // And says it's better to switch to another server const int fd = link->fd; + pthread_mutex_lock( &link->sendMutex ); link->fd = link->betterFd; + pthread_mutex_unlock( &link->sendMutex ); link->betterFd = -1; link->currentServer = link->betterServer; link->version = link->betterVersion; @@ -425,8 +463,10 @@ static void* uplink_mainloop(void *data) } // Don't keep link established if we're idle for too much if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { + pthread_mutex_lock( &link->sendMutex ); close( link->fd ); link->fd = events[EV_SOCKET].fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->cycleDetected = false; if ( link->recvBufferLen != 0 ) { link->recvBufferLen = 0; @@ -503,7 +543,9 @@ static void* uplink_mainloop(void *data) spin_lock( &link->queueLock ); const int fd = link->fd; const dnbd3_signal_t* signal = link->signal; + pthread_mutex_lock( &link->sendMutex ); link->fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->signal = NULL; if ( !link->shutdown ) { link->shutdown = true; @@ -524,6 +566,7 @@ static void* uplink_mainloop(void *data) } spin_destroy( &link->queueLock ); spin_destroy( &link->rttLock ); + pthread_mutex_destroy( &link->sendMutex ); free( link->recvBuffer ); link->recvBuffer = NULL; if ( link->cacheFd != -1 ) { @@ -558,7 +601,9 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) */ spin_unlock( &link->queueLock ); if ( hops < 200 ) ++hops; - const int ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); + pthread_mutex_lock( &link->sendMutex ); + const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); + pthread_mutex_unlock( &link->sendMutex ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection @@ -629,7 +674,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; link->replicationHandle = offset; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); - if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) { + pthread_mutex_lock( &link->sendMutex ); + bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ); + pthread_mutex_unlock( &link->sendMutex ); + if ( !sendOk ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); return; } @@ -874,8 +922,10 @@ static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) if ( link->fd == -1 ) return; altservers_serverFailed( &link->currentServer ); + pthread_mutex_lock( &link->sendMutex ); close( link->fd ); link->fd = -1; + pthread_mutex_unlock( &link->sendMutex ); link->replicationHandle = REP_NONE; if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { link->nextReplicationIndex = 0; @@ -911,7 +961,10 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink) size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t); uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); - if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) { + pthread_mutex_lock( &uplink->sendMutex ); + bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ); + pthread_mutex_unlock( &uplink->sendMutex ); + if ( !sendOk || bytes == 0 ) { free( buffer ); return; } |