summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/config.h4
-rw-r--r--src/server/globals.c2
-rw-r--r--src/server/globals.h6
-rw-r--r--src/server/helper.h2
-rw-r--r--src/server/image.c226
-rw-r--r--src/server/net.c192
-rw-r--r--src/server/uplink.c18
-rw-r--r--src/shared/sockhelper.c2
8 files changed, 318 insertions, 134 deletions
diff --git a/src/config.h b/src/config.h
index 3ebfd54..f5b550c 100644
--- a/src/config.h
+++ b/src/config.h
@@ -28,10 +28,10 @@
#define SERVER_MAX_CLIENTS 400
#define SERVER_MAX_IMAGES 5000
#define SERVER_MAX_ALTS 250
-#define SERVER_MAX_UPLINK_QUEUE 1500
#define SERVER_MAX_UPLINK_FAILS 8 // How many times may a server fail until it is considered bad
#define SERVER_BAD_UPLINK_IGNORE 120 // How many seconds is a server considered bad?
-#define SERVER_UPLINK_QUEUELEN_THRES 900
+#define SERVER_MAX_UPLINK_QUEUE 1500 // Maximum number of queued requests per uplink
+#define SERVER_UPLINK_QUEUELEN_THRES 900 // Threshold where we start dropping incoming clients
#define SERVER_MAX_PENDING_ALT_CHECKS 50
#define SERVER_CACHE_MAP_SAVE_INTERVAL 90
diff --git a/src/server/globals.c b/src/server/globals.c
index 1b0e8f3..5a7de89 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -12,6 +12,7 @@ bool _vmdkLegacyMode = false;
volatile bool _shutdown = false;
int _serverPenalty = 0;
int _clientPenalty = 0;
+bool _removeMissingImages = true;
bool _isProxy = false;
bool _proxyPrivateOnly = false;
bool _backgroundReplication = true;
@@ -32,6 +33,7 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key
SAVE_TO_VAR_BOOL( dnbd3, isProxy );
SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly );
SAVE_TO_VAR_BOOL( dnbd3, backgroundReplication );
+ SAVE_TO_VAR_BOOL( dnbd3, removeMissingImages );
SAVE_TO_VAR_INT( dnbd3, serverPenalty );
SAVE_TO_VAR_INT( dnbd3, clientPenalty );
SAVE_TO_VAR_INT( dnbd3, uplinkTimeout );
diff --git a/src/server/globals.h b/src/server/globals.h
index 575f3ab..0bf34de 100644
--- a/src/server/globals.h
+++ b/src/server/globals.h
@@ -111,6 +111,7 @@ struct _dnbd3_image
int users; // clients currently using this image
int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server
time_t atime; // last access time
+ time_t lastWorkCheck; // last time a non-working image has been checked
bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected
pthread_spinlock_t lock;
};
@@ -170,6 +171,11 @@ extern bool _isProxy;
extern bool _proxyPrivateOnly;
/**
+ * Whether to remove missing images from image list on SIGHUP
+ */
+extern bool _removeMissingImages;
+
+/**
* Read timeout when waiting for or sending data on an uplink
*/
extern int _uplinkTimeout;
diff --git a/src/server/helper.h b/src/server/helper.h
index 4ab2134..389a98c 100644
--- a/src/server/helper.h
+++ b/src/server/helper.h
@@ -33,7 +33,7 @@ static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_h
* Test whether string ends in suffix.
* @return true if string =~ /suffix$/
*/
-static inline int strend(char *string, char *suffix)
+static inline bool strend(char *string, char *suffix)
{
if ( string == NULL ) return false;
if ( suffix == NULL || *suffix == '\0' ) return true;
diff --git a/src/server/image.c b/src/server/image.c
index fb3f8ba..d4df26d 100644
--- a/src/server/image.c
+++ b/src/server/image.c
@@ -26,6 +26,7 @@
#include <glob.h>
#define PATHLEN (2000)
+#define NONWORKING_RECHECK_INTERVAL_SECONDS (60)
// ##########################################
@@ -44,10 +45,10 @@ typedef struct
time_t deadline;
} imagecache;
static imagecache remoteCloneCache[CACHELEN];
-static int remoteCloneCacheIndex = 0;
// ##########################################
+static void image_remove(dnbd3_image_t *image);
static dnbd3_image_t* image_free(dnbd3_image_t *image);
static bool image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize);
static bool image_load_all_internal(char *base, char *path);
@@ -283,18 +284,65 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
candidate->users++;
spin_unlock( &candidate->lock );
- if ( !checkIfWorking ) return candidate;
+ if ( !checkIfWorking ) return candidate; // Found, but not interested in working state
// Found, see if it works
- if ( !candidate->working && candidate->cache_map != NULL && candidate->uplink == NULL && file_isWritable( candidate->path ) ) {
- // Not working and has file + cache-map, try to init uplink (uplink_init will check if proxy mode is enabled)
- uplink_init( candidate, -1, NULL );
- } else if ( candidate->working && candidate->uplink != NULL && candidate->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
- // To many pending uplink requests. We take that as a hint that the uplink is clogged or no working uplink server
- // exists, so "working" is changed to false for now. Should a new uplink server be found the uplink thread will
- // set this back to true some time.
- candidate->working = false;
+
+ if ( candidate->working ) {
+ // Last known state was "working", see if that should change
+ if ( candidate->readFd == -1 ) {
+ candidate->working = false;
+ }
+ } else { // ...not working...
+ // Don't re-check too often
+ spin_lock( &candidate->lock );
+ bool check;
+ const time_t now = time( NULL );
+ check = ( now - candidate->lastWorkCheck ) > NONWORKING_RECHECK_INTERVAL_SECONDS;
+ if ( check ) {
+ candidate->lastWorkCheck = now;
+ }
+ spin_unlock( &candidate->lock );
+ if ( !check ) {
+ return candidate;
+ }
+ // Check if the local file exists, has the right size, and is readable (writable for incomplete image)
+ if ( candidate->cache_map != NULL ) {
+ // -- Incomplete - rw check
+ if ( candidate->cacheFd == -1 ) { // Make sure file is open for writing
+ candidate->cacheFd = open( candidate->path, O_RDWR );
+ // It might have failed - still offer proxy mode, we just can't cache
+ if ( candidate->cacheFd == -1 ) {
+ logadd( LOG_WARNING, "Cannot re-open %s for writing - replication disabled", candidate->path );
+ }
+ }
+ if ( candidate->uplink == NULL && candidate->cacheFd != -1 ) {
+ uplink_init( candidate, -1, NULL );
+ }
+ }
+ // Common for ro and rw images
+ const off_t len = lseek( candidate->readFd, 0, SEEK_END );
+ if ( len == -1 ) {
+ logadd( LOG_WARNING, "lseek() on %s failed (errno=%d), removing image", candidate->path, errno );
+ image_remove( candidate ); // No release here, the image is still returned and should be released by caller
+ } else if ( (uint64_t)len != candidate->realFilesize ) {
+ logadd( LOG_DEBUG1, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64
+ ". Try sending SIGHUP to server if you know what you're doing.",
+ candidate->path, candidate->realFilesize, (uint64_t)len );
+ } else {
+ // Seek worked, file size is same, now see if we can read from file
+ char buffer[100];
+ if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) {
+ logadd( LOG_DEBUG2, "Reading first %d bytes from %s failed (errno=%d), removing image",
+ (int)sizeof(buffer), candidate->path, errno );
+ image_remove( candidate );
+ } else {
+ // Seems everything is fine again \o/
+ candidate->working = true;
+ }
+ }
}
+
return candidate; // Success :-)
}
@@ -304,7 +352,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking)
* Every call to image_lock() needs to be followed by a call to image_release() at some point.
* Locks on: imageListLock, _images[].lock
*/
-dnbd3_image_t* image_lock(dnbd3_image_t *image)
+dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places that do image->users--
{
if ( image == NULL ) return NULL ;
int i;
@@ -361,7 +409,7 @@ dnbd3_image_t* image_release(dnbd3_image_t *image)
* no active users
* Locks on: imageListLock, image[].lock
*/
-void image_remove(dnbd3_image_t *image)
+static void image_remove(dnbd3_image_t *image)
{
bool wasInList = false;
spin_lock( &imageListLock );
@@ -399,13 +447,46 @@ void image_killUplinks()
/**
* Load all images in given path recursively.
* Pass NULL to use path from config.
- * NOT THREAD SAFE, make sure this is only running
- * on one thread at a time!
*/
bool image_loadAll(char *path)
{
bool ret;
+ char imgPath[PATHLEN];
+ int imgId;
+ dnbd3_image_t *imgHandle;
+
if ( path == NULL ) path = _basePath;
+ if ( _removeMissingImages ) {
+ // Check if all loaded images still exist on disk
+ logadd( LOG_DEBUG1, "Checking for vanished images" );
+ spin_lock( &imageListLock );
+ for (int i = _num_images - 1; i >= 0; --i) {
+ if ( _images[i] == NULL ) {
+ if ( i + 1 == _num_images ) _num_images--;
+ continue;
+ }
+ imgId = _images[i]->id;
+ snprintf( imgPath, PATHLEN, "%s", _images[i]->path );
+ spin_unlock( &imageListLock ); // isReadable hits the fs; unlock
+ // Check if fill can still be opened for reading
+ ret = file_isReadable( imgPath );
+ // Lock again, see if image is still there, free if required
+ spin_lock( &imageListLock );
+ if ( ret || i >= _num_images || _images[i] == NULL || _images[i]->id != imgId ) continue;
+ // Image needs to be removed
+ imgHandle = _images[i];
+ _images[i] = NULL;
+ if ( i + 1 == _num_images ) _num_images--;
+ if ( imgHandle->users != 0 ) continue; // Still in use, do not free (last releasing user will trigger)
+ // Image is not in use anymore, free the dangling entry immediately
+ spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock
+ image_free( imgHandle );
+ spin_lock( &imageListLock );
+ }
+ spin_unlock( &imageListLock );
+ }
+ // Now scan for new images
+ logadd( LOG_DEBUG1, "Scanning for new or modified images" );
pthread_mutex_lock( &reloadLock );
ret = image_load_all_internal( path, path );
pthread_mutex_unlock( &reloadLock );
@@ -438,6 +519,9 @@ bool image_tryFreeAll()
static dnbd3_image_t* image_free(dnbd3_image_t *image)
{
assert( image != NULL );
+ if ( !_shutdown ) {
+ logadd( LOG_INFO, "Freeing image %s:%d", image->lower_name, (int)image->rid );
+ }
//
image_saveCacheMap( image );
uplink_shutdown( image );
@@ -511,7 +595,7 @@ static bool image_load_all_internal(char *base, char *path)
logadd( LOG_WARNING, "stat() for '%s' failed. Ignoring....", subpath );
continue;
}
- if ( S_ISDIR( st.st_mode )) {
+ if ( S_ISDIR( st.st_mode ) ) {
image_load_all_internal( base, subpath ); // Recurse
} else {
image_load( base, subpath, true ); // Load image if possible
@@ -531,7 +615,7 @@ static bool image_load_all_internal(char *base, char *path)
static bool image_load(char *base, char *path, int withUplink)
{
static int imgIdCounter = 0; // Used to assign unique numeric IDs to images
- int i, revision;
+ int i, revision = -1;
struct stat st;
uint8_t *cache_map = NULL;
uint32_t *crc32list = NULL;
@@ -563,7 +647,7 @@ static bool image_load(char *base, char *path, int withUplink)
// Easy - legacy mode, simply append full file name and set rid to 1
strcat( dst, fileName );
revision = 1;
- } else {
+ } else if ( !_vmdkLegacyMode ) {
// Try to parse *.r<ID> syntax
for (i = fileNameLen - 1; i > 1; --i) {
if ( fileName[i] < '0' || fileName[i] > '9' ) break;
@@ -578,7 +662,7 @@ static bool image_load(char *base, char *path, int withUplink)
}
*dst = '\0';
}
- if ( revision <= 0 ) {
+ if ( revision <= 0 || revision >= 65536 ) {
logadd( LOG_WARNING, "Image '%s' has invalid revision ID %d", path, revision );
goto load_error;
}
@@ -612,7 +696,7 @@ static bool image_load(char *base, char *path, int withUplink)
// 1. Allocate memory for the cache map if the image is incomplete
cache_map = image_loadCacheMap( path, virtualFilesize );
- // TODO: Maybe try sha-256 or 512 first if you're paranoid (to be implemented)
+ // XXX: Maybe try sha-256 or 512 first if you're paranoid (to be implemented)
// 2. Load CRC-32 list of image
uint32_t masterCrc;
@@ -630,34 +714,34 @@ static bool image_load(char *base, char *path, int withUplink)
// Compare data just loaded to identical image we apparently already loaded
if ( existing != NULL ) {
if ( existing->realFilesize != realFilesize ) {
- // Image will be replaced below
logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", existing->lower_name, (int)existing->rid );
+ // Image will be replaced below
} else if ( existing->crc32 != NULL && crc32list != NULL
&& memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlockCount ) != 0 ) {
- // Image will be replaced below
logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", existing->lower_name, (int)existing->rid );
logadd( LOG_WARNING, "The image will be reloaded, but you should NOT replace existing images while the server is running." );
logadd( LOG_WARNING, "Actually even if it's not running this should never be done. Use a new RID instead!" );
+ // Image will be replaced below
} else if ( existing->crc32 == NULL && crc32list != NULL ) {
logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", existing->lower_name, (int)existing->rid );
existing->crc32 = crc32list;
existing->masterCrc32 = masterCrc;
crc32list = NULL;
function_return = true;
- goto load_error;
+ goto load_error; // Keep existing
} else if ( existing->cache_map != NULL && cache_map == NULL ) {
// Just ignore that fact, if replication is really complete the cache map will be removed anyways
logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", existing->lower_name, (int)existing->rid );
function_return = true;
- goto load_error;
+ goto load_error; // Keep existing
} else {
// Nothing changed about the existing image, so do nothing
function_return = true;
- goto load_error;
+ goto load_error; // Keep existing
}
- // Remove image from images array
- image_release( existing );
+ // Remove existing image from images array, so it will be replaced by the reloaded image
image_remove( existing );
+ image_release( existing );
existing = NULL;
}
@@ -714,7 +798,7 @@ static bool image_load(char *base, char *path, int withUplink)
spin_lock( &imageListLock );
// Now we're locked, assign unique ID to image (unique for this running server instance!)
image->id = ++imgIdCounter;
- for (i = 0; i < _num_images; ++i) {
+ for ( i = 0; i < _num_images; ++i ) {
if ( _images[i] != NULL ) continue;
_images[i] = image;
break;
@@ -727,7 +811,7 @@ static bool image_load(char *base, char *path, int withUplink)
goto load_error;
}
_images[_num_images++] = image;
- logadd( LOG_DEBUG1, "Loaded image '%s'\n", image->lower_name );
+ logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->lower_name, (int)image->rid );
}
// Keep fd for reading
image->readFd = fdImage;
@@ -888,7 +972,7 @@ bool image_create(char *image, int revision, uint64_t size)
close( fdCache );
return true;
//
- failure_cleanup: ;
+failure_cleanup: ;
if ( fdImage >= 0 ) close( fdImage );
if ( fdCache >= 0 ) close( fdCache );
remove( path );
@@ -913,8 +997,11 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
*/
dnbd3_image_t* image_getOrLoad(char * const name, const uint16_t revision)
{
- // not proxy, specific revision - nothing to do
- if ( !_isProxy && revision != 0 ) return image_get( name, revision, true );
+ // specific revision - try shortcut
+ if ( revision != 0 ) {
+ dnbd3_image_t *image = image_get( name, revision, true );
+ if ( image != NULL ) return image;
+ }
const size_t len = strlen( name );
// Sanity check
if ( len == 0 || name[len - 1] == '/' || name[0] == '/'
@@ -935,14 +1022,15 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
{
int i;
// Already existing locally?
- dnbd3_image_t *image = image_get( name, revision, true );
- // exists and specific revision - nothing to do
- if ( image != NULL && revision != 0 ) return image;
+ dnbd3_image_t *image = NULL;
+ if ( revision == 0 ) {
+ image = image_get( name, revision, true );
+ }
// Doesn't exist or is rid 0, try remote if not already tried it recently
const time_t now = time( NULL );
char *cmpname = name;
- int useIndex = -1;
+ int useIndex = -1, fallbackIndex = 0;
if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN;
pthread_mutex_lock( &remoteCloneLock );
for (i = 0; i < CACHELEN; ++i) {
@@ -952,6 +1040,9 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked...
return image;
}
+ if ( remoteCloneCache[i].deadline < remoteCloneCache[fallbackIndex].deadline ) {
+ fallbackIndex = i;
+ }
}
// Re-check to prevent two clients at the same time triggering this,
// but only if rid != 0, since we would just get an old rid then
@@ -966,11 +1057,13 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
serialized_buffer_t serialized;
// Mark as recently checked
if ( useIndex == -1 ) {
- useIndex = remoteCloneCacheIndex = (remoteCloneCacheIndex + 1) % CACHELEN;
+ useIndex = fallbackIndex;
}
remoteCloneCache[useIndex].deadline = now + SERVER_REMOTE_IMAGE_CHECK_CACHETIME;
snprintf( remoteCloneCache[useIndex].name, NAMELEN, "%s", cmpname );
remoteCloneCache[useIndex].rid = revision;
+ pthread_mutex_unlock( &remoteCloneLock );
+
// Get some alt servers and try to get the image from there
dnbd3_host_t servers[4];
int uplinkSock = -1;
@@ -980,26 +1073,32 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision,
uint16_t remoteRid = revision;
uint64_t remoteImageSize;
for (i = 0; i < count; ++i) {
+ char *remoteName;
+ bool ok = false;
int sock = sock_connect( &servers[i], 750, _uplinkTimeout );
- if ( sock < 0 ) continue;
+ if ( sock == -1 ) continue;
if ( !dnbd3_select_image( sock, name, revision, FLAGS8_SERVER ) ) goto server_fail;
- char *remoteName;
if ( !dnbd3_select_image_reply( &serialized, sock, &remoteProtocolVersion, &remoteName, &remoteRid, &remoteImageSize ) ) goto server_fail;
if ( remoteProtocolVersion < MIN_SUPPORTED_SERVER || remoteRid == 0 ) goto server_fail;
- if ( revision != 0 && remoteRid != revision ) goto server_fail;
+ if ( revision != 0 && remoteRid != revision ) goto server_fail; // Want specific revision but uplink supplied different rid
if ( revision == 0 && image != NULL && image->rid >= remoteRid ) goto server_fail; // Not actually a failure: Highest remote rid is <= highest local rid - don't clone!
if ( remoteImageSize < DNBD3_BLOCK_SIZE || remoteName == NULL || strcmp( name, remoteName ) != 0 ) goto server_fail;
if ( remoteImageSize > SERVER_MAX_PROXY_IMAGE_SIZE ) goto server_fail;
- if ( !image_ensureDiskSpace( remoteImageSize ) ) goto server_fail;
- if ( !image_clone( sock, name, remoteRid, remoteImageSize ) ) goto server_fail; // This sets up the file+map+crc
+ pthread_mutex_lock( &reloadLock );
+ ok = image_ensureDiskSpace( remoteImageSize )
+ && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img
+ pthread_mutex_unlock( &reloadLock );
+ if ( !ok ) goto server_fail;
+
// Cloning worked :-)
uplinkSock = sock;
uplinkServer = &servers[i];
break;
- server_fail: ;
+
+server_fail: ;
close( sock );
}
- pthread_mutex_unlock( &remoteCloneLock );
+
// If we still have a pointer to a local image, release the reference
if ( image != NULL ) image_release( image );
// If everything worked out, this call should now actually return the image
@@ -1030,9 +1129,10 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
uint16_t detectedRid = 0;
if ( _vmdkLegacyMode ) {
- // TODO
- assert( 0 );
- detectedRid = requestedRid;
+ if ( strend( name, ".vmdk" ) ) {
+ snprintf( imageFile, PATHLEN, "%s/%s", _basePath, name );
+ detectedRid = MAX( 1, requestedRid );
+ }
} else if ( requestedRid != 0 ) {
snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, requestedRid );
detectedRid = requestedRid;
@@ -1044,8 +1144,9 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
if ( ret == 0 ) {
long int best = 0;
for ( size_t i = 0; i < g.gl_pathc; ++i ) {
- char *rev = strrchr( g.gl_pathv[i], 'r' );
- if ( rev == NULL ) continue;
+ const char * const path = g.gl_pathv[i];
+ const char * rev = strrchr( path, 'r' );
+ if ( rev == NULL || rev == path || *(rev - 1) != '.' ) continue;
rev++;
if ( *rev < '0' || *rev > '9' ) continue;
char *err = NULL;
@@ -1062,22 +1163,45 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste
}
globfree( &g );
}
+ logadd( LOG_DEBUG2, "Trying to load %s:%d ( -> %d) as %s", name, (int)requestedRid, (int)detectedRid, imageFile );
// No file was determined, or it doesn't seem to exist/be readable
- if ( imageFile[0] == '\0' || !file_isReadable( imageFile ) ) { // XXX glob fallback to rid-1? Rework above
+ if ( imageFile[0] == '\0' ) {
+ logadd( LOG_DEBUG2, "Not found, bailing out" );
return image_get( name, detectedRid, true );
}
+ if ( requestedRid == 0 ) {
+ // rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0
+ while ( detectedRid != 0 ) {
+ dnbd3_image_t *image = image_get( name, detectedRid, true );
+ if ( image != NULL ) {
+ // globbed rid already loaded, return
+ return image;
+ }
+ if ( file_isReadable( imageFile ) ) {
+ // globbed rid is
+ break;
+ }
+ logadd( LOG_DEBUG2, "%s: rid %d globbed but not readable, trying lower rid...", name, (int)detectedRid );
+ detectedRid--;
+ snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, requestedRid );
+ }
+ }
+
// Now lock on the loading mutex, then check again if the image exists (we're multi-threaded)
pthread_mutex_lock( &reloadLock );
dnbd3_image_t* image = image_get( name, detectedRid, true );
if ( image != NULL ) {
// The image magically appeared in the meantime
+ logadd( LOG_DEBUG2, "Magically appeared" );
pthread_mutex_unlock( &reloadLock );
return image;
}
// Still not loaded, let's try to do so
+ logadd( LOG_DEBUG2, "Calling load" );
image_load( _basePath, imageFile, false );
pthread_mutex_unlock( &reloadLock );
// If loading succeeded, this will return the image
+ logadd( LOG_DEBUG2, "Calling get" );
return image_get( name, requestedRid, true );
}
@@ -1333,7 +1457,7 @@ static bool image_calcBlockCrc32(const int fd, const int block, const uint32_t r
*/
static bool image_ensureDiskSpace(uint64_t size)
{
- for (;;) {
+ for ( int maxtries = 0; maxtries < 20; ++maxtries ) {
const int64_t available = file_freeDiskSpace( _basePath );
if ( available == -1 ) {
const int e = errno;
@@ -1368,7 +1492,7 @@ static bool image_ensureDiskSpace(uint64_t size)
return false;
}
oldest = image_lock( oldest );
- if ( oldest == NULL ) return false;
+ if ( oldest == NULL ) continue; // Image freed in the meantime? Try again
logadd( LOG_INFO, "'%s:%d' has to go!", oldest->lower_name, (int)oldest->rid );
unlink( oldest->path );
size_t len = strlen( oldest->path ) + 5 + 1;
diff --git a/src/server/net.c b/src/server/net.c
index ad228f4..a467620 100644
--- a/src/server/net.c
+++ b/src/server/net.c
@@ -85,7 +85,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" );
return false;
}
- if ( recv( sock, payload->buffer, size, MSG_WAITALL ) != size ) {
+ if ( sock_recv( sock, payload->buffer, size ) != size ) {
logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size );
return false;
}
@@ -94,34 +94,35 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff
return true;
}
+/**
+ * Send reply with optional payload. payload can be null. The caller has to
+ * acquire the sendMutex first.
+ */
static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload)
{
- const unsigned int size = reply->size;
+ const size_t size = reply->size;
fixup_reply( *reply );
- if ( !payload || size == 0 ) {
- if ( send( sock, reply, sizeof(dnbd3_reply_t), 0 ) != sizeof(dnbd3_reply_t) ) {
- logadd( LOG_DEBUG1, "Send failed (header-only)\n" );
- return false;
- }
- } else {
- struct iovec iov[2];
- iov[0].iov_base = reply;
- iov[0].iov_len = sizeof(dnbd3_reply_t);
- iov[1].iov_base = payload;
- iov[1].iov_len = (size_t)size;
- if ( (size_t)writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) {
- logadd( LOG_DEBUG1, "Send failed (reply with payload of %u bytes)\n", size );
+ if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) {
+ logadd( LOG_DEBUG1, "Sending reply header to client failed" );
+ return false;
+ }
+ if ( size != 0 && payload != NULL ) {
+ if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) {
+ logadd( LOG_DEBUG1, "Sending payload of %u bytes to client failed", size );
return false;
}
}
return true;
}
+/**
+ * Send given amount of null bytes. The caller has to acquire the sendMutex first.
+ */
static inline bool sendPadding( const int fd, uint32_t bytes )
{
ssize_t ret;
while ( bytes >= sizeof(nullbytes) ) {
- ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 1 );
+ ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 );
if ( ret <= 0 )
return false;
bytes -= (uint32_t)ret;
@@ -131,8 +132,9 @@ static inline bool sendPadding( const int fd, uint32_t bytes )
uint64_t net_getTotalBytesSent()
{
+ // reads and writes to 64bit ints are not atomic on x86, so let's be safe and use locking
spin_lock( &statisticsSentLock );
- uint64_t tmp = totalBytesSent;
+ const uint64_t tmp = totalBytesSent;
spin_unlock( &statisticsSentLock );
return tmp;
}
@@ -156,10 +158,10 @@ void *net_client_handler(void *dnbd3_client)
bool hasName = false;
serialized_buffer_t payload;
- char *image_name;
uint16_t rid, client_version;
uint64_t start, end;
- char buffer[100];
+ uint32_t tempBytesSent = 0;
+ char hostPrintable[100];
dnbd3_server_entry_t server_list[NUMBER_SERVERS];
@@ -174,25 +176,38 @@ void *net_client_handler(void *dnbd3_client)
if ( recv_request_header( client->sock, &request ) ) {
if ( request.cmd != CMD_SELECT_IMAGE ) {
logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd );
- } else {
- if ( recv_request_payload( client->sock, request.size, &payload ) ) {
- client_version = serializer_get_uint16( &payload );
- image_name = serializer_get_string( &payload );
- rid = serializer_get_uint16( &payload );
- client->isServer = serializer_get_uint8( &payload );
- if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
- if ( client_version < MIN_SUPPORTED_CLIENT ) {
- logadd( LOG_DEBUG1, "Client too old\n" );
- } else {
- logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
- }
+ } else if ( recv_request_payload( client->sock, request.size, &payload ) ) {
+ char *image_name;
+ client_version = serializer_get_uint16( &payload );
+ image_name = serializer_get_string( &payload );
+ rid = serializer_get_uint16( &payload );
+ client->isServer = serializer_get_uint8( &payload );
+ if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) {
+ if ( client_version < MIN_SUPPORTED_CLIENT ) {
+ logadd( LOG_DEBUG1, "Client too old\n" );
} else {
- client->image = image = image_getOrLoad( image_name, rid );
- if ( image == NULL ) {
- //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else if ( !image->working ) {
- logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
- } else {
+ logadd( LOG_DEBUG1, "Incomplete handshake received\n" );
+ }
+ } else {
+ client->image = image = image_getOrLoad( image_name, rid );
+ if ( image == NULL ) {
+ //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else if ( !image->working ) {
+ logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid );
+ } else {
+ // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable
+ bOk = true;
+ if ( image->cache_map != NULL ) {
+ spin_lock( &image->lock );
+ if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) {
+ bOk = ( rand() % 4 ) == 1;
+ }
+ spin_unlock( &image->lock );
+ if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this
+ usleep( 100000 ); // server gets a penalty and is less likely to be selected
+ }
+ }
+ if ( bOk ) {
image_file = image->readFd;
serializer_reset_write( &payload );
serializer_put_uint16( &payload, PROTOCOL_VERSION );
@@ -201,9 +216,8 @@ void *net_client_handler(void *dnbd3_client)
serializer_put_uint64( &payload, image->virtualFilesize );
reply.cmd = CMD_SELECT_IMAGE;
reply.size = serializer_get_written_length( &payload );
- if ( send_reply( client->sock, &reply, &payload ) ) {
- if ( !client->isServer ) image->atime = time( NULL );
- bOk = true;
+ if ( !send_reply( client->sock, &reply, &payload ) ) {
+ bOk = false;
}
}
}
@@ -211,6 +225,11 @@ void *net_client_handler(void *dnbd3_client)
}
} else if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) {
rpc_sendStatsJson( client->sock );
+ } else {
+ // Unknown request
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable );
+ }
}
if ( bOk ) {
@@ -251,23 +270,13 @@ void *net_client_handler(void *dnbd3_client)
spin_lock( &image->lock );
// Check again as we only aquired the lock just now
if ( image->cache_map != NULL ) {
- const uint64_t firstByte = start >> 15;
- const uint64_t lastByte = (end - 1) >> 15;
- // First byte
- uint64_t pos = start;
- do {
- const int map_x = (pos >> 12) & 7; // mod 8
- const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[firstByte] & bit_mask) == 0 ) {
- isCached = false;
- break;
- }
- pos += DNBD3_BLOCK_SIZE;
- } while ( firstByte == (pos >> 15) && pos < end );
+ const uint64_t firstByteInMap = start >> 15;
+ const uint64_t lastByteInMap = (end - 1) >> 15;
+ uint64_t pos;
// Middle - quick checking
if ( isCached ) {
- pos = firstByte + 1;
- while ( pos < lastByte ) {
+ pos = firstByteInMap + 1;
+ while ( pos < lastByteInMap ) {
if ( image->cache_map[pos] != 0xff ) {
isCached = false;
break;
@@ -275,14 +284,27 @@ void *net_client_handler(void *dnbd3_client)
++pos;
}
}
- // Last byte
- if ( isCached && firstByte != lastByte ) {
- pos = lastByte << 15;
+ // First byte
+ if ( isCached ) {
+ pos = start;
+ do {
+ const int map_x = (pos >> 12) & 7; // mod 8
+ const uint8_t bit_mask = 1 << map_x;
+ if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) {
+ isCached = false;
+ break;
+ }
+ pos += DNBD3_BLOCK_SIZE;
+ } while ( firstByteInMap == (pos >> 15) && pos < end );
+ }
+ // Last byte - only check if request spans multiple bytes in cache map
+ if ( isCached && firstByteInMap != lastByteInMap ) {
+ pos = lastByteInMap << 15;
while ( pos < end ) {
- assert( lastByte == (pos >> 15) );
+ assert( lastByteInMap == (pos >> 15) );
const int map_x = (pos >> 12) & 7; // mod 8
const uint8_t bit_mask = 1 << map_x;
- if ( (image->cache_map[lastByte] & bit_mask) == 0 ) {
+ if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) {
isCached = false;
break;
}
@@ -293,7 +315,9 @@ void *net_client_handler(void *dnbd3_client)
spin_unlock( &image->lock );
if ( !isCached ) {
if ( !uplink_request( client, request.handle, request.offset, request.size ) ) {
- logadd( LOG_DEBUG1, "Could not relay uncached request to upstream proxy\n" );
+ logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d",
+ image->lower_name, (int)image->rid );
+ image->working = false;
goto exit_client_cleanup;
}
break; // DONE
@@ -329,10 +353,13 @@ void *net_client_handler(void *dnbd3_client)
if ( ret <= 0 ) {
const int err = errno;
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
- if ( ret < 0 && err != EPIPE && err != ECONNRESET )
- logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
- (int)done, (int)realBytes, err );
- if ( err == EBADF || err == EINVAL || err == EIO ) image->working = false;
+ if ( ret == -1 ) {
+ if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) {
+ logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n",
+ (int)done, (int)realBytes, err );
+ }
+ if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false;
+ }
goto exit_client_cleanup;
}
done += ret;
@@ -343,9 +370,20 @@ void *net_client_handler(void *dnbd3_client)
goto exit_client_cleanup;
}
}
- client->bytesSent += request.size; // Increase counter for statistics.
}
if ( lock ) pthread_mutex_unlock( &client->sendMutex );
+ // Global per-client counter
+ spin_lock( &client->lock );
+ client->bytesSent += request.size; // Increase counter for statistics.
+ spin_unlock( &client->lock );
+ // Local counter that gets added to the global total bytes sent counter periodically
+ tempBytesSent += request.size;
+ if (tempBytesSent > 1024 * 1024 ) {
+ spin_lock( &statisticsSentLock );
+ totalBytesSent += tempBytesSent;
+ spin_unlock( &statisticsSentLock );
+ tempBytesSent = 0;
+ }
break;
case CMD_GET_SERVERS:
@@ -353,19 +391,25 @@ void *net_client_handler(void *dnbd3_client)
num = altservers_getMatching( &client->host, server_list, NUMBER_SERVERS );
reply.cmd = CMD_GET_SERVERS;
reply.size = num * sizeof(dnbd3_server_entry_t);
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, server_list );
- client->isServer = false; // Only clients request list of servers
+ pthread_mutex_unlock( &client->sendMutex );
goto set_name;
break;
case CMD_KEEPALIVE:
reply.cmd = CMD_KEEPALIVE;
reply.size = 0;
+ pthread_mutex_lock( &client->sendMutex );
send_reply( client->sock, &reply, NULL );
+ pthread_mutex_unlock( &client->sendMutex );
+ image->atime = time( NULL );
set_name: ;
- if ( !hasName && host_to_string( &client->host, buffer, sizeof buffer ) ) {
+ if ( !hasName ) {
hasName = true;
- setThreadName( buffer );
+ if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) {
+ setThreadName( hostPrintable );
+ }
}
break;
@@ -376,19 +420,21 @@ set_name: ;
case CMD_GET_CRC32:
reply.cmd = CMD_GET_CRC32;
+ pthread_mutex_lock( &client->sendMutex );
if ( image->crc32 == NULL ) {
reply.size = 0;
send_reply( client->sock, &reply, NULL );
} else {
const int size = reply.size = (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t);
send_reply( client->sock, &reply, NULL );
- send( client->sock, &image->masterCrc32, sizeof(uint32_t), 0 );
+ send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE );
send( client->sock, image->crc32, size - sizeof(uint32_t), 0 );
}
+ pthread_mutex_unlock( &client->sendMutex );
break;
default:
- logadd( LOG_ERROR, "Unknown command: %d", (int)request.cmd );
+ logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd );
break;
}
@@ -397,9 +443,9 @@ set_name: ;
exit_client_cleanup: ;
dnbd3_removeClient( client );
spin_lock( &statisticsSentLock );
- totalBytesSent += client->bytesSent;// Add the amount of bytes sent by the client to the statistics of the server.
+ totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server.
spin_unlock( &statisticsSentLock );
- client = dnbd3_freeClient( client );
+ client = dnbd3_freeClient( client ); // This will also call image_release on client->image
return NULL ;
}
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 7fa3c96..29c42af 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -158,19 +158,19 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
spin_lock( &client->image->lock );
if ( client->image->uplink == NULL ) {
spin_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Uplink request for image with no uplink (%s)\n", client->image->lower_name );
+ logadd( LOG_DEBUG1, "Uplink request for image with no uplink" );
return false;
}
dnbd3_connection_t * const uplink = client->image->uplink;
if ( uplink->shutdown ) {
spin_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down (%s)\n", client->image->lower_name );
+ logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" );
return false;
}
// Check if the client is the same host as the uplink. If so assume this is a circular proxy chain
if ( isSameAddress( &uplink->currentServer, &client->host ) ) {
spin_unlock( &client->image->lock );
- logadd( LOG_DEBUG1, "Proxy cycle detected.\n" );
+ logadd( LOG_DEBUG1, "Proxy cycle detected" );
return false;
}
@@ -572,9 +572,15 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
const uint64_t end = inReply.handle + inReply.size;
link->bytesReceived += inReply.size;
// 1) Write to cache file
- assert( link->image->cacheFd != -1 );
- ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start );
- if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true );
+ if ( link->image->cacheFd != -1 ) {
+ ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start );
+ if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true );
+ if ( ret == -1 && ( errno == EBADF || errno == EINVAL || errno == EIO ) ) {
+ const int fd = link->image->cacheFd;
+ link->image->cacheFd = -1;
+ close( fd );
+ }
+ }
// 2) Figure out which clients are interested in it
spin_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
diff --git a/src/shared/sockhelper.c b/src/shared/sockhelper.c
index d4995db..e93d45c 100644
--- a/src/shared/sockhelper.c
+++ b/src/shared/sockhelper.c
@@ -303,7 +303,7 @@ ssize_t sock_sendAll(const int sock, void *buffer, const size_t len, int maxtrie
ssize_t ret = 0;
while ( done < len ) {
if ( maxtries >= 0 && --maxtries == -1 ) break;
- ret = write( sock, (char*)buffer + done, len - done );
+ ret = send( sock, (char*)buffer + done, len - done, MSG_NOSIGNAL );
if ( ret == -1 ) {
if ( errno == EINTR ) continue;
if ( errno == EAGAIN || errno == EWOULDBLOCK ) {