summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2021-04-13 16:41:26 +0200
committerSimon Rettberg2021-04-14 13:17:59 +0200
commit0be00336f97a7ef3590ee803d4ad19e305aa1583 (patch)
treeb00b91b8c6d687ad4116405818e2cecb3e6cf44b
parent[CLIENT] Use SO_GETPEERCRED instead of braindead setuid crap (diff)
downloaddnbd3-0be00336f97a7ef3590ee803d4ad19e305aa1583.tar.gz
dnbd3-0be00336f97a7ef3590ee803d4ad19e305aa1583.tar.xz
dnbd3-0be00336f97a7ef3590ee803d4ad19e305aa1583.zip
[SERVER] Make prefetching synchronous
There is a race condition where we process the next request from the same client faster than the OS will schedule the async prefetch job, rendering it a NOOP in the best case (request ranges match) or fetching redundant data from the upstream server (prefetch range is larger than actual request by client). Make prefetching synchronous to prevent this race condition.
-rw-r--r--pkg/config/server.conf2
-rw-r--r--src/server/globals.c29
-rw-r--r--src/server/globals.h15
-rw-r--r--src/server/uplink.c350
4 files changed, 228 insertions, 168 deletions
diff --git a/pkg/config/server.conf b/pkg/config/server.conf
index 5f0b2a0..95a71db 100644
--- a/pkg/config/server.conf
+++ b/pkg/config/server.conf
@@ -48,6 +48,8 @@ maxClients=2000
maxImages=1000
maxPayload=9M
maxReplicationSize=150G
+; Maximum number of bytes to prefetch when relaying client request to upstream server
+maxPrefetch=256k
; Log related config
[logging]
diff --git a/src/server/globals.c b/src/server/globals.c
index f689705..8ef7aec 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -24,19 +24,20 @@ atomic_bool _lookupMissingForProxy = true;
atomic_bool _sparseFiles = false;
atomic_bool _ignoreAllocErrors = false;
atomic_bool _removeMissingImages = true;
-atomic_int _uplinkTimeout = SOCKET_TIMEOUT_UPLINK;
-atomic_int _clientTimeout = SOCKET_TIMEOUT_CLIENT;
+atomic_uint _uplinkTimeout = SOCKET_TIMEOUT_UPLINK;
+atomic_uint _clientTimeout = SOCKET_TIMEOUT_CLIENT;
atomic_bool _closeUnusedFd = false;
atomic_bool _vmdkLegacyMode = false;
// Not really needed anymore since we have '+' and '-' in alt-servers
atomic_bool _proxyPrivateOnly = false;
+atomic_bool _pretendClient = false;
atomic_int _autoFreeDiskSpaceDelay = 3600 * 10;
// [limits]
atomic_int _maxClients = SERVER_MAX_CLIENTS;
atomic_int _maxImages = SERVER_MAX_IMAGES;
-atomic_int _maxPayload = 9000000; // 9MB
+atomic_uint _maxPayload = 9000000; // 9MB
atomic_uint_fast64_t _maxReplicationSize = (uint64_t)100000000000LL;
-atomic_bool _pretendClient = false;
+atomic_uint _maxPrefetch = 262144; // 256KB
/**
* True when loading config the first time. Consecutive loads will
@@ -60,17 +61,17 @@ static const char* units = "KMGTPEZY";
static bool parse64(const char *in, atomic_int_fast64_t *out, const char *optname);
static bool parse64u(const char *in, atomic_uint_fast64_t *out, const char *optname);
-static bool parse32(const char *in, atomic_int *out, const char *optname) UNUSED;
-static bool parse32u(const char *in, atomic_int *out, const char *optname);
+static bool parse32(const char *in, atomic_int *out, const char *optname);
+static bool parse32u(const char *in, atomic_uint *out, const char *optname);
static int ini_handler(void *custom UNUSED, const char* section, const char* key, const char* value)
{
if ( initialLoad ) {
if ( _basePath == NULL ) SAVE_TO_VAR_STR( dnbd3, basePath );
SAVE_TO_VAR_BOOL( dnbd3, vmdkLegacyMode );
- SAVE_TO_VAR_UINT( dnbd3, listenPort );
- SAVE_TO_VAR_UINT( limits, maxClients );
- SAVE_TO_VAR_UINT( limits, maxImages );
+ SAVE_TO_VAR_INT( dnbd3, listenPort );
+ SAVE_TO_VAR_INT( limits, maxClients );
+ SAVE_TO_VAR_INT( limits, maxImages );
}
SAVE_TO_VAR_BOOL( dnbd3, isProxy );
SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly );
@@ -81,12 +82,13 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key
SAVE_TO_VAR_BOOL( dnbd3, ignoreAllocErrors );
SAVE_TO_VAR_BOOL( dnbd3, removeMissingImages );
SAVE_TO_VAR_BOOL( dnbd3, closeUnusedFd );
- SAVE_TO_VAR_UINT( dnbd3, serverPenalty );
- SAVE_TO_VAR_UINT( dnbd3, clientPenalty );
+ SAVE_TO_VAR_INT( dnbd3, serverPenalty );
+ SAVE_TO_VAR_INT( dnbd3, clientPenalty );
SAVE_TO_VAR_UINT( dnbd3, uplinkTimeout );
SAVE_TO_VAR_UINT( dnbd3, clientTimeout );
SAVE_TO_VAR_UINT( limits, maxPayload );
SAVE_TO_VAR_UINT64( limits, maxReplicationSize );
+ SAVE_TO_VAR_UINT( limits, maxPrefetch );
SAVE_TO_VAR_BOOL( dnbd3, pretendClient );
SAVE_TO_VAR_INT( dnbd3, autoFreeDiskSpaceDelay );
if ( strcmp( section, "dnbd3" ) == 0 && strcmp( key, "backgroundReplication" ) == 0 ) {
@@ -295,7 +297,7 @@ static bool parse32(const char *in, atomic_int *out, const char *optname)
return true;
}
-static bool parse32u(const char *in, atomic_int *out, const char *optname)
+static bool parse32u(const char *in, atomic_uint *out, const char *optname)
{
atomic_int_fast64_t v;
if ( !parse64( in, &v, optname ) ) return false;
@@ -303,7 +305,7 @@ static bool parse32u(const char *in, atomic_int *out, const char *optname)
logadd( LOG_WARNING, "'%s' must be between %d and %d, but is '%s'", optname, (int)0, (int)INT_MAX, in );
return false;
}
- *out = (int)v;
+ *out = (unsigned int)v;
return true;
}
@@ -351,6 +353,7 @@ size_t globals_dumpConfig(char *buffer, size_t size)
PINT(maxImages);
PINT(maxPayload);
PUINT64(maxReplicationSize);
+ PINT(maxPrefetch);
return size - rem;
}
diff --git a/src/server/globals.h b/src/server/globals.h
index 12805ed..b255668 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -229,12 +229,12 @@ extern atomic_bool _removeMissingImages;
/**
* Read timeout when waiting for or sending data on an uplink
*/
-extern atomic_int _uplinkTimeout;
+extern atomic_uint _uplinkTimeout;
/**
* Read timeout when waiting for or sending data from/to client
*/
-extern atomic_int _clientTimeout;
+extern atomic_uint _clientTimeout;
/**
* If true, images with no active client will have their fd closed after some
@@ -309,7 +309,7 @@ extern atomic_int _maxImages;
* Usually this isn't even a megabyte for "real" clients (blockdev
* or fuse).
*/
-extern atomic_int _maxPayload;
+extern atomic_uint _maxPayload;
/**
* If in proxy mode, don't replicate images that are
@@ -332,6 +332,15 @@ extern atomic_bool _pretendClient;
extern atomic_int _autoFreeDiskSpaceDelay;
/**
+ * When handling a client request, this sets the maximum amount
+ * of bytes we prefetch offset right at the end of the client request.
+ * The prefetch size will be MIN( length * 3, _maxPrefetch ), if
+ * length <= _maxPrefetch, so effectively, setting this to 0 disables
+ * any prefetching.
+ */
+extern atomic_uint _maxPrefetch;
+
+/**
* Load the server configuration.
*/
void globals_loadConfig();
diff --git a/src/server/uplink.c b/src/server/uplink.c
index e4d9d0e..dedaa17 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -24,6 +24,10 @@
static atomic_uint_fast64_t totalBytesReceived = 0;
+typedef struct {
+ uint64_t start, end, handle;
+} req_t;
+
static void cancelAllRequests(dnbd3_uplink_t *uplink);
static void freeUplinkStruct(ref *ref);
static void* uplink_mainloop(void *data);
@@ -39,13 +43,6 @@ static void connectionFailed(dnbd3_uplink_t *uplink, bool findNew);
static int numWantedReplicationRequests(dnbd3_uplink_t *uplink);
static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle);
static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops);
-static void *prefetchForClient(void *data);
-
-typedef struct {
- dnbd3_uplink_t *uplink;
- uint64_t start;
- uint32_t length;
-} prefetch_job_t;
#define assert_uplink_thread() assert( pthread_equal( uplink->thread, pthread_self() ) )
@@ -247,6 +244,10 @@ void uplink_removeEntry(dnbd3_uplink_t *uplink, void *data, uplink_callback call
mutex_unlock( &uplink->queueLock );
}
+/**
+ * Called from a client (proxy) connection to request a missing part of the image.
+ * The caller has made sure that the range is actually missing.
+ */
bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
assert( client != NULL && callback != NULL );
@@ -266,8 +267,7 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
// This might be a false positive if there are multiple instances running on the same host (IP)
bool ret;
- if ( client != NULL && hops > 1
- && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
+ if ( hops > 1 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) {
uplink->cycleDetected = true;
signal_call( uplink->signal );
logadd( LOG_WARNING, "Proxy cycle detected (same host)." );
@@ -279,7 +279,11 @@ bool uplink_requestClient(dnbd3_client_t *client, uplink_callback callback, uint
return ret;
}
-bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length)
+/**
+ * Called by integrated fuse module
+ */
+bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length)
{
dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref );
if ( unlikely( uplink == NULL ) ) {
@@ -295,12 +299,42 @@ bool uplink_request(dnbd3_image_t *image, void *data, uplink_callback callback,
return ret;
}
+static void extendRequest(uint64_t start, uint64_t *end, const dnbd3_image_t *image, uint32_t wanted)
+{
+ uint32_t length = (uint32_t)( *end - start );
+ if ( length >= wanted )
+ return;
+ length = wanted;
+ if ( unlikely( _backgroundReplication == BGR_HASHBLOCK
+ && *end / HASH_BLOCK_SIZE != (start + length) / HASH_BLOCK_SIZE ) ) {
+ // Don't extend across hash-block border in this mode
+ *end = ( start + length ) & ~( HASH_BLOCK_SIZE - 1 );
+ } else {
+ *end = start + length;
+ }
+ if ( unlikely( *end > image->virtualFilesize ) ) {
+ *end = image->virtualFilesize;
+ }
+ *end = ( *end + DNBD3_BLOCK_SIZE - 1 ) & ~( DNBD3_BLOCK_SIZE - 1 );
+ //logadd( LOG_DEBUG2, "Extended %"PRIx64" from %"PRIx64" to %"PRIx64, start, end, req.end );
+}
+
+static bool requestBlock(dnbd3_uplink_t *uplink, req_t *req, uint8_t hops)
+{
+ if ( uplink->current.fd == -1 )
+ return false;
+ return dnbd3_get_block( uplink->current.fd, req->start,
+ (uint32_t)( req->end - req->start ), req->handle,
+ COND_HOPCOUNT( uplink->current.version, hops ) );
+}
+
/**
* Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL.
- * If client is NULL, this is assumed to be a background replication request.
+ * If callback is NULL, this is assumed to be a background replication request.
* Locks on: uplink.queueLock, uplink.sendMutex
*/
-static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
+static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_callback callback,
+ uint64_t handle, uint64_t start, uint32_t length, uint8_t hops)
{
assert( uplink != NULL );
assert( data == NULL || callback != NULL );
@@ -309,174 +343,186 @@ static bool uplink_requestInternal(dnbd3_uplink_t *uplink, void *data, uplink_ca
return false;
}
if ( length > (uint32_t)_maxPayload ) {
- logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload", length );
+ logadd( LOG_WARNING, "UPLINK: Cannot relay request; length of %" PRIu32 " exceeds maximum payload",
+ length );
return false;
}
- struct {
- uint64_t handle, start, end;
- } req;
+ req_t req, preReq;
hops++;
- do {
- const uint64_t end = start + length;
- dnbd3_queue_entry_t *request = NULL, *last = NULL;
- bool isNew;
- mutex_lock( &uplink->queueLock );
- if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ dnbd3_queue_entry_t *request = NULL, *last = NULL, *pre = NULL;
+ bool isNew;
+ const uint64_t end = start + length;
+ req.start = start & ~(DNBD3_BLOCK_SIZE - 1);
+ req.end = end;
+ /* Don't do this for now -- this breaks matching of prefetch jobs, since they'd
+ * be misaligned, and the next client request wouldn't match anything.
+ * To improve this, we need to be able to attach a queue_client to multiple queue_entries
+ * and then serve it once all the queue_entries are done (atomic_int in queue_client).
+ * But currently we directly send the receive buffer's content to the queue_client after
+ * receiving the payload, as this will also work when the local cache is borked (we just
+ * tunnel though the traffic). One could argue that this mode of operation is nonsense,
+ * and we should just drop all affected clients. Then as a next step, don't serve the
+ * clients form the receive buffer, but just issue a normal sendfile() call after writing
+ * the received data to the local cache.
+ if ( callback != NULL ) {
+ // Not background replication request, extend request size
+ extendRequest( req.start, &req.end, uplink->image, _minRequestSize );
+ }
+ */
+ req.end = (req.end + DNBD3_BLOCK_SIZE - 1) & ~(DNBD3_BLOCK_SIZE - 1);
+ // Critical section - work with the queue
+ mutex_lock( &uplink->queueLock );
+ if ( uplink->shutdown ) { // Check again after locking to prevent lost requests
+ goto fail_lock;
+ }
+ for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
+ if ( it->from <= start && it->to >= end ) {
+ // Matching range, attach
+ request = it;
+ break;
+ }
+ if ( it->next == NULL ) {
+ // Not matching, last in list, remember
+ last = it;
+ break;
+ }
+ }
+ dnbd3_queue_client_t **c = NULL;
+ if ( request == NULL ) {
+ // No existing request to attach to
+ if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
+ logadd( LOG_WARNING,
+ "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
goto fail_lock;
}
- for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) {
- if ( it->from <= start && it->to >= end ) {
- // Matching range, attach
- request = it;
- break;
- }
- if ( it->next == NULL ) {
- // Not matching, last in list, remember
- last = it;
- break;
- }
+ uplink->queueLen++;
+ if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ uplink->image->problem.queue = true;
}
- dnbd3_queue_client_t **c = NULL;
- if ( request == NULL ) {
- // No existing request to attach to
- if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) {
- logadd( LOG_WARNING, "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." );
- goto fail_lock;
- }
- uplink->queueLen++;
- if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
- uplink->image->problem.queue = true;
- }
- request = malloc( sizeof(*request) );
- if ( last == NULL ) {
- uplink->queue = request;
- } else {
- last->next = request;
- }
- request->next = NULL;
- request->handle = ++uplink->queueId;
- request->from = start & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
- request->to = (end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1);
+ request = malloc( sizeof(*request) );
+ if ( last == NULL ) {
+ uplink->queue = request;
+ } else {
+ last->next = request;
+ }
+ request->next = NULL;
+ request->handle = ++uplink->queueId;
+ request->from = req.start;
+ request->to = req.end;
#ifdef DEBUG
- timing_get( &request->entered );
+ timing_get( &request->entered );
#endif
- request->hopCount = hops;
- request->sent = true; // Optimistic; would be set to false on failure
- if ( callback == NULL ) {
- // BGR
- request->clients = NULL;
- } else {
- c = &request->clients;
- }
- isNew = true;
- } else if ( callback == NULL ) {
- // Replication request that maches existing request. Do nothing
- isNew = false;
+ request->hopCount = hops;
+ request->sent = true; // Optimistic; would be set to false on failure
+ if ( callback == NULL ) {
+ // BGR
+ request->clients = NULL;
} else {
- // Existing request. Check if potential cycle
- if ( hops > request->hopCount && request->from == start && request->to == end ) {
- logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
- goto fail_lock;
- }
- // Count number if clients, get tail of list
- int count = 0;
c = &request->clients;
- while ( *c != NULL ) {
- c = &(**c).next;
- if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
- logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
- goto fail_lock;
- }
- }
- isNew = false;
}
- req.handle = request->handle;
- req.start = request->from;
- req.end = request->to;
- if ( callback != NULL ) {
- assert( c != NULL );
- *c = malloc( sizeof( *request->clients ) );
- (**c).next = NULL;
- (**c).handle = handle;
- (**c).from = start;
- (**c).to = end;
- (**c).data = data;
- (**c).callback = callback;
+ isNew = true;
+ } else if ( callback == NULL ) {
+ // Replication request that maches existing request. Do nothing
+ isNew = false;
+ } else {
+ // Existing request. Check if potential cycle
+ if ( hops > request->hopCount && request->from == start && request->to == end ) {
+ logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) );
+ goto fail_lock;
}
- mutex_unlock( &uplink->queueLock );
-
- if ( !isNew ) {
- goto success_ref; // Attached to pending request, do nothing
+ // Count number if clients, get tail of list
+ int count = 0;
+ c = &request->clients;
+ while ( *c != NULL ) {
+ c = &(**c).next;
+ if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) {
+ logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count );
+ goto fail_lock;
+ }
}
- } while (0);
+ isNew = false;
+ }
+ // Prefetch immediately, without unlocking the list - the old approach of
+ // async prefetching in another thread was sometimes so slow that we'd process
+ // another request from the same client before the prefetch job would execute.
+ if ( callback != NULL && ( isNew || request->clients == NULL || request->clients->data == data )
+ && end == request->to && length <= _maxPrefetch ) {
+ // Only if this is a client request, and the !! end boundary matches exactly !!
+ // (See above for reason why)
+ // - We neither check the local cache, nor other pending requests. Worth it?
+ // Complexity vs. probability
+ preReq.start = end;
+ preReq.end = end;
+ extendRequest( preReq.start, &preReq.end, uplink->image, MIN( length * 3, _maxPrefetch ) );
+ if ( preReq.start < preReq.end ) {
+ //logadd( LOG_DEBUG2, "Prefetching @ %"PRIx64" - %"PRIx64, preReq.start, preReq.end );
+ uplink->queueLen++;
+ pre = malloc( sizeof(*pre) );
+ pre->next = request->next;
+ request->next = pre;
+ pre->handle = preReq.handle = ++uplink->queueId;
+ pre->from = preReq.start;
+ pre->to = preReq.end;
+ pre->hopCount = hops;
+ pre->sent = true; // Optimistic; would be set to false on failure
+ pre->clients = NULL;
+#ifdef DEBUG
+ timing_get( &pre->entered );
+#endif
+ }
+ }
+ // // // //
+ // Copy data - need this after unlocking
+ req.handle = request->handle;
+ if ( callback != NULL ) {
+ assert( c != NULL );
+ *c = malloc( sizeof( *request->clients ) );
+ (**c).next = NULL;
+ (**c).handle = handle;
+ (**c).from = start;
+ (**c).to = end;
+ (**c).data = data;
+ (**c).callback = callback;
+ }
+ mutex_unlock( &uplink->queueLock );
+ // End queue critical section
+ if ( pre == NULL && !isNew )
+ return true; // Nothing to do
- // Fire away the request
+ // Fire away the request(s)
mutex_lock( &uplink->sendMutex );
- if ( unlikely( uplink->current.fd == -1 ) ) {
+ bool ret1 = true;
+ bool ret2 = true;
+ if ( isNew ) {
+ ret1 = requestBlock( uplink, &req, hops );
+ }
+ if ( pre != NULL ) {
+ ret2 = requestBlock( uplink, &preReq, hops );
+ }
+ if ( !ret1 || !ret2 ) { // Set with send locked
uplink->image->problem.uplink = true;
+ }
+ mutex_unlock( &uplink->sendMutex );
+ // markRequestUnsend locks the queue, would violate locking order with send mutex
+ if ( !ret1 ) {
markRequestUnsent( uplink, req.handle );
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" );
- } else {
- const bool ret = dnbd3_get_block( uplink->current.fd, req.start,
- (uint32_t)( req.end - req.start ), req.handle,
- COND_HOPCOUNT( uplink->current.version, hops ) );
- if ( unlikely( !ret ) ) {
- markRequestUnsent( uplink, req.handle );
- uplink->image->problem.uplink = true;
- mutex_unlock( &uplink->sendMutex );
- logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
- } else {
- // OK
- mutex_unlock( &uplink->sendMutex );
- goto success_ref;
- }
- // Fall through to waking up sender thread
+ logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle );
}
-
- if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
+ if ( !ret2 ) {
+ markRequestUnsent( uplink, preReq.handle );
}
-success_ref:
- if ( callback != NULL ) {
- // Was from client -- potential prefetch
- // Same size as this request, but consider end of image...
- uint32_t len = (uint32_t)MIN( uplink->image->virtualFilesize - req.end,
- req.end - req.start );
- // Also don't prefetch if we cross a hash block border and BGR mode == hashblock
- if ( len > 0 && ( _backgroundReplication != BGR_HASHBLOCK
- || req.start % HASH_BLOCK_SIZE == (req.end-1) % HASH_BLOCK_SIZE ) ) {
- prefetch_job_t *job = malloc( sizeof( *job ) );
- job->start = req.end;
- job->length = len;
- job->uplink = uplink;
- ref_inc( &uplink->reference ); // Hold one for the thread, thread will return it
- threadpool_run( &prefetchForClient, (void*)job, "PREFETCH" );
- }
+ if ( ( !ret1 || !ret2 ) && signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
}
return true;
+
fail_lock:
mutex_unlock( &uplink->queueLock );
return false;
}
-static void *prefetchForClient(void *data)
-{
- prefetch_job_t *job = (prefetch_job_t*)data;
- dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image );
- if ( cache != NULL ) {
- if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) {
- uplink_requestInternal( job->uplink, NULL, NULL, ++job->uplink->queueId, job->start, job->length, 0 );
- }
- ref_put( &cache->reference );
- }
- ref_put( &job->uplink->reference ); // Acquired in uplink_request
- free( job );
- return NULL;
-}
-
/**
* Uplink thread.
* Locks are irrelevant as this is never called from another function