summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2020-03-14 15:51:52 +0100
committerSimon Rettberg2020-03-14 15:51:52 +0100
commit3680e4819cbd7edbe632372e69533d254f1ae2c2 (patch)
tree0729b187bba5b9f0bace073bfb8f4a8af8dd99a5 /src/server/uplink.c
parent[SERVER] Add comments, assert for uplink thread (diff)
downloaddnbd3-3680e4819cbd7edbe632372e69533d254f1ae2c2.tar.gz
dnbd3-3680e4819cbd7edbe632372e69533d254f1ae2c2.tar.xz
dnbd3-3680e4819cbd7edbe632372e69533d254f1ae2c2.zip
[SERVER] Remove uplink_ prefix from static (private) functions
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c80
1 files changed, 40 insertions, 40 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index df2f082..d6b319b 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -25,17 +25,17 @@
static atomic_uint_fast64_t totalBytesReceived = 0;
static void cancelAllRequests(dnbd3_uplink_t *uplink);
-static void uplink_free(ref *ref);
+static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
-static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly);
-static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
-static void uplink_handleReceive(dnbd3_uplink_t *uplink);
-static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink);
-static void uplink_addCrc32(dnbd3_uplink_t *uplink);
-static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink);
-static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
-static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink);
-static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
+static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly);
+static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int lastBlockIndex);
+static void handleReceive(dnbd3_uplink_t *uplink);
+static bool sendKeepalive(dnbd3_uplink_t *uplink);
+static void requestCrc32List(dnbd3_uplink_t *uplink);
+static bool sendReplicationRequest(dnbd3_uplink_t *uplink);
+static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force);
+static bool connectionShouldShutdown(dnbd3_uplink_t *uplink);
+static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
static void *prefetchForClient(void *data);
@@ -86,7 +86,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
}
uplink = calloc( 1, sizeof(dnbd3_uplink_t) );
// Start with one reference for the uplink thread. We'll return it when the thread finishes
- ref_init( &uplink->reference, uplink_free, 1 );
+ ref_init( &uplink->reference, freeUplinkStruct, 1 );
mutex_init( &uplink->queueLock, LOCK_UPLINK_QUEUE );
mutex_init( &uplink->rttLock, LOCK_UPLINK_RTT );
mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND );
@@ -132,7 +132,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version
return true;
failure: ;
if ( uplink != NULL ) {
- image->users++; // Expected by uplink_free()
+ image->users++; // Expected by freeUplinkStruct()
ref_put( &uplink->reference ); // The ref for the uplink thread that never was
}
mutex_unlock( &image->lock );
@@ -195,7 +195,7 @@ static void cancelAllRequests(dnbd3_uplink_t *uplink)
uplink->image->problem.queue = false;
}
-static void uplink_free(ref *ref)
+static void freeUplinkStruct(ref *ref)
{
dnbd3_uplink_t *uplink = container_of(ref, dnbd3_uplink_t, reference);
logadd( LOG_DEBUG1, "Freeing uplink for '%s:%d'", PIMG(uplink->image) );
@@ -489,7 +489,7 @@ static void* uplink_mainloop(void *data)
thread_detach( uplink->thread );
blockNoncriticalSignals();
// Make sure file is open for writing
- if ( !uplink_reopenCacheFd( uplink, false ) ) {
+ if ( !reopenCacheFd( uplink, false ) ) {
// It might have failed - still offer proxy mode, we just can't cache
logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", uplink->image->path, errno );
}
@@ -545,16 +545,16 @@ static void* uplink_mainloop(void *data)
}
// If we don't have a crc32 list yet, see if the new server has one
if ( uplink->image->crc32 == NULL ) {
- uplink_addCrc32( uplink );
+ requestCrc32List( uplink );
}
// Re-send all pending requests
- uplink_sendRequests( uplink, false );
- uplink_sendReplicationRequest( uplink );
+ sendQueuedRequests( uplink, false );
+ sendReplicationRequest( uplink );
events[EV_SOCKET].events = POLLIN | POLLRDHUP;
if ( uplink->image->problem.uplink ) {
// Some of the requests above must have failed again already :-(
logadd( LOG_DEBUG1, "Newly established uplink connection failed during getCRC or sendRequests" );
- uplink_connectionFailed( uplink, true );
+ connectionFailed( uplink, true );
}
timing_gets( &nextAltCheck, altCheckInterval );
// The rtt worker already did the handshake for our image, so there's nothing
@@ -573,18 +573,18 @@ static void* uplink_mainloop(void *data)
}
if ( uplink->current.fd != -1 ) {
// Uplink seems fine, relay requests to it...
- uplink_sendRequests( uplink, true );
+ sendQueuedRequests( uplink, true );
} else if ( uplink->queueLen != 0 ) { // No uplink; maybe it was shutdown since it was idle for too long
uplink->idleTime = 0;
}
}
// Uplink socket
if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
- uplink_connectionFailed( uplink, true );
+ connectionFailed( uplink, true );
logadd( LOG_DEBUG1, "Uplink gone away, panic! (revents=%d)\n", (int)events[EV_SOCKET].revents );
setThreadName( "panic-uplink" );
} else if ( (events[EV_SOCKET].revents & POLLIN) ) {
- uplink_handleReceive( uplink );
+ handleReceive( uplink );
if ( _shutdown || uplink->shutdown ) goto cleanup;
}
declare_now;
@@ -595,13 +595,13 @@ static void* uplink_mainloop(void *data)
// Keep-alive
if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) {
// Send keep-alive if nothing is happening, and try to trigger background rep.
- if ( !uplink_sendKeepalive( uplink ) || !uplink_sendReplicationRequest( uplink ) ) {
- uplink_connectionFailed( uplink, true );
+ if ( !sendKeepalive( uplink ) || !sendReplicationRequest( uplink ) ) {
+ connectionFailed( uplink, true );
logadd( LOG_DEBUG1, "Error sending keep-alive/BGR, panic!\n" );
}
}
// Don't keep uplink established if we're idle for too much
- if ( uplink_connectionShouldShutdown( uplink ) ) {
+ if ( connectionShouldShutdown( uplink ) ) {
logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", PIMG(uplink->image) );
goto cleanup;
}
@@ -656,7 +656,7 @@ static void* uplink_mainloop(void *data)
}
mutex_unlock( &uplink->queueLock );
if ( resend ) {
- uplink_sendRequests( uplink, true );
+ sendQueuedRequests( uplink, true );
}
}
#endif
@@ -692,7 +692,7 @@ cleanup: ;
/**
* Only called from uplink thread.
*/
-static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
+static void sendQueuedRequests(dnbd3_uplink_t *uplink, bool newOnly)
{
assert_uplink_thread();
// Scan for new requests, or optionally, (re)send all
@@ -759,7 +759,7 @@ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly)
*
* @return false if sending request failed, true otherwise (i.e. not necessary/disabled)
*/
-static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
+static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
{
assert_uplink_thread();
if ( uplink->current.fd == -1 )
@@ -804,7 +804,7 @@ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
}
if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) {
// Nothing left in current block, find next one
- replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte );
+ replicationIndex = findNextIncompleteHashBlock( uplink, endByte );
}
if ( replicationIndex == -1 ) {
// Replication might be complete, uplink_mainloop should take care....
@@ -827,7 +827,7 @@ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
if ( _backgroundReplication == BGR_HASHBLOCK
&& uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) {
// Just crossed a hash block boundary, look for new candidate starting at this very index
- uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
+ uplink->nextReplicationIndex = findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex );
if ( uplink->nextReplicationIndex == -1 )
break;
}
@@ -841,7 +841,7 @@ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink)
* of a hash block which is neither completely empty nor completely
* replicated yet. Returns -1 if no match.
*/
-static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex)
+static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMapIndex)
{
int retval = -1;
dnbd3_cache_map_t *cache = ref_get_cachemap( uplink->image );
@@ -890,7 +890,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int
* Locks on: uplink.lock, images[].lock
* Only called from uplink thread, so current.fd is assumed to be valid.
*/
-static void uplink_handleReceive(dnbd3_uplink_t *uplink)
+static void handleReceive(dnbd3_uplink_t *uplink)
{
dnbd3_reply_t inReply, outReply;
int ret;
@@ -960,7 +960,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
struct iovec iov[2];
// 1) Write to cache file
if ( unlikely( uplink->cacheFd == -1 ) ) {
- uplink_reopenCacheFd( uplink, false );
+ reopenCacheFd( uplink, false );
}
if ( likely( uplink->cacheFd != -1 ) ) {
int err = 0;
@@ -980,7 +980,7 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
}
if ( err == EBADF || err == EINVAL || err == EIO ) {
uplink->image->problem.write = true;
- if ( !tryAgain || !uplink_reopenCacheFd( uplink, true ) )
+ if ( !tryAgain || !reopenCacheFd( uplink, true ) )
break;
tryAgain = false;
continue; // Write handle to image successfully re-opened, try again
@@ -1066,20 +1066,20 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink)
free( entry );
} // main receive loop
// Trigger background replication if applicable
- if ( !uplink_sendReplicationRequest( uplink ) ) {
+ if ( !sendReplicationRequest( uplink ) ) {
goto error_cleanup;
}
// Normal end
return;
// Error handling from failed receive or message parsing
error_cleanup: ;
- uplink_connectionFailed( uplink, true );
+ connectionFailed( uplink, true );
}
/**
* Only call from uplink thread
*/
-static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
+static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
{
assert_uplink_thread();
if ( uplink->current.fd == -1 )
@@ -1108,7 +1108,7 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew)
* Send keep alive request to server.
* Called from uplink thread, current.fd must be valid.
*/
-static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink)
+static bool sendKeepalive(dnbd3_uplink_t *uplink)
{
static const dnbd3_request_t request = { .magic = dnbd3_packet_magic, .cmd = net_order_16( CMD_KEEPALIVE ) };
assert_uplink_thread();
@@ -1124,7 +1124,7 @@ static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink)
* FIXME This is broken as it could happen that another message arrives after sending
* the request. Refactor, split and move receive into general receive handler.
*/
-static void uplink_addCrc32(dnbd3_uplink_t *uplink)
+static void requestCrc32List(dnbd3_uplink_t *uplink)
{
dnbd3_image_t *image = uplink->image;
if ( image == NULL || image->virtualFilesize == 0 ) return;
@@ -1174,7 +1174,7 @@ static void uplink_addCrc32(dnbd3_uplink_t *uplink)
* it will be closed first. Otherwise, nothing will happen and true will be returned
* immediately.
*/
-static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
+static bool reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
{
if ( uplink->cacheFd != -1 ) {
if ( !force ) return true;
@@ -1191,7 +1191,7 @@ static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force)
* a minimum number of active clients configured that is not currently
* reached)
*/
-static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink)
+static bool connectionShouldShutdown(dnbd3_uplink_t *uplink)
{
return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT
&& ( _backgroundReplication != BGR_FULL || _bgrMinClients > uplink->image->users ) );