summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2015-12-15 17:45:44 +0100
committerSimon Rettberg2015-12-15 17:45:44 +0100
commit72104f2e83fa724f9667c876dca17a2c5ee9b2a2 (patch)
tree38837580c70b390f0bc35c15d2bc4d0865a9f3c4
parent[SERVER] Make listen port configurable (diff)
downloaddnbd3-72104f2e83fa724f9667c876dca17a2c5ee9b2a2.tar.gz
dnbd3-72104f2e83fa724f9667c876dca17a2c5ee9b2a2.tar.xz
dnbd3-72104f2e83fa724f9667c876dca17a2c5ee9b2a2.zip
[SERVER] Remove non-working images from list, plus refactoring
Now that we can automatically load unknown images from disk on request, it makes sense to remove non-working images from the image list. On future requests, we will look for them on disk again, which is nice in case of temporary storage hickups. Also, some more ore less related locking has been refined (loading images, replicating images)
-rw-r--r--src/config.h4
-rw-r--r--src/server/globals.c2
-rw-r--r--src/server/globals.h6
-rw-r--r--src/server/helper.h2
-rw-r--r--src/server/image.c226
-rw-r--r--src/server/net.c192
-rw-r--r--src/server/uplink.c18
-rw-r--r--src/shared/sockhelper.c2
8 files changed, 318 insertions, 134 deletions
diff --git a/src/config.h b/src/config.h
index 3ebfd54..f5b550c 100644
--- a/src/config.h
+++ b/src/config.h
@@ -28,10 +28,10 @@
#define SERVER_MAX_CLIENTS 400
#define SERVER_MAX_IMAGES 5000
#define SERVER_MAX_ALTS 250
-#define SERVER_MAX_UPLINK_QUEUE 1500
#define SERVER_MAX_UPLINK_FAILS 8 // How many times may a server fail until it is considered bad
#define SERVER_BAD_UPLINK_IGNORE 120 // How many seconds is a server considered bad?
-#define SERVER_UPLINK_QUEUELEN_THRES 900
+#define SERVER_MAX_UPLINK_QUEUE 1500 // Maximum number of queued requests per uplink
+#define SERVER_UPLINK_QUEUELEN_THRES 900 // Threshold where we start dropping incoming clients
#define SERVER_MAX_PENDING_ALT_CHECKS 50
#define SERVER_CACHE_MAP_SAVE_INTERVAL 90
diff --git a/src/server/globals.c b/src/server/globals.c
index 1b0e8f3..5a7de89 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -12,6 +12,7 @@ bool _vmdkLegacyMode = false;
volatile bool _shutdown = false;
int _serverPenalty = 0;
int _clientPenalty = 0;
+bool _removeMissingImages = true;
bool _isProxy = false;
bool _proxyPrivateOnly = false;
bool _backgroundReplication = true;
@@ -32,6 +33,7 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key
SAVE_TO_VAR_BOOL( dnbd3, isProxy );
SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly );
SAVE_TO_VAR_BOOL( dnbd3, backgroundReplication );
+ SAVE_TO_VAR_BOOL( dnbd3, removeMissingImages );
SAVE_TO_VAR_INT( dnbd3, serverPenalty );
SAVE_TO_VAR_INT( dnbd3, clientPenalty );
SAVE_TO_VAR_INT( dnbd3, uplinkTimeout );
diff --git a/src/server/globals.h b/src/server/globals.h
index 575f3ab..0bf34de 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -111,6 +111,7 @@ struct _dnbd3_image
int users; // clients currently using this image
int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server
time_t atime; // last access time
+ time_t lastWorkCheck; // last time a non-working image has been checked
bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected
pthread_spinlock_t lock;
};
@@ -170,6 +171,11 @@ extern bool _isProxy;
extern bool _proxyPrivateOnly;
/**
+ * Whether to remove missing images from image list on SIGHUP
+ */
+extern bool _removeMissingImages;
+
+/**
* Read timeout when waiting for or sending data on an uplink
*/
extern int _uplinkTimeout;
diff --git a/src/server/helper.h b/src/server/helper.h
index 4ab2134..389a98c 100644
--- a/src/server/helper.h
+++ b/src/server/helper.h
@@ -33,7 +33,7 @@ static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_h
* Test whether string ends in suffix.
* @return true if string =~ /suffix$/
*/
-static inline int strend(char *string, char *suffix)
+static inline bool strend(char *string, char *suffix)
{
if ( string == NULL ) return false;
if ( suffix == NULL || *suffix == '\0' ) return true;
diff --git a/src/server/image.c b/src/server/image.c
index fb3f8ba..d4df26d 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -26,6 +26,7 @@
#include <glob.h>
#define PATHLEN (2000)
+#define NONWORKING_RECHECK_INTERVAL_SECONDS (60)
// ##########################################
@@ -44,10 +45,10 @@ typedef struct
time_t deadline;
} imagecache;
static imagecache remoteCloneCache[CACHELEN];
-static int remoteCloneCacheIndex = 0;
// ##########################################
+static void image_remove(dnbd3_image_t *image);
static dnbd3_image_t* image_free(dnbd3_image_t *image);
static bool image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize);
static bool image_load_all_internal(char *base, char *path);
@@ -283,18 +284,65 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
candidate->users++;
spin_unlock( &candidate->lock );
- if ( !checkIfWorking ) return candidate;
+ if ( !checkIfWorking ) return candidate; // Found, but not interested in working state
// Found, see if it works
- if ( !candidate->working && candidate->cache_map != NULL && candidate->uplink == NULL && file_isWritable( candidate->path ) ) {
- // Not working and has file + cache-map, try to init uplink (uplink_init will check if proxy mode is enabled)
- uplink_init( candidate, -1, NULL );
- } else if ( candidate->working && candidate->uplink != NULL && candidate->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
- // To many pending uplink requests. We take that as a hint that the uplink is clogged or no working uplink server
- // exists, so "working" is changed to false for now. Should a new uplink server be found the uplink thread will
- // set this back to true some time.
- candidate->working = false;
+
+ if ( candidate->working ) {
+ // Last known state was "working", see if that should change
+ if ( candidate->readFd == -1 ) {
+ candidate->working = false;
+ }
+ } else { // ...not working...
+ // Don't re-check too often
+ spin_lock( &candidate->lock );
+ bool check;
+ const time_t now = time( NULL );
+ check = ( now - candidate->lastWorkCheck ) > NONWORKING_RECHECK_INTERVAL_SECONDS;
+ if ( check ) {
+ candidate->lastWorkCheck = now;
+ }
+ spin_unlock( &candidate->lock );
+ if ( !check ) {
+ return candidate;
+ }
+ // Check if the local file exists, has the right size, and is readable (writable for incomplete image)
+ if ( candidate->cache_map != NULL ) {
+ // -- Incomplete - rw check
+ if ( candidate->cacheFd == -1 ) { // Make sure file is open for writing
+ candidate->cacheFd = open( candidate->path, O_RDWR );
+ // It might have failed - still offer proxy mode, we just can't cache
+ if ( candidate->cacheFd == -1 ) {
+ logadd( LOG_WARNING, "Cannot re-open %s for writing - replication disabled", candidate->path );
+ }
+ }
+ if ( candidate->uplink == NULL && candidate->cacheFd != -1 ) {
+ uplink_init( candidate, -1, NULL );
+ }
+ }
+ // Common for ro and rw images
+ const off_t len = lseek( candidate->readFd, 0, SEEK_END );
+ if ( len == -1 ) {
+ logadd( LOG_WARNING, "lseek() on %s failed (errno=%d), removing image", candidate->path, errno );
+ image_remove( candidate ); // No release here, the image is still returned and should be released by caller
+ } else if ( (uint64_t)len != candidate->realFilesize ) {
+ logadd( LOG_DEBUG1, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64
+ ". Try sending SIGHUP to server if you know what you're doing.",
+ candidate->path, candidate->realFilesize, (uint64_t)len );
+ } else {
+ // Seek worked, file size is same, now see if we can read from file
+ char buffer[100];
+ if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) {
+ logadd( LOG_DEBUG2, "Reading first %d bytes from %s failed (errno=%d), removing image",
+ (int)sizeof(buffer), candidate->path, errno );
+ image_remove( candidate );
+ } else {
+ // Seems everything is fine again \o/
+ candidate->working = true;
+ }
+ }
}
+
return candidate; // Success :-)
}
@@ -304,7 +352,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
* Every call to image_lock() needs to be followed by a call to image_release() at some point.
* Locks on: imageListLock, _images[].lock
*/
-dnbd3_image_t* image_lock(dnbd3_image_t *image)
+dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places that do image->users--
{
if ( image == NULL ) return NULL ;
int i;
@@ -361,7 +409,7 @@ dnbd3_image_t* image_release(dnbd3_image_t *image)
* no active users
* Locks on: imageListLock, image[].lock
*/
-void image_remove(dnbd3_image_t *image)
+static void image_remove(dnbd3_image_t *image)
{
bool wasInList = false;
spin_lock( &imageListLock );
@@ -399,13 +447,46 @@ void image_killUplinks()
/**
* Load all images in given path recursively.
* Pass NULL to use path from config.
- * NOT THREAD SAFE, make sure this is only running
- * on one thread at a time!
*/
bool image_loadAll(char *path)
{
bool ret;
+ char imgPath[PATHLEN];
+ int imgId;
+ dnbd3_image_t *imgHandle;
+
if ( path == NULL ) path = _basePath;
+ if ( _removeMissingImages ) {
+ // Check if all loaded images still exist on disk
+ logadd( LOG_DEBUG1, "Checking for vanished images" );
+ spin_lock( &imageListLock );
+ for (int i = _num_images - 1; i >= 0; --i) {
+ if ( _images[i] == NULL ) {
+ if ( i + 1 == _num_images ) _num_images--;
+ continue;
+ }
+ imgId = _images[i]->id;
+ snprintf( imgPath, PATHLEN, "%s", _images[i]->path );
+ spin_unlock( &imageListLock ); // isReadable hits the fs; unlock
+ // Check if fill can still be opened for reading
+ ret = file_isReadable( imgPath );
+ // Lock again, see if image is still there, free if required
+ spin_lock( &imageListLock );
+ if ( ret || i >= _num_images || _images[i] == NULL || _images[i]->id != imgId ) continue;
+ // Image needs to be removed
+ imgHandle = _images[i];
+ _images[i] = NULL;
+ if ( i + 1 == _num_images ) _num_images--;
+ if ( imgHandle->users != 0 ) continue; // Still in use, do not free (last releasing user will trigger)
+ // Image is not in use anymore, free the dangling entry immediately
+ spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock
+ image_free( imgHandle );
+ spin_lock( &imageListLock );
+ }
+ spin_unlock( &imageListLock );
+ }
+ // Now scan for new images
+ logadd( LOG_DEBUG1, "Scanning for new or modified images" );
pthread_mutex_lock( &reloadLock );
ret = image_load_all_internal( path, path );
pthread_mutex_unlock( &reloadLock );
@@ -438,6 +519,9 @@ bool image_tryFreeAll()
static dnbd3_image_t* image_free(dnbd3_image_t *image)
{
assert( image != NULL );
+ if ( !_shutdown ) {
+ logadd( LOG_INFO, "Freeing image %s:%d", image->lower_name, (int)image->rid );
+ }
//
image_saveCacheMap( image );
uplink_shutdown( image );
@@ -511,7 +595,7 @@ static bool image_load_all_internal(char *base, char *path)
logadd( LOG_WARNING, "stat() for '%s' failed. Ignoring....", subpath );
continue;
}
- if ( S_ISDIR( st.st_mode )) {
+ if ( S_ISDIR( st.st_mode ) ) {
image_load_all_internal( base, subpath ); // Recurse
} else {
image_load( base, subpath, true ); // Load image if possible
@@ -531,7 +615,7 @@ static bool image_load_all_internal(char *base, char *path)
static bool image_load(char *base, char *path, int withUplink)
{
static int imgIdCounter = 0; // Used to assign unique numeric IDs to images
- int i, revision;
+ int i, revision = -1;
struct stat st;
uint8_t *cache_map = NULL;
uint32_t *crc32list = NULL;
@@ -563,7 +647,7 @@ static bool image_load(char *base, char *path, int withUplink)
// Easy - legacy mode, simply append full file name and set rid to 1
strcat( dst, fileName );
revision = 1;
- } else {
+ } else if ( !_vmdkLegacyMode ) {
// Try to parse *.r<ID> syntax
for (i = fileNameLen - 1; i > 1; --i) {
if ( fileName[i] < '0' || fileName[i] > '9' ) break;
@@ -578,7 +662,7 @@ static bool image_load(char *base, char *path, int withUplink)
}
*dst = '\0';
}
- if ( revision <= 0 ) {
+ if ( revision <= 0 || revision >= 65536 ) {
logadd( LOG_WARNING, "Image '%s' has invalid revision ID %d", path, revision );
goto load_error;
}
@@ -612,7 +696,7 @@ static bool image_load(char *base, char *path, int withUplink)
// 1. Allocate memory for the cache map if the image is incomplete
cache_map = image_loadCacheMap( path, virtualFilesize );
- // TODO: Maybe try sha-256 or 512 first if you're paranoid (to be implemented)
+ // XXX: Maybe try sha-256 or 512 first if you're paranoid (to be implemented)
// 2. Load CRC-32 list of image
uint32_t masterCrc;
@@ -630,34 +714,34 @@ static bool image_load(char *base, char *path, int withUplink)
// Compare data just loaded to identical image we apparently already loaded
if ( existing != NULL ) {
if ( existing->realFilesize != realFilesize ) {
- // Image will be replaced below
logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", existing->lower_name, (int)existing->rid );
+ // Image will be replaced below
} else if ( existing->crc32 != NULL && crc32list != NULL
&& memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlockCount ) != 0 ) {
- // Image will be replaced below
logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", existing->lower_name, (int)existing->rid );
logadd( LOG_WARNING, "The image will be reloaded, but you should NOT replace existing images while the server is running." );
logadd( LOG_WARNING, "Actually even if it's not running this should never be done. Use a new RID instead!" );
+ // Image will be replaced below
} else if ( existing->crc32 == NULL && crc32list != NULL ) {
logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", existing->lower_name, (int)existing->rid );
existing->crc32 = crc32list;
existing->masterCrc32 = masterCrc;
crc32list = NULL;
function_return = true;
- goto load_error;
+ goto load_error; // Keep existing
} else if ( existing->cache_map != NULL && cache_map == NULL ) {
// Just ignore that fact, if replication is really complete the cache map will be removed anyways
logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", existing->lower_name, (int)existing->rid );
function_return = true;
- goto load_error;
+ goto load_error; // Keep existing
} else {
// Nothing changed about the existing image, so do nothing
function_return = true;
- goto load_error;
+ goto load_error; // Keep existing
}
- // Remove image from images array
- image_release( existing );
+ // Remove existing image from images array, so it will be replaced by the reloaded image
image_remove( existing );
+ image_release( existing );
existing = NULL;
}
@@ -714,7 +798,7 @@ static bool image_load(char *base, char *path, int withUplink)
spin_lock( &imageListLock );
// Now we're locked, assign unique ID to image (unique for this running server instance!)
image->id = ++imgIdCounter;
- for (i = 0; i < _num_images; ++i) {
+ for ( i = 0; i < _num_images; ++i ) {
if ( _images[i] != NULL ) continue;
_images[i] = image;
break;
@@ -727,7 +811,7 @@ static bool image_load(char *base, char *path, int withUplink)
goto load_error;
}
_images[_num_images++] = image;
- logadd( LOG_DEBUG1, "Loaded image '%s'\n", image->lower_name );
+ logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->lower_name, (int)image->rid );
}
// Keep fd for reading
image->readFd = fdImage;
@@ -888,7 +972,7 @@ bool image_create(char *image, int revision, uint64_t size)
close( fdCache );
return true;
//
- failure_cleanup: ;
+failure_cleanup: ;
if ( fdImage >= 0 ) close( fdImage );
if ( fdCache >= 0 ) close( fdCache );
remove( path );
@@ -913,8 +997,11 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
*/
dnbd3_image_t* image_getOrLoad(char * const name, const uint16_t revision)
{
- // not proxy, specific revision - nothing to do
- if ( !_isProxy && revision != 0 ) return image_get( name, revision, true );
+ // specific revision - try shortcut
+ if ( revision != 0 ) {
+ dnbd3_image_t *image = image_get( name, revision, true );
+ if ( image != NULL ) return image;
+ }
const size_t len = strlen( name );
// Sanity check
if ( len == 0 || name[len - 1] == '/' || name[0] == '/'
@@ -935,14 +1022,15 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
{
int i;
// Already existing locally?
- dnbd3_image_t *image = image_get( name, revision, true );
- // exists and specific revision - nothing to do
- if ( image != NULL && revision != 0 ) return image;
+ dnbd3_image_t *image = NULL;
+ if ( revision == 0 ) {
+ image = image_get( name, revision, true );
+ }
// Doesn't exist or is rid 0, try remote if not already tried it recently
const time_t now = time( NULL );
char *cmpname = name;
- int useIndex = -1;
+ int useIndex = -1, fallbackIndex = 0;
if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN;
pthread_mutex_lock( &remoteCloneLock );
for (i = 0; i < CACHELEN; ++i) {
@@ -952,6 +1040,9 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked...
return image;
}
+ if ( remoteCloneCache[i].deadline < remoteCloneCache[fallbackIndex].deadline ) {
+ fallbackIndex = i;
+ }
}
// Re-check to prevent two clients at the same time triggering this,
// but only if rid != 0, since we would just get an old rid then
@@ -966,11 +1057,13 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
serialized_buffer_t serialized;
// Mark as recently checked
if ( useIndex == -1 ) {
- useIndex = remoteCloneCacheIndex = (remoteCloneCacheIndex + 1) % CACHELEN;
+ useIndex = fallbackIndex;
}
remoteCloneCache[useIndex].deadline = now + SERVER_REMOTE_IMAGE_CHECK_CACHETIME;
snprintf( remoteCloneCache[useIndex].name, NAMELEN, "%s", cmpname );
remoteCloneCache[useIndex].rid = revision;
+ pthread_mutex_unlock( &remoteCloneLock );
+
// Get some alt servers and try to get the image from there
dnbd3_host_t servers[4];
int uplinkSock = -1;
@@ -980,26 +1073,32 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
uint16_t remoteRid = revision;
uint64_t remoteImageSize;
for (i = 0; i < count; ++i) {
+ char *remoteName;
+ bool ok = false;
int sock = sock_connect( &servers[i], 750, _uplinkTimeout );
- if ( sock < 0 ) continue;
+ if ( sock == -1 ) continue;
if ( !dnbd3_select_image( sock, name, revision, FLAGS8_SERVER ) ) goto server_fail;
- char *remoteName;
if ( !dnbd3_select_image_reply( &serialized, sock, &remoteProtocolVersion, &remoteName, &remoteRid, &remoteImageSize ) ) goto server_fail;
if ( remoteProtocolVersion < MIN_SUPPORTED_SERVER || remoteRid == 0 ) goto server_fail;
- if ( revision != 0 && remoteRid != revision ) goto server_fail;
+ if ( revision != 0 && remoteRid != revision ) goto server_fail; // Want specific revision but uplink supplied different rid
if ( revision == 0 && image != NULL && image->rid >= remoteRid ) goto server_fail; // Not actually a failure: Highest remote rid is <= highest local rid - don't clone!
if ( remoteImageSize < DNBD3_BLOCK_SIZE || remoteName == NULL || strcmp( name, remoteName ) != 0 ) goto server_fail;
if ( remoteImageSize > SERVER_MAX_PROXY_IMAGE_SIZE ) goto server_fail;
- if ( !image_ensureDiskSpace( remoteImageSize ) ) goto server_fail;
- if ( !image_clone( sock, name, remoteRid, remoteImageSize ) ) goto server_fail; // This sets up the file+map+crc
+ pthread_mutex_lock( &reloadLock );
+ ok = image_ensureDiskSpace( remoteImageSize )
+ && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img
+ pthread_mutex_unlock( &reloadLock );
+ if ( !ok ) goto server_fail;
+
// Cloning worked :-)
uplinkSock = sock;
uplinkServer = &servers[i];
break;
- server_fail: ;
+
+server_fail: ;
close( sock );
}
- pthread_mutex_unlock( &remoteCloneLock );
+
// If we still have a pointer to a local image, release the reference
if ( image != NULL ) image_release( image );
// If everything worked out, this call should now actually return the image
@@ -1030,9 +1129,10 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
uint16_t detectedRid = 0;
if ( _vmdkLegacyMode ) {
- // TODO
- assert( 0 );
- detectedRid = requestedRid;
+ if ( strend( name, ".vmdk" ) ) {
+ snprintf( imageFile, PATHLEN, "%s/%s", _basePath, name );
+ detectedRid = MAX( 1, requestedRid );
+ }
} else if ( requestedRid != 0 ) {
snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, requestedRid );
detectedRid = requestedRid;
@@ -1044,8 +1144,9 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
if ( ret == 0 ) {
long int best = 0;
for ( size_t i = 0; i < g.gl_pathc; ++i ) {
- char *rev = strrchr( g.gl_pathv[i], 'r' );
- if ( rev == NULL ) continue;
+ const char * const path = g.gl_pathv[i];
+ const char * rev = strrchr( path, 'r' );
+ if ( rev == NULL || rev == path || *(rev - 1) != '.' ) continue;
rev++;
if ( *rev < '0' || *rev > '9' ) continue;
char *err = NULL;
@@ -1062,22 +1163,45 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
}
globfree( &g );
}
+ logadd( LOG_DEBUG2, "Trying to load %s:%d ( -> %d) as %s", name, (int)requestedRid, (int)detectedRid, imageFile );
// No file was determined, or it doesn't seem to exist/be readable
- if ( imageFile[0] == '\0' || !file_isReadable( imageFile ) ) { // XXX glob fallback to rid-1? Rework above
+ if ( imageFile[0] == '\0' ) {
+ logadd( LOG_DEBUG2, "Not found, bailing out" );
return image_get( name, detectedRid, true );
}
+ if ( requestedRid == 0 ) {
+ // rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0
+ while ( detectedRid != 0 ) {
+ dnbd3_image_t *image = image_get( name, detectedRid, true );
+ if ( image != NULL ) {
+ // globbed rid already loaded, return
+ return image;
+ }
+ if ( file_isReadable( imageFile ) ) {
+ // globbed rid is
+ break;
+ }
+ logadd( LOG_DEBUG2, "%s: rid %d globbed but not readable, trying lower rid...", name, (int)detectedRid );
+ detectedRid--;
+ snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, requestedRid );
+ }
+ }
+
// Now lock on the loading mutex, then check again if the image exists (we're multi-threaded)
pthread_mutex_lock( &reloadLock );
dnbd3_image_t* image = image_get( name, detectedRid, true );
if ( image != NULL ) {
// The image magically appeared in the meantime
+ logadd( LOG_DEBUG2, "Magically appeared" );
pthread_mutex_unlock( &reloadLock );
return image;
}
// Still not loaded, let's try to do so
+ logadd( LOG_DEBUG2, "Calling load" );
image_load( _basePath, imageFile, false );
pthread_mutex_unlock( &reloadLock );
// If loading succeeded, this will return the image
+ logadd( LOG_DEBUG2, "Calling get" );
return image_get( name, requestedRid, true );
}
@@ -1333,7 +1457,7 @@ static bool image_calcBlockCrc32(const int fd, const int block, const uint32_t r
*/
static bool image_ensureDiskSpace(uint64_t size)
{
- for (;;) {
+ for ( int maxtries = 0; maxtries < 20; ++maxtries ) {
const int64_t available = file_freeDiskSpace( _basePath );
if ( available == -1 ) {
const int e = errno;
@@ -1368,7 +1492,7 @@ static bool image_ensureDiskSpace(uint64_t size)
return false;
}
oldest = image_lock( oldest );
- if ( oldest == NULL ) return false;
+ if ( oldest == NULL ) continue; // Image freed in the meantime? Try again
logadd( LOG_INFO, "'%s:%d' has to go!", oldest->lower_name, (int)oldest->rid );
unlink( oldest->path );
size_t len = strlen( oldest->path ) + 5 + 1;
diff --git a/src/server/net.c b/src/server/net.c
index ad228f4..a467620 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -85,7 +85,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" );
return false;
}
- if ( recv( sock, payload->buffer, size, MSG_WAITALL ) != size ) {
+ if ( sock_recv( sock, payload->buffer, size ) != size ) {
logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size );
return false;
}
@@ -94,34 +94,35 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
return true;
}
+/**
+ * Send reply with optional payload. payload can be null. The caller has to
+ * acquire the sendMutex first.
+ */
static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
{
- const unsigned int size = reply->size;
+ const size_t size = reply->size;
fixup_reply( *reply );
- if ( !payload || size == 0 ) {
- if ( send( sock, reply, sizeof(dnbd3_reply_t), 0 ) != sizeof(dnbd3_reply_t) ) {
- logadd( LOG_DEBUG1, "Send failed (header-only)\n" );
- return false;
- }
- } else {
- struct iovec iov[2];
- iov[0].iov_base = reply;
- iov[0].iov_len = sizeof(dnbd3_reply_t);
- iov[1].iov_base = payload;
- iov[1].iov_len = (size_t)size;
- if ( (size_t)writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) {
- logadd( LOG_DEBUG1, "Send failed (reply with payload of %u bytes)\n", size );
+ if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) {
+ logadd( LOG_DEBUG1, "Sending reply header to client failed" );
+ return false;
+ }
+ if ( size != 0 && payload != NULL ) {
+ if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) {
+ logadd( LOG_DEBUG1, "Sending payload of %u bytes to client failed", size );
return false;
}
}
return true;
}
+/**
+ * Send given amount of null bytes. The caller has to acquire the sendMutex first.
+ */
static inline bool sendPadding( const int fd, uint32_t bytes )
{
ssize_t ret;
while ( bytes >= sizeof(nullbytes) ) {
- ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 1 );
+ ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 );
if ( ret <= 0 )
return false;
bytes -= (uint32_t)ret;
@@ -131,8 +132,9 @@ static inline bool sendPadding( const int fd, uint32_t bytes )
uint64_t net_getTotalBytesSent()
{
+ // reads and writes to 64bit ints are not atomic on x86, so let's be safe and use locking
spin_lock( &statisticsSentLock );
- uint64_t tmp = totalBytesSent;
+ const uint64_t tmp = totalBytesSent;
spin_unlock( &statisticsSentLock );
return tmp;
}
@@ -156,10 +158,10 @@ void *net_client_handler(void *dnbd3_client)
bool hasName = false;
serialized_buffer_t payload;
- char *image_name;
uint16_t rid, client_version;
uint64_t start, end;
- char buffer[100];
+ uint32_t tempBytesSent = 0;
+ char hostPrintable[100];
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -174,25 +176,38 @@ void *net_client_handler(void *dnbd3_client)
if ( recv_request_header( client->sock, &request ) ) {
if ( request.cmd != CMD_SELECT_IMAGE ) {
logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd );
- } else {
- if ( recv_request_payload( client->sock, request.size, &payload ) ) {
- client_version = serializer_get_uint16( &payload );
- image_name = serializer_get_string( &payload );
- rid = serializer_get_uint16( &payload );
- client->isServer = serializer_get_uint8( &payload );
- if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
- if ( client_version < MIN_SUPPORTED_CLIENT ) {
- logadd( LOG_DEBUG1, "Client too old\n" );
- } else {
- logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
- }
+ } else if ( recv_request_payload( client->sock, request.size, &payload ) ) {
+ char *image_name;
+ client_version = serializer_get_uint16( &payload );
+ image_name = serializer_get_string( &payload );
+ rid = serializer_get_uint16( &payload );
+ client->isServer = serializer_get_uint8( &payload );
+ if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
+ if ( client_version < MIN_SUPPORTED_CLIENT ) {
+ logadd( LOG_DEBUG1, "Client too old\n" );
} else {
- client->image = image = image_getOrLoad( image_name, rid );
- if ( image == NULL ) {
- //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( !image->working ) {
- logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else {
+ logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
+ }
+ } else {
+ client->image = image = image_getOrLoad( image_name, rid );
+ if ( image == NULL ) {
+ //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else if ( !image->working ) {
+ logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else {
+ // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
+ bOk = true;
+ if ( image->cache_map != NULL ) {
+ spin_lock( &image->lock );
+ if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ bOk = ( rand() % 4 ) == 1;
+ }
+ spin_unlock( &image->lock );
+ if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ }
+ if ( bOk ) {
image_file = image->readFd;
serializer_reset_write( &payload );
serializer_put_uint16( &payload, PROTOCOL_VERSION );
@@ -201,9 +216,8 @@ void *net_client_handler(void *dnbd3_client)
serializer_put_uint64( &payload, image->virtualFilesize );
reply.cmd = CMD_SELECT_IMAGE;
reply.size = serializer_get_written_length( &payload );
- if ( send_reply( client->sock, &reply, &payload ) ) {
- if ( !client->isServer ) image->atime = time( NULL );
- bOk = true;
+ if ( !send_reply( client->sock, &reply, &payload ) ) {
+ bOk = false;
}
}
}
@@ -211,6 +225,11 @@ void *net_client_handler(void *dnbd3_client)
}
} else if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) {
rpc_sendStatsJson( client->sock );
+ } else {
+ // Unknown request
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable );
+ }
}
if ( bOk ) {
@@ -251,23 +270,13 @@ void *net_client_handler(void *dnbd3_client)
spin_lock( &image->lock );
// Check again as we only aquired the lock just now
if ( image->cache_map != NULL ) {
- const uint64_t firstByte = start >> 15;
- const uint64_t lastByte = (end - 1) >> 15;
- // First byte
- uint64_t pos = start;
- do {
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[firstByte] & bit_mask) == 0 ) {
- isCached = false;
- break;
- }
- pos += DNBD3_BLOCK_SIZE;
- } while ( firstByte == (pos >> 15) && pos < end );
+ const uint64_t firstByteInMap = start >> 15;
+ const uint64_t lastByteInMap = (end - 1) >> 15;
+ uint64_t pos;
// Middle - quick checking
if ( isCached ) {
- pos = firstByte + 1;
- while ( pos < lastByte ) {
+ pos = firstByteInMap + 1;
+ while ( pos < lastByteInMap ) {
if ( image->cache_map[pos] != 0xff ) {
isCached = false;
break;
@@ -275,14 +284,27 @@ void *net_client_handler(void *dnbd3_client)
++pos;
}
}
- // Last byte
- if ( isCached && firstByte != lastByte ) {
- pos = lastByte << 15;
+ // First byte
+ if ( isCached ) {
+ pos = start;
+ do {
+ const int map_x = (pos >> 12) & 7; // mod 8
+ const uint8_t bit_mask = 1 << map_x;
+ if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) {
+ isCached = false;
+ break;
+ }
+ pos += DNBD3_BLOCK_SIZE;
+ } while ( firstByteInMap == (pos >> 15) && pos < end );
+ }
+ // Last byte - only check if request spans multiple bytes in cache map
+ if ( isCached && firstByteInMap != lastByteInMap ) {
+ pos = lastByteInMap << 15;
while ( pos < end ) {
- assert( lastByte == (pos >> 15) );
+ assert( lastByteInMap == (pos >> 15) );
const int map_x = (pos >> 12) & 7; // mod 8
const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[lastByte] & bit_mask) == 0 ) {
+ if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) {
isCached = false;
break;
}
@@ -293,7 +315,9 @@ void *net_client_handler(void *dnbd3_client)
spin_unlock( &image->lock );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, request.offset, request.size ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request to upstream proxy\n" );
+ logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d",
+ image->lower_name, (int)image->rid );
+ image->working = false;
goto exit_client_cleanup;
}
break; // DONE
@@ -329,10 +353,13 @@ void *net_client_handler(void *dnbd3_client)
if ( ret <= 0 ) {
const int err = errno;
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
- if ( ret < 0 && err != EPIPE && err != ECONNRESET )
- logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
- (int)done, (int)realBytes, err );
- if ( err == EBADF || err == EINVAL || err == EIO ) image->working = false;
+ if ( ret == -1 ) {
+ if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) {
+ logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
+ (int)done, (int)realBytes, err );
+ }
+ if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false;
+ }
goto exit_client_cleanup;
}
done += ret;
@@ -343,9 +370,20 @@ void *net_client_handler(void *dnbd3_client)
goto exit_client_cleanup;
}
}
- client->bytesSent += request.size; // Increase counter for statistics.
}
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ // Global per-client counter
+ spin_lock( &client->lock );
+ client->bytesSent += request.size; // Increase counter for statistics.
+ spin_unlock( &client->lock );
+ // Local counter that gets added to the global total bytes sent counter periodically
+ tempBytesSent += request.size;
+ if (tempBytesSent > 1024 * 1024 ) {
+ spin_lock( &statisticsSentLock );
+ totalBytesSent += tempBytesSent;
+ spin_unlock( &statisticsSentLock );
+ tempBytesSent = 0;
+ }
break;
case CMD_GET_SERVERS:
@@ -353,19 +391,25 @@ void *net_client_handler(void *dnbd3_client)
num = altservers_getMatching( &client->host, server_list, NUMBER_SERVERS );
reply.cmd = CMD_GET_SERVERS;
reply.size = num * sizeof(dnbd3_server_entry_t);
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, server_list );
- client->isServer = false; // Only clients request list of servers
+ pthread_mutex_unlock( &client->sendMutex );
goto set_name;
break;
case CMD_KEEPALIVE:
reply.cmd = CMD_KEEPALIVE;
reply.size = 0;
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
+ pthread_mutex_unlock( &client->sendMutex );
+ image->atime = time( NULL );
set_name: ;
- if ( !hasName && host_to_string( &client->host, buffer, sizeof buffer ) ) {
+ if ( !hasName ) {
hasName = true;
- setThreadName( buffer );
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ setThreadName( hostPrintable );
+ }
}
break;
@@ -376,19 +420,21 @@ set_name: ;
case CMD_GET_CRC32:
reply.cmd = CMD_GET_CRC32;
+ pthread_mutex_lock( &client->sendMutex );
if ( image->crc32 == NULL ) {
reply.size = 0;
send_reply( client->sock, &reply, NULL );
} else {
const int size = reply.size = (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t);
send_reply( client->sock, &reply, NULL );
- send( client->sock, &image->masterCrc32, sizeof(uint32_t), 0 );
+ send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE );
send( client->sock, image->crc32, size - sizeof(uint32_t), 0 );
}
+ pthread_mutex_unlock( &client->sendMutex );
break;
default:
- logadd( LOG_ERROR, "Unknown command: %d", (int)request.cmd );
+ logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd );
break;
}
@@ -397,9 +443,9 @@ set_name: ;
exit_client_cleanup: ;
dnbd3_removeClient( client );
spin_lock( &statisticsSentLock );
- totalBytesSent += client->bytesSent;// Add the amount of bytes sent by the client to the statistics of the server.
+ totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server.
spin_unlock( &statisticsSentLock );
- client = dnbd3_freeClient( client );
+ client = dnbd3_freeClient( client ); // This will also call image_release on client->image
return NULL ;
}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 7fa3c96..29c42af 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -158,19 +158,19 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
spin_lock( &client->image->lock );
if ( client->image->uplink == NULL ) {
spin_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Uplink request for image with no uplink (%s)\n", client->image->lower_name );
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
return false;
}
dnbd3_connection_t * const uplink = client->image->uplink;
if ( uplink->shutdown ) {
spin_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down (%s)\n", client->image->lower_name );
+ logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
return false;
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
if ( isSameAddress( &uplink->currentServer, &client->host ) ) {
spin_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Proxy cycle detected.\n" );
+ logadd( LOG_DEBUG1, "Proxy cycle detected" );
return false;
}
@@ -572,9 +572,15 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
const uint64_t end = inReply.handle + inReply.size;
link->bytesReceived += inReply.size;
// 1) Write to cache file
- assert( link->image->cacheFd != -1 );
- ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start );
- if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true );
+ if ( link->image->cacheFd != -1 ) {
+ ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start );
+ if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true );
+ if ( ret == -1 && ( errno == EBADF || errno == EINVAL || errno == EIO ) ) {
+ const int fd = link->image->cacheFd;
+ link->image->cacheFd = -1;
+ close( fd );
+ }
+ }
// 2) Figure out which clients are interested in it
spin_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
diff --git a/src/shared/sockhelper.c b/src/shared/sockhelper.c
index d4995db..e93d45c 100644
--- a/src/shared/sockhelper.c
+++ b/src/shared/sockhelper.c
@@ -303,7 +303,7 @@ ssize_t sock_sendAll(const int sock, void *buffer, const size_t len, int maxtrie
ssize_t ret = 0;
while ( done < len ) {
if ( maxtries >= 0 && --maxtries == -1 ) break;
- ret = write( sock, (char*)buffer + done, len - done );
+ ret = send( sock, (char*)buffer + done, len - done, MSG_NOSIGNAL );
if ( ret == -1 ) {
if ( errno == EINTR ) continue;
if ( errno == EAGAIN || errno == EWOULDBLOCK ) {