summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c59
1 files changed, 56 insertions, 3 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 31b220d..ccbf209 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -69,11 +69,14 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE );
+ pthread_mutex_init( &link->sendMutex, NULL );
link->image = image;
link->bytesReceived = 0;
link->idleTime = 0;
link->queueLen = 0;
+ pthread_mutex_lock( &link->sendMutex );
link->fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->cacheFd = -1;
link->signal = NULL;
link->replicationHandle = REP_NONE;
@@ -267,6 +270,39 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
#endif
spin_unlock( &uplink->queueLock );
+ if ( foundExisting != -1 )
+ return true; // Attached to pending request, do nothing
+
+ // See if we can fire away the request
+ if ( pthread_mutex_trylock( &uplink->sendMutex ) != 0 ) {
+ logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" );
+ } else {
+ if ( uplink->fd == -1 ) {
+ pthread_mutex_unlock( &uplink->sendMutex );
+ logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
+ } else {
+ const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart);
+ if ( hops < 200 ) ++hops;
+ const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) );
+ pthread_mutex_unlock( &uplink->sendMutex );
+ if ( !ret ) {
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" );
+ } else {
+ spin_lock( &uplink->queueLock );
+ if ( uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client && uplink->queue[freeSlot].status == ULR_NEW ) {
+ uplink->queue[freeSlot].status = ULR_PENDING;
+ logadd( LOG_DEBUG2, "Succesful direct uplink request" );
+ } else {
+ logadd( LOG_DEBUG2, "Weird queue update fail for direct uplink request" );
+ }
+ spin_unlock( &uplink->queueLock );
+ return true;
+ }
+ // Fall through to waking up sender thread
+ }
+ }
+
if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
@@ -346,7 +382,9 @@ static void* uplink_mainloop(void *data)
// The rttTest worker thread has finished our request.
// And says it's better to switch to another server
const int fd = link->fd;
+ pthread_mutex_lock( &link->sendMutex );
link->fd = link->betterFd;
+ pthread_mutex_unlock( &link->sendMutex );
link->betterFd = -1;
link->currentServer = link->betterServer;
link->version = link->betterVersion;
@@ -425,8 +463,10 @@ static void* uplink_mainloop(void *data)
}
// Don't keep link established if we're idle for too much
if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) {
+ pthread_mutex_lock( &link->sendMutex );
close( link->fd );
link->fd = events[EV_SOCKET].fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->cycleDetected = false;
if ( link->recvBufferLen != 0 ) {
link->recvBufferLen = 0;
@@ -503,7 +543,9 @@ static void* uplink_mainloop(void *data)
spin_lock( &link->queueLock );
const int fd = link->fd;
const dnbd3_signal_t* signal = link->signal;
+ pthread_mutex_lock( &link->sendMutex );
link->fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->signal = NULL;
if ( !link->shutdown ) {
link->shutdown = true;
@@ -524,6 +566,7 @@ static void* uplink_mainloop(void *data)
}
spin_destroy( &link->queueLock );
spin_destroy( &link->rttLock );
+ pthread_mutex_destroy( &link->sendMutex );
free( link->recvBuffer );
link->recvBuffer = NULL;
if ( link->cacheFd != -1 ) {
@@ -558,7 +601,9 @@ static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly)
*/
spin_unlock( &link->queueLock );
if ( hops < 200 ) ++hops;
- const int ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
+ pthread_mutex_lock( &link->sendMutex );
+ const bool ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) );
+ pthread_mutex_unlock( &link->sendMutex );
if ( !ret ) {
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
@@ -629,7 +674,10 @@ static void uplink_sendReplicationRequest(dnbd3_connection_t *link)
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
link->replicationHandle = offset;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
- if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) {
+ pthread_mutex_lock( &link->sendMutex );
+ bool sendOk = dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) );
+ pthread_mutex_unlock( &link->sendMutex );
+ if ( !sendOk ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" );
return;
}
@@ -874,8 +922,10 @@ static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew)
if ( link->fd == -1 )
return;
altservers_serverFailed( &link->currentServer );
+ pthread_mutex_lock( &link->sendMutex );
close( link->fd );
link->fd = -1;
+ pthread_mutex_unlock( &link->sendMutex );
link->replicationHandle = REP_NONE;
if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) {
link->nextReplicationIndex = 0;
@@ -911,7 +961,10 @@ static void uplink_addCrc32(dnbd3_connection_t *uplink)
size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t);
uint32_t masterCrc;
uint32_t *buffer = malloc( bytes );
- if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) {
+ pthread_mutex_lock( &uplink->sendMutex );
+ bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes );
+ pthread_mutex_unlock( &uplink->sendMutex );
+ if ( !sendOk || bytes == 0 ) {
free( buffer );
return;
}