diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/server/uplink.c | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c index efe7fa0..df2f082 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -46,6 +46,8 @@ typedef struct { uint32_t length; } prefetch_request_t; +#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) ) + // ############ uplink connection handling void uplink_globalsInit() @@ -692,6 +694,7 @@ cleanup: ; */ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) { + assert_uplink_thread(); // Scan for new requests, or optionally, (re)send all // Build a buffer, so if there aren't too many requests, we can send them after // unlocking the queue again. Otherwise we need flushes during iteration, which @@ -758,6 +761,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) */ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) { + assert_uplink_thread(); if ( uplink->current.fd == -1 ) return false; // Should never be called in this state, consider send error if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) @@ -890,6 +894,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) { dnbd3_reply_t inReply, outReply; int ret; + assert_uplink_thread(); for (;;) { ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; @@ -1023,7 +1028,6 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) outReply.magic = dnbd3_packet_magic; dnbd3_queue_client_t *next; for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { - size_t bytesSent = 0; assert( c->from >= start && c->to <= end ); dnbd3_client_t * const client = c->client; outReply.cmd = CMD_GET_BLOCK; @@ -1038,10 +1042,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) if ( client->sock != -1 ) { ssize_t sent = writev( client->sock, iov, 2 ); if ( sent > (ssize_t)sizeof outReply ) { - bytesSent = (size_t)sent - sizeof outReply; - } - if ( bytesSent != 0 ) { - client->bytesSent += bytesSent; + client->bytesSent += (size_t)sent - sizeof outReply; } } mutex_unlock( &client->sendMutex ); @@ -1080,6 +1081,7 @@ error_cleanup: ; */ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) { + assert_uplink_thread(); if ( uplink->current.fd == -1 ) return; setThreadName( "panic-uplink" ); @@ -1109,6 +1111,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink) { static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) }; + assert_uplink_thread(); mutex_lock( &uplink->sendMutex ); bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); mutex_unlock( &uplink->sendMutex ); @@ -1182,6 +1185,12 @@ static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force) return uplink->cacheFd != -1; } +/** + * Returns true if the uplink has been idle for some time (apart from + * background replication, if it is set to hashblock, or if it has + * a minimum number of active clients configured that is not currently + * reached) + */ static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink) { return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT @@ -1202,7 +1211,12 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) /** * Get number of replication requests that should be sent right now to * meet the configured bgrWindowSize. Returns 0 if any client requests - * are pending + * are pending. + * This applies a sort of "slow start" in case the uplink was recently + * dealing with actual client requests, in that the uplink's idle time + * (in seconds) is an upper bound for the number returned, so we don't + * saturate the uplink with loads of requests right away, in case that + * client triggers more requests to the uplink server. */ static int numWantedReplicationRequests(dnbd3_uplink_t *uplink) { |