From 325b8b0e5b79ee939546a6dcf2b8ad58f52f36c6 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Sat, 9 Feb 2019 19:19:05 +0100 Subject: [FUSE] Use shared/timing.* instead of nowMilli/Micro --- src/fuse/connection.c | 97 ++++++++++++++++++++++----------------------------- src/fuse/connection.h | 3 +- 2 files changed, 43 insertions(+), 57 deletions(-) diff --git a/src/fuse/connection.c b/src/fuse/connection.c index fb2d2c8..294983b 100644 --- a/src/fuse/connection.c +++ b/src/fuse/connection.c @@ -51,7 +51,7 @@ static struct { pthread_mutex_t sendMutex; dnbd3_signal_t* panicSignal; dnbd3_host_t currentServer; - uint64_t startupTime; + ticks startupTime; } connection; // Known alt servers @@ -95,9 +95,6 @@ static bool throwDataAway(int sockFd, uint32_t amount); static void enqueueRequest(dnbd3_async_t *request); static dnbd3_async_t* removeRequest(dnbd3_async_t *request); -static uint64_t nowMilli(); -static uint64_t nowMicro(); - bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid) { int sock = -1; @@ -111,6 +108,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r socklen_t salen; poll_list_t *cons = sock_newPollList(); + timing_setBase(); pthread_mutex_lock( &mutexInit ); if ( !connectionInitDone && keepRunning ) { dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; @@ -182,7 +180,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r connection.currentServer.type = 0; } connection.panicSignal = signal_new(); - connection.startupTime = nowMilli(); + timing_get( &connection.startupTime ); connection.sockFd = sock; requests.head = NULL; requests.tail = NULL; @@ -282,9 +280,10 @@ size_t connection_printStats(char *buffer, const size_t len) { int ret; size_t remaining = len; + declare_now; if ( remaining > 0 ) { - ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %ds\n\n", - image.name, (int)image.rid, (int)( (nowMilli() - connection.startupTime) / 1000 ) ); + ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n", + image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) ); if ( ret < 0 ) { ret = 0; } @@ -375,14 +374,15 @@ static void* connection_receiveThreadMain(void *sockPtr) goto fail; } // Check RTT - uint64_t diff = nowMicro() - request->time; + declare_now; + uint64_t diff = timing_diffUs( &request->time, &now ); if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s lock_read( &altLock ); for ( int i = 0; i < MAX_ALTS; ++i ) { if ( altservers[i].host.type == 0 ) continue; if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) { - altservers[i].liveRtt = ( altservers[i].liveRtt * 2 + (int)diff ) / 3; + altservers[i].liveRtt = ( altservers[i].liveRtt * 3 + (int)diff ) / 4; break; } } @@ -432,33 +432,38 @@ fail:; static void* connection_backgroundThread(void *something UNUSED) { - uint64_t nextKeepalive = 0; - uint64_t nextRttCheck = 0; + ticks nextKeepalive; + ticks nextRttCheck; + timing_get( &nextKeepalive ); + nextRttCheck = nextKeepalive; while ( keepRunning ) { - const uint64_t now = nowMilli(); - if ( now < nextKeepalive && now < nextRttCheck ) { - int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now ); - int waitRes = signal_wait( connection.panicSignal, waitTime ); + ticks now; + timing_get( &now ); + uint32_t wt1 = timing_diffMs( &now, &nextKeepalive ); + uint32_t wt2 = timing_diffMs( &now, &nextRttCheck ); + if ( wt1 > 0 && wt2 > 0 ) { + int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 ); if ( waitRes == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno ); } + timing_get( &now ); } // Woken up, see what we have to do const bool panic = connection.sockFd == -1; // Check alt servers - if ( panic || now >= nextRttCheck ) { + if ( panic || timing_reachedPrecise( &nextRttCheck, &now ) ) { addAltServers(); sortAltServers(); probeAltServers(); - if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) { - nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull; + if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) { + timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP ); } else { - nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull; + timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL ); } } // Send keepalive packet - if ( now >= nextKeepalive ) { + if ( timing_reachedPrecise( &nextKeepalive, &now ) ) { pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { dnbd3_request_t request; @@ -474,7 +479,7 @@ static void* connection_backgroundThread(void *something UNUSED) } } pthread_mutex_unlock( &connection.sendMutex ); - nextKeepalive = now + TIMER_INTERVAL_KEEPALIVE_PACKET * 1000ull; + timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET ); } } return NULL; @@ -578,7 +583,6 @@ static void probeAltServers() uint64_t testOffset = 0; uint32_t testLength = RTT_BLOCK_SIZE; dnbd3_async_t *request = NULL; - uint64_t now; alt_server_t *current = NULL, *best = NULL; if ( !panic ) { @@ -592,17 +596,15 @@ static void probeAltServers() } unlock_rw( &altLock ); } - now = nowMicro(); + declare_now; pthread_spin_lock( &requests.lock ); if ( requests.head != NULL ) { if ( !panic && current != NULL ) { const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second dnbd3_async_t *iterator; for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { - if ( iterator->time == 0 ) - continue; // Huh? // A request with measurement tag is pending - if ( now > iterator->time && now - iterator->time > maxDelay ) { + if ( timing_diffUs( &iterator->time, &now ) > maxDelay ) { panic = true; break; } @@ -634,7 +636,8 @@ static void probeAltServers() srv->rttIndex += 1; } // Probe - const uint64_t start = nowMicro(); + ticks start; + timing_get( &start ); errno = 0; int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); if ( sock == -1 ) { @@ -699,19 +702,21 @@ static void probeAltServers() } // Non-panic mode: // Update stats of server - const uint64_t end = nowMicro(); + ticks end; + timing_get( &end ); srv->consecutiveFails = 0; - srv->rtts[srv->rttIndex] = (int)(end - start); - srv->rtt = 0; + srv->rtts[srv->rttIndex] = (int)timing_diffUs( &start, &end ); + int newRtt = 0; for ( int i = 0; i < RTT_COUNT; ++i ) { - srv->rtt += srv->rtts[i]; + newRtt += srv->rtts[i]; } if ( srv->liveRtt != 0 ) { // Make live rtt measurement influence result - srv->rtt = ( srv->rtt + srv->liveRtt ) / ( RTT_COUNT + 1 ); + newRtt = ( newRtt + srv->liveRtt ) / ( RTT_COUNT + 1 ); } else { - srv->rtt /= RTT_COUNT; + newRtt /= RTT_COUNT; } + srv->rtt = newRtt; // Keep socket open if this is currently the best one if ( best == NULL || best->rtt > srv->rtt ) { @@ -739,7 +744,7 @@ fail:; alt_server_t * const srv = &altservers[altIndex]; // Decay liveRtt slowly... if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) { - srv->liveRtt -= ( srv->liveRtt / 100 + 1 ); + srv->liveRtt -= ( ( srv->liveRtt / 100 ) + 1 ); } if ( srv == best ) { if ( srv->bestCount < 50 ) { @@ -820,7 +825,7 @@ static void switchConnection(int sockFd, alt_server_t *srv) signal_call( connection.panicSignal ); return; } - connection.startupTime = nowMilli(); + timing_get( &connection.startupTime ); pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd ); sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len ); logadd( LOG_INFO, "%s", message ); @@ -881,7 +886,7 @@ static void enqueueRequest(dnbd3_async_t *request) request->success = false; //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line ); // Measure latency and add to switch formula - request->time = nowMicro(); + timing_get( &request->time ); pthread_spin_lock( &requests.lock ); if ( requests.head == NULL ) { requests.head = requests.tail = request; @@ -916,23 +921,3 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request) return iterator; } -static uint64_t nowMilli() -{ - struct timespec ts; - if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { - printf( "clock_gettime() failed. Errno: %d\n", errno ); - return 0; - } - return ( ts.tv_sec * 1000ull ) + ( ts.tv_nsec / 1000000ull ); -} - -static uint64_t nowMicro() -{ - struct timespec ts; - if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { - printf( "clock_gettime() failed. Errno: %d\n", errno ); - return 0; - } - return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull ); -} - diff --git a/src/fuse/connection.h b/src/fuse/connection.h index 929777a..c919d95 100644 --- a/src/fuse/connection.h +++ b/src/fuse/connection.h @@ -2,6 +2,7 @@ #define _CONNECTION_H_ #include "../shared/fdsignal.h" +#include "../shared/timing.h" #include #include #include @@ -12,7 +13,7 @@ typedef struct _dnbd3_async { struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller) dnbd3_signal_t* signal; // Used to signal the caller char* buffer; // Caller-provided buffer to be filled - uint64_t time; // When request was put on wire, 0 if not measuring + ticks time; // When request was put on wire, 0 if not measuring uint64_t offset; uint32_t length; bool finished; // Will be set to true if the request has been handled -- cgit v1.2.3-55-g7522