summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2019-07-23 17:20:20 +0200
committerSimon Rettberg2019-07-23 17:24:27 +0200
commit7937c7db7baea296fce4055a9e9f498e67da2d09 (patch)
tree72574175ea6aa5bc2890fd792c499355a7b2bfab /src/server/uplink.c
parent[FUSE] Add --sticky mode to ignore alt-servers announced by servers (diff)
downloaddnbd3-7937c7db7baea296fce4055a9e9f498e67da2d09.tar.gz
dnbd3-7937c7db7baea296fce4055a9e9f498e67da2d09.tar.xz
dnbd3-7937c7db7baea296fce4055a9e9f498e67da2d09.zip
[SERVER] uplink: Relay request in client's thread if possible
Early benchmarking shows that this is faster, since we don't require another thread to wake up just to send out the request.
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c59
1 files changed, 56 insertions, 3 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 31b220d..ccbf209 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -69,11 +69,14 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
+ pthread_mutex_init( &link->sendMutex, NULL );
link->image = image;
link->bytesReceived = 0;
link->idleTime = 0;
link->queueLen = 0;
+ pthread_mutex_lock( &link->sendMutex );
link->fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->cacheFd = -1;
link->signal = NULL;
link->replicationHandle = REP_NONE;
@@ -267,6 +270,39 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
#endif
spin_unlock( &uplink->queueLock );
+ if ( foundExisting != -1 )
+ return true; // Attached to pending request, do nothing
+
+ // See if we can fire away the request
+ if ( pthread_mutex_trylock( &uplink->sendMutex ) != 0 ) {
+ logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
+ } else {
+ if ( uplink->fd == -1 ) {
+ pthread_mutex_unlock( &uplink->sendMutex );
+ logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
+ } else {
+ const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
+ if ( hops < 200 ) ++hops;
+ const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
+ pthread_mutex_unlock( &uplink->sendMutex );
+ if ( !ret ) {
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
+ } else {
+ spin_lock( &uplink->queueLock );
+ if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) {
+ uplink->queue[freeSlot].status = ULR_PENDING;
+ logadd( LOG_DEBUG2, "Succesful direct uplink request" );
+ } else {
+ logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" );
+ }
+ spin_unlock( &uplink->queueLock );
+ return true;
+ }
+ // Fall through to waking up sender thread
+ }
+ }
+
if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
@@ -346,7 +382,9 @@ static void* uplink_mainloop(void *data)
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
const int fd = link->fd;
+ pthread_mutex_lock( &link->sendMutex );
link->fd = link->betterFd;
+ pthread_mutex_unlock( &link->sendMutex );
link->betterFd = -1;
link->currentServer = link->betterServer;
link->version = link->betterVersion;
@@ -425,8 +463,10 @@ static void* uplink_mainloop(void *data)
}
// Don't keep link established if we're idle for too much
if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
+ pthread_mutex_lock( &link->sendMutex );
close( link->fd );
link->fd = events[EV_SOCKET].fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->cycleDetected = false;
if ( link->recvBufferLen != 0 ) {
link->recvBufferLen = 0;
@@ -503,7 +543,9 @@ static void* uplink_mainloop(void *data)
spin_lock( &link->queueLock );
const int fd = link->fd;
const dnbd3_signal_t* signal = link->signal;
+ pthread_mutex_lock( &link->sendMutex );
link->fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->signal = NULL;
if ( !link->shutdown ) {
link->shutdown = true;
@@ -524,6 +566,7 @@ static void* uplink_mainloop(void *data)
}
spin_destroy( &link->queueLock );
spin_destroy( &link->rttLock );
+ pthread_mutex_destroy( &link->sendMutex );
free( link->recvBuffer );
link->recvBuffer = NULL;
if ( link->cacheFd != -1 ) {
@@ -558,7 +601,9 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
*/
spin_unlock( &link->queueLock );
if ( hops < 200 ) ++hops;
- const int ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
+ pthread_mutex_lock( &link->sendMutex );
+ const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
+ pthread_mutex_unlock( &link->sendMutex );
if ( !ret ) {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
@@ -629,7 +674,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
link->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
- if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) {
+ pthread_mutex_lock( &link->sendMutex );
+ bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) );
+ pthread_mutex_unlock( &link->sendMutex );
+ if ( !sendOk ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
return;
}
@@ -874,8 +922,10 @@ static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew)
if ( link->fd == -1 )
return;
altservers_serverFailed( &link->currentServer );
+ pthread_mutex_lock( &link->sendMutex );
close( link->fd );
link->fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->replicationHandle = REP_NONE;
if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = 0;
@@ -911,7 +961,10 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t);
uint32_t masterCrc;
uint32_t *buffer = malloc( bytes );
- if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) {
+ pthread_mutex_lock( &uplink->sendMutex );
+ bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes );
+ pthread_mutex_unlock( &uplink->sendMutex );
+ if ( !sendOk || bytes == 0 ) {
free( buffer );
return;
}