diff options
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/globals.c | 2 | ||||
-rw-r--r-- | src/server/globals.h | 6 | ||||
-rw-r--r-- | src/server/helper.h | 2 | ||||
-rw-r--r-- | src/server/image.c | 226 | ||||
-rw-r--r-- | src/server/net.c | 192 | ||||
-rw-r--r-- | src/server/uplink.c | 18 |
6 files changed, 315 insertions, 131 deletions
diff --git a/src/server/globals.c b/src/server/globals.c index 1b0e8f3..5a7de89 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -12,6 +12,7 @@ bool _vmdkLegacyMode = false; volatile bool _shutdown = false; int _serverPenalty = 0; int _clientPenalty = 0; +bool _removeMissingImages = true; bool _isProxy = false; bool _proxyPrivateOnly = false; bool _backgroundReplication = true; @@ -32,6 +33,7 @@ static int ini_handler(void *custom UNUSED, const char* section, const char* key SAVE_TO_VAR_BOOL( dnbd3, isProxy ); SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly ); SAVE_TO_VAR_BOOL( dnbd3, backgroundReplication ); + SAVE_TO_VAR_BOOL( dnbd3, removeMissingImages ); SAVE_TO_VAR_INT( dnbd3, serverPenalty ); SAVE_TO_VAR_INT( dnbd3, clientPenalty ); SAVE_TO_VAR_INT( dnbd3, uplinkTimeout ); diff --git a/src/server/globals.h b/src/server/globals.h index 575f3ab..0bf34de 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -111,6 +111,7 @@ struct _dnbd3_image int users; // clients currently using this image int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server time_t atime; // last access time + time_t lastWorkCheck; // last time a non-working image has been checked bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected pthread_spinlock_t lock; }; @@ -170,6 +171,11 @@ extern bool _isProxy; extern bool _proxyPrivateOnly; /** + * Whether to remove missing images from image list on SIGHUP + */ +extern bool _removeMissingImages; + +/** * Read timeout when waiting for or sending data on an uplink */ extern int _uplinkTimeout; diff --git a/src/server/helper.h b/src/server/helper.h index 4ab2134..389a98c 100644 --- a/src/server/helper.h +++ b/src/server/helper.h @@ -33,7 +33,7 @@ static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_h * Test whether string ends in suffix. * @return true if string =~ /suffix$/ */ -static inline int strend(char *string, char *suffix) +static inline bool strend(char *string, char *suffix) { if ( string == NULL ) return false; if ( suffix == NULL || *suffix == '\0' ) return true; diff --git a/src/server/image.c b/src/server/image.c index fb3f8ba..d4df26d 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -26,6 +26,7 @@ #include <glob.h> #define PATHLEN (2000) +#define NONWORKING_RECHECK_INTERVAL_SECONDS (60) // ########################################## @@ -44,10 +45,10 @@ typedef struct time_t deadline; } imagecache; static imagecache remoteCloneCache[CACHELEN]; -static int remoteCloneCacheIndex = 0; // ########################################## +static void image_remove(dnbd3_image_t *image); static dnbd3_image_t* image_free(dnbd3_image_t *image); static bool image_isHashBlockComplete(uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); static bool image_load_all_internal(char *base, char *path); @@ -283,18 +284,65 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) candidate->users++; spin_unlock( &candidate->lock ); - if ( !checkIfWorking ) return candidate; + if ( !checkIfWorking ) return candidate; // Found, but not interested in working state // Found, see if it works - if ( !candidate->working && candidate->cache_map != NULL && candidate->uplink == NULL && file_isWritable( candidate->path ) ) { - // Not working and has file + cache-map, try to init uplink (uplink_init will check if proxy mode is enabled) - uplink_init( candidate, -1, NULL ); - } else if ( candidate->working && candidate->uplink != NULL && candidate->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { - // To many pending uplink requests. We take that as a hint that the uplink is clogged or no working uplink server - // exists, so "working" is changed to false for now. Should a new uplink server be found the uplink thread will - // set this back to true some time. - candidate->working = false; + + if ( candidate->working ) { + // Last known state was "working", see if that should change + if ( candidate->readFd == -1 ) { + candidate->working = false; + } + } else { // ...not working... + // Don't re-check too often + spin_lock( &candidate->lock ); + bool check; + const time_t now = time( NULL ); + check = ( now - candidate->lastWorkCheck ) > NONWORKING_RECHECK_INTERVAL_SECONDS; + if ( check ) { + candidate->lastWorkCheck = now; + } + spin_unlock( &candidate->lock ); + if ( !check ) { + return candidate; + } + // Check if the local file exists, has the right size, and is readable (writable for incomplete image) + if ( candidate->cache_map != NULL ) { + // -- Incomplete - rw check + if ( candidate->cacheFd == -1 ) { // Make sure file is open for writing + candidate->cacheFd = open( candidate->path, O_RDWR ); + // It might have failed - still offer proxy mode, we just can't cache + if ( candidate->cacheFd == -1 ) { + logadd( LOG_WARNING, "Cannot re-open %s for writing - replication disabled", candidate->path ); + } + } + if ( candidate->uplink == NULL && candidate->cacheFd != -1 ) { + uplink_init( candidate, -1, NULL ); + } + } + // Common for ro and rw images + const off_t len = lseek( candidate->readFd, 0, SEEK_END ); + if ( len == -1 ) { + logadd( LOG_WARNING, "lseek() on %s failed (errno=%d), removing image", candidate->path, errno ); + image_remove( candidate ); // No release here, the image is still returned and should be released by caller + } else if ( (uint64_t)len != candidate->realFilesize ) { + logadd( LOG_DEBUG1, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64 + ". Try sending SIGHUP to server if you know what you're doing.", + candidate->path, candidate->realFilesize, (uint64_t)len ); + } else { + // Seek worked, file size is same, now see if we can read from file + char buffer[100]; + if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) { + logadd( LOG_DEBUG2, "Reading first %d bytes from %s failed (errno=%d), removing image", + (int)sizeof(buffer), candidate->path, errno ); + image_remove( candidate ); + } else { + // Seems everything is fine again \o/ + candidate->working = true; + } + } } + return candidate; // Success :-) } @@ -304,7 +352,7 @@ dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) * Every call to image_lock() needs to be followed by a call to image_release() at some point. * Locks on: imageListLock, _images[].lock */ -dnbd3_image_t* image_lock(dnbd3_image_t *image) +dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places that do image->users-- { if ( image == NULL ) return NULL ; int i; @@ -361,7 +409,7 @@ dnbd3_image_t* image_release(dnbd3_image_t *image) * no active users * Locks on: imageListLock, image[].lock */ -void image_remove(dnbd3_image_t *image) +static void image_remove(dnbd3_image_t *image) { bool wasInList = false; spin_lock( &imageListLock ); @@ -399,13 +447,46 @@ void image_killUplinks() /** * Load all images in given path recursively. * Pass NULL to use path from config. - * NOT THREAD SAFE, make sure this is only running - * on one thread at a time! */ bool image_loadAll(char *path) { bool ret; + char imgPath[PATHLEN]; + int imgId; + dnbd3_image_t *imgHandle; + if ( path == NULL ) path = _basePath; + if ( _removeMissingImages ) { + // Check if all loaded images still exist on disk + logadd( LOG_DEBUG1, "Checking for vanished images" ); + spin_lock( &imageListLock ); + for (int i = _num_images - 1; i >= 0; --i) { + if ( _images[i] == NULL ) { + if ( i + 1 == _num_images ) _num_images--; + continue; + } + imgId = _images[i]->id; + snprintf( imgPath, PATHLEN, "%s", _images[i]->path ); + spin_unlock( &imageListLock ); // isReadable hits the fs; unlock + // Check if fill can still be opened for reading + ret = file_isReadable( imgPath ); + // Lock again, see if image is still there, free if required + spin_lock( &imageListLock ); + if ( ret || i >= _num_images || _images[i] == NULL || _images[i]->id != imgId ) continue; + // Image needs to be removed + imgHandle = _images[i]; + _images[i] = NULL; + if ( i + 1 == _num_images ) _num_images--; + if ( imgHandle->users != 0 ) continue; // Still in use, do not free (last releasing user will trigger) + // Image is not in use anymore, free the dangling entry immediately + spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock + image_free( imgHandle ); + spin_lock( &imageListLock ); + } + spin_unlock( &imageListLock ); + } + // Now scan for new images + logadd( LOG_DEBUG1, "Scanning for new or modified images" ); pthread_mutex_lock( &reloadLock ); ret = image_load_all_internal( path, path ); pthread_mutex_unlock( &reloadLock ); @@ -438,6 +519,9 @@ bool image_tryFreeAll() static dnbd3_image_t* image_free(dnbd3_image_t *image) { assert( image != NULL ); + if ( !_shutdown ) { + logadd( LOG_INFO, "Freeing image %s:%d", image->lower_name, (int)image->rid ); + } // image_saveCacheMap( image ); uplink_shutdown( image ); @@ -511,7 +595,7 @@ static bool image_load_all_internal(char *base, char *path) logadd( LOG_WARNING, "stat() for '%s' failed. Ignoring....", subpath ); continue; } - if ( S_ISDIR( st.st_mode )) { + if ( S_ISDIR( st.st_mode ) ) { image_load_all_internal( base, subpath ); // Recurse } else { image_load( base, subpath, true ); // Load image if possible @@ -531,7 +615,7 @@ static bool image_load_all_internal(char *base, char *path) static bool image_load(char *base, char *path, int withUplink) { static int imgIdCounter = 0; // Used to assign unique numeric IDs to images - int i, revision; + int i, revision = -1; struct stat st; uint8_t *cache_map = NULL; uint32_t *crc32list = NULL; @@ -563,7 +647,7 @@ static bool image_load(char *base, char *path, int withUplink) // Easy - legacy mode, simply append full file name and set rid to 1 strcat( dst, fileName ); revision = 1; - } else { + } else if ( !_vmdkLegacyMode ) { // Try to parse *.r<ID> syntax for (i = fileNameLen - 1; i > 1; --i) { if ( fileName[i] < '0' || fileName[i] > '9' ) break; @@ -578,7 +662,7 @@ static bool image_load(char *base, char *path, int withUplink) } *dst = '\0'; } - if ( revision <= 0 ) { + if ( revision <= 0 || revision >= 65536 ) { logadd( LOG_WARNING, "Image '%s' has invalid revision ID %d", path, revision ); goto load_error; } @@ -612,7 +696,7 @@ static bool image_load(char *base, char *path, int withUplink) // 1. Allocate memory for the cache map if the image is incomplete cache_map = image_loadCacheMap( path, virtualFilesize ); - // TODO: Maybe try sha-256 or 512 first if you're paranoid (to be implemented) + // XXX: Maybe try sha-256 or 512 first if you're paranoid (to be implemented) // 2. Load CRC-32 list of image uint32_t masterCrc; @@ -630,34 +714,34 @@ static bool image_load(char *base, char *path, int withUplink) // Compare data just loaded to identical image we apparently already loaded if ( existing != NULL ) { if ( existing->realFilesize != realFilesize ) { - // Image will be replaced below logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", existing->lower_name, (int)existing->rid ); + // Image will be replaced below } else if ( existing->crc32 != NULL && crc32list != NULL && memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlockCount ) != 0 ) { - // Image will be replaced below logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", existing->lower_name, (int)existing->rid ); logadd( LOG_WARNING, "The image will be reloaded, but you should NOT replace existing images while the server is running." ); logadd( LOG_WARNING, "Actually even if it's not running this should never be done. Use a new RID instead!" ); + // Image will be replaced below } else if ( existing->crc32 == NULL && crc32list != NULL ) { logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", existing->lower_name, (int)existing->rid ); existing->crc32 = crc32list; existing->masterCrc32 = masterCrc; crc32list = NULL; function_return = true; - goto load_error; + goto load_error; // Keep existing } else if ( existing->cache_map != NULL && cache_map == NULL ) { // Just ignore that fact, if replication is really complete the cache map will be removed anyways logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", existing->lower_name, (int)existing->rid ); function_return = true; - goto load_error; + goto load_error; // Keep existing } else { // Nothing changed about the existing image, so do nothing function_return = true; - goto load_error; + goto load_error; // Keep existing } - // Remove image from images array - image_release( existing ); + // Remove existing image from images array, so it will be replaced by the reloaded image image_remove( existing ); + image_release( existing ); existing = NULL; } @@ -714,7 +798,7 @@ static bool image_load(char *base, char *path, int withUplink) spin_lock( &imageListLock ); // Now we're locked, assign unique ID to image (unique for this running server instance!) image->id = ++imgIdCounter; - for (i = 0; i < _num_images; ++i) { + for ( i = 0; i < _num_images; ++i ) { if ( _images[i] != NULL ) continue; _images[i] = image; break; @@ -727,7 +811,7 @@ static bool image_load(char *base, char *path, int withUplink) goto load_error; } _images[_num_images++] = image; - logadd( LOG_DEBUG1, "Loaded image '%s'\n", image->lower_name ); + logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->lower_name, (int)image->rid ); } // Keep fd for reading image->readFd = fdImage; @@ -888,7 +972,7 @@ bool image_create(char *image, int revision, uint64_t size) close( fdCache ); return true; // - failure_cleanup: ; +failure_cleanup: ; if ( fdImage >= 0 ) close( fdImage ); if ( fdCache >= 0 ) close( fdCache ); remove( path ); @@ -913,8 +997,11 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste */ dnbd3_image_t* image_getOrLoad(char * const name, const uint16_t revision) { - // not proxy, specific revision - nothing to do - if ( !_isProxy && revision != 0 ) return image_get( name, revision, true ); + // specific revision - try shortcut + if ( revision != 0 ) { + dnbd3_image_t *image = image_get( name, revision, true ); + if ( image != NULL ) return image; + } const size_t len = strlen( name ); // Sanity check if ( len == 0 || name[len - 1] == '/' || name[0] == '/' @@ -935,14 +1022,15 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, { int i; // Already existing locally? - dnbd3_image_t *image = image_get( name, revision, true ); - // exists and specific revision - nothing to do - if ( image != NULL && revision != 0 ) return image; + dnbd3_image_t *image = NULL; + if ( revision == 0 ) { + image = image_get( name, revision, true ); + } // Doesn't exist or is rid 0, try remote if not already tried it recently const time_t now = time( NULL ); char *cmpname = name; - int useIndex = -1; + int useIndex = -1, fallbackIndex = 0; if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN; pthread_mutex_lock( &remoteCloneLock ); for (i = 0; i < CACHELEN; ++i) { @@ -952,6 +1040,9 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked... return image; } + if ( remoteCloneCache[i].deadline < remoteCloneCache[fallbackIndex].deadline ) { + fallbackIndex = i; + } } // Re-check to prevent two clients at the same time triggering this, // but only if rid != 0, since we would just get an old rid then @@ -966,11 +1057,13 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, serialized_buffer_t serialized; // Mark as recently checked if ( useIndex == -1 ) { - useIndex = remoteCloneCacheIndex = (remoteCloneCacheIndex + 1) % CACHELEN; + useIndex = fallbackIndex; } remoteCloneCache[useIndex].deadline = now + SERVER_REMOTE_IMAGE_CHECK_CACHETIME; snprintf( remoteCloneCache[useIndex].name, NAMELEN, "%s", cmpname ); remoteCloneCache[useIndex].rid = revision; + pthread_mutex_unlock( &remoteCloneLock ); + // Get some alt servers and try to get the image from there dnbd3_host_t servers[4]; int uplinkSock = -1; @@ -980,26 +1073,32 @@ static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, uint16_t remoteRid = revision; uint64_t remoteImageSize; for (i = 0; i < count; ++i) { + char *remoteName; + bool ok = false; int sock = sock_connect( &servers[i], 750, _uplinkTimeout ); - if ( sock < 0 ) continue; + if ( sock == -1 ) continue; if ( !dnbd3_select_image( sock, name, revision, FLAGS8_SERVER ) ) goto server_fail; - char *remoteName; if ( !dnbd3_select_image_reply( &serialized, sock, &remoteProtocolVersion, &remoteName, &remoteRid, &remoteImageSize ) ) goto server_fail; if ( remoteProtocolVersion < MIN_SUPPORTED_SERVER || remoteRid == 0 ) goto server_fail; - if ( revision != 0 && remoteRid != revision ) goto server_fail; + if ( revision != 0 && remoteRid != revision ) goto server_fail; // Want specific revision but uplink supplied different rid if ( revision == 0 && image != NULL && image->rid >= remoteRid ) goto server_fail; // Not actually a failure: Highest remote rid is <= highest local rid - don't clone! if ( remoteImageSize < DNBD3_BLOCK_SIZE || remoteName == NULL || strcmp( name, remoteName ) != 0 ) goto server_fail; if ( remoteImageSize > SERVER_MAX_PROXY_IMAGE_SIZE ) goto server_fail; - if ( !image_ensureDiskSpace( remoteImageSize ) ) goto server_fail; - if ( !image_clone( sock, name, remoteRid, remoteImageSize ) ) goto server_fail; // This sets up the file+map+crc + pthread_mutex_lock( &reloadLock ); + ok = image_ensureDiskSpace( remoteImageSize ) + && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img + pthread_mutex_unlock( &reloadLock ); + if ( !ok ) goto server_fail; + // Cloning worked :-) uplinkSock = sock; uplinkServer = &servers[i]; break; - server_fail: ; + +server_fail: ; close( sock ); } - pthread_mutex_unlock( &remoteCloneLock ); + // If we still have a pointer to a local image, release the reference if ( image != NULL ) image_release( image ); // If everything worked out, this call should now actually return the image @@ -1030,9 +1129,10 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste uint16_t detectedRid = 0; if ( _vmdkLegacyMode ) { - // TODO - assert( 0 ); - detectedRid = requestedRid; + if ( strend( name, ".vmdk" ) ) { + snprintf( imageFile, PATHLEN, "%s/%s", _basePath, name ); + detectedRid = MAX( 1, requestedRid ); + } } else if ( requestedRid != 0 ) { snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, requestedRid ); detectedRid = requestedRid; @@ -1044,8 +1144,9 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste if ( ret == 0 ) { long int best = 0; for ( size_t i = 0; i < g.gl_pathc; ++i ) { - char *rev = strrchr( g.gl_pathv[i], 'r' ); - if ( rev == NULL ) continue; + const char * const path = g.gl_pathv[i]; + const char * rev = strrchr( path, 'r' ); + if ( rev == NULL || rev == path || *(rev - 1) != '.' ) continue; rev++; if ( *rev < '0' || *rev > '9' ) continue; char *err = NULL; @@ -1062,22 +1163,45 @@ static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requeste } globfree( &g ); } + logadd( LOG_DEBUG2, "Trying to load %s:%d ( -> %d) as %s", name, (int)requestedRid, (int)detectedRid, imageFile ); // No file was determined, or it doesn't seem to exist/be readable - if ( imageFile[0] == '\0' || !file_isReadable( imageFile ) ) { // XXX glob fallback to rid-1? Rework above + if ( imageFile[0] == '\0' ) { + logadd( LOG_DEBUG2, "Not found, bailing out" ); return image_get( name, detectedRid, true ); } + if ( requestedRid == 0 ) { + // rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0 + while ( detectedRid != 0 ) { + dnbd3_image_t *image = image_get( name, detectedRid, true ); + if ( image != NULL ) { + // globbed rid already loaded, return + return image; + } + if ( file_isReadable( imageFile ) ) { + // globbed rid is + break; + } + logadd( LOG_DEBUG2, "%s: rid %d globbed but not readable, trying lower rid...", name, (int)detectedRid ); + detectedRid--; + snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, requestedRid ); + } + } + // Now lock on the loading mutex, then check again if the image exists (we're multi-threaded) pthread_mutex_lock( &reloadLock ); dnbd3_image_t* image = image_get( name, detectedRid, true ); if ( image != NULL ) { // The image magically appeared in the meantime + logadd( LOG_DEBUG2, "Magically appeared" ); pthread_mutex_unlock( &reloadLock ); return image; } // Still not loaded, let's try to do so + logadd( LOG_DEBUG2, "Calling load" ); image_load( _basePath, imageFile, false ); pthread_mutex_unlock( &reloadLock ); // If loading succeeded, this will return the image + logadd( LOG_DEBUG2, "Calling get" ); return image_get( name, requestedRid, true ); } @@ -1333,7 +1457,7 @@ static bool image_calcBlockCrc32(const int fd, const int block, const uint32_t r */ static bool image_ensureDiskSpace(uint64_t size) { - for (;;) { + for ( int maxtries = 0; maxtries < 20; ++maxtries ) { const int64_t available = file_freeDiskSpace( _basePath ); if ( available == -1 ) { const int e = errno; @@ -1368,7 +1492,7 @@ static bool image_ensureDiskSpace(uint64_t size) return false; } oldest = image_lock( oldest ); - if ( oldest == NULL ) return false; + if ( oldest == NULL ) continue; // Image freed in the meantime? Try again logadd( LOG_INFO, "'%s:%d' has to go!", oldest->lower_name, (int)oldest->rid ); unlink( oldest->path ); size_t len = strlen( oldest->path ) + 5 + 1; diff --git a/src/server/net.c b/src/server/net.c index ad228f4..a467620 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -85,7 +85,7 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" ); return false; } - if ( recv( sock, payload->buffer, size, MSG_WAITALL ) != size ) { + if ( sock_recv( sock, payload->buffer, size ) != size ) { logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size ); return false; } @@ -94,34 +94,35 @@ static inline bool recv_request_payload(int sock, uint32_t size, serialized_buff return true; } +/** + * Send reply with optional payload. payload can be null. The caller has to + * acquire the sendMutex first. + */ static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) { - const unsigned int size = reply->size; + const size_t size = reply->size; fixup_reply( *reply ); - if ( !payload || size == 0 ) { - if ( send( sock, reply, sizeof(dnbd3_reply_t), 0 ) != sizeof(dnbd3_reply_t) ) { - logadd( LOG_DEBUG1, "Send failed (header-only)\n" ); - return false; - } - } else { - struct iovec iov[2]; - iov[0].iov_base = reply; - iov[0].iov_len = sizeof(dnbd3_reply_t); - iov[1].iov_base = payload; - iov[1].iov_len = (size_t)size; - if ( (size_t)writev( sock, iov, 2 ) != sizeof(dnbd3_reply_t) + size ) { - logadd( LOG_DEBUG1, "Send failed (reply with payload of %u bytes)\n", size ); + if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) { + logadd( LOG_DEBUG1, "Sending reply header to client failed" ); + return false; + } + if ( size != 0 && payload != NULL ) { + if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) { + logadd( LOG_DEBUG1, "Sending payload of %u bytes to client failed", size ); return false; } } return true; } +/** + * Send given amount of null bytes. The caller has to acquire the sendMutex first. + */ static inline bool sendPadding( const int fd, uint32_t bytes ) { ssize_t ret; while ( bytes >= sizeof(nullbytes) ) { - ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 1 ); + ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 ); if ( ret <= 0 ) return false; bytes -= (uint32_t)ret; @@ -131,8 +132,9 @@ static inline bool sendPadding( const int fd, uint32_t bytes ) uint64_t net_getTotalBytesSent() { + // reads and writes to 64bit ints are not atomic on x86, so let's be safe and use locking spin_lock( &statisticsSentLock ); - uint64_t tmp = totalBytesSent; + const uint64_t tmp = totalBytesSent; spin_unlock( &statisticsSentLock ); return tmp; } @@ -156,10 +158,10 @@ void *net_client_handler(void *dnbd3_client) bool hasName = false; serialized_buffer_t payload; - char *image_name; uint16_t rid, client_version; uint64_t start, end; - char buffer[100]; + uint32_t tempBytesSent = 0; + char hostPrintable[100]; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -174,25 +176,38 @@ void *net_client_handler(void *dnbd3_client) if ( recv_request_header( client->sock, &request ) ) { if ( request.cmd != CMD_SELECT_IMAGE ) { logadd( LOG_DEBUG1, "Client sent invalid handshake (%d). Dropping Client\n", (int)request.cmd ); - } else { - if ( recv_request_payload( client->sock, request.size, &payload ) ) { - client_version = serializer_get_uint16( &payload ); - image_name = serializer_get_string( &payload ); - rid = serializer_get_uint16( &payload ); - client->isServer = serializer_get_uint8( &payload ); - if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { - if ( client_version < MIN_SUPPORTED_CLIENT ) { - logadd( LOG_DEBUG1, "Client too old\n" ); - } else { - logadd( LOG_DEBUG1, "Incomplete handshake received\n" ); - } + } else if ( recv_request_payload( client->sock, request.size, &payload ) ) { + char *image_name; + client_version = serializer_get_uint16( &payload ); + image_name = serializer_get_string( &payload ); + rid = serializer_get_uint16( &payload ); + client->isServer = serializer_get_uint8( &payload ); + if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( client_version < MIN_SUPPORTED_CLIENT ) { + logadd( LOG_DEBUG1, "Client too old\n" ); } else { - client->image = image = image_getOrLoad( image_name, rid ); - if ( image == NULL ) { - //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else if ( !image->working ) { - logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid ); - } else { + logadd( LOG_DEBUG1, "Incomplete handshake received\n" ); + } + } else { + client->image = image = image_getOrLoad( image_name, rid ); + if ( image == NULL ) { + //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + } else if ( !image->working ) { + logadd( LOG_DEBUG1, "Client requested non-working image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + } else { + // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable + bOk = true; + if ( image->cache_map != NULL ) { + spin_lock( &image->lock ); + if ( image->uplink == NULL || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + bOk = ( rand() % 4 ) == 1; + } + spin_unlock( &image->lock ); + if ( image->cacheFd == -1 ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + } + if ( bOk ) { image_file = image->readFd; serializer_reset_write( &payload ); serializer_put_uint16( &payload, PROTOCOL_VERSION ); @@ -201,9 +216,8 @@ void *net_client_handler(void *dnbd3_client) serializer_put_uint64( &payload, image->virtualFilesize ); reply.cmd = CMD_SELECT_IMAGE; reply.size = serializer_get_written_length( &payload ); - if ( send_reply( client->sock, &reply, &payload ) ) { - if ( !client->isServer ) image->atime = time( NULL ); - bOk = true; + if ( !send_reply( client->sock, &reply, &payload ) ) { + bOk = false; } } } @@ -211,6 +225,11 @@ void *net_client_handler(void *dnbd3_client) } } else if ( strncmp( (char*)&request, "GET ", 4 ) == 0 || strncmp( (char*)&request, "POST ", 5 ) == 0 ) { rpc_sendStatsJson( client->sock ); + } else { + // Unknown request + if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) { + logadd( LOG_DEBUG1, "Client %s sent invalid handshake", hostPrintable ); + } } if ( bOk ) { @@ -251,23 +270,13 @@ void *net_client_handler(void *dnbd3_client) spin_lock( &image->lock ); // Check again as we only aquired the lock just now if ( image->cache_map != NULL ) { - const uint64_t firstByte = start >> 15; - const uint64_t lastByte = (end - 1) >> 15; - // First byte - uint64_t pos = start; - do { - const int map_x = (pos >> 12) & 7; // mod 8 - const uint8_t bit_mask = 1 << map_x; - if ( (image->cache_map[firstByte] & bit_mask) == 0 ) { - isCached = false; - break; - } - pos += DNBD3_BLOCK_SIZE; - } while ( firstByte == (pos >> 15) && pos < end ); + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + uint64_t pos; // Middle - quick checking if ( isCached ) { - pos = firstByte + 1; - while ( pos < lastByte ) { + pos = firstByteInMap + 1; + while ( pos < lastByteInMap ) { if ( image->cache_map[pos] != 0xff ) { isCached = false; break; @@ -275,14 +284,27 @@ void *net_client_handler(void *dnbd3_client) ++pos; } } - // Last byte - if ( isCached && firstByte != lastByte ) { - pos = lastByte << 15; + // First byte + if ( isCached ) { + pos = start; + do { + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = 1 << map_x; + if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) { + isCached = false; + break; + } + pos += DNBD3_BLOCK_SIZE; + } while ( firstByteInMap == (pos >> 15) && pos < end ); + } + // Last byte - only check if request spans multiple bytes in cache map + if ( isCached && firstByteInMap != lastByteInMap ) { + pos = lastByteInMap << 15; while ( pos < end ) { - assert( lastByte == (pos >> 15) ); + assert( lastByteInMap == (pos >> 15) ); const int map_x = (pos >> 12) & 7; // mod 8 const uint8_t bit_mask = 1 << map_x; - if ( (image->cache_map[lastByte] & bit_mask) == 0 ) { + if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) { isCached = false; break; } @@ -293,7 +315,9 @@ void *net_client_handler(void *dnbd3_client) spin_unlock( &image->lock ); if ( !isCached ) { if ( !uplink_request( client, request.handle, request.offset, request.size ) ) { - logadd( LOG_DEBUG1, "Could not relay uncached request to upstream proxy\n" ); + logadd( LOG_DEBUG1, "Could not relay un-cached request to upstream proxy, disabling image %s:%d", + image->lower_name, (int)image->rid ); + image->working = false; goto exit_client_cleanup; } break; // DONE @@ -329,10 +353,13 @@ void *net_client_handler(void *dnbd3_client) if ( ret <= 0 ) { const int err = errno; if ( lock ) pthread_mutex_unlock( &client->sendMutex ); - if ( ret < 0 && err != EPIPE && err != ECONNRESET ) - logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n", - (int)done, (int)realBytes, err ); - if ( err == EBADF || err == EINVAL || err == EIO ) image->working = false; + if ( ret == -1 ) { + if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN ) { + logadd( LOG_DEBUG1, "sendfile failed (image to net. sent %d/%d, errno=%d)\n", + (int)done, (int)realBytes, err ); + } + if ( err != EAGAIN && err != EWOULDBLOCK ) image->working = false; + } goto exit_client_cleanup; } done += ret; @@ -343,9 +370,20 @@ void *net_client_handler(void *dnbd3_client) goto exit_client_cleanup; } } - client->bytesSent += request.size; // Increase counter for statistics. } if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + // Global per-client counter + spin_lock( &client->lock ); + client->bytesSent += request.size; // Increase counter for statistics. + spin_unlock( &client->lock ); + // Local counter that gets added to the global total bytes sent counter periodically + tempBytesSent += request.size; + if (tempBytesSent > 1024 * 1024 ) { + spin_lock( &statisticsSentLock ); + totalBytesSent += tempBytesSent; + spin_unlock( &statisticsSentLock ); + tempBytesSent = 0; + } break; case CMD_GET_SERVERS: @@ -353,19 +391,25 @@ void *net_client_handler(void *dnbd3_client) num = altservers_getMatching( &client->host, server_list, NUMBER_SERVERS ); reply.cmd = CMD_GET_SERVERS; reply.size = num * sizeof(dnbd3_server_entry_t); + pthread_mutex_lock( &client->sendMutex ); send_reply( client->sock, &reply, server_list ); - client->isServer = false; // Only clients request list of servers + pthread_mutex_unlock( &client->sendMutex ); goto set_name; break; case CMD_KEEPALIVE: reply.cmd = CMD_KEEPALIVE; reply.size = 0; + pthread_mutex_lock( &client->sendMutex ); send_reply( client->sock, &reply, NULL ); + pthread_mutex_unlock( &client->sendMutex ); + image->atime = time( NULL ); set_name: ; - if ( !hasName && host_to_string( &client->host, buffer, sizeof buffer ) ) { + if ( !hasName ) { hasName = true; - setThreadName( buffer ); + if ( host_to_string( &client->host, hostPrintable, sizeof hostPrintable ) ) { + setThreadName( hostPrintable ); + } } break; @@ -376,19 +420,21 @@ set_name: ; case CMD_GET_CRC32: reply.cmd = CMD_GET_CRC32; + pthread_mutex_lock( &client->sendMutex ); if ( image->crc32 == NULL ) { reply.size = 0; send_reply( client->sock, &reply, NULL ); } else { const int size = reply.size = (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t); send_reply( client->sock, &reply, NULL ); - send( client->sock, &image->masterCrc32, sizeof(uint32_t), 0 ); + send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE ); send( client->sock, image->crc32, size - sizeof(uint32_t), 0 ); } + pthread_mutex_unlock( &client->sendMutex ); break; default: - logadd( LOG_ERROR, "Unknown command: %d", (int)request.cmd ); + logadd( LOG_ERROR, "Unknown command from client: %d", (int)request.cmd ); break; } @@ -397,9 +443,9 @@ set_name: ; exit_client_cleanup: ; dnbd3_removeClient( client ); spin_lock( &statisticsSentLock ); - totalBytesSent += client->bytesSent;// Add the amount of bytes sent by the client to the statistics of the server. + totalBytesSent += tempBytesSent; // Add the amount of bytes sent by the client to the statistics of the server. spin_unlock( &statisticsSentLock ); - client = dnbd3_freeClient( client ); + client = dnbd3_freeClient( client ); // This will also call image_release on client->image return NULL ; } diff --git a/src/server/uplink.c b/src/server/uplink.c index 7fa3c96..29c42af 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -158,19 +158,19 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin spin_lock( &client->image->lock ); if ( client->image->uplink == NULL ) { spin_unlock( &client->image->lock ); - logadd( LOG_DEBUG1, "Uplink request for image with no uplink (%s)\n", client->image->lower_name ); + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); return false; } dnbd3_connection_t * const uplink = client->image->uplink; if ( uplink->shutdown ) { spin_unlock( &client->image->lock ); - logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down (%s)\n", client->image->lower_name ); + logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); return false; } // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain if ( isSameAddress( &uplink->currentServer, &client->host ) ) { spin_unlock( &client->image->lock ); - logadd( LOG_DEBUG1, "Proxy cycle detected.\n" ); + logadd( LOG_DEBUG1, "Proxy cycle detected" ); return false; } @@ -572,9 +572,15 @@ static void uplink_handleReceive(dnbd3_connection_t *link) const uint64_t end = inReply.handle + inReply.size; link->bytesReceived += inReply.size; // 1) Write to cache file - assert( link->image->cacheFd != -1 ); - ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start ); - if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true ); + if ( link->image->cacheFd != -1 ) { + ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start ); + if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true ); + if ( ret == -1 && ( errno == EBADF || errno == EINVAL || errno == EIO ) ) { + const int fd = link->image->cacheFd; + link->image->cacheFd = -1; + close( fd ); + } + } // 2) Figure out which clients are interested in it spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { |