summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2020-03-14 15:41:50 +0100
committerSimon Rettberg2020-03-14 15:41:50 +0100
commit023145f531c54bdfa9e329a5caf38a3061dc42c5 (patch)
tree0187d6552f341b02e4b463cfa6cb96666a2ee30e /src/server/uplink.c
parent[SERVER] Use image:rid in log messages (diff)
downloaddnbd3-023145f531c54bdfa9e329a5caf38a3061dc42c5.tar.gz
dnbd3-023145f531c54bdfa9e329a5caf38a3061dc42c5.tar.xz
dnbd3-023145f531c54bdfa9e329a5caf38a3061dc42c5.zip
[SERVER] Add comments, assert for uplink thread
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c26
1 files changed, 20 insertions, 6 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index efe7fa0..df2f082 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -46,6 +46,8 @@ typedef struct {
uint32_t length;
} prefetch_request_t;
+#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) )
+
// ############ uplink connection handling
void uplink_globalsInit()
@@ -692,6 +694,7 @@ cleanup: ;
*/
static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
+ assert_uplink_thread();
// Scan for new requests, or optionally, (re)send all
// Build a buffer, so if there aren't too many requests, we can send them after
// unlocking the queue again. Otherwise we need flushes during iteration, which
@@ -758,6 +761,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
*/
static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
{
+ assert_uplink_thread();
if ( uplink->current.fd == -1 )
return false; // Should never be called in this state, consider send error
if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 )
@@ -890,6 +894,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
{
dnbd3_reply_t inReply, outReply;
int ret;
+ assert_uplink_thread();
for (;;) {
ret = dnbd3_read_reply( uplink->current.fd, &inReply, false );
if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue;
@@ -1023,7 +1028,6 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
outReply.magic = dnbd3_packet_magic;
dnbd3_queue_client_t *next;
for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
- size_t bytesSent = 0;
assert( c->from >= start && c->to <= end );
dnbd3_client_t * const client = c->client;
outReply.cmd = CMD_GET_BLOCK;
@@ -1038,10 +1042,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
if ( client->sock != -1 ) {
ssize_t sent = writev( client->sock, iov, 2 );
if ( sent > (ssize_t)sizeof outReply ) {
- bytesSent = (size_t)sent - sizeof outReply;
- }
- if ( bytesSent != 0 ) {
- client->bytesSent += bytesSent;
+ client->bytesSent += (size_t)sent - sizeof outReply;
}
}
mutex_unlock( &client->sendMutex );
@@ -1080,6 +1081,7 @@ error_cleanup: ;
*/
static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
+ assert_uplink_thread();
if ( uplink->current.fd == -1 )
return;
setThreadName( "panic-uplink" );
@@ -1109,6 +1111,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink)
{
static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
+ assert_uplink_thread();
mutex_lock( &uplink->sendMutex );
bool sendOk = send( uplink->current.fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request);
mutex_unlock( &uplink->sendMutex );
@@ -1182,6 +1185,12 @@ static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
return uplink->cacheFd != -1;
}
+/**
+ * Returns true if the uplink has been idle for some time (apart from
+ * background replication, if it is set to hashblock, or if it has
+ * a minimum number of active clients configured that is not currently
+ * reached)
+ */
static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink)
{
return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT
@@ -1202,7 +1211,12 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len)
/**
* Get number of replication requests that should be sent right now to
* meet the configured bgrWindowSize. Returns 0 if any client requests
- * are pending
+ * are pending.
+ * This applies a sort of "slow start" in case the uplink was recently
+ * dealing with actual client requests, in that the uplink's idle time
+ * (in seconds) is an upper bound for the number returned, so we don't
+ * saturate the uplink with loads of requests right away, in case that
+ * client triggers more requests to the uplink server.
*/
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink)
{