summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2019-02-09 19:19:05 +0100
committerSimon Rettberg2019-02-09 19:19:05 +0100
commit325b8b0e5b79ee939546a6dcf2b8ad58f52f36c6 (patch)
tree948c7135d90391cb73bd0e479d35b8334f143895
parent[SHARED] More timing helpers (diff)
downloaddnbd3-325b8b0e5b79ee939546a6dcf2b8ad58f52f36c6.tar.gz
dnbd3-325b8b0e5b79ee939546a6dcf2b8ad58f52f36c6.tar.xz
dnbd3-325b8b0e5b79ee939546a6dcf2b8ad58f52f36c6.zip
[FUSE] Use shared/timing.* instead of nowMilli/Micro
-rw-r--r--src/fuse/connection.c97
-rw-r--r--src/fuse/connection.h3
2 files changed, 43 insertions, 57 deletions
diff --git a/src/fuse/connection.c b/src/fuse/connection.c
index fb2d2c8..294983b 100644
--- a/src/fuse/connection.c
+++ b/src/fuse/connection.c
@@ -51,7 +51,7 @@ static struct {
pthread_mutex_t sendMutex;
dnbd3_signal_t* panicSignal;
dnbd3_host_t currentServer;
- uint64_t startupTime;
+ ticks startupTime;
} connection;
// Known alt servers
@@ -95,9 +95,6 @@ static bool throwDataAway(int sockFd, uint32_t amount);
static void enqueueRequest(dnbd3_async_t *request);
static dnbd3_async_t* removeRequest(dnbd3_async_t *request);
-static uint64_t nowMilli();
-static uint64_t nowMicro();
-
bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid)
{
int sock = -1;
@@ -111,6 +108,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
socklen_t salen;
poll_list_t *cons = sock_newPollList();
+ timing_setBase();
pthread_mutex_lock( &mutexInit );
if ( !connectionInitDone && keepRunning ) {
dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS];
@@ -182,7 +180,7 @@ bool connection_init(const char *hosts, const char *lowerImage, const uint16_t r
connection.currentServer.type = 0;
}
connection.panicSignal = signal_new();
- connection.startupTime = nowMilli();
+ timing_get( &connection.startupTime );
connection.sockFd = sock;
requests.head = NULL;
requests.tail = NULL;
@@ -282,9 +280,10 @@ size_t connection_printStats(char *buffer, const size_t len)
{
int ret;
size_t remaining = len;
+ declare_now;
if ( remaining > 0 ) {
- ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %ds\n\n",
- image.name, (int)image.rid, (int)( (nowMilli() - connection.startupTime) / 1000 ) );
+ ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %" PRIu32 "s\n\n",
+ image.name, (int)image.rid, timing_diff( &connection.startupTime, &now ) );
if ( ret < 0 ) {
ret = 0;
}
@@ -375,14 +374,15 @@ static void* connection_receiveThreadMain(void *sockPtr)
goto fail;
}
// Check RTT
- uint64_t diff = nowMicro() - request->time;
+ declare_now;
+ uint64_t diff = timing_diffUs( &request->time, &now );
if ( diff < 30ull * 1000 * 1000 ) { // Sanity check - ignore if > 30s
lock_read( &altLock );
for ( int i = 0; i < MAX_ALTS; ++i ) {
if ( altservers[i].host.type == 0 )
continue;
if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) {
- altservers[i].liveRtt = ( altservers[i].liveRtt * 2 + (int)diff ) / 3;
+ altservers[i].liveRtt = ( altservers[i].liveRtt * 3 + (int)diff ) / 4;
break;
}
}
@@ -432,33 +432,38 @@ fail:;
static void* connection_backgroundThread(void *something UNUSED)
{
- uint64_t nextKeepalive = 0;
- uint64_t nextRttCheck = 0;
+ ticks nextKeepalive;
+ ticks nextRttCheck;
+ timing_get( &nextKeepalive );
+ nextRttCheck = nextKeepalive;
while ( keepRunning ) {
- const uint64_t now = nowMilli();
- if ( now < nextKeepalive && now < nextRttCheck ) {
- int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now );
- int waitRes = signal_wait( connection.panicSignal, waitTime );
+ ticks now;
+ timing_get( &now );
+ uint32_t wt1 = timing_diffMs( &now, &nextKeepalive );
+ uint32_t wt2 = timing_diffMs( &now, &nextRttCheck );
+ if ( wt1 > 0 && wt2 > 0 ) {
+ int waitRes = signal_wait( connection.panicSignal, (int)MIN( wt1, wt2 ) + 1 );
if ( waitRes == SIGNAL_ERROR ) {
logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno );
}
+ timing_get( &now );
}
// Woken up, see what we have to do
const bool panic = connection.sockFd == -1;
// Check alt servers
- if ( panic || now >= nextRttCheck ) {
+ if ( panic || timing_reachedPrecise( &nextRttCheck, &now ) ) {
addAltServers();
sortAltServers();
probeAltServers();
- if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) {
- nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull;
+ if ( panic || timing_diff( &connection.startupTime, &now ) <= STARTUP_MODE_DURATION ) {
+ timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_STARTUP );
} else {
- nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull;
+ timing_addSeconds( &nextRttCheck, &now, TIMER_INTERVAL_PROBE_NORMAL );
}
}
// Send keepalive packet
- if ( now >= nextKeepalive ) {
+ if ( timing_reachedPrecise( &nextKeepalive, &now ) ) {
pthread_mutex_lock( &connection.sendMutex );
if ( connection.sockFd != -1 ) {
dnbd3_request_t request;
@@ -474,7 +479,7 @@ static void* connection_backgroundThread(void *something UNUSED)
}
}
pthread_mutex_unlock( &connection.sendMutex );
- nextKeepalive = now + TIMER_INTERVAL_KEEPALIVE_PACKET * 1000ull;
+ timing_addSeconds( &nextKeepalive, &now, TIMER_INTERVAL_KEEPALIVE_PACKET );
}
}
return NULL;
@@ -578,7 +583,6 @@ static void probeAltServers()
uint64_t testOffset = 0;
uint32_t testLength = RTT_BLOCK_SIZE;
dnbd3_async_t *request = NULL;
- uint64_t now;
alt_server_t *current = NULL, *best = NULL;
if ( !panic ) {
@@ -592,17 +596,15 @@ static void probeAltServers()
}
unlock_rw( &altLock );
}
- now = nowMicro();
+ declare_now;
pthread_spin_lock( &requests.lock );
if ( requests.head != NULL ) {
if ( !panic && current != NULL ) {
const int maxDelay = MAX( current->rtt * 5, 1000000 ); // Give at least one second
dnbd3_async_t *iterator;
for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) {
- if ( iterator->time == 0 )
- continue; // Huh?
// A request with measurement tag is pending
- if ( now > iterator->time && now - iterator->time > maxDelay ) {
+ if ( timing_diffUs( &iterator->time, &now ) > maxDelay ) {
panic = true;
break;
}
@@ -634,7 +636,8 @@ static void probeAltServers()
srv->rttIndex += 1;
}
// Probe
- const uint64_t start = nowMicro();
+ ticks start;
+ timing_get( &start );
errno = 0;
int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 );
if ( sock == -1 ) {
@@ -699,19 +702,21 @@ static void probeAltServers()
}
// Non-panic mode:
// Update stats of server
- const uint64_t end = nowMicro();
+ ticks end;
+ timing_get( &end );
srv->consecutiveFails = 0;
- srv->rtts[srv->rttIndex] = (int)(end - start);
- srv->rtt = 0;
+ srv->rtts[srv->rttIndex] = (int)timing_diffUs( &start, &end );
+ int newRtt = 0;
for ( int i = 0; i < RTT_COUNT; ++i ) {
- srv->rtt += srv->rtts[i];
+ newRtt += srv->rtts[i];
}
if ( srv->liveRtt != 0 ) {
// Make live rtt measurement influence result
- srv->rtt = ( srv->rtt + srv->liveRtt ) / ( RTT_COUNT + 1 );
+ newRtt = ( newRtt + srv->liveRtt ) / ( RTT_COUNT + 1 );
} else {
- srv->rtt /= RTT_COUNT;
+ newRtt /= RTT_COUNT;
}
+ srv->rtt = newRtt;
// Keep socket open if this is currently the best one
if ( best == NULL || best->rtt > srv->rtt ) {
@@ -739,7 +744,7 @@ fail:;
alt_server_t * const srv = &altservers[altIndex];
// Decay liveRtt slowly...
if ( srv->liveRtt > current->liveRtt && srv->liveRtt > srv->rtt ) {
- srv->liveRtt -= ( srv->liveRtt / 100 + 1 );
+ srv->liveRtt -= ( ( srv->liveRtt / 100 ) + 1 );
}
if ( srv == best ) {
if ( srv->bestCount < 50 ) {
@@ -820,7 +825,7 @@ static void switchConnection(int sockFd, alt_server_t *srv)
signal_call( connection.panicSignal );
return;
}
- connection.startupTime = nowMilli();
+ timing_get( &connection.startupTime );
pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd );
sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len );
logadd( LOG_INFO, "%s", message );
@@ -881,7 +886,7 @@ static void enqueueRequest(dnbd3_async_t *request)
request->success = false;
//logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line );
// Measure latency and add to switch formula
- request->time = nowMicro();
+ timing_get( &request->time );
pthread_spin_lock( &requests.lock );
if ( requests.head == NULL ) {
requests.head = requests.tail = request;
@@ -916,23 +921,3 @@ static dnbd3_async_t* removeRequest(dnbd3_async_t *request)
return iterator;
}
-static uint64_t nowMilli()
-{
- struct timespec ts;
- if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) {
- printf( "clock_gettime() failed. Errno: %d\n", errno );
- return 0;
- }
- return ( ts.tv_sec * 1000ull ) + ( ts.tv_nsec / 1000000ull );
-}
-
-static uint64_t nowMicro()
-{
- struct timespec ts;
- if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) {
- printf( "clock_gettime() failed. Errno: %d\n", errno );
- return 0;
- }
- return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull );
-}
-
diff --git a/src/fuse/connection.h b/src/fuse/connection.h
index 929777a..c919d95 100644
--- a/src/fuse/connection.h
+++ b/src/fuse/connection.h
@@ -2,6 +2,7 @@
#define _CONNECTION_H_
#include "../shared/fdsignal.h"
+#include "../shared/timing.h"
#include <stddef.h>
#include <stdbool.h>
#include <stdint.h>
@@ -12,7 +13,7 @@ typedef struct _dnbd3_async {
struct _dnbd3_async *next; // Next in this linked list (provate field, not set by caller)
dnbd3_signal_t* signal; // Used to signal the caller
char* buffer; // Caller-provided buffer to be filled
- uint64_t time; // When request was put on wire, 0 if not measuring
+ ticks time; // When request was put on wire, 0 if not measuring
uint64_t offset;
uint32_t length;
bool finished; // Will be set to true if the request has been handled