From 1d2295131020688b5a688286ce8c53d6bb7abdb8 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Sun, 18 Aug 2019 21:59:26 +0200 Subject: [SERVER] Add struct representing active connection to uplink server --- src/server/uplink.c | 78 ++++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 40 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 7d66b21..e21e28c 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -97,7 +97,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version uplink->idleTime = 0; uplink->queueLen = 0; mutex_lock( &uplink->sendMutex ); - uplink->fd = -1; + uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->cacheFd = -1; uplink->signal = NULL; @@ -105,12 +105,12 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version mutex_lock( &uplink->rttLock ); uplink->cycleDetected = false; if ( sock >= 0 ) { - uplink->betterFd = sock; - uplink->betterServer = *host; + uplink->better.fd = sock; + uplink->better.host = *host; uplink->rttTestResult = RTT_DOCHANGE; - uplink->betterVersion = version; + uplink->better.version = version; } else { - uplink->betterFd = -1; + uplink->better.fd = -1; uplink->rttTestResult = RTT_IDLE; } mutex_unlock( &uplink->rttLock ); @@ -211,7 +211,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { + if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) { mutex_unlock( &client->image->lock ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); mutex_lock( &uplink->rttLock ); @@ -315,14 +315,14 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); } else { - if ( uplink->fd == -1 ) { + if ( uplink->current.fd == -1 ) { mutex_unlock( &uplink->sendMutex ); logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); } else { const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); if ( hops < 200 ) ++hops; - const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); + const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); mutex_unlock( &uplink->sendMutex ); if ( !ret ) { logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); @@ -405,7 +405,7 @@ static void* uplink_mainloop(void *data) mutex_unlock( &uplink->rttLock ); if ( waitTime == 0 ) { // Nothing - } else if ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) { + } else if ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) { waitTime = 1000; } else { declare_now; @@ -413,7 +413,7 @@ static void* uplink_mainloop(void *data) if ( waitTime < 100 ) waitTime = 100; if ( waitTime > 5000 ) waitTime = 5000; } - events[EV_SOCKET].fd = uplink->fd; + events[EV_SOCKET].fd = uplink->current.fd; numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || uplink->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? @@ -430,13 +430,11 @@ static void* uplink_mainloop(void *data) uplink->rttTestResult = RTT_IDLE; // The rttTest worker thread has finished our request. // And says it's better to switch to another server - const int fd = uplink->fd; + const int fd = uplink->current.fd; mutex_lock( &uplink->sendMutex ); - uplink->fd = uplink->betterFd; + uplink->current = uplink->better; mutex_unlock( &uplink->sendMutex ); - uplink->betterFd = -1; - uplink->currentServer = uplink->betterServer; - uplink->version = uplink->betterVersion; + uplink->better.fd = -1; uplink->cycleDetected = false; mutex_unlock( &uplink->rttLock ); discoverFailCount = 0; @@ -445,7 +443,7 @@ static void* uplink_mainloop(void *data) uplink->image->working = true; uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; - if ( host_to_string( &uplink->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { + if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) { logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 ); setThreadName( buffer ); } @@ -471,7 +469,7 @@ static void* uplink_mainloop(void *data) if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name ); } - if ( uplink->fd != -1 ) { + if ( uplink->current.fd != -1 ) { // Uplink seems fine, relay requests to it... uplink_sendRequests( uplink, true ); } else { // No uplink; maybe it was shutdown since it was idle for too long @@ -499,9 +497,9 @@ static void* uplink_mainloop(void *data) uplink_saveCacheMap( uplink ); } // Keep-alive - if ( uplink->fd != -1 && uplink->replicationHandle == REP_NONE ) { + if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) { // Send keep-alive if nothing is happening - if ( uplink_sendKeepalive( uplink->fd ) ) { + if ( uplink_sendKeepalive( uplink->current.fd ) ) { // Re-trigger periodically, in case it requires a minimum user count uplink_sendReplicationRequest( uplink ); } else { @@ -511,10 +509,10 @@ static void* uplink_mainloop(void *data) } } // Don't keep uplink established if we're idle for too much - if ( uplink->fd != -1 && uplink_connectionShouldShutdown( uplink ) ) { + if ( uplink->current.fd != -1 && uplink_connectionShouldShutdown( uplink ) ) { mutex_lock( &uplink->sendMutex ); - close( uplink->fd ); - uplink->fd = events[EV_SOCKET].fd = -1; + close( uplink->current.fd ); + uplink->current.fd = events[EV_SOCKET].fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->cycleDetected = false; if ( uplink->recvBufferLen != 0 ) { @@ -531,7 +529,7 @@ static void* uplink_mainloop(void *data) const int rttTestResult = uplink->rttTestResult; mutex_unlock( &uplink->rttLock ); if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { - if ( timing_reached( &nextAltCheck, &now ) || ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) { + if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) { // It seems it's time for a check if ( image_isComplete( uplink->image ) ) { // Quit work if image is complete @@ -556,7 +554,7 @@ static void* uplink_mainloop(void *data) timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); } #ifdef _DEBUG - if ( uplink->fd != -1 && !uplink->shutdown ) { + if ( uplink->current.fd != -1 && !uplink->shutdown ) { bool resend = false; ticks deadline; timing_set( &deadline, &now, -10 ); @@ -594,10 +592,10 @@ static void* uplink_mainloop(void *data) uplink->image->uplink = NULL; } mutex_lock( &uplink->queueLock ); - const int fd = uplink->fd; + const int fd = uplink->current.fd; const dnbd3_signal_t* signal = uplink->signal; mutex_lock( &uplink->sendMutex ); - uplink->fd = -1; + uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->signal = NULL; // Do not access uplink->image after unlocking, since we set @@ -610,8 +608,8 @@ static void* uplink_mainloop(void *data) // Wait for the RTT check to finish/fail if it's in progress while ( uplink->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); - if ( uplink->betterFd != -1 ) { - close( uplink->betterFd ); + if ( uplink->better.fd != -1 ) { + close( uplink->better.fd ); } mutex_destroy( &uplink->queueLock ); mutex_destroy( &uplink->rttLock ); @@ -651,14 +649,14 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) mutex_unlock( &uplink->queueLock ); if ( hops < 200 ) ++hops; mutex_lock( &uplink->sendMutex ); - const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); + const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); mutex_unlock( &uplink->sendMutex ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection // is reestablished. logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - altservers_serverFailed( &uplink->currentServer ); + altservers_serverFailed( &uplink->current.host ); return; } mutex_lock( &uplink->queueLock ); @@ -678,7 +676,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) */ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) { - if ( uplink == NULL || uplink->fd == -1 ) return; + if ( uplink == NULL || uplink->current.fd == -1 ) return; if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE ) return; @@ -724,7 +722,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) uplink->replicationHandle = offset; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_block( uplink->fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->version, 1 ) ); + bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) ); mutex_unlock( &uplink->sendMutex ); if ( !sendOk ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); @@ -798,7 +796,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) dnbd3_reply_t inReply, outReply; int ret, i; for (;;) { - ret = dnbd3_read_reply( uplink->fd, &inReply, false ); + ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; if ( ret == REPLY_AGAIN ) break; if ( unlikely( ret == REPLY_CLOSED ) ) { @@ -826,7 +824,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) exit( 1 ); } } - if ( unlikely( (uint32_t)sock_recv( uplink->fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) { + if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) { logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path ); goto error_cleanup; } @@ -973,12 +971,12 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { - if ( uplink->fd == -1 ) + if ( uplink->current.fd == -1 ) return; - altservers_serverFailed( &uplink->currentServer ); + altservers_serverFailed( &uplink->current.host ); mutex_lock( &uplink->sendMutex ); - close( uplink->fd ); - uplink->fd = -1; + close( uplink->current.fd ); + uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->replicationHandle = REP_NONE; if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { @@ -987,7 +985,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) if ( !findNew ) return; mutex_lock( &uplink->rttLock ); - bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->betterFd != -1; + bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1; mutex_unlock( &uplink->rttLock ); if ( bail ) return; @@ -1016,7 +1014,7 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink) uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ); + bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes ); mutex_unlock( &uplink->sendMutex ); if ( !sendOk || bytes == 0 ) { free( buffer ); -- cgit v1.2.3-55-g7522