From 1d2295131020688b5a688286ce8c53d6bb7abdb8 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Sun, 18 Aug 2019 21:59:26 +0200 Subject: [SERVER] Add struct representing active connection to uplink server --- src/server/altservers.c | 30 +++++++++---------- src/server/globals.h | 14 ++++----- src/server/image.c | 2 +- src/server/integrity.c | 2 +- src/server/uplink.c | 78 ++++++++++++++++++++++++------------------------- 5 files changed, 60 insertions(+), 66 deletions(-) diff --git a/src/server/altservers.c b/src/server/altservers.c index 1001981..fbe10a8 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -125,14 +125,14 @@ void altservers_findUplink(dnbd3_uplink_t *uplink) { if ( uplink->shutdown ) return; - if ( uplink->fd != -1 && numAltServers <= 1 ) + if ( uplink->current.fd != -1 && numAltServers <= 1 ) return; int i; // if betterFd != -1 it means the uplink is supposed to switch to another // server. As this function here is called by the uplink thread, it can // never be that the uplink is supposed to switch, but instead calls // this function. - assert( uplink->betterFd == -1 ); + assert( uplink->better.fd == -1 ); // it is however possible that an RTT measurement is currently in progress, // so check for that case and do nothing if one is in progress // XXX As this function is only ever called by the image's uplink thread, @@ -457,9 +457,9 @@ static void *altservers_main(void *data UNUSED) if ( uplink == NULL ) continue; // First, get 4 alt servers - numAlts = altservers_getListForUplink( servers, ALTS, uplink->fd == -1 ); + numAlts = altservers_getListForUplink( servers, ALTS, uplink->current.fd == -1 ); // If we're already connected and only got one server anyways, there isn't much to do - if ( numAlts <= 1 && uplink->fd != -1 ) { + if ( numAlts <= 1 && uplink->current.fd != -1 ) { uplink->rttTestResult = RTT_DONTCHANGE; continue; } @@ -475,15 +475,15 @@ static void *altservers_main(void *data UNUSED) } LOG( LOG_DEBUG2, "[%d] Running alt check", itLink ); assert( uplink->rttTestResult == RTT_INPROGRESS ); - if ( uplink->fd != -1 ) { + if ( uplink->current.fd != -1 ) { // Add current server if not already in list found = false; for (itAlt = 0; itAlt < numAlts; ++itAlt) { - if ( !isSameAddressPort( &uplink->currentServer, &servers[itAlt] ) ) continue; + if ( !isSameAddressPort( &uplink->current.host, &servers[itAlt] ) ) continue; found = true; break; } - if ( !found ) servers[numAlts++] = uplink->currentServer; + if ( !found ) servers[numAlts++] = uplink->current.host; } // Test them all int bestSock = -1; @@ -537,7 +537,7 @@ static void *altservers_main(void *data UNUSED) // Measurement done - everything fine so far mutex_lock( &altServersLock ); mutex_lock( &uplink->rttLock ); - const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer ); + const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->current.host ); // Penaltize rtt if this was a cycle; this will treat this server with lower priority // in the near future too, so we prevent alternating between two servers that are both // part of a cycle and have the lowest latency. @@ -547,9 +547,9 @@ static void *altservers_main(void *data UNUSED) unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); mutex_unlock( &altServersLock ); // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time - if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000; + if ( ( uplink->cycleDetected || uplink->current.fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000; mutex_unlock( &uplink->rttLock ); - if ( uplink->fd != -1 && isCurrent ) { + if ( uplink->current.fd != -1 && isCurrent ) { // Was measuring current server currentRtt = avg; close( sock ); @@ -574,18 +574,18 @@ static void *altservers_main(void *data UNUSED) close( sock ); } // Done testing all servers. See if we should switch - if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { + if ( bestSock != -1 && (uplink->current.fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { // yep - if ( currentRtt > 10000000 || uplink->fd == -1 ) { + if ( currentRtt > 10000000 || uplink->current.fd == -1 ) { LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); } else { LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); } sock_setTimeout( bestSock, _uplinkTimeout ); mutex_lock( &uplink->rttLock ); - uplink->betterFd = bestSock; - uplink->betterServer = servers[bestIndex]; - uplink->betterVersion = bestProtocolVersion; + uplink->better.fd = bestSock; + uplink->better.host = servers[bestIndex]; + uplink->better.version = bestProtocolVersion; uplink->rttTestResult = RTT_DOCHANGE; mutex_unlock( &uplink->rttLock ); signal_call( uplink->signal ); diff --git a/src/server/globals.h b/src/server/globals.h index 0371e33..659e5a2 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -31,9 +31,9 @@ typedef struct } dnbd3_queued_request_t; typedef struct { - int fd; - int version; - dnbd3_host_t host; + int fd; // Socket fd for this connection + int version; // Protocol version of remote server + dnbd3_host_t host; // IP/Port of remote server } dnbd3_server_connection_t; #define RTT_IDLE 0 // Not in progress @@ -43,20 +43,16 @@ typedef struct { #define RTT_NOT_REACHABLE 4 // No uplink was reachable struct _dnbd3_uplink { - int fd; // socket fd to remote server - int version; // remote server protocol version + dnbd3_server_connection_t current; // Currently active connection; fd == -1 means disconnected + dnbd3_server_connection_t better; // Better connection as found by altserver worker; fd == -1 means none dnbd3_signal_t* signal; // used to wake up the process pthread_t thread; // thread holding the connection pthread_mutex_t sendMutex; // For locking socket while sending pthread_mutex_t queueLock; // lock for synchronization on request queue etc. dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer - dnbd3_host_t currentServer; // Current server we're connected to pthread_mutex_t rttLock; // When accessing rttTestResult, betterFd or betterServer int rttTestResult; // RTT_* int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! - int betterVersion; // protocol version of better server - int betterFd; // Active connection to better server, ready to use - dnbd3_host_t betterServer; // The better server uint8_t *recvBuffer; // Buffer for receiving payload uint32_t recvBufferLen; // Len of ^^ atomic_bool shutdown; // signal this thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop() diff --git a/src/server/image.c b/src/server/image.c index 4a65ed3..d250715 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -1508,7 +1508,7 @@ json_t* image_getListAsJson() uplinkName[0] = '\0'; } else { bytesReceived = image->uplink->bytesReceived; - if ( image->uplink->fd == -1 || !host_to_string( &image->uplink->currentServer, uplinkName, sizeof(uplinkName) ) ) { + if ( image->uplink->current.fd == -1 || !host_to_string( &image->uplink->current.host, uplinkName, sizeof(uplinkName) ) ) { uplinkName[0] = '\0'; } } diff --git a/src/server/integrity.c b/src/server/integrity.c index c52d17b..3d1ac9b 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -240,7 +240,7 @@ static void* integrity_main(void * data UNUSED) if ( !foundCorrupted ) { mutex_lock( &image->lock ); if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper? - image->working = image->uplink->fd != -1 && image->readFd != -1; + image->working = image->uplink->current.fd != -1 && image->readFd != -1; } mutex_unlock( &image->lock ); } diff --git a/src/server/uplink.c b/src/server/uplink.c index 7d66b21..e21e28c 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -97,7 +97,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version uplink->idleTime = 0; uplink->queueLen = 0; mutex_lock( &uplink->sendMutex ); - uplink->fd = -1; + uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->cacheFd = -1; uplink->signal = NULL; @@ -105,12 +105,12 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version mutex_lock( &uplink->rttLock ); uplink->cycleDetected = false; if ( sock >= 0 ) { - uplink->betterFd = sock; - uplink->betterServer = *host; + uplink->better.fd = sock; + uplink->better.host = *host; uplink->rttTestResult = RTT_DOCHANGE; - uplink->betterVersion = version; + uplink->better.version = version; } else { - uplink->betterFd = -1; + uplink->better.fd = -1; uplink->rttTestResult = RTT_IDLE; } mutex_unlock( &uplink->rttLock ); @@ -211,7 +211,7 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { + if ( hops != 0 && isSameAddress( &uplink->current.host, &client->host ) ) { mutex_unlock( &client->image->lock ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); mutex_lock( &uplink->rttLock ); @@ -315,14 +315,14 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin if ( mutex_trylock( &uplink->sendMutex ) != 0 ) { logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); } else { - if ( uplink->fd == -1 ) { + if ( uplink->current.fd == -1 ) { mutex_unlock( &uplink->sendMutex ); logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); } else { const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); if ( hops < 200 ) ++hops; - const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); + const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); mutex_unlock( &uplink->sendMutex ); if ( !ret ) { logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); @@ -405,7 +405,7 @@ static void* uplink_mainloop(void *data) mutex_unlock( &uplink->rttLock ); if ( waitTime == 0 ) { // Nothing - } else if ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) { + } else if ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) { waitTime = 1000; } else { declare_now; @@ -413,7 +413,7 @@ static void* uplink_mainloop(void *data) if ( waitTime < 100 ) waitTime = 100; if ( waitTime > 5000 ) waitTime = 5000; } - events[EV_SOCKET].fd = uplink->fd; + events[EV_SOCKET].fd = uplink->current.fd; numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || uplink->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? @@ -430,13 +430,11 @@ static void* uplink_mainloop(void *data) uplink->rttTestResult = RTT_IDLE; // The rttTest worker thread has finished our request. // And says it's better to switch to another server - const int fd = uplink->fd; + const int fd = uplink->current.fd; mutex_lock( &uplink->sendMutex ); - uplink->fd = uplink->betterFd; + uplink->current = uplink->better; mutex_unlock( &uplink->sendMutex ); - uplink->betterFd = -1; - uplink->currentServer = uplink->betterServer; - uplink->version = uplink->betterVersion; + uplink->better.fd = -1; uplink->cycleDetected = false; mutex_unlock( &uplink->rttLock ); discoverFailCount = 0; @@ -445,7 +443,7 @@ static void* uplink_mainloop(void *data) uplink->image->working = true; uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; - if ( host_to_string( &uplink->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { + if ( host_to_string( &uplink->current.host, buffer + 1, sizeof(buffer) - 1 ) ) { logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", uplink->image->name, buffer + 1 ); setThreadName( buffer ); } @@ -471,7 +469,7 @@ static void* uplink_mainloop(void *data) if ( signal_clear( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", uplink->image->name ); } - if ( uplink->fd != -1 ) { + if ( uplink->current.fd != -1 ) { // Uplink seems fine, relay requests to it... uplink_sendRequests( uplink, true ); } else { // No uplink; maybe it was shutdown since it was idle for too long @@ -499,9 +497,9 @@ static void* uplink_mainloop(void *data) uplink_saveCacheMap( uplink ); } // Keep-alive - if ( uplink->fd != -1 && uplink->replicationHandle == REP_NONE ) { + if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) { // Send keep-alive if nothing is happening - if ( uplink_sendKeepalive( uplink->fd ) ) { + if ( uplink_sendKeepalive( uplink->current.fd ) ) { // Re-trigger periodically, in case it requires a minimum user count uplink_sendReplicationRequest( uplink ); } else { @@ -511,10 +509,10 @@ static void* uplink_mainloop(void *data) } } // Don't keep uplink established if we're idle for too much - if ( uplink->fd != -1 && uplink_connectionShouldShutdown( uplink ) ) { + if ( uplink->current.fd != -1 && uplink_connectionShouldShutdown( uplink ) ) { mutex_lock( &uplink->sendMutex ); - close( uplink->fd ); - uplink->fd = events[EV_SOCKET].fd = -1; + close( uplink->current.fd ); + uplink->current.fd = events[EV_SOCKET].fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->cycleDetected = false; if ( uplink->recvBufferLen != 0 ) { @@ -531,7 +529,7 @@ static void* uplink_mainloop(void *data) const int rttTestResult = uplink->rttTestResult; mutex_unlock( &uplink->rttLock ); if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { - if ( timing_reached( &nextAltCheck, &now ) || ( uplink->fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) { + if ( timing_reached( &nextAltCheck, &now ) || ( uplink->current.fd == -1 && !uplink_connectionShouldShutdown( uplink ) ) || uplink->cycleDetected ) { // It seems it's time for a check if ( image_isComplete( uplink->image ) ) { // Quit work if image is complete @@ -556,7 +554,7 @@ static void* uplink_mainloop(void *data) timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); } #ifdef _DEBUG - if ( uplink->fd != -1 && !uplink->shutdown ) { + if ( uplink->current.fd != -1 && !uplink->shutdown ) { bool resend = false; ticks deadline; timing_set( &deadline, &now, -10 ); @@ -594,10 +592,10 @@ static void* uplink_mainloop(void *data) uplink->image->uplink = NULL; } mutex_lock( &uplink->queueLock ); - const int fd = uplink->fd; + const int fd = uplink->current.fd; const dnbd3_signal_t* signal = uplink->signal; mutex_lock( &uplink->sendMutex ); - uplink->fd = -1; + uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->signal = NULL; // Do not access uplink->image after unlocking, since we set @@ -610,8 +608,8 @@ static void* uplink_mainloop(void *data) // Wait for the RTT check to finish/fail if it's in progress while ( uplink->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); - if ( uplink->betterFd != -1 ) { - close( uplink->betterFd ); + if ( uplink->better.fd != -1 ) { + close( uplink->better.fd ); } mutex_destroy( &uplink->queueLock ); mutex_destroy( &uplink->rttLock ); @@ -651,14 +649,14 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) mutex_unlock( &uplink->queueLock ); if ( hops < 200 ) ++hops; mutex_lock( &uplink->sendMutex ); - const bool ret = dnbd3_get_block( uplink->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->version, hops ) ); + const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); mutex_unlock( &uplink->sendMutex ); if ( !ret ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection // is reestablished. logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - altservers_serverFailed( &uplink->currentServer ); + altservers_serverFailed( &uplink->current.host ); return; } mutex_lock( &uplink->queueLock ); @@ -678,7 +676,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) */ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) { - if ( uplink == NULL || uplink->fd == -1 ) return; + if ( uplink == NULL || uplink->current.fd == -1 ) return; if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return; // Don't do background replication if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE ) return; @@ -724,7 +722,7 @@ static void uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) uplink->replicationHandle = offset; const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_block( uplink->fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->version, 1 ) ); + bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) ); mutex_unlock( &uplink->sendMutex ); if ( !sendOk ) { logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); @@ -798,7 +796,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) dnbd3_reply_t inReply, outReply; int ret, i; for (;;) { - ret = dnbd3_read_reply( uplink->fd, &inReply, false ); + ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; if ( ret == REPLY_AGAIN ) break; if ( unlikely( ret == REPLY_CLOSED ) ) { @@ -826,7 +824,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) exit( 1 ); } } - if ( unlikely( (uint32_t)sock_recv( uplink->fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) { + if ( unlikely( (uint32_t)sock_recv( uplink->current.fd, uplink->recvBuffer, inReply.size ) != inReply.size ) ) { logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", uplink->image->path ); goto error_cleanup; } @@ -973,12 +971,12 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { - if ( uplink->fd == -1 ) + if ( uplink->current.fd == -1 ) return; - altservers_serverFailed( &uplink->currentServer ); + altservers_serverFailed( &uplink->current.host ); mutex_lock( &uplink->sendMutex ); - close( uplink->fd ); - uplink->fd = -1; + close( uplink->current.fd ); + uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); uplink->replicationHandle = REP_NONE; if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { @@ -987,7 +985,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) if ( !findNew ) return; mutex_lock( &uplink->rttLock ); - bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->betterFd != -1; + bool bail = uplink->rttTestResult == RTT_INPROGRESS || uplink->better.fd != -1; mutex_unlock( &uplink->rttLock ); if ( bail ) return; @@ -1016,7 +1014,7 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink) uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ); + bool sendOk = dnbd3_get_crc32( uplink->current.fd, &masterCrc, buffer, &bytes ); mutex_unlock( &uplink->sendMutex ); if ( !sendOk || bytes == 0 ) { free( buffer ); -- cgit v1.2.3-55-g7522