From 88695877f085af475a6ca8a01c2fbb08eb5b15da Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 29 Aug 2019 14:49:18 +0200 Subject: [SERVER] Use weakref for cache maps Gets rid of a bunch of locking, especially the hot path in net.c where clients are requesting data. Many clients unsing the same incomplete image previously created a bottleneck here. --- src/server/image.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/server/image.h') diff --git a/src/server/image.h b/src/server/image.h index 4668eff..cd87f03 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -9,7 +9,7 @@ void image_serverStartup(); bool image_isComplete(dnbd3_image_t *image); -bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); +bool image_isHashBlockComplete(atomic_uint_least8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set); -- cgit v1.2.3-55-g7522 From 701e5a967fd6bc97644f39e6fea3714f49a90291 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 6 Sep 2019 17:32:58 +0200 Subject: [SERVER] rpc: Add cachemap feature --- src/server/globals.h | 2 +- src/server/image.c | 16 ++++++++++++++++ src/server/image.h | 2 ++ src/server/rpc.c | 44 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) (limited to 'src/server/image.h') diff --git a/src/server/globals.h b/src/server/globals.h index 58b2c9d..df8c595 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -110,7 +110,7 @@ typedef struct typedef struct { ref reference; - atomic_uint_least8_t map[]; + _Atomic uint8_t map[]; } dnbd3_cache_map_t; /** diff --git a/src/server/image.c b/src/server/image.c index 9fcb866..5fa06d8 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -274,6 +274,22 @@ bool image_ensureOpen(dnbd3_image_t *image) return image->readFd != -1; } +dnbd3_image_t* image_byId(int imgId) +{ + int i; + mutex_lock( &imageListLock ); + for (i = 0; i < _num_images; ++i) { + dnbd3_image_t * const image = _images[i]; + if ( image != NULL && image->id == imgId ) { + image->users++; + mutex_unlock( &imageListLock ); + return image; + } + } + mutex_unlock( &imageListLock ); + return NULL; +} + /** * Get an image by name+rid. This function increases a reference counter, * so you HAVE TO CALL image_release for every image_get() call at some diff --git a/src/server/image.h b/src/server/image.h index cd87f03..449e31f 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -17,6 +17,8 @@ void image_markComplete(dnbd3_image_t *image); bool image_ensureOpen(dnbd3_image_t *image); +dnbd3_image_t* image_byId(int imgId); + dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking); bool image_reopenCacheFd(dnbd3_image_t *image, const bool force); diff --git a/src/server/rpc.c b/src/server/rpc.c index 662263e..548c80f 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -9,6 +9,7 @@ #include "fileutil.h" #include "picohttpparser/picohttpparser.h" #include "urldecode.h" +#include "reference.h" #include #include @@ -43,7 +44,9 @@ _Static_assert( sizeof("test") == 5 && sizeof("test2") == 6, "Stringsize messup DEFSTR(STR_CONNECTION, "connection") DEFSTR(STR_CLOSE, "close") DEFSTR(STR_QUERY, "/query") +DEFSTR(STR_CACHEMAP, "/cachemap") DEFSTR(STR_Q, "q") +DEFSTR(STR_ID, "id") static inline bool equals(struct string *s1,struct string *s2) { @@ -81,6 +84,7 @@ static struct { } status; static bool handleStatus(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive); +static bool handleCacheMap(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive); static bool sendReply(int sock, const char *status, const char *ctype, const char *payload, ssize_t plen, int keepAlive); static void parsePath(struct string *path, struct string *file, struct field *getv, size_t *getc); static bool hasHeaderValue(struct phr_header *headers, size_t numHeaders, struct string *name, struct string *value); @@ -212,6 +216,8 @@ void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int // Don't care if GET or POST if ( equals( &file, &STR_QUERY ) ) { ok = handleStatus( sock, permissions, getv, getc, keepAlive ); + } else if ( equals( &file, &STR_CACHEMAP ) ) { + ok = handleCacheMap( sock, permissions, getv, getc, keepAlive ); } else { ok = sendReply( sock, "404 Not found", "text/plain", "Nothing", -1, keepAlive ); } @@ -342,6 +348,44 @@ static bool handleStatus(int sock, int permissions, struct field *fields, size_t return ok; } +static bool handleCacheMap(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive) +{ + if ( !(permissions & ACL_IMAGE_LIST) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access image list", -1, keepAlive ); + } + int imgId = -1; + static const char one = 0xff; + for (size_t i = 0; i < fields_num; ++i) { + if ( equals( &fields[i].name, &STR_ID ) ) { + char *broken; + imgId = strtol( fields[i].value.s, &broken, 10 ); + if ( broken != fields[i].value.s ) + break; + imgId = -1; + } + } + if ( imgId == -1 ) + return sendReply( sock, "400 Bad Request", "text/plain", "Missing parameter 'id'", -1, keepAlive ); + dnbd3_image_t *image = image_byId( imgId ); + if ( image == NULL ) + return sendReply( sock, "404 Not found", "text/plain", "Image not found", -1, keepAlive ); + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + image_release( image ); + int len; + const char *map; + if ( cache == NULL ) { + map = &one; + len = 1; + } else { + _Static_assert( sizeof(const char) == sizeof(_Atomic uint8_t), "Atomic assumption exploded" ); + map = (const char*)cache->map; + len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + } + bool ok = sendReply( sock, "200 OK", "application/octet-stream", map, len, keepAlive ); + ref_put( &cache->reference ); + return ok; +} + static bool sendReply(int sock, const char *status, const char *ctype, const char *payload, ssize_t plen, int keepAlive) { if ( plen == -1 ) plen = strlen( payload ); -- cgit v1.2.3-55-g7522 From 3d0c89fccf14599d156696d74224a4fbe0787777 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Tue, 29 Oct 2019 17:55:20 +0100 Subject: [SERVER] Fix checking images without cache map --- src/server/image.c | 18 +++++++++++------- src/server/image.h | 2 +- src/server/integrity.c | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) (limited to 'src/server/image.h') diff --git a/src/server/image.c b/src/server/image.c index 6259e38..9581a92 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -160,7 +160,7 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co end = (end + HASH_BLOCK_SIZE - 1) & ~(uint64_t)(HASH_BLOCK_SIZE - 1); for ( pos = start; pos < end; pos += HASH_BLOCK_SIZE ) { const int block = (int)( pos / HASH_BLOCK_SIZE ); - if ( image_isHashBlockComplete( cache->map, block, image->realFilesize ) ) { + if ( image_isHashBlockComplete( cache, block, image->realFilesize ) ) { integrity_check( image, block, false ); } } @@ -651,9 +651,11 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) return NULL ; } -bool image_isHashBlockComplete(atomic_uint_least8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize) +bool image_isHashBlockComplete(dnbd3_cache_map_t * const cache, const uint64_t block, const uint64_t realFilesize) { - if ( cacheMap == NULL ) return true; + if ( cache == NULL ) + return true; + const atomic_uint_least8_t *cacheMap = cache->map; const uint64_t end = (block + 1) * HASH_BLOCK_SIZE; if ( end <= realFilesize ) { // Trivial case: block in question is not the last block (well, or image size is multiple of HASH_BLOCK_SIZE) @@ -1039,10 +1041,10 @@ static void image_checkRandomBlocks(dnbd3_image_t *image, const int count) int blocks[count]; int index = 0, j; int block; - if ( image_isHashBlockComplete( cache->map, 0, image->virtualFilesize ) ) { + if ( image_isHashBlockComplete( cache, 0, image->virtualFilesize ) ) { blocks[index++] = 0; } - if ( hashBlocks > 1 && image_isHashBlockComplete( cache->map, hashBlocks - 1, image->virtualFilesize ) ) { + if ( hashBlocks > 1 && image_isHashBlockComplete( cache, hashBlocks - 1, image->virtualFilesize ) ) { blocks[index++] = hashBlocks - 1; } int tries = count * 5; // Try only so many times to find a non-duplicate complete block @@ -1052,12 +1054,14 @@ static void image_checkRandomBlocks(dnbd3_image_t *image, const int count) if ( blocks[j] == block ) goto while_end; } // Block complete? If yes, add to list - if ( image_isHashBlockComplete( cache->map, block, image->virtualFilesize ) ) { + if ( image_isHashBlockComplete( cache, block, image->virtualFilesize ) ) { blocks[index++] = block; } while_end: ; } - ref_put( &cache->reference ); + if ( cache != NULL ) { + ref_put( &cache->reference ); + } for ( int i = 0; i < index; ++i ) { integrity_check( image, blocks[i], true ); } diff --git a/src/server/image.h b/src/server/image.h index 449e31f..89791fc 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -9,7 +9,7 @@ void image_serverStartup(); bool image_isComplete(dnbd3_image_t *image); -bool image_isHashBlockComplete(atomic_uint_least8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); +bool image_isHashBlockComplete(dnbd3_cache_map_t * const cache, const uint64_t block, const uint64_t fileSize); void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set); diff --git a/src/server/integrity.c b/src/server/integrity.c index 1fbd9dc..4006dfc 100644 --- a/src/server/integrity.c +++ b/src/server/integrity.c @@ -174,7 +174,7 @@ static void* integrity_main(void * data UNUSED) dnbd3_cache_map_t *cache = ref_get_cachemap( image ); if ( cache != NULL ) { // When checking full image, skip incomplete blocks, otherwise assume block is complete - complete = image_isHashBlockComplete( cache->map, blocks[0], fileSize ); + complete = image_isHashBlockComplete( cache, blocks[0], fileSize ); ref_put( &cache->reference ); } } @@ -205,7 +205,7 @@ static void* integrity_main(void * data UNUSED) bool iscomplete = true; dnbd3_cache_map_t *cache = ref_get_cachemap( image ); if ( cache != NULL ) { - iscomplete = image_isHashBlockComplete( cache->map, blocks[0], fileSize ); + iscomplete = image_isHashBlockComplete( cache, blocks[0], fileSize ); ref_put( &cache->reference ); } logadd( LOG_WARNING, "Hash check for block %d of %s failed (complete: was: %d, is: %d)", blocks[0], image->name, (int)complete, (int)iscomplete ); -- cgit v1.2.3-55-g7522 From 930b65f26cb39687a113641f56711a2d58f886ca Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 4 Mar 2020 17:49:50 +0100 Subject: [SERVER] Add timer task for saving cache maps Cache maps will now be saved periodically, but only if either they have a "dirty" bit set, which happens if any bits in the map get cleared again (due to corruption), or if new data has been replicated from an uplink server. This either means at least one byte received and 5 minutes have passed, or at least 500MB have been downloaded. The timer currently runs every 20 seconds. --- src/server/altservers.c | 20 +++++++ src/server/altservers.h | 2 + src/server/globals.h | 3 +- src/server/image.c | 136 +++++++++++++++++++++++++++++++++++++++++++++++- src/server/image.h | 2 + src/server/uplink.c | 76 ++------------------------- src/serverconfig.h | 5 +- 7 files changed, 168 insertions(+), 76 deletions(-) (limited to 'src/server/image.h') diff --git a/src/server/altservers.c b/src/server/altservers.c index a6ad235..380737c 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -273,6 +273,26 @@ int altservers_getHostListForReplication(const char *image, dnbd3_host_t *server return num; } +/** + * Returns true if there is at least one alt-server the + * given image name would be allowed to be cloned from. + */ +bool altservers_imageHasAltServers(const char *image) +{ + bool ret = false; + mutex_lock( &altServersLock ); + for ( int i = 0; i < numAltServers; ++i ) { + if ( altServers[i].isClientOnly || ( !altServers[i].isPrivate && _proxyPrivateOnly ) ) + continue; + if ( !isImageAllowed( &altServers[i], image ) ) + continue; + ret = true; + break; + } + mutex_unlock( &altServersLock ); + return ret; +} + /** * Get alt servers. If there are more alt servers than * requested, random servers will be picked. diff --git a/src/server/altservers.h b/src/server/altservers.h index 8e29aaa..78f6fcc 100644 --- a/src/server/altservers.h +++ b/src/server/altservers.h @@ -19,6 +19,8 @@ int altservers_getListForClient(dnbd3_client_t *client, dnbd3_server_entry_t *ou int altservers_getHostListForReplication(const char *image, dnbd3_host_t *servers, int size); +bool altservers_imageHasAltServers(const char *image); + bool altservers_toString(int server, char *buffer, size_t len); int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2); diff --git a/src/server/globals.h b/src/server/globals.h index 5de4180..10d3ee3 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -93,6 +93,7 @@ struct _dnbd3_uplink // If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block" uint64_t replicationHandle; // Handle of pending replication request atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup. + atomic_uint_fast64_t bytesReceivedLastSave; // Number of bytes received when we last saved the cache map int queueLen; // length of queue uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives) dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; @@ -128,7 +129,6 @@ struct _dnbd3_image uint64_t virtualFilesize; // virtual size of image (real size rounded up to multiple of 4k) uint64_t realFilesize; // actual file size on disk ticks atime; // last access time - ticks lastWorkCheck; // last time a non-working image has been checked ticks nextCompletenessEstimate; // next time the completeness estimate should be updated uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image uint32_t masterCrc32; // CRC-32 of the crc-32 list @@ -144,6 +144,7 @@ struct _dnbd3_image atomic_bool queue; // Too many requests waiting on uplink } problem; uint16_t rid; // revision of image + atomic_bool mapDirty; // Cache map has been modified outside uplink (only integrity checker for now) pthread_mutex_t lock; }; diff --git a/src/server/image.c b/src/server/image.c index 3583f86..5a9e15b 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -55,6 +55,8 @@ static dnbd3_cache_map_t* image_loadCacheMap(const char * const imagePath, const static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc); static bool image_checkRandomBlocks(dnbd3_image_t *image, const int count, int fromFd); static void* closeUnusedFds(void*); +static void* saveAllCacheMaps(void*); +static bool saveCacheMap(dnbd3_image_t *image); static void allocCacheMap(dnbd3_image_t *image, bool complete); static void cmfree(ref *ref) @@ -73,6 +75,7 @@ void image_serverStartup() mutex_init( &remoteCloneLock, LOCK_REMOTE_CLONE ); mutex_init( &reloadLock, LOCK_RELOAD ); server_addJob( &closeUnusedFds, NULL, 10, 900 ); + server_addJob( &saveAllCacheMaps, NULL, 9, 20 ); } /** @@ -160,6 +163,8 @@ void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, co integrity_check( image, block, false ); } } + } else if ( !set ) { + image->mapDirty = true; } ref_put( &cache->reference ); } @@ -624,6 +629,7 @@ static dnbd3_image_t* image_free(dnbd3_image_t *image) // this will get called again when the uplink is done. if ( !uplink_shutdown( image ) ) return NULL; + saveCacheMap( image ); mutex_lock( &image->lock ); ref_setref( &image->ref_cacheMap, NULL ); free( image->crc32 ); @@ -1830,6 +1836,135 @@ static void* closeUnusedFds(void* nix UNUSED) return NULL; } +#define IMGCOUNT 5 +static void* saveAllCacheMaps(void* nix UNUSED) +{ + static ticks nextSave; + dnbd3_image_t *list[IMGCOUNT]; + int count = 0; + declare_now; + bool full = timing_reached( &nextSave, &now ); + mutex_lock( &imageListLock ); + for ( int i = 0; i < _num_images; ++i ) { + dnbd3_image_t * const image = _images[i]; + if ( image->mapDirty ) { + // Flag is set if integrity checker found a problem - save out + image->users++; + list[count++] = image; + image->mapDirty = false; + } else { + // Otherwise, consider longer timeout and byte count limits of uplink + dnbd3_uplink_t *uplink = ref_get_uplink( &image->uplinkref ); + if ( uplink != NULL ) { + assert( uplink->bytesReceivedLastSave <= uplink->bytesReceived ); + uint64_t diff = uplink->bytesReceived - uplink->bytesReceivedLastSave; + if ( diff > CACHE_MAP_MAX_UNSAVED_BYTES + || ( full && diff != 0 ) ) { + image->users++; + list[count++] = image; + uplink->bytesReceivedLastSave = uplink->bytesReceived; + } + ref_put( &uplink->reference ); + } + } + if ( count == IMGCOUNT ) + break; + } + mutex_unlock( &imageListLock ); + if ( full && count < IMGCOUNT ) { + // Only update nextSave once we handled all images in the list + timing_addSeconds( &nextSave, &now, CACHE_MAP_MAX_SAVE_DELAY ); + } + for ( int i = 0; i < count; ++i ) { + saveCacheMap( list[i] ); + image_release( list[i] ); + } + return NULL; +} +#undef IMGCOUNT + +/** + * Saves the cache map of the given image. + * Return true on success. + * @param image the image + */ +static bool saveCacheMap(dnbd3_image_t *image) +{ + if ( !_isProxy ) + return true; // Nothing to do + dnbd3_cache_map_t *cache = ref_get_cachemap( image ); + if ( cache == NULL ) + return true; // Nothing to do + // Check if we're a "hybrid proxy", i.e. there are only some namespaces (directories) + // for which we have any upstream servers configured. If there's none, don't touch + // the cache map on disk. + if ( !altservers_imageHasAltServers( image->name ) ) { + ref_put( &cache->reference ); + return true; // Nothing to do + } + + logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); + const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); + char mapfile[strlen( image->path ) + 4 + 1]; + strcpy( mapfile, image->path ); + strcat( mapfile, ".map" ); + + int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); + if ( fd == -1 ) { + const int err = errno; + ref_put( &cache->reference ); + logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); + return false; + } + + // On Linux we could use readFd, but in general it's not guaranteed to work + int imgFd = open( image->path, O_WRONLY ); + if ( imgFd == -1 ) { + logadd( LOG_WARNING, "Cannot open %s for fsync(): errno=%d", image->path, errno ); + } else { + if ( fsync( imgFd ) == -1 ) { + logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d. Resetting cache map.", image->path, errno ); + dnbd3_cache_map_t *old = image_loadCacheMap(image->path, image->virtualFilesize); + const int mapSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + if ( old == NULL ) { + // Could not load old map. FS might be toast. + logadd( LOG_ERROR, "Cannot load old cache map. Setting all zero." ); + memset( cache->map, 0, mapSize ); + } else { + // AND the maps together to be safe + for ( int i = 0; i < mapSize; ++i ) { + cache->map[i] &= old->map[i]; + } + old->reference.free( &old->reference ); + } + } + close( imgFd ); + } + + // Write current map to file + size_t done = 0; + while ( done < size ) { + const ssize_t ret = write( fd, cache->map + done, size - done ); + if ( ret == -1 ) { + if ( errno == EINTR ) continue; + logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); + break; + } + if ( ret <= 0 ) { + logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); + break; + } + done += (size_t)ret; + } + ref_put( &cache->reference ); + if ( fsync( fd ) == -1 ) { + logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); + } + close( fd ); + // TODO fsync on parent directory + return true; +} + static void allocCacheMap(dnbd3_image_t *image, bool complete) { const uint8_t val = complete ? 0xff : 0; @@ -1846,4 +1981,3 @@ static void allocCacheMap(dnbd3_image_t *image, bool complete) } mutex_unlock( &image->lock ); } - diff --git a/src/server/image.h b/src/server/image.h index 89791fc..4614c74 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -49,6 +49,8 @@ void image_closeUnusedFd(); bool image_ensureDiskSpaceLocked(uint64_t size, bool force); +bool image_saveCacheMap(dnbd3_image_t *image); + // one byte in the map covers 8 4kib blocks, so 32kib per byte // "+ (1 << 15) - 1" is required to account for the last bit of // the image that is smaller than 32kib diff --git a/src/server/uplink.c b/src/server/uplink.c index 97cb2a9..e5ab9c0 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -57,7 +57,6 @@ static bool uplink_sendKeepalive(dnbd3_uplink_t *uplink); static void uplink_addCrc32(dnbd3_uplink_t *uplink); static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink); static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force); -static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink); static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink); static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew); @@ -103,6 +102,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version mutex_init( &uplink->sendMutex, LOCK_UPLINK_SEND ); uplink->image = image; uplink->bytesReceived = 0; + uplink->bytesReceivedLastSave = 0; uplink->idleTime = 0; uplink->queueLen = 0; uplink->cacheFd = -1; @@ -445,7 +445,6 @@ static void* uplink_mainloop(void *data) int altCheckInterval = SERVER_RTT_INTERVAL_INIT; int rttTestResult; uint32_t discoverFailCount = 0; - uint32_t unsavedSeconds = 0; ticks nextAltCheck, lastKeepalive; char buffer[200]; memset( events, 0, sizeof(events) ); @@ -561,12 +560,6 @@ static void* uplink_mainloop(void *data) if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { lastKeepalive = now; uplink->idleTime += timepassed; - unsavedSeconds += timepassed; - if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && uplink->idleTime >= 20 && uplink->idleTime <= 70 ) ) { - // fsync/save every 4 minutes, or every 60 seconds if uplink is idle - unsavedSeconds = 0; - uplink_saveCacheMap( uplink ); - } // Keep-alive if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) { // Send keep-alive if nothing is happening, and try to trigger background rep. @@ -639,9 +632,9 @@ static void* uplink_mainloop(void *data) } #endif } - cleanup: ; - uplink_saveCacheMap( uplink ); +cleanup: ; dnbd3_image_t *image = uplink->image; + image->mapDirty = true; // Force writeout of cache map mutex_lock( &image->lock ); bool exp = false; if ( atomic_compare_exchange_strong( &uplink->shutdown, &exp, true ) ) { @@ -1135,69 +1128,6 @@ static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force) return uplink->cacheFd != -1; } -/** - * Saves the cache map of the given image. - * Return true on success. - * Locks on: imageListLock, image.lock - */ -static bool uplink_saveCacheMap(dnbd3_uplink_t *uplink) -{ - dnbd3_image_t *image = uplink->image; - assert( image != NULL ); - - if ( uplink->cacheFd != -1 ) { - if ( fsync( uplink->cacheFd ) == -1 ) { - // A failing fsync means we have no guarantee that any data - // since the last fsync (or open if none) has been saved. Apart - // from keeping the cache map from the last successful fsync - // around and restoring it there isn't much we can do to recover - // a consistent state. Bail out. - logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); - logadd( LOG_ERROR, "Bailing out immediately" ); - exit( 1 ); - } - } - - dnbd3_cache_map_t *cache = ref_get_cachemap( image ); - if ( cache == NULL ) - return true; - logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); - const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); - assert( image->path != NULL ); - char mapfile[strlen( image->path ) + 4 + 1]; - strcpy( mapfile, image->path ); - strcat( mapfile, ".map" ); - - int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); - if ( fd == -1 ) { - const int err = errno; - ref_put( &cache->reference ); - logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); - return false; - } - - size_t done = 0; - while ( done < size ) { - const ssize_t ret = write( fd, cache->map + done, size - done ); - if ( ret == -1 ) { - if ( errno == EINTR ) continue; - logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); - break; - } - if ( ret <= 0 ) { - logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); - break; - } - done += (size_t)ret; - } - ref_put( &cache->reference ); - if ( fsync( fd ) == -1 ) { - logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); - } - close( fd ); - return true; -} - static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink) { return ( uplink->idleTime > SERVER_UPLINK_IDLE_TIMEOUT diff --git a/src/serverconfig.h b/src/serverconfig.h index 239f0a2..5c7301d 100644 --- a/src/serverconfig.h +++ b/src/serverconfig.h @@ -17,7 +17,10 @@ #define SERVER_UPLINK_QUEUELEN_THRES 900 // Threshold where we start dropping incoming clients #define SERVER_MAX_PENDING_ALT_CHECKS 500 // Length of queue for pending alt checks requested by uplinks -#define SERVER_CACHE_MAP_SAVE_INTERVAL 90 +// Wait a maximum of 5 minutes before saving cache map (if data was received at all) +#define CACHE_MAP_MAX_SAVE_DELAY 300 +// If more than 500MB have been received from uplink without saving cache map, do so +#define CACHE_MAP_MAX_UNSAVED_BYTES ((uint64_t)500 * 1000 * 1000) // Time in ms to wait for a read/write call to complete on an uplink connection #define SOCKET_TIMEOUT_UPLINK 5000 -- cgit v1.2.3-55-g7522 From 290d3478f245bb7d2112bb781286a9fbae42b983 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 13 Mar 2020 16:03:29 +0100 Subject: [SERVER] Rewrite uplink queue handling - Now uses linked lists instead of huge array - Does prefetch data on client requests - Can have multiple replication requests in-flight --- src/server/globals.c | 6 + src/server/globals.h | 35 ++- src/server/image.c | 3 +- src/server/image.h | 44 +++ src/server/net.c | 44 +-- src/server/reference.h | 5 + src/server/uplink.c | 771 +++++++++++++++++++++++++++---------------------- src/server/uplink.h | 2 +- src/serverconfig.h | 3 +- 9 files changed, 518 insertions(+), 395 deletions(-) (limited to 'src/server/image.h') diff --git a/src/server/globals.c b/src/server/globals.c index ac079b1..98e0ddb 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -19,6 +19,7 @@ atomic_int _clientPenalty = 0; atomic_bool _isProxy = false; atomic_int _backgroundReplication = BGR_FULL; atomic_int _bgrMinClients = 0; +atomic_int _bgrWindowSize = 1; atomic_bool _lookupMissingForProxy = true; atomic_bool _sparseFiles = false; atomic_bool _ignoreAllocErrors = false; @@ -74,6 +75,7 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key SAVE_TO_VAR_BOOL( dnbd3, isProxy ); SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly ); SAVE_TO_VAR_INT( dnbd3, bgrMinClients ); + SAVE_TO_VAR_INT( dnbd3, bgrWindowSize ); SAVE_TO_VAR_BOOL( dnbd3, lookupMissingForProxy ); SAVE_TO_VAR_BOOL( dnbd3, sparseFiles ); SAVE_TO_VAR_BOOL( dnbd3, ignoreAllocErrors ); @@ -134,6 +136,9 @@ void globals_loadConfig() logadd( LOG_WARNING, "Ignoring 'sparseFiles=true' since backgroundReplication is set to true and bgrMinClients is too low" ); _sparseFiles = false; } + if ( _bgrWindowSize < 1 ) { + _bgrWindowSize = 1; + } // Dump config as interpreted char buffer[2000]; globals_dumpConfig( buffer, sizeof(buffer) ); @@ -325,6 +330,7 @@ size_t globals_dumpConfig(char *buffer, size_t size) PBOOL(backgroundReplication); } PINT(bgrMinClients); + PINT(bgrWindowSize); PBOOL(lookupMissingForProxy); PBOOL(sparseFiles); PBOOL(ignoreAllocErrors); diff --git a/src/server/globals.h b/src/server/globals.h index 1bb6857..5cee92a 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -18,18 +18,27 @@ typedef struct _dnbd3_uplink dnbd3_uplink_t; typedef struct _dnbd3_image dnbd3_image_t; typedef struct _dnbd3_client dnbd3_client_t; -typedef struct +typedef struct _dnbd3_queue_client { - uint64_t handle; // Client defined handle to pass back in reply - uint64_t from; // First byte offset of requested block (ie. 4096) - uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) + struct _dnbd3_queue_client *next; + uint64_t handle; // Handle used by client + uint64_t from, to; // Client range dnbd3_client_t * client; // Client to send reply to - int status; // status of this entry: ULR_* +} dnbd3_queue_client_t; + +typedef struct _dnbd3_queue_entry +{ + struct _dnbd3_queue_entry *next; + uint64_t handle; // Our handle for this entry + uint64_t from; // First byte offset of requested block (ie. 4096) + uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) + dnbd3_queue_client_t *clients; #ifdef _DEBUG - ticks entered; // When this request entered the queue (for debugging) + ticks entered; // When this request entered the queue (for debugging) #endif - uint8_t hopCount; // How many hops this request has already taken across proxies -} dnbd3_queued_request_t; + uint8_t hopCount; // How many hops this request has already taken across proxies + bool sent; // Already sent to uplink? +} dnbd3_queue_entry_t; typedef struct _ns { @@ -91,12 +100,12 @@ struct _dnbd3_uplink bool cycleDetected; // connection cycle between proxies detected for current remote server int nextReplicationIndex; // Which index in the cache map we should start looking for incomplete blocks at // If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block" - uint64_t replicationHandle; // Handle of pending replication request atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup. atomic_uint_fast64_t bytesReceivedLastSave; // Number of bytes received when we last saved the cache map int queueLen; // length of queue uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives) - dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; + dnbd3_queue_entry_t *queue; + atomic_uint_fast32_t queueId; dnbd3_alt_local_t altData[SERVER_MAX_ALTS]; }; @@ -156,6 +165,7 @@ struct _dnbd3_client atomic_uint_fast64_t bytesSent; // Byte counter for this client. dnbd3_image_t * _Atomic image; // Image in use by this client, or NULL during handshake int sock; + _Atomic uint8_t relayedCount; // How many requests are in-flight to the uplink server bool isServer; // true if a server in proxy mode, false if real client dnbd3_host_t host; char hostName[HOSTNAMELEN]; // inet_ntop version of host @@ -242,6 +252,11 @@ extern atomic_int _backgroundReplication; */ extern atomic_int _bgrMinClients; +/** + * How many in-flight replication requests we should target (per uplink) + */ +extern atomic_int _bgrWindowSize; + /** * (In proxy mode): If connecting client is a proxy, and the requested image * is not known locally, should we ask our known alt servers for it? diff --git a/src/server/image.c b/src/server/image.c index 86b6374..81ec479 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -340,7 +340,6 @@ dnbd3_image_t* image_byId(int imgId) dnbd3_image_t* image_get(char *name, uint16_t revision, bool ensureFdOpen) { int i; - const char *removingText = _removeMissingImages ? ", removing from list" : ""; dnbd3_image_t *candidate = NULL; // Simple sanity check const size_t slen = strlen( name ); @@ -1895,7 +1894,7 @@ static void* saveLoadAllCacheMaps(void* nix UNUSED) // We're not replicating this image, if there's a cache map, reload // it periodically, since we might read from a shared storage that // another server instance is writing to. - if ( full || !cache->unchanged && !image->problem.read ) { + if ( full || ( !cache->unchanged && !image->problem.read ) ) { logadd( LOG_DEBUG2, "Reloading cache map of %s:%d", PIMG(image) ); dnbd3_cache_map_t *onDisk = image_loadCacheMap(image->path, image->virtualFilesize); if ( onDisk == NULL ) { diff --git a/src/server/image.h b/src/server/image.h index 4614c74..b23711b 100644 --- a/src/server/image.h +++ b/src/server/image.h @@ -51,6 +51,50 @@ bool image_ensureDiskSpaceLocked(uint64_t size, bool force); bool image_saveCacheMap(dnbd3_image_t *image); +/** + * Check if given range is cached. Be careful when using this function because: + * 1) you need to hold a reference to the cache map + * 2) start and end are assumed to be 4k aligned + * 3) start and end are not checked to be in bounds (we don't know the image in this context) + */ +static inline bool image_isRangeCachedUnsafe(dnbd3_cache_map_t *cache, uint64_t start, uint64_t end) +{ + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + const uint8_t fb = (uint8_t)(0xff << ((start >> 12) & 7)); + const uint8_t lb = (uint8_t)(~(0xff << ((((end - 1) >> 12) & 7) + 1))); + uint64_t pos; + uint8_t b; + bool isCached; + if ( firstByteInMap == lastByteInMap ) { // Single byte to check, much simpler + b = cache->map[firstByteInMap]; + isCached = ( b & ( fb & lb ) ) == ( fb & lb ); + } else { + isCached = true; + atomic_thread_fence( memory_order_acquire ); + // First byte + if ( isCached ) { + b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed ); + isCached = ( ( b & fb ) == fb ); + } + // Last byte + if ( isCached ) { + b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed ); + isCached = ( ( b & lb ) == lb ); + } + // Middle, must be all bits set (0xff) + if ( isCached ) { + for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { + if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) { + isCached = false; + break; + } + } + } + } + return isCached; +} + // one byte in the map covers 8 4kib blocks, so 32kib per byte // "+ (1 << 15) - 1" is required to account for the last bit of // the image that is smaller than 32kib diff --git a/src/server/net.c b/src/server/net.c index 954cb8a..9ba9dbc 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -197,6 +197,7 @@ void* net_handleNewConnection(void *clientPtr) client->hostName[HOSTNAMELEN-1] = '\0'; mutex_unlock( &client->lock ); client->bytesSent = 0; + client->relayedCount = 0; if ( !addToList( client ) ) { freeClientStruct( client ); @@ -344,41 +345,18 @@ void* net_handleNewConnection(void *clientPtr) // This is a proxyed image, check if we need to relay the request... const uint64_t start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); const uint64_t end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint64_t firstByteInMap = start >> 15; - const uint64_t lastByteInMap = (end - 1) >> 15; - const uint8_t fb = (uint8_t)(0xff << ((start >> 12) & 7)); - const uint8_t lb = (uint8_t)(~(0xff << ((((end - 1) >> 12) & 7) + 1))); - uint64_t pos; - uint8_t b; - bool isCached; - if ( firstByteInMap == lastByteInMap ) { // Single byte to check, much simpler - b = cache->map[firstByteInMap]; - isCached = ( b & ( fb & lb ) ) == ( fb & lb ); - } else { - isCached = true; - atomic_thread_fence( memory_order_acquire ); - // First byte - if ( isCached ) { - b = atomic_load_explicit( &cache->map[firstByteInMap], memory_order_relaxed ); - isCached = ( ( b & fb ) == fb ); - } - // Last byte - if ( isCached ) { - b = atomic_load_explicit( &cache->map[lastByteInMap], memory_order_relaxed ); - isCached = ( ( b & lb ) == lb ); - } - // Middle, must be all bits set (0xff) - if ( isCached ) { - for ( pos = firstByteInMap + 1; pos < lastByteInMap; ++pos ) { - if ( atomic_load_explicit( &cache->map[pos], memory_order_relaxed ) != 0xff ) { - isCached = false; - break; - } + if ( !image_isRangeCachedUnsafe( cache, start, end ) ) { + if ( unlikely( client->relayedCount > 250 ) ) { + logadd( LOG_DEBUG1, "Client is overloading uplink; throttling" ); + for ( int i = 0; i < 100 && client->relayedCount > 200; ++i ) { + usleep( 10000 ); + } + if ( client->relayedCount > 250 ) { + logadd( LOG_WARNING, "Could not lower client's uplink backlog; dropping client" ); + goto exit_client_cleanup; } } - } - if ( !isCached ) { - if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { + if ( !uplink_request( NULL, client, request.handle, offset, request.size, request.hops ) ) { logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy for image %s:%d", client->hostName, image->name, image->rid ); goto exit_client_cleanup; diff --git a/src/server/reference.h b/src/server/reference.h index 4eda546..75a681f 100644 --- a/src/server/reference.h +++ b/src/server/reference.h @@ -39,6 +39,11 @@ static inline ref *ref_get( weakref *weakref ) return ref; } +static inline void ref_inc( ref *ref ) +{ + ++ref->count; +} + static inline void ref_put( ref *ref ) { if ( --ref->count == 0 ) { diff --git a/src/server/uplink.c b/src/server/uplink.c index 7c7cd1c..188bf06 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -8,6 +8,7 @@ #include "../shared/protocol.h" #include "../shared/timing.h" #include "../shared/crc32.h" +#include "threadpool.h" #include "reference.h" #include @@ -21,30 +22,6 @@ #define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE ) #define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) ) -#define REP_NONE ( (uint64_t)0xffffffffffffffff ) - -// Status of request in queue - -// Slot is free, can be used. -// Must only be set in uplink_handle_receive() or uplink_remove_client() -#define ULR_FREE 0 -// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse. -// Must only be set in uplink_request() -#define ULR_NEW 1 -// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. -// Must only be set in uplink_mainloop() or uplink_request() -#define ULR_PENDING 2 -// Slot is being processed, do not consider for hop on. -// Must only be set in uplink_handle_receive() -#define ULR_PROCESSING 3 - -static const char *const NAMES_ULR[4] = { - [ULR_FREE] = "ULR_FREE", - [ULR_NEW] = "ULR_NEW", - [ULR_PENDING] = "ULR_PENDING", - [ULR_PROCESSING] = "ULR_PROCESSING", -}; - static atomic_uint_fast64_t totalBytesReceived = 0; static void cancelAllRequests(dnbd3_uplink_t *uplink); @@ -59,6 +36,15 @@ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink); static bool uplink_reopenCacheFd(dnbd3_uplink_t *uplink, const bool force); static bool uplink_connectionShouldShutdown(dnbd3_uplink_t *uplink); static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew); +static int numWantedReplicationRequests(dnbd3_uplink_t *uplink); +static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle); +static void *prefetchForClient(void *data); + +typedef struct { + dnbd3_uplink_t *uplink; + uint64_t start; + uint32_t length; +} prefetch_request_t; // ############ uplink connection handling @@ -106,6 +92,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version uplink->bytesReceived = 0; uplink->bytesReceivedLastSave = 0; uplink->idleTime = SERVER_UPLINK_IDLE_TIMEOUT - 90; + uplink->queue = NULL; uplink->queueLen = 0; uplink->cacheFd = -1; uplink->signal = signal_new(); @@ -113,7 +100,6 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version logadd( LOG_WARNING, "Error creating signal. Uplink unavailable." ); goto failure; } - uplink->replicationHandle = REP_NONE; mutex_lock( &uplink->rttLock ); mutex_lock( &uplink->sendMutex ); uplink->current.fd = -1; @@ -175,9 +161,9 @@ bool uplink_shutdown(dnbd3_image_t *image) } cancelAllRequests( uplink ); ref_setref( &image->uplinkref, NULL ); - ref_put( &uplink->reference ); mutex_unlock( &uplink->queueLock ); bool retval = ( exp && image->users == 0 ); + ref_put( &uplink->reference ); mutex_unlock( &image->lock ); return retval; } @@ -188,12 +174,21 @@ bool uplink_shutdown(dnbd3_image_t *image) */ static void cancelAllRequests(dnbd3_uplink_t *uplink) { - for ( int i = 0; i < uplink->queueLen; ++i ) { - if ( uplink->queue[i].status != ULR_FREE ) { - net_sendReply( uplink->queue[i].client, CMD_ERROR, uplink->queue[i].handle ); - uplink->queue[i].status = ULR_FREE; + dnbd3_queue_entry_t *it = uplink->queue; + while ( it != NULL ) { + dnbd3_queue_client_t *cit = it->clients; + while ( cit != NULL ) { + net_sendReply( cit->client, CMD_ERROR, cit->handle ); + cit->client->relayedCount--; + dnbd3_queue_client_t *next = cit->next; + free( cit ); + cit = next; } + dnbd3_queue_entry_t *next = it->next; + free( it ); + it = next; } + uplink->queue = NULL; uplink->queueLen = 0; uplink->image->problem.queue = false; } @@ -234,39 +229,54 @@ static void uplink_free(ref *ref) */ void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client) { + if ( client->relayedCount == 0 ) + return; mutex_lock( &uplink->queueLock ); - for (int i = uplink->queueLen - 1; i >= 0; --i) { - if ( uplink->queue[i].client == client ) { - // Make sure client doesn't get destroyed while we're sending it data - mutex_lock( &client->sendMutex ); - mutex_unlock( &client->sendMutex ); - uplink->queue[i].client = NULL; - uplink->queue[i].status = ULR_FREE; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + for ( dnbd3_queue_client_t **cit = &it->clients; *cit != NULL; cit = &(**cit).next ) { + if ( (**cit).client == client ) { + --client->relayedCount; + dnbd3_queue_client_t *entry = *cit; + *cit = (**cit).next; + free( entry ); + } } - if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--; } mutex_unlock( &uplink->queueLock ); + if ( unlikely( client->relayedCount != 0 ) ) { + logadd( LOG_DEBUG1, "Client has relayedCount == %"PRIu8" on disconnect..", client->relayedCount ); + int i; + for ( i = 0; i < 1000 && client->relayedCount != 0; ++i ) { + usleep( 10000 ); + } + if ( client->relayedCount != 0 ) { + logadd( LOG_WARNING, "Client relayedCount still %"PRIu8" after sleeping!", client->relayedCount ); + } + } } /** - * Request a chunk of data through an uplink server - * Locks on: image.lock, uplink.queueLock + * Request a chunk of data through an uplink server. Either uplink or client has to be non-NULL. + * If client is NULL, this is assumed to be a background replication request. + * Locks on: uplink.queueLock, uplink.sendMutex */ -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) { - if ( client == NULL || client->image == NULL ) - return false; + bool getUplink = ( uplink == NULL ); + assert( client != NULL || uplink != NULL ); if ( length > (uint32_t)_maxPayload ) { logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); return false; } - dnbd3_uplink_t * uplink = ref_get_uplink( &client->image->uplinkref ); - if ( unlikely( uplink == NULL ) ) { - uplink_init( client->image, -1, NULL, -1 ); + if ( getUplink ) { uplink = ref_get_uplink( &client->image->uplinkref ); - if ( uplink == NULL ) { - logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); - return false; + if ( unlikely( uplink == NULL ) ) { + uplink_init( client->image, -1, NULL, -1 ); + uplink = ref_get_uplink( &client->image->uplinkref ); + if ( uplink == NULL ) { + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } } } if ( uplink->shutdown ) { @@ -275,163 +285,179 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain // This might be a false positive if there are multiple instances running on the same host (IP) - if ( hops != 0 && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { + if ( client != NULL && hops != 0 + && isSameAddress( altservers_indexToHost( uplink->current.index ), &client->host ) ) { uplink->cycleDetected = true; signal_call( uplink->signal ); logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); goto fail_ref; } - int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise - int existingType = -1; // ULR_* type of existing request - int i; - int freeSlot = -1; - int firstUsedSlot = -1; - bool requestLoop = false; - const uint64_t end = start + length; - - mutex_lock( &uplink->queueLock ); - if ( uplink->shutdown ) { // Check again after locking to prevent lost requests - goto fail_lock; - } - for (i = 0; i < uplink->queueLen; ++i) { - // find free slot to place this request into - if ( uplink->queue[i].status == ULR_FREE ) { - if ( freeSlot == -1 || existingType != ULR_PROCESSING ) { - freeSlot = i; - } - continue; - } - if ( firstUsedSlot == -1 ) { - firstUsedSlot = i; - } - // find existing request to attach to - if ( uplink->queue[i].from > start || uplink->queue[i].to < end ) - continue; // Range not suitable - // Detect potential proxy cycle. New request hopcount is greater, range is same, old request has already been sent -> suspicious - if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end && uplink->queue[i].status == ULR_PENDING ) { - requestLoop = true; - break; - } - if ( foundExisting == -1 || existingType == ULR_PROCESSING ) { - foundExisting = i; - existingType = uplink->queue[i].status; - } - } - if ( unlikely( requestLoop ) ) { - uplink->cycleDetected = true; - signal_call( uplink->signal ); - logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); - goto fail_lock; - } - if ( freeSlot < firstUsedSlot && firstUsedSlot < 10 && existingType != ULR_PROCESSING ) { - freeSlot = -1; // Not attaching to existing request, make it use a higher slot - } - if ( freeSlot == -1 ) { - if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { - logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); + struct { + uint64_t handle, start, end; + } req; + do { + const uint64_t end = start + length; + dnbd3_queue_entry_t *request = NULL, *last = NULL; + bool isNew; + mutex_lock( &uplink->queueLock ); + if ( uplink->shutdown ) { // Check again after locking to prevent lost requests goto fail_lock; } - freeSlot = uplink->queueLen++; - if ( freeSlot > SERVER_UPLINK_QUEUELEN_THRES ) { - uplink->image->problem.queue = true; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->from <= start && it->to >= end ) { + // Matching range, attach + request = it; + break; + } + if ( it->next == NULL ) { + // Not matching, last in list, remember + last = it; + break; + } } - } - // Do not send request to uplink server if we have a matching pending request AND the request either has the - // status ULR_NEW/PENDING OR we found a free slot with LOWER index than the one we attach to. Otherwise - // explicitly send this request to the uplink server. The second condition mentioned here is to prevent - // a race condition where the reply for the outstanding request already arrived and the uplink thread - // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might - // already have passed the index of the free slot we determined, but not reached the existing request we just found above. - if ( foundExisting != -1 && existingType == ULR_PROCESSING && freeSlot > foundExisting ) { - foundExisting = -1; // -1 means "send request" - } -#ifdef _DEBUG - if ( foundExisting != -1 ) { - logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, NAMES_ULR[existingType], foundExisting, freeSlot ); - logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n" - "New %" PRIu64 "-%" PRIu64 " (%p)\n", - uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client, - start, end, (void*)client ); - } -#endif - // Fill structure - uplink->queue[freeSlot].from = start; - uplink->queue[freeSlot].to = end; - uplink->queue[freeSlot].handle = handle; - uplink->queue[freeSlot].client = client; - //int old = uplink->queue[freeSlot].status; - uplink->queue[freeSlot].status = ( foundExisting == -1 ? ULR_NEW : - ( existingType == ULR_NEW ? ULR_PENDING : existingType ) ); - uplink->queue[freeSlot].hopCount = hops; + dnbd3_queue_client_t **c; + if ( request == NULL ) { + // No existing request to attach to + if ( uplink->queueLen >= UPLINK_MAX_QUEUE ) { + logadd( LOG_WARNING, "Uplink queue is full, consider increasing UPLINK_MAX_QUEUE. Dropping client..." ); + goto fail_lock; + } + uplink->queueLen++; + if ( uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + uplink->image->problem.queue = true; + } + request = malloc( sizeof(*request) ); + if ( last == NULL ) { + uplink->queue = request; + } else { + last->next = request; + } + request->next = NULL; + request->handle = ++uplink->queueId; + request->from = start & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + request->to = (end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); #ifdef _DEBUG - timing_get( &uplink->queue[freeSlot].entered ); - //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end ); + timing_get( &request->entered ); #endif - mutex_unlock( &uplink->queueLock ); + request->hopCount = hops; + request->sent = true; // Optimistic; would be set to false on failure + if ( client == NULL ) { + // BGR + request->clients = NULL; + } else { + c = &request->clients; + } + isNew = true; + } else if ( client == NULL ) { + // Replication request that maches existing request. Do nothing + isNew = false; + } else { + // Existing request. Check if potential cycle + if ( hops > request->hopCount + 1 && request->from == start && request->to == end ) { + logadd( LOG_DEBUG1, "Request cycle detected on uplink for %s:%d", PIMG(uplink->image) ); + goto fail_lock; + } + // Count number if clients, get tail of list + int count = 0; + c = &request->clients; + while ( *c != NULL ) { + c = &(**c).next; + if ( ++count >= UPLINK_MAX_CLIENTS_PER_REQUEST ) { + logadd( LOG_DEBUG2, "Won't accept more than %d clients per request, dropping client", count ); + goto fail_lock; + } + } + isNew = false; + } + req.handle = request->handle; + req.start = request->from; + req.end = request->to; + if ( client != NULL ) { + *c = malloc( sizeof( *request->clients ) ); + (**c).next = NULL; + (**c).handle = handle; + (**c).from = start; + (**c).to = end; + (**c).client = client; + client->relayedCount++; + } + mutex_unlock( &uplink->queueLock ); - if ( foundExisting != -1 ) { - ref_put( &uplink->reference ); - return true; // Attached to pending request, do nothing - } + if ( !isNew ) { + goto success_ref; // Attached to pending request, do nothing + } + } while (0); - // See if we can fire away the request - if ( unlikely( mutex_trylock( &uplink->sendMutex ) != 0 ) ) { - logadd( LOG_DEBUG2, "Could not trylock send mutex, queueing uplink request" ); + // Fire away the request + mutex_lock( &uplink->sendMutex ); + if ( unlikely( uplink->current.fd == -1 ) ) { + uplink->image->problem.uplink = true; + markRequestUnsent( uplink, req.handle ); + mutex_unlock( &uplink->sendMutex ); + logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); } else { - if ( unlikely( uplink->current.fd == -1 ) ) { + if ( hops < 200 ) ++hops; + const bool ret = dnbd3_get_block( uplink->current.fd, req.start, req.end - req.start, + req.handle, COND_HOPCOUNT( uplink->current.version, hops ) ); + if ( unlikely( !ret ) ) { + markRequestUnsent( uplink, req.handle ); uplink->image->problem.uplink = true; mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG2, "Cannot do direct uplink request: No socket open" ); + logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing (%"PRIu64")", req.handle ); } else { - const uint64_t reqStart = uplink->queue[freeSlot].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint32_t reqSize = (uint32_t)(((uplink->queue[freeSlot].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); - if ( hops < 200 ) ++hops; - const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); - if ( unlikely( !ret ) ) { - uplink->image->problem.uplink = true; - mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG2, "Could not send out direct uplink request, queueing" ); - } else { - // Direct send succeeded, update queue entry from NEW to PENDING, so the request won't be sent again - int state; - mutex_unlock( &uplink->sendMutex ); - mutex_lock( &uplink->queueLock ); - if ( !uplink->shutdown && uplink->queue[freeSlot].handle == handle && uplink->queue[freeSlot].client == client ) { - state = uplink->queue[freeSlot].status; - if ( uplink->queue[freeSlot].status == ULR_NEW ) { - uplink->queue[freeSlot].status = ULR_PENDING; - } - } else { - state = -1; - } - mutex_unlock( &uplink->queueLock ); - if ( state == -1 ) { - logadd( LOG_DEBUG2, "Direct uplink request queue entry gone after sending and re-locking queue. *shrug*" ); - } else if ( state == ULR_NEW ) { - //logadd( LOG_DEBUG2, "Direct uplink request" ); - } else { - logadd( LOG_DEBUG2, "Direct uplink request queue entry changed to %s afte sending (expected ULR_NEW).", NAMES_ULR[uplink->queue[freeSlot].status] ); - } - ref_put( &uplink->reference ); - return true; - } - // Fall through to waking up sender thread + // OK + mutex_unlock( &uplink->sendMutex ); + goto success_ref; } + // Fall through to waking up sender thread } if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); } - ref_put( &uplink->reference ); + +success_ref: + if ( client != NULL ) { + // Was from client -- potential prefetch + uint32_t len = MIN( uplink->image->virtualFilesize - req.end, req.end - req.start ); + if ( len > 0 ) { + prefetch_request_t *job = malloc( sizeof( *job ) ); + job->start = req.end; + job->length = len; + job->uplink = uplink; + ref_inc( &uplink->reference ); // Hold one for the thread, thread will return it + threadpool_run( &prefetchForClient, (void*)job ); + } + } + if ( getUplink ) { + ref_put( &uplink->reference ); + } return true; fail_lock: mutex_unlock( &uplink->queueLock ); fail_ref: - ref_put( &uplink->reference ); + if ( getUplink ) { + ref_put( &uplink->reference ); + } return false; } +static void *prefetchForClient(void *data) +{ + prefetch_request_t *job = (prefetch_request_t*)data; + dnbd3_cache_map_t *cache = ref_get_cachemap( job->uplink->image ); + if ( cache != NULL ) { + if ( !image_isRangeCachedUnsafe( cache, job->start, job->start + job->length ) ) { + uplink_request( job->uplink, NULL, ++job->uplink->queueId, job->start, job->length, 0 ); + } + ref_put( &cache->reference ); + } + ref_put( &job->uplink->reference ); + free( job ); + return NULL; +} + /** * Uplink thread. * Locks are irrelevant as this is never called from another function @@ -443,7 +469,7 @@ static void* uplink_mainloop(void *data) #define EV_COUNT (2) struct pollfd events[EV_COUNT]; dnbd3_uplink_t * const uplink = (dnbd3_uplink_t*)data; - int numSocks, i, waitTime; + int numSocks, waitTime; int altCheckInterval = SERVER_RTT_INTERVAL_INIT; int rttTestResult; uint32_t discoverFailCount = 0; @@ -478,7 +504,7 @@ static void* uplink_mainloop(void *data) declare_now; waitTime = (int)timing_diffMs( &now, &nextAltCheck ); if ( waitTime < 100 ) waitTime = 100; - if ( waitTime > 10000 ) waitTime = 10000; + else if ( waitTime > 10000 ) waitTime = 10000; } events[EV_SOCKET].fd = uplink->current.fd; numSocks = poll( events, EV_COUNT, waitTime ); @@ -505,7 +531,6 @@ static void* uplink_mainloop(void *data) mutex_unlock( &uplink->rttLock ); discoverFailCount = 0; if ( fd != -1 ) close( fd ); - uplink->replicationHandle = REP_NONE; uplink->image->problem.uplink = false; uplink->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received buffer[0] = '@'; @@ -559,11 +584,11 @@ static void* uplink_mainloop(void *data) } declare_now; uint32_t timepassed = timing_diff( &lastKeepalive, &now ); - if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { + if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL || ( timepassed >= 2 && uplink->idleTime < _bgrWindowSize ) ) { lastKeepalive = now; uplink->idleTime += timepassed; // Keep-alive - if ( uplink->current.fd != -1 && uplink->replicationHandle == REP_NONE ) { + if ( uplink->current.fd != -1 && uplink->queueLen < _bgrWindowSize ) { // Send keep-alive if nothing is happening, and try to trigger background rep. if ( !uplink_sendKeepalive( uplink ) || !uplink_sendReplicationRequest( uplink ) ) { uplink_connectionFailed( uplink, true ); @@ -612,19 +637,16 @@ static void* uplink_mainloop(void *data) ticks deadline; timing_set( &deadline, &now, -10 ); mutex_lock( &uplink->queueLock ); - for (i = 0; i < uplink->queueLen; ++i) { - if ( uplink->queue[i].status != ULR_FREE && timing_reached( &uplink->queue[i].entered, &deadline ) ) { - snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" - "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)uplink, i, uplink->queue[i].client->image->name, - uplink->queue[i].from, uplink->queue[i].to, uplink->queue[i].status ); - uplink->queue[i].entered = now; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( timing_reached( &it->entered, &deadline ) ) { + logadd( LOG_WARNING, "Starving request detected:" + " (from %" PRIu64 " to %" PRIu64 ", sent: %d) %s:%d", + it->from, it->to, (int)it->sent, PIMG(uplink->image) ); + it->entered = now; #ifdef _DEBUG_RESEND_STARVING - uplink->queue[i].status = ULR_NEW; + it->sent = false; resend = true; #endif - mutex_unlock( &uplink->queueLock ); - logadd( LOG_WARNING, "%s", buffer ); - mutex_lock( &uplink->queueLock ); } } mutex_unlock( &uplink->queueLock ); @@ -667,37 +689,54 @@ cleanup: ; */ static void uplink_sendRequests(dnbd3_uplink_t *uplink, bool newOnly) { - // Scan for new requests - int j; + // Scan for new requests, or optionally, (re)send all + // Build a buffer, so if there aren't too many requests, we can send them after + // unlocking the queue again. Otherwise we need flushes during iteration, which + // is no ideal, but in that case the uplink is probably overwhelmed anyways. + // Try 125 as that's exactly 300bytes, usually 2*MTU. +#define MAX_RESEND_BATCH 125 + dnbd3_request_t reqs[MAX_RESEND_BATCH]; + int count = 0; mutex_lock( &uplink->queueLock ); - for (j = 0; j < uplink->queueLen; ++j) { - if ( uplink->queue[j].status != ULR_NEW && (newOnly || uplink->queue[j].status != ULR_PENDING) ) continue; - uplink->queue[j].status = ULR_PENDING; - uint8_t hops = uplink->queue[j].hopCount; - const uint64_t reqStart = uplink->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); - const uint32_t reqSize = (uint32_t)(((uplink->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); - /* - logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", - (void*)uplink, j, uplink->queue[j].status, uplink->queue[j].handle, uplink->queue[j].from, uplink->queue[j].to, reqStart, reqStart+reqSize ); - */ - mutex_unlock( &uplink->queueLock ); - if ( hops < 200 ) ++hops; - mutex_lock( &uplink->sendMutex ); - const bool ret = dnbd3_get_block( uplink->current.fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( uplink->current.version, hops ) ); - if ( likely( ret ) ) { - mutex_unlock( &uplink->sendMutex ); - } else { - // Non-critical - if the connection dropped or the server was changed - // the thread will re-send this request as soon as the connection - // is reestablished. - uplink->image->problem.uplink = true; + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( newOnly && it->sent ) + continue; + it->sent = true; + dnbd3_request_t *hdr = &reqs[count++]; + hdr->magic = dnbd3_packet_magic; + hdr->cmd = CMD_GET_BLOCK; + hdr->size = it->to - it->from; + hdr->offset_small = it->from; + hdr->hops = it->hopCount + 1; + hdr->handle = it->handle; + fixup_request( *hdr ); + if ( count == MAX_RESEND_BATCH ) { + bool ok = false; + logadd( LOG_DEBUG2, "BLOCKING resend of %d", count ); + count = 0; + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + ok = ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH, 3 ) + == DNBD3_REQUEST_SIZE * MAX_RESEND_BATCH ); + } mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); - return; + if ( !ok ) { + uplink->image->problem.uplink = true; + break; + } } - mutex_lock( &uplink->queueLock ); } mutex_unlock( &uplink->queueLock ); + if ( count != 0 ) { + mutex_lock( &uplink->sendMutex ); + if ( uplink->current.fd != -1 ) { + uplink->image->problem.uplink = + ( sock_sendAll( uplink->current.fd, reqs, DNBD3_REQUEST_SIZE * count, 3 ) + != DNBD3_REQUEST_SIZE * count ); + } + mutex_unlock( &uplink->sendMutex ); + } +#undef MAX_RESEND_BATCH } /** @@ -720,71 +759,73 @@ static bool uplink_sendReplicationRequest(dnbd3_uplink_t *uplink) return false; // Should never be called in this state, consider send error if ( _backgroundReplication == BGR_DISABLED || uplink->cacheFd == -1 ) return true; // Don't do background replication - if ( uplink->nextReplicationIndex == -1 || uplink->replicationHandle != REP_NONE ) - return true; // Already a replication request on the wire, or no more blocks to replicate + if ( uplink->nextReplicationIndex == -1 ) + return true; // No more blocks to replicate dnbd3_image_t * const image = uplink->image; if ( image->users < _bgrMinClients ) return true; // Not enough active users + const int numNewRequests = numWantedReplicationRequests( uplink ); + if ( numNewRequests <= 0 ) + return true; // Already sufficient amount of requests on the wire dnbd3_cache_map_t *cache = ref_get_cachemap( image ); - if ( cache == NULL || image->users ) { + if ( cache == NULL ) { // No cache map (=image complete) - ref_put( &cache->reference ); return true; } const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); const int lastBlockIndex = mapBytes - 1; - int endByte; - if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks - endByte = uplink->nextReplicationIndex + mapBytes; - } else { // Hashblock based: Only look for match in current hash block - endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; - if ( endByte > mapBytes ) { - endByte = mapBytes; + for ( int bc = 0; bc < numNewRequests; ++bc ) { + int endByte; + if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks + endByte = uplink->nextReplicationIndex + mapBytes; + } else { // Hashblock based: Only look for match in current hash block + endByte = ( uplink->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; + if ( endByte > mapBytes ) { + endByte = mapBytes; + } } - } - atomic_thread_fence( memory_order_acquire ); - int replicationIndex = -1; - for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) { - const int i = j % ( mapBytes ); // Wrap around for BGR_FULL - if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff - && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) { - // Found incomplete one - replicationIndex = i; + atomic_thread_fence( memory_order_acquire ); + int replicationIndex = -1; + for ( int j = uplink->nextReplicationIndex; j < endByte; ++j ) { + const int i = j % ( mapBytes ); // Wrap around for BGR_FULL + if ( atomic_load_explicit( &cache->map[i], memory_order_relaxed ) != 0xff + && ( i != lastBlockIndex || !uplink->replicatedLastBlock ) ) { + // Found incomplete one + replicationIndex = i; + break; + } + } + if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { + // Nothing left in current block, find next one + replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte ); + } + if ( replicationIndex == -1 ) { + // Replication might be complete, uplink_mainloop should take care.... + uplink->nextReplicationIndex = -1; break; } + const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; + const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); + const uint64_t handle = ++uplink->queueId; + if ( !uplink_request( uplink, NULL, handle, offset, size, 0 ) ) { + logadd( LOG_DEBUG1, "Error sending background replication request to uplink server (%s:%d)", + PIMG(uplink->image) ); + ref_put( &cache->reference ); + return false; + } + if ( replicationIndex == lastBlockIndex ) { + uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks + } + uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter + if ( _backgroundReplication == BGR_HASHBLOCK + && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { + // Just crossed a hash block boundary, look for new candidate starting at this very index + uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex ); + if ( uplink->nextReplicationIndex == -1 ) + break; + } } ref_put( &cache->reference ); - if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { - // Nothing left in current block, find next one - replicationIndex = uplink_findNextIncompleteHashBlock( uplink, endByte ); - } - if ( replicationIndex == -1 ) { - // Replication might be complete, uplink_mainloop should take care.... - uplink->nextReplicationIndex = -1; - return true; - } - const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; - uplink->replicationHandle = offset; - const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); - mutex_lock( &uplink->sendMutex ); - bool sendOk = dnbd3_get_block( uplink->current.fd, offset, size, uplink->replicationHandle, COND_HOPCOUNT( uplink->current.version, 1 ) ); - if ( likely( sendOk ) ) { - mutex_unlock( &uplink->sendMutex ); - } else { - uplink->image->problem.uplink = true; - mutex_unlock( &uplink->sendMutex ); - logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); - return false; - } - if ( replicationIndex == lastBlockIndex ) { - uplink->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks - } - uplink->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter - if ( _backgroundReplication == BGR_HASHBLOCK - && uplink->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { - // Just crossed a hash block boundary, look for new candidate starting at this very index - uplink->nextReplicationIndex = uplink_findNextIncompleteHashBlock( uplink, uplink->nextReplicationIndex ); - } return true; } @@ -845,7 +886,7 @@ static int uplink_findNextIncompleteHashBlock(dnbd3_uplink_t *uplink, const int static void uplink_handleReceive(dnbd3_uplink_t *uplink) { dnbd3_reply_t inReply, outReply; - int ret, i; + int ret; for (;;) { ret = dnbd3_read_reply( uplink->current.fd, &inReply, false ); if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !uplink->shutdown ) ) continue; @@ -881,13 +922,34 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) } // Payload read completely // Bail out if we're not interested - if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue; + if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) + continue; // Is a legit block reply - struct iovec iov[2]; - const uint64_t start = inReply.handle; - const uint64_t end = inReply.handle + inReply.size; totalBytesReceived += inReply.size; uplink->bytesReceived += inReply.size; + // Get entry from queue + dnbd3_queue_entry_t *entry; + mutex_lock( &uplink->queueLock ); + for ( entry = uplink->queue; entry != NULL; entry = entry->next ) { + if ( entry->handle == inReply.handle ) + break; + } + if ( entry == NULL ) { + mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + logadd( LOG_DEBUG1, "Received block reply on uplink, but handle %"PRIu64" is unknown (%s:%d)", + inReply.handle, PIMG(uplink->image) ); + continue; + } + const uint64_t start = entry->from; + const uint64_t end = entry->to; + mutex_unlock( &uplink->queueLock ); // Do not dereference pointer after unlock! + // We don't remove the entry from the list here yet, to slightly increase the chance of other + // clients attaching to this request while we write the data to disk + if ( end - start != inReply.size ) { + logadd( LOG_WARNING, "Received payload length does not match! (is: %"PRIu32", expect: %u, %s:%d)", + inReply.size, (unsigned int)( end - start ), PIMG(uplink->image) ); + } + struct iovec iov[2]; // 1) Write to cache file if ( unlikely( uplink->cacheFd == -1 ) ) { uplink_reopenCacheFd( uplink, false ); @@ -934,98 +996,76 @@ static void uplink_handleReceive(dnbd3_uplink_t *uplink) PIMG(uplink->image), err ); } } - // 2) Figure out which clients are interested in it - // Mark as ULR_PROCESSING, since we unlock repeatedly in the second loop - // below; this prevents uplink_request() from attaching to this request - // by populating a slot with index greater than the highest matching - // request with ULR_PROCESSING (assuming there is no ULR_PENDING or ULR_NEW - // where it's fine if the index is greater) + bool found = false; + dnbd3_queue_entry_t **it; mutex_lock( &uplink->queueLock ); - for (i = 0; i < uplink->queueLen; ++i) { - dnbd3_queued_request_t * const req = &uplink->queue[i]; - assert( req->status != ULR_PROCESSING ); - if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue; - assert( req->client != NULL ); - if ( req->from >= start && req->to <= end ) { // Match :-) - req->status = ULR_PROCESSING; - } - } - // 3) Send to interested clients - iterate backwards so request collaboration works, and - // so we can decrease queueLen on the fly while iterating. Should you ever change this to start - // from 0, you also need to change the "attach to existing request"-logic in uplink_request() - outReply.magic = dnbd3_packet_magic; - bool served = false; - for ( i = uplink->queueLen - 1; i >= 0; --i ) { - dnbd3_queued_request_t * const req = &uplink->queue[i]; - if ( req->status == ULR_PROCESSING ) { - size_t bytesSent = 0; - assert( req->from >= start && req->to <= end ); - dnbd3_client_t * const client = req->client; - outReply.cmd = CMD_GET_BLOCK; - outReply.handle = req->handle; - outReply.size = (uint32_t)( req->to - req->from ); - iov[0].iov_base = &outReply; - iov[0].iov_len = sizeof outReply; - iov[1].iov_base = uplink->recvBuffer + (req->from - start); - iov[1].iov_len = outReply.size; - fixup_reply( outReply ); - req->status = ULR_FREE; - req->client = NULL; - served = true; - mutex_lock( &client->sendMutex ); - mutex_unlock( &uplink->queueLock ); - if ( client->sock != -1 ) { - ssize_t sent = writev( client->sock, iov, 2 ); - if ( sent > (ssize_t)sizeof outReply ) { - bytesSent = (size_t)sent - sizeof outReply; - } - } - if ( bytesSent != 0 ) { - client->bytesSent += bytesSent; - } - mutex_unlock( &client->sendMutex ); - mutex_lock( &uplink->queueLock ); - if ( i > uplink->queueLen ) { - i = uplink->queueLen; // Might have been set to 0 by cancelAllRequests - } + for ( it = &uplink->queue; *it != NULL; it = &(**it).next ) { + if ( *it == entry && entry->handle == inReply.handle ) { // ABA check + assert( found == false ); + *it = (**it).next; + found = true; + uplink->queueLen--; + break; } - if ( req->status == ULR_FREE && i == uplink->queueLen - 1 ) uplink->queueLen--; } if ( uplink->queueLen < SERVER_UPLINK_QUEUELEN_THRES ) { uplink->image->problem.queue = false; } mutex_unlock( &uplink->queueLock ); -#ifdef _DEBUG - if ( !served && start != uplink->replicationHandle ) { - logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)uplink, uplink->image->name, start, end ); + if ( !found ) { + logadd( LOG_DEBUG1, "Replication request vanished from queue after writing to disk (%s:%d)", + PIMG(uplink->image) ); + continue; } -#endif - if ( start == uplink->replicationHandle ) { - // Was our background replication - uplink->replicationHandle = REP_NONE; - // Try to remove from fs cache if no client was interested in this data - if ( !served && uplink->cacheFd != -1 ) { - posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); + outReply.magic = dnbd3_packet_magic; + dnbd3_queue_client_t *next; + for ( dnbd3_queue_client_t *c = entry->clients; c != NULL; c = next ) { + size_t bytesSent = 0; + assert( c->from >= start && c->to <= end ); + dnbd3_client_t * const client = c->client; + outReply.cmd = CMD_GET_BLOCK; + outReply.handle = c->handle; + outReply.size = (uint32_t)( c->to - c->from ); + iov[0].iov_base = &outReply; + iov[0].iov_len = sizeof outReply; + iov[1].iov_base = uplink->recvBuffer + (c->from - start); + iov[1].iov_len = outReply.size; + fixup_reply( outReply ); + mutex_lock( &client->sendMutex ); + if ( client->sock != -1 ) { + ssize_t sent = writev( client->sock, iov, 2 ); + if ( sent > (ssize_t)sizeof outReply ) { + bytesSent = (size_t)sent - sizeof outReply; + } + if ( bytesSent != 0 ) { + client->bytesSent += bytesSent; + } } + mutex_unlock( &client->sendMutex ); + client->relayedCount--; + next = c->next; + free( c ); } - if ( served ) { + if ( entry->clients != NULL ) { // Was some client -- reset idle counter uplink->idleTime = 0; // Re-enable replication if disabled if ( uplink->nextReplicationIndex == -1 ) { uplink->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; } + } else { + if ( uplink->cacheFd != -1 ) { + // Try to remove from fs cache if no client was interested in this data + posix_fadvise( uplink->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); + } } + free( entry ); + } // main receive loop + // Trigger background replication if applicable + if ( !uplink_sendReplicationRequest( uplink ) ) { + goto error_cleanup; } - if ( uplink->replicationHandle == REP_NONE ) { - mutex_lock( &uplink->queueLock ); - const bool rep = ( uplink->queueLen == 0 ); - mutex_unlock( &uplink->queueLock ); - if ( rep ) { - if ( !uplink_sendReplicationRequest( uplink ) ) - goto error_cleanup; - } - } + // Normal end return; // Error handling from failed receive or message parsing error_cleanup: ; @@ -1046,7 +1086,6 @@ static void uplink_connectionFailed(dnbd3_uplink_t *uplink, bool findNew) close( uplink->current.fd ); uplink->current.fd = -1; mutex_unlock( &uplink->sendMutex ); - uplink->replicationHandle = REP_NONE; if ( _backgroundReplication == BGR_FULL && uplink->nextReplicationIndex == -1 ) { uplink->nextReplicationIndex = 0; } @@ -1156,3 +1195,39 @@ bool uplink_getHostString(dnbd3_uplink_t *uplink, char *buffer, size_t len) return false; return altservers_toString( current, buffer, len ); } + +/** + * Get number of replication requests that should be sent right now to + * meet the configured bgrWindowSize. Returns 0 if any client requests + * are pending + */ +static int numWantedReplicationRequests(dnbd3_uplink_t *uplink) +{ + int ret = MIN( _bgrWindowSize, uplink->idleTime + 1 ); + if ( uplink->queueLen == 0 ) + return ret; + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->clients == NULL ) { + ret--; + } else { + ret = 0; // Do not allow BGR if client requests are being handled + break; + } + } + mutex_unlock( &uplink->queueLock ); + return ret; +} + +static void markRequestUnsent(dnbd3_uplink_t *uplink, uint64_t handle) +{ + mutex_lock( &uplink->queueLock ); + for ( dnbd3_queue_entry_t *it = uplink->queue; it != NULL; it = it->next ) { + if ( it->handle == handle ) { + it->sent = false; + break; + } + } + mutex_unlock( &uplink->queueLock ); +} + diff --git a/src/server/uplink.h b/src/server/uplink.h index 49ff0b4..8f69b05 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -12,7 +12,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version void uplink_removeClient(dnbd3_uplink_t *uplink, dnbd3_client_t *client); -bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount); +bool uplink_request(dnbd3_uplink_t *uplink, dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops); bool uplink_shutdown(dnbd3_image_t *image); diff --git a/src/serverconfig.h b/src/serverconfig.h index 5c7301d..31708de 100644 --- a/src/serverconfig.h +++ b/src/serverconfig.h @@ -13,7 +13,8 @@ #define SERVER_BAD_UPLINK_MAX 20 // Hard block server if it failed this many times #define SERVER_BAD_UPLINK_LOCAL_BLOCK 10 // If a server didn't supply the requested image this many times, block it for some time #define SERVER_BAD_UPLINK_IGNORE 180 // How many seconds is a server ignored -#define SERVER_MAX_UPLINK_QUEUE 1500 // Maximum number of queued requests per uplink +#define UPLINK_MAX_QUEUE 500 // Maximum number of queued requests per uplink +#define UPLINK_MAX_CLIENTS_PER_REQUEST 32 // Maximum number of clients that can attach to one uplink request #define SERVER_UPLINK_QUEUELEN_THRES 900 // Threshold where we start dropping incoming clients #define SERVER_MAX_PENDING_ALT_CHECKS 500 // Length of queue for pending alt checks requested by uplinks -- cgit v1.2.3-55-g7522