summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c146
1 files changed, 67 insertions, 79 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index bf6f32e..cb4f956 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -38,6 +38,7 @@ static bool connectionShouldShutdown(dnbd3_uplink_t *uplink);
static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
static void *prefetchForClient(void *data);
typedef struct {
@@ -180,8 +181,7 @@ static void cancelAllRequests(dnbd3_uplink_t *uplink)
while ( it != NULL ) {
dnbd3_queue_client_t *cit = it->clients;
while ( cit != NULL ) {
- net_sendReply( cit->client, CMD_ERROR, cit->handle );
- cit->client->relayedCount--;
+ (*cit->callback)( cit->data, cit->handle, 0, 0, NULL );
dnbd3_queue_client_t *next = cit->next;
free( cit );
cit = next;
@@ -229,15 +229,13 @@ static void freeUplinkStruct(ref *ref)
* Remove given client from uplink request queue
* Locks on: uplink.queueLock
*/
-void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
+void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback callback)
{
- if ( client->relayedCount == 0 )
- return;
mutex_lock( &uplink->queueLock );
for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; ) {
- if ( (**cit).client == client ) {
- --client->relayedCount;
+ if ( (**cit).data == data && (**cit).callback == callback ) {
+ (*(**cit).callback)( (**cit).data, (**cit).handle, 0, 0, NULL );
dnbd3_queue_client_t *entry = *cit;
*cit = (**cit).next;
free( entry );
@@ -247,63 +245,78 @@ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client)
}
}
mutex_unlock( &uplink->queueLock );
- if ( unlikely( client->relayedCount != 0 ) ) {
- logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount );
- int i;
- for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) {
- usleep( 10000 );
- }
- if ( client->relayedCount != 0 ) {
- logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount );
- }
- }
}
-/**
- * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
- * If client is NULL, this is assumed to be a background replication request.
- * Locks on: uplink.queueLock, uplink.sendMutex
- */
-bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
- bool getUplink = ( uplink == NULL );
- assert( client != NULL || uplink != NULL );
- if ( hops++ > 200 ) { // This is just silly
+ assert( client != NULL && callback != NULL );
+ if ( hops > 200 ) { // This is just silly
logadd( LOG_WARNING, "Refusing to relay a request that has > 200 hops" );
return false;
}
- if ( length > (uint32_t)_maxPayload ) {
- logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length );
- return false;
- }
- if ( getUplink ) {
+ dnbd3_uplink_t *uplink = ref_get_uplink( &client->image->uplinkref );
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( client->image, -1, NULL, -1 );
uplink = ref_get_uplink( &client->image->uplinkref );
- if ( unlikely( uplink == NULL ) ) {
- uplink_init( client->image, -1, NULL, -1 );
- uplink = ref_get_uplink( &client->image->uplinkref );
- if ( uplink == NULL ) {
- logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
- return false;
- }
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
}
}
- if ( uplink->shutdown ) {
- logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
- goto fail_ref;
- }
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
+ bool ret;
if ( client != NULL && hops > 1
&& isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
- goto fail_ref;
+ ret = false;
+ } else {
+ ret = uplink_requestInternal( uplink, (void*)client, callback, handle, start, length, hops );
+ }
+ ref_put( &uplink->reference );
+ return ret;
+}
+
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length)
+{
+ dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
+ if ( unlikely( uplink == NULL ) ) {
+ uplink_init( image, -1, NULL, -1 );
+ uplink = ref_get_uplink( &image->uplinkref );
+ if ( uplink == NULL ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
+ return false;
+ }
+ }
+ bool ret = uplink_requestInternal( uplink, data, callback, handle, start, length, 0 );
+ ref_put( &uplink->reference );
+ return ret;
+}
+
+/**
+ * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
+ * If client is NULL, this is assumed to be a background replication request.
+ * Locks on: uplink.queueLock, uplink.sendMutex
+ */
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+{
+ assert( uplink != NULL );
+ assert( data == NULL || callback != NULL );
+ if ( uplink->shutdown ) {
+ logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
+ return false;
+ }
+ if ( length > (uint32_t)_maxPayload ) {
+ logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length );
+ return false;
}
struct {
uint64_t handle, start, end;
} req;
+ hops++;
do {
const uint64_t end = start + length;
dnbd3_queue_entry_t *request = NULL, *last = NULL;
@@ -350,14 +363,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han
#endif
request->hopCount = hops;
request->sent = true; // Optimistic; would be set to false on failure
- if ( client == NULL ) {
+ if ( callback == NULL ) {
// BGR
request->clients = NULL;
} else {
c = &request->clients;
}
isNew = true;
- } else if ( client == NULL ) {
+ } else if ( callback == NULL ) {
// Replication request that maches existing request. Do nothing
isNew = false;
} else {
@@ -381,14 +394,14 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han
req.handle = request->handle;
req.start = request->from;
req.end = request->to;
- if ( client != NULL ) {
+ if ( callback != NULL ) {
*c = malloc( sizeof( *request->clients ) );
(**c).next = NULL;
(**c).handle = handle;
(**c).from = start;
(**c).to = end;
- (**c).client = client;
- client->relayedCount++;
+ (**c).data = data;
+ (**c).callback = callback;
}
mutex_unlock( &uplink->queueLock );
@@ -426,7 +439,7 @@ bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t han
}
success_ref:
- if ( client != NULL ) {
+ if ( callback != NULL ) {
// Was from client -- potential prefetch
// Same size as this request, but consider end of image...
uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end,
@@ -442,16 +455,9 @@ success_ref:
threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" );
}
}
- if ( getUplink ) {
- ref_put( &uplink->reference );
- }
return true;
fail_lock:
mutex_unlock( &uplink->queueLock );
-fail_ref:
- if ( getUplink ) {
- ref_put( &uplink->reference );
- }
return false;
}
@@ -461,7 +467,7 @@ static void *prefetchForClient(void *data)
dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image );
if ( cache != NULL ) {
if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) {
- uplink_request( job->uplink, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
+ uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
}
ref_put( &cache->reference );
}
@@ -824,7 +830,7 @@ static bool sendReplicationRequest(dnbd3_uplink_t *uplink)
const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE;
const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE );
const uint64_t handle = ++uplink->queueId;
- if ( !uplink_request( uplink, NULL, handle, offset, size, 0 ) ) {
+ if ( !uplink_requestInternal( uplink, NULL, NULL, handle, offset, size, 0 ) ) {
logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)",
PIMG(uplink->image) );
ref_put( &cache->reference );
@@ -902,7 +908,7 @@ static int findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int startMa
*/
static void handleReceive(dnbd3_uplink_t *uplink)
{
- dnbd3_reply_t inReply, outReply;
+ dnbd3_reply_t inReply;
int ret;
assert_uplink_thread();
for (;;) {
@@ -967,7 +973,6 @@ static void handleReceive(dnbd3_uplink_t *uplink)
logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)",
inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) );
}
- struct iovec iov[2];
// 1) Write to cache file
if ( unlikely( uplink->cacheFd == -1 ) ) {
reopenCacheFd( uplink, false );
@@ -1035,28 +1040,11 @@ static void handleReceive(dnbd3_uplink_t *uplink)
PIMG(uplink->image) );
continue;
}
- outReply.magic = dnbd3_packet_magic;
dnbd3_queue_client_t *next;
for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) {
assert( c->from >= start && c->to <= end );
- dnbd3_client_t * const client = c->client;
- outReply.cmd = CMD_GET_BLOCK;
- outReply.handle = c->handle;
- outReply.size = (uint32_t)( c->to - c->from );
- iov[0].iov_base = &outReply;
- iov[0].iov_len = sizeof outReply;
- iov[1].iov_base = uplink->recvBuffer + (c->from - start);
- iov[1].iov_len = outReply.size;
- fixup_reply( outReply );
- mutex_lock( &client->sendMutex );
- if ( client->sock != -1 ) {
- ssize_t sent = writev( client->sock, iov, 2 );
- if ( sent > (ssize_t)sizeof outReply ) {
- client->bytesSent += (size_t)sent - sizeof outReply;
- }
- }
- mutex_unlock( &client->sendMutex );
- client->relayedCount--;
+ (*c->callback)( c->data, c->handle, c->from, (uint32_t)( c->to - c->from ),
+ (const char*)( uplink->recvBuffer + (c->from - start) ) );
next = c->next;
free( c );
}