diff options
author | Frederic Robra | 2019-06-25 17:03:28 +0200 |
---|---|---|
committer | Frederic Robra | 2019-06-25 17:03:28 +0200 |
commit | 43e57ce5e11e9052f5a7db66f2e8613f1784f919 (patch) | |
tree | c5e1372a160b2601f61b18d617b71799b06b02ae /src/server | |
download | dnbd3-ng-43e57ce5e11e9052f5a7db66f2e8613f1784f919.tar.gz dnbd3-ng-43e57ce5e11e9052f5a7db66f2e8613f1784f919.tar.xz dnbd3-ng-43e57ce5e11e9052f5a7db66f2e8613f1784f919.zip |
first version of dnbd3-ng
Diffstat (limited to 'src/server')
32 files changed, 8272 insertions, 0 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c new file mode 100644 index 0000000..b91ceab --- /dev/null +++ b/src/server/altservers.c @@ -0,0 +1,612 @@ +#include "altservers.h" +#include "locks.h" +#include "helper.h" +#include "image.h" +#include "fileutil.h" +#include "../shared/protocol.h" +#include "../shared/timing.h" +#include "../serverconfig.h" +#include <assert.h> +#include <inttypes.h> +#include <jansson.h> + +#define LOG(lvl, msg, ...) logadd(lvl, msg " (%s:%d)", __VA_ARGS__, image->name, (int)image->rid) +#define LOG_GOTO(jumplabel, lvl, ...) do { LOG(lvl, __VA_ARGS__); goto jumplabel; } while (0); +#define ERROR_GOTO(jumplabel, ...) LOG_GOTO(jumplabel, LOG_ERROR, __VA_ARGS__) + +static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; +static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) +static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL) +static dnbd3_signal_t* runSignal = NULL; + +static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; +static int numAltServers = 0; +static pthread_spinlock_t altServersLock; + +static pthread_t altThread; + +static void *altservers_main(void *data); +static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt); + +void altservers_init() +{ + srand( (unsigned int)time( NULL ) ); + // Init spinlock + spin_init( &pendingLockWrite, PTHREAD_PROCESS_PRIVATE ); + spin_init( &altServersLock, PTHREAD_PROCESS_PRIVATE ); + // Init signal + runSignal = signal_new(); + if ( runSignal == NULL ) { + logadd( LOG_ERROR, "Error creating signal object. Uplink feature unavailable." ); + exit( EXIT_FAILURE ); + } + memset( altServers, 0, SERVER_MAX_ALTS * sizeof(dnbd3_alt_server_t) ); + if ( 0 != thread_create( &altThread, NULL, &altservers_main, (void *)NULL ) ) { + logadd( LOG_ERROR, "Could not start altservers connector thread" ); + exit( EXIT_FAILURE ); + } + // Init waiting links queue -- this is currently a global static array so + // it will already be zero, but in case we refactor later do it explicitly + // while also holding the write lock so thread sanitizer is happy + spin_lock( &pendingLockWrite ); + for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + pending[i] = NULL; + } + spin_unlock( &pendingLockWrite ); +} + +void altservers_shutdown() +{ + if ( runSignal == NULL ) return; + signal_call( runSignal ); // Wake altservers thread up + thread_join( altThread, NULL ); +} + +static void addalt(int argc, char **argv, void *data) +{ + char *shost; + dnbd3_host_t host; + bool isPrivate = false; + bool isClientOnly = false; + if ( argv[0][0] == '#' ) return; + for (shost = argv[0]; *shost != '\0'; ) { // Trim left and scan for "-" prefix + if ( *shost == '-' ) isPrivate = true; + else if ( *shost == '+' ) isClientOnly = true; + else if ( *shost != ' ' && *shost != '\t' ) break; + shost++; + } + if ( !parse_address( shost, &host ) ) { + logadd( LOG_WARNING, "Invalid entry in alt-servers file ignored: '%s'", shost ); + return; + } + if ( argc == 1 ) argv[1] = ""; + if ( altservers_add( &host, argv[1], isPrivate, isClientOnly ) ) { + (*(int*)data)++; + } +} + +int altservers_load() +{ + int count = 0; + char *name; + if ( asprintf( &name, "%s/%s", _configDir, "alt-servers" ) == -1 ) return -1; + file_loadLineBased( name, 1, 2, &addalt, (void*)&count ); + free( name ); + logadd( LOG_DEBUG1, "Added %d alt servers\n", count ); + return count; +} + +bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly) +{ + int i, freeSlot = -1; + spin_lock( &altServersLock ); + for (i = 0; i < numAltServers; ++i) { + if ( isSameAddressPort( &altServers[i].host, host ) ) { + spin_unlock( &altServersLock ); + return false; + } else if ( freeSlot == -1 && altServers[i].host.type == 0 ) { + freeSlot = i; + } + } + if ( freeSlot == -1 ) { + if ( numAltServers >= SERVER_MAX_ALTS ) { + logadd( LOG_WARNING, "Cannot add another alt server, maximum of %d already reached.", (int)SERVER_MAX_ALTS ); + spin_unlock( &altServersLock ); + return false; + } + freeSlot = numAltServers++; + } + altServers[freeSlot].host = *host; + altServers[freeSlot].isPrivate = isPrivate; + altServers[freeSlot].isClientOnly = isClientOnly; + if ( comment != NULL ) snprintf( altServers[freeSlot].comment, COMMENT_LENGTH, "%s", comment ); + spin_unlock( &altServersLock ); + return true; +} + +/** + * ONLY called from the passed uplink's main thread + */ +void altservers_findUplink(dnbd3_connection_t *uplink) +{ + int i; + // if betterFd != -1 it means the uplink is supposed to switch to another + // server. As this function here is called by the uplink thread, it can + // never be that the uplink is supposed to switch, but instead calls + // this function. + assert( uplink->betterFd == -1 ); + spin_lock( &pendingLockWrite ); + // it is however possible that an RTT measurement is currently in progress, + // so check for that case and do nothing if one is in progress + if ( uplink->rttTestResult == RTT_INPROGRESS ) { + for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( pending[i] != uplink ) continue; + // Yep, measuring right now + spin_unlock( &pendingLockWrite ); + return; + } + } + // Find free slot for measurement + for (i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( pending[i] != NULL ) continue; + pending[i] = uplink; + uplink->rttTestResult = RTT_INPROGRESS; + spin_unlock( &pendingLockWrite ); + signal_call( runSignal ); // Wake altservers thread up + return; + } + // End of loop - no free slot + spin_unlock( &pendingLockWrite ); + logadd( LOG_WARNING, "No more free RTT measurement slots, ignoring a request..." ); +} + +/** + * The given uplink is about to disappear, so remove it from any queues + */ +void altservers_removeUplink(dnbd3_connection_t *uplink) +{ + pthread_mutex_lock( &pendingLockConsume ); + spin_lock( &pendingLockWrite ); + for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) { + if ( pending[i] == uplink ) { + uplink->rttTestResult = RTT_NOT_REACHABLE; + pending[i] = NULL; + } + } + spin_unlock( &pendingLockWrite ); + pthread_mutex_unlock( &pendingLockConsume ); +} + +/** + * Get <size> known (working) alt servers, ordered by network closeness + * (by finding the smallest possible subnet) + * Private servers are excluded, so this is what you want to call to + * get a list of servers you can tell a client about + */ +int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size) +{ + if ( host == NULL || host->type == 0 || numAltServers == 0 || output == NULL || size <= 0 ) return 0; + int i, j; + int count = 0; + int scores[size]; + int score; + spin_lock( &altServersLock ); + if ( size > numAltServers ) size = numAltServers; + for (i = 0; i < numAltServers; ++i) { + if ( altServers[i].host.type == 0 ) continue; // Slot is empty + if ( altServers[i].isPrivate ) continue; // Do not tell clients about private servers + if ( host->type == altServers[i].host.type ) { + score = altservers_netCloseness( host, &altServers[i].host ) - altServers[i].numFails; + } else { + score = -( altServers[i].numFails + 128 ); // Wrong address family + } + if ( count == 0 ) { + // Trivial - this is the first entry + output[0].host = altServers[i].host; + output[0].failures = 0; + scores[0] = score; + count++; + } else { + // Other entries already exist, insert in proper position + for (j = 0; j < size; ++j) { + if ( j < count && score <= scores[j] ) continue; + if ( j > count ) break; // Should never happen but just in case... + if ( j < count && j + 1 < size ) { + // Check if we're in the middle and need to move other entries... + memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) ); + memmove( &scores[j + 1], &scores[j], sizeof(int) * (size - j - 1) ); + } + if ( count < size ) { + count++; + } + output[j].host = altServers[i].host; + output[j].failures = 0; + scores[j] = score; + break; + } + } + } + spin_unlock( &altServersLock ); + return count; +} + +/** + * Get <size> alt servers. If there are more alt servers than + * requested, random servers will be picked. + * This function is suited for finding uplink servers as + * it includes private servers and ignores any "client only" servers + */ +int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency) +{ + if ( size <= 0 ) return 0; + int count = 0, i; + ticks now; + timing_get( &now ); + spin_lock( &altServersLock ); + // Flip first server in list with a random one every time this is called + if ( numAltServers > 1 ) { + const dnbd3_alt_server_t tmp = altServers[0]; + do { + i = rand() % numAltServers; + } while ( i == 0 ); + altServers[0] = altServers[i]; + altServers[i] = tmp; + } + // We iterate over the list twice. First run adds servers with 0 failures only, + // second one also considers those that failed (not too many times) + if ( size > numAltServers ) size = numAltServers; + for (i = 0; i < numAltServers * 2; ++i) { + dnbd3_alt_server_t *srv = &altServers[i % numAltServers]; + if ( srv->host.type == 0 ) continue; // Slot is empty + if ( _proxyPrivateOnly && !srv->isPrivate ) continue; // Config says to consider private alt-servers only? ignore! + if ( srv->isClientOnly ) continue; + bool first = ( i < numAltServers ); + if ( first ) { + if ( srv->numFails > 0 ) continue; + } else { + if ( srv->numFails == 0 ) continue; // Already added in first iteration + if ( !emergency && srv->numFails > SERVER_BAD_UPLINK_THRES // server failed X times in a row + && timing_diff( &srv->lastFail, &now ) < SERVER_BAD_UPLINK_IGNORE ) continue; // and last fail was not too long ago? ignore! + if ( !emergency ) srv->numFails--; + } + // server seems ok, include in output and decrease its fail counter + output[count++] = srv->host; + if ( count >= size ) break; + } + spin_unlock( &altServersLock ); + return count; +} + +json_t* altservers_toJson() +{ + json_t *list = json_array(); + + spin_lock( &altServersLock ); + char host[100]; + const int count = numAltServers; + dnbd3_alt_server_t src[count]; + memcpy( src, altServers, sizeof(src) ); + spin_unlock( &altServersLock ); + for (int i = 0; i < count; ++i) { + json_t *rtts = json_array(); + for (int j = 0; j < SERVER_RTT_PROBES; ++j) { + json_array_append_new( rtts, json_integer( src[i].rtt[ (j + src[i].rttIndex + 1) % SERVER_RTT_PROBES ] ) ); + } + sock_printHost( &src[i].host, host, sizeof(host) ); + json_t *server = json_pack( "{ss,ss,so,sb,sb,si}", + "comment", src[i].comment, + "host", host, + "rtt", rtts, + "isPrivate", (int)src[i].isPrivate, + "isClientOnly", (int)src[i].isClientOnly, + "numFails", src[i].numFails + ); + json_array_append_new( list, server ); + } + return list; +} + +/** + * Update rtt history of given server - returns the new average for that server + */ +static unsigned int altservers_updateRtt(const dnbd3_host_t * const host, const unsigned int rtt) +{ + unsigned int avg = rtt; + int i; + spin_lock( &altServersLock ); + for (i = 0; i < numAltServers; ++i) { + if ( !isSameAddressPort( host, &altServers[i].host ) ) continue; + altServers[i].rtt[++altServers[i].rttIndex % SERVER_RTT_PROBES] = rtt; +#if SERVER_RTT_PROBES == 5 + avg = (altServers[i].rtt[0] + altServers[i].rtt[1] + altServers[i].rtt[2] + + altServers[i].rtt[3] + altServers[i].rtt[4]) / SERVER_RTT_PROBES; +#else +#warning You might want to change the code in altservers_update_rtt if you changed SERVER_RTT_PROBES + avg = 0; + for (int j = 0; j < SERVER_RTT_PROBES; ++j) { + avg += altServers[i].rtt[j]; + } + avg /= SERVER_RTT_PROBES; +#endif + // If we got a new rtt value, server must be working + if ( altServers[i].numFails > 0 ) { + altServers[i].numFails--; + } + break; + } + spin_unlock( &altServersLock ); + return avg; +} + +/** + * Determine how close two addresses are to each other by comparing the number of + * matching bits from the left of the address. Does not count individual bits but + * groups of 4 for speed. + * Return: Closeness - higher number means closer + */ +int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2) +{ + if ( host1 == NULL || host2 == NULL || host1->type != host2->type ) return -1; + int retval = 0; + const int max = host1->type == HOST_IP4 ? 4 : 16; + for (int i = 0; i < max; ++i) { + if ( (host1->addr[i] & 0xf0) != (host2->addr[i] & 0xf0) ) return retval; + ++retval; + if ( (host1->addr[i] & 0x0f) != (host2->addr[i] & 0x0f) ) return retval; + ++retval; + } + return retval; +} + +/** + * Called if an uplink server failed during normal uplink operation. This unit keeps + * track of how often servers fail, and consider them disabled for some time if they + * fail too many times. + */ +void altservers_serverFailed(const dnbd3_host_t * const host) +{ + int i; + int foundIndex = -1, lastOk = -1; + ticks now; + timing_get( &now ); + spin_lock( &altServersLock ); + for (i = 0; i < numAltServers; ++i) { + if ( foundIndex == -1 ) { + // Looking for the failed server in list + if ( isSameAddressPort( host, &altServers[i].host ) ) { + foundIndex = i; + } + } else if ( altServers[i].host.type != 0 && altServers[i].numFails == 0 ) { + lastOk = i; + } + } + // Do only increase counter if last fail was not too recent. This is + // to prevent the counter from increasing rapidly if many images use the + // same uplink. If there's a network hickup, all uplinks will call this + // function and would increase the counter too quickly, disabling the server. + if ( foundIndex != -1 && timing_diff( &altServers[foundIndex].lastFail, &now ) > SERVER_RTT_INTERVAL_INIT ) { + altServers[foundIndex].numFails += SERVER_UPLINK_FAIL_INCREASE; + altServers[foundIndex].lastFail = now; + if ( lastOk != -1 ) { + // Make sure non-working servers are put at the end of the list, so they're less likely + // to get picked when testing servers for uplink connections. + const dnbd3_alt_server_t tmp = altServers[foundIndex]; + altServers[foundIndex] = altServers[lastOk]; + altServers[lastOk] = tmp; + } + } + spin_unlock( &altServersLock ); +} +/** + * Mainloop of this module. It will wait for requests by uplinks to find a + * suitable uplink server for them. If found, it will tell the uplink about + * the best server found. Currently the RTT history is kept per server and + * not per uplink, so if many images use the same uplink server, the history + * will update quite quickly. Needs to be improved some time, ie. by only + * updating the rtt if the last update was at least X seconds ago. + */ +static void *altservers_main(void *data UNUSED) +{ + const int ALTS = 4; + int ret, itLink, itAlt, numAlts; + bool found; + char buffer[DNBD3_BLOCK_SIZE ]; + dnbd3_reply_t reply; + dnbd3_host_t servers[ALTS + 1]; + serialized_buffer_t serialized; + struct timespec start, end; + ticks nextCloseUnusedFd; + + setThreadName( "altserver-check" ); + blockNoncriticalSignals(); + timing_gets( &nextCloseUnusedFd, 900 ); + // LOOP + while ( !_shutdown ) { + // Wait 5 seconds max. + ret = signal_wait( runSignal, 5000 ); + if ( _shutdown ) goto cleanup; + if ( ret == SIGNAL_ERROR ) { + if ( errno == EAGAIN || errno == EINTR ) continue; + logadd( LOG_WARNING, "Error %d on signal_clear on alservers_main! Things will break!", errno ); + usleep( 100000 ); + } + // Work your way through the queue + for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { + spin_lock( &pendingLockWrite ); + if ( pending[itLink] == NULL ) { + spin_unlock( &pendingLockWrite ); + continue; // Check once before locking, as a mutex is expensive + } + spin_unlock( &pendingLockWrite ); + pthread_mutex_lock( &pendingLockConsume ); + spin_lock( &pendingLockWrite ); + dnbd3_connection_t * const uplink = pending[itLink]; + spin_unlock( &pendingLockWrite ); + if ( uplink == NULL ) { // Check again after locking + pthread_mutex_unlock( &pendingLockConsume ); + continue; + } + dnbd3_image_t * const image = image_lock( uplink->image ); + if ( image == NULL ) { // Check again after locking + uplink->rttTestResult = RTT_NOT_REACHABLE; + spin_lock( &pendingLockWrite ); + pending[itLink] = NULL; + spin_unlock( &pendingLockWrite ); + pthread_mutex_unlock( &pendingLockConsume ); + logadd( LOG_DEBUG1, "Image has gone away that was queued for RTT measurement" ); + continue; + } + LOG( LOG_DEBUG2, "[%d] Running alt check", itLink ); + assert( uplink->rttTestResult == RTT_INPROGRESS ); + // Now get 4 alt servers + numAlts = altservers_getListForUplink( servers, ALTS, uplink->fd == -1 ); + if ( uplink->fd != -1 ) { + // Add current server if not already in list + found = false; + for (itAlt = 0; itAlt < numAlts; ++itAlt) { + if ( !isSameAddressPort( &uplink->currentServer, &servers[itAlt] ) ) continue; + found = true; + break; + } + if ( !found ) servers[numAlts++] = uplink->currentServer; + } + // Test them all + int bestSock = -1; + int bestIndex = -1; + int bestProtocolVersion = -1; + unsigned long bestRtt = RTT_UNREACHABLE; + unsigned long currentRtt = RTT_UNREACHABLE; + for (itAlt = 0; itAlt < numAlts; ++itAlt) { + usleep( 1000 ); // Wait a very short moment for the network to recover (we might be doing lots of measurements...) + // Connect + clock_gettime( BEST_CLOCK_SOURCE, &start ); + int sock = sock_connect( &servers[itAlt], 750, 1000 ); + if ( sock < 0 ) continue; + // Select image ++++++++++++++++++++++++++++++ + if ( !dnbd3_select_image( sock, image->name, image->rid, SI_SERVER_FLAGS ) ) { + goto server_failed; + } + // See if selecting the image succeeded ++++++++++++++++++++++++++++++ + uint16_t protocolVersion, rid; + uint64_t imageSize; + char *name; + if ( !dnbd3_select_image_reply( &serialized, sock, &protocolVersion, &name, &rid, &imageSize ) ) { + goto server_image_not_available; + } + if ( protocolVersion < MIN_SUPPORTED_SERVER ) goto server_failed; + if ( name == NULL || strcmp( name, image->name ) != 0 ) { + ERROR_GOTO( server_failed, "[RTT] Server offers image '%s'", name ); + } + if ( rid != image->rid ) { + ERROR_GOTO( server_failed, "[RTT] Server provides rid %d", (int)rid ); + } + if ( imageSize != image->virtualFilesize ) { + ERROR_GOTO( server_failed, "[RTT] Remote size: %" PRIu64 ", expected: %" PRIu64, imageSize, image->virtualFilesize ); + } + // Request first block (NOT random!) ++++++++++++++++++++++++++++++ + if ( !dnbd3_get_block( sock, 0, DNBD3_BLOCK_SIZE, 0, COND_HOPCOUNT( protocolVersion, 1 ) ) ) { + LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Could not request first block", itLink ); + } + // See if requesting the block succeeded ++++++++++++++++++++++ + if ( !dnbd3_get_reply( sock, &reply ) ) { + LOG_GOTO( server_failed, LOG_DEBUG1, "[RTT%d] Received corrupted reply header after CMD_GET_BLOCK", itLink ); + } + // check reply header + if ( reply.cmd != CMD_GET_BLOCK || reply.size != DNBD3_BLOCK_SIZE ) { + ERROR_GOTO( server_failed, "[RTT] Reply to first block request is %" PRIu32 " bytes", reply.size ); + } + if ( recv( sock, buffer, DNBD3_BLOCK_SIZE, MSG_WAITALL ) != DNBD3_BLOCK_SIZE ) { + ERROR_GOTO( server_failed, "[RTT%d] Could not read first block payload", itLink ); + } + clock_gettime( BEST_CLOCK_SOURCE, &end ); + // Measurement done - everything fine so far + spin_lock( &uplink->rttLock ); + const bool isCurrent = isSameAddressPort( &servers[itAlt], &uplink->currentServer ); + // Penaltize rtt if this was a cycle; this will treat this server with lower priority + // in the near future too, so we prevent alternating between two servers that are both + // part of a cycle and have the lowest latency. + const unsigned int rtt = (unsigned int)((end.tv_sec - start.tv_sec) * 1000000 + + (end.tv_nsec - start.tv_nsec) / 1000 + + ( (isCurrent && uplink->cycleDetected) ? 1000000 : 0 )); // µs + unsigned int avg = altservers_updateRtt( &servers[itAlt], rtt ); + // If a cycle was detected, or we lost connection to the current (last) server, penaltize it one time + if ( ( uplink->cycleDetected || uplink->fd == -1 ) && isCurrent ) avg = (avg * 2) + 50000; + spin_unlock( &uplink->rttLock ); + if ( uplink->fd != -1 && isCurrent ) { + // Was measuring current server + currentRtt = avg; + close( sock ); + } else if ( avg < bestRtt ) { + // Was another server, update "best" + if ( bestSock != -1 ) close( bestSock ); + bestSock = sock; + bestRtt = avg; + bestIndex = itAlt; + bestProtocolVersion = protocolVersion; + } else { + // Was too slow, ignore + close( sock ); + } + // We're done, call continue + continue; + // Jump here if anything went wrong + // This will cleanup and continue + server_failed: ; + altservers_serverFailed( &servers[itAlt] ); + server_image_not_available: ; + close( sock ); + } + // Done testing all servers. See if we should switch + if ( bestSock != -1 && (uplink->fd == -1 || (bestRtt < 10000000 && RTT_THRESHOLD_FACTOR(currentRtt) > bestRtt)) ) { + // yep + if ( currentRtt > 10000000 || uplink->fd == -1 ) { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: -", bestRtt ); + } else { + LOG( LOG_DEBUG1, "Change - best: %luµs, current: %luµs", bestRtt, currentRtt ); + } + sock_setTimeout( bestSock, _uplinkTimeout ); + spin_lock( &uplink->rttLock ); + uplink->betterFd = bestSock; + uplink->betterServer = servers[bestIndex]; + uplink->betterVersion = bestProtocolVersion; + uplink->rttTestResult = RTT_DOCHANGE; + spin_unlock( &uplink->rttLock ); + signal_call( uplink->signal ); + } else if ( bestSock == -1 && currentRtt == RTT_UNREACHABLE ) { + // No server was reachable + spin_lock( &uplink->rttLock ); + uplink->rttTestResult = RTT_NOT_REACHABLE; + spin_unlock( &uplink->rttLock ); + } else { + // nope + if ( bestSock != -1 ) close( bestSock ); + spin_lock( &uplink->rttLock ); + uplink->rttTestResult = RTT_DONTCHANGE; + uplink->cycleDetected = false; // It's a lie, but prevents rtt measurement triggering again right away + spin_unlock( &uplink->rttLock ); + if ( !image->working ) { + image->working = true; + LOG( LOG_DEBUG1, "[%d] No better alt server found, enabling again", itLink ); + } + } + image_release( image ); + // end of loop over all pending uplinks + spin_lock( &pendingLockWrite ); + pending[itLink] = NULL; + spin_unlock( &pendingLockWrite ); + pthread_mutex_unlock( &pendingLockConsume ); + } + // Save cache maps of all images if applicable + declare_now; + // TODO: Has nothing to do with alt servers really, maybe move somewhere else? + if ( _closeUnusedFd && timing_reached( &nextCloseUnusedFd, &now ) ) { + timing_gets( &nextCloseUnusedFd, 900 ); + image_closeUnusedFd(); + } + } + cleanup: ; + if ( runSignal != NULL ) signal_close( runSignal ); + runSignal = NULL; + return NULL ; +} + diff --git a/src/server/altservers.h b/src/server/altservers.h new file mode 100644 index 0000000..7b7b46d --- /dev/null +++ b/src/server/altservers.h @@ -0,0 +1,30 @@ +#ifndef _ALTSERVERS_H_ +#define _ALTSERVERS_H_ + +#include "globals.h" + +struct json_t; + +void altservers_init(); + +void altservers_shutdown(); + +int altservers_load(); + +bool altservers_add(dnbd3_host_t *host, const char *comment, const int isPrivate, const int isClientOnly); + +void altservers_findUplink(dnbd3_connection_t *uplink); + +void altservers_removeUplink(dnbd3_connection_t *uplink); + +int altservers_getListForClient(dnbd3_host_t *host, dnbd3_server_entry_t *output, int size); + +int altservers_getListForUplink(dnbd3_host_t *output, int size, int emergency); + +int altservers_netCloseness(dnbd3_host_t *host1, dnbd3_host_t *host2); + +void altservers_serverFailed(const dnbd3_host_t * const host); + +struct json_t* altservers_toJson(); + +#endif /* UPLINK_CONNECTOR_H_ */ diff --git a/src/server/fileutil.c b/src/server/fileutil.c new file mode 100644 index 0000000..336ab68 --- /dev/null +++ b/src/server/fileutil.c @@ -0,0 +1,128 @@ +#include "fileutil.h" +#include "helper.h" + +#include <fcntl.h> +#include <unistd.h> +#include <stdio.h> +#include <errno.h> +#include <stdlib.h> +#include <assert.h> +#include <string.h> +#include <sys/stat.h> +#include <sys/statvfs.h> + +bool file_isReadable(char *file) +{ + int fd = open( file, O_RDONLY ); + if ( fd < 0 ) return false; + close( fd ); + return true; +} + +bool file_isWritable(char *file) +{ + int fd = open( file, O_WRONLY ); + if ( fd >= 0 ) { + close( fd ); + return true; + } + fd = open( file, O_WRONLY | O_CREAT, 0600 ); + if ( fd < 0 ) return false; + close( fd ); + remove( file ); + return true; +} + +bool mkdir_p(const char* path) +{ + assert( path != NULL ); + if ( *path == '\0' ) return true; + char buffer[strlen( path ) + 1]; + strcpy( buffer, path ); + char *current = buffer; + char *slash; + while ( (slash = strchr( current, '/' )) != NULL ) { + *slash = '\0'; + if ( *buffer != '\0' && mkdir( buffer, 0755 ) != 0 && errno != EEXIST ) return false; + *slash = '/'; + current = slash + 1; + } + if ( mkdir( buffer, 0755 ) != 0 && errno != EEXIST ) return false; + return true; +} + +bool file_alloc(int fd, uint64_t offset, uint64_t size) +{ +#ifdef __linux__ + if ( fallocate( fd, 0, offset, size ) == 0 ) return true; // fast way +#elif defined(__FreeBSD__) + if ( posix_fallocate( fd, offset, size ) == 0 ) return true; // slow way +#endif + return false; +} + +bool file_setSize(int fd, uint64_t size) +{ + if ( ftruncate( fd, size ) == 0 ) return true; + + // Try really hard... image loading logic relies on the file + // having the proper apparent size + uint8_t byte = 0; + pread( fd, &byte, 1, size - 1 ); + if ( pwrite( fd, &byte, 1, size - 1 ) == 1 ) return true; + return false; +} + +bool file_freeDiskSpace(const char * const path, uint64_t *total, uint64_t *avail) +{ + struct statvfs fiData; + if ( statvfs( path, &fiData ) < 0 ) { + return false; + } + if ( avail != NULL ) { + *avail = ((uint64_t)fiData.f_bavail * (uint64_t)fiData.f_frsize); + } + if ( total != NULL ) { + *total = ((uint64_t)fiData.f_blocks * (uint64_t)fiData.f_frsize); + } + return true; +} + +time_t file_lastModification(const char * const file) +{ + struct stat st; + if ( stat( file, &st ) != 0 ) return 0; + return st.st_mtime; +} + +int file_loadLineBased(const char * const file, int minFields, int maxFields, void (*cb)(int argc, char **argv, void *data), void *data) +{ + char buffer[1000], *line; + char *items[20]; + int count = 0, itemCount; + + if ( file == NULL || cb == NULL ) return -1; + FILE *fp = fopen( file, "r" ); + if ( fp == NULL ) return -1; + while ( fgets( buffer, sizeof(buffer), fp ) != NULL ) { + itemCount = 0; + for (line = buffer; *line != '\0' && itemCount < 20; ) { // Trim left and scan for "-" prefix + while ( *line == ' ' || *line == '\t' ) ++line; + if ( *line == '\r' || *line == '\n' || *line == '\0' ) break; // Ignore empty lines + items[itemCount++] = line; + if ( itemCount >= maxFields ) { + trim_right( line ); + break; + } + while ( *line != '\0' && *line != ' ' && *line != '\t' && *line != '\r' && *line != '\n' ) ++line; + if ( *line != '\0' ) *line++ = '\0'; + } + if ( itemCount >= minFields ) { + cb( itemCount, items, data ); + count++; + } + } + fclose( fp ); + return count; +} + diff --git a/src/server/fileutil.h b/src/server/fileutil.h new file mode 100644 index 0000000..fcb5c20 --- /dev/null +++ b/src/server/fileutil.h @@ -0,0 +1,17 @@ +#ifndef _FILEUTIL_H_ +#define _FILEUTIL_H_ + +#include <stdint.h> +#include <stdbool.h> +#include <time.h> + +bool file_isReadable(char *file); +bool file_isWritable(char *file); +bool mkdir_p(const char* path); +bool file_alloc(int fd, uint64_t offset, uint64_t size); +bool file_setSize(int fd, uint64_t size); +bool file_freeDiskSpace(const char * const path, uint64_t *total, uint64_t *avail); +time_t file_lastModification(const char * const file); +int file_loadLineBased(const char * const file, int minFields, int maxFields, void (*cb)(int argc, char **argv, void *data), void *data); + +#endif /* FILEUTIL_H_ */ diff --git a/src/server/globals.c b/src/server/globals.c new file mode 100644 index 0000000..c9b9411 --- /dev/null +++ b/src/server/globals.c @@ -0,0 +1,321 @@ +#include "globals.h" +#include "ini.h" +#include "../shared/log.h" +#include <string.h> +#include <stdlib.h> +#include <inttypes.h> +#include <limits.h> +#include <sys/resource.h> +#include <errno.h> + +char *_configDir = NULL; +atomic_bool _shutdown = false; +// [dnbd3] +atomic_int _listenPort = PORT; +char *_basePath = NULL; +atomic_int _serverPenalty = 0; +atomic_int _clientPenalty = 0; +atomic_bool _isProxy = false; +atomic_int _backgroundReplication = BGR_FULL; +atomic_int _bgrMinClients = 0; +atomic_bool _lookupMissingForProxy = true; +atomic_bool _sparseFiles = false; +atomic_bool _removeMissingImages = true; +atomic_int _uplinkTimeout = SOCKET_TIMEOUT_UPLINK; +atomic_int _clientTimeout = SOCKET_TIMEOUT_CLIENT; +atomic_bool _closeUnusedFd = false; +atomic_bool _vmdkLegacyMode = false; +// Not really needed anymore since we have '+' and '-' in alt-servers +atomic_bool _proxyPrivateOnly = false; +// [limits] +atomic_int _maxClients = SERVER_MAX_CLIENTS; +atomic_int _maxImages = SERVER_MAX_IMAGES; +atomic_int _maxPayload = 9000000; // 9MB +atomic_uint_fast64_t _maxReplicationSize = (uint64_t)100000000000LL; + +/** + * True when loading config the first time. Consecutive loads will + * ignore certain values which cannot be changed safely at runtime. + */ +static atomic_bool initialLoad = true; +static pthread_mutex_t loadLock = PTHREAD_MUTEX_INITIALIZER; + +#define IS_TRUE(value) (atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0) +#define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0) +#define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = IS_TRUE(value); } while (0) +#define SAVE_TO_VAR_INT(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) parse32(value, &_ ## kk, #ss); } while (0) +#define SAVE_TO_VAR_UINT(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) parse32u(value, &_ ## kk, #ss); } while (0) +#define SAVE_TO_VAR_UINT64(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) parse64u(value, &_ ## kk, #ss); } while (0) + +static void sanitizeFixedConfig(); + +static void handleMaskString( const char *value, void(*func)(logmask_t) ); + +static const char* units = "KMGTPEZY"; + +static bool parse64(const char *in, atomic_int_fast64_t *out, const char *optname); +static bool parse64u(const char *in, atomic_uint_fast64_t *out, const char *optname); +static bool parse32(const char *in, atomic_int *out, const char *optname) UNUSED; +static bool parse32u(const char *in, atomic_int *out, const char *optname); + +static int ini_handler(void *custom UNUSED, const char* section, const char* key, const char* value) +{ + if ( initialLoad ) { + if ( _basePath == NULL ) SAVE_TO_VAR_STR( dnbd3, basePath ); + SAVE_TO_VAR_BOOL( dnbd3, vmdkLegacyMode ); + SAVE_TO_VAR_UINT( dnbd3, listenPort ); + SAVE_TO_VAR_UINT( limits, maxClients ); + SAVE_TO_VAR_UINT( limits, maxImages ); + } + SAVE_TO_VAR_BOOL( dnbd3, isProxy ); + SAVE_TO_VAR_BOOL( dnbd3, proxyPrivateOnly ); + SAVE_TO_VAR_INT( dnbd3, bgrMinClients ); + SAVE_TO_VAR_BOOL( dnbd3, lookupMissingForProxy ); + SAVE_TO_VAR_BOOL( dnbd3, sparseFiles ); + SAVE_TO_VAR_BOOL( dnbd3, removeMissingImages ); + SAVE_TO_VAR_BOOL( dnbd3, closeUnusedFd ); + SAVE_TO_VAR_UINT( dnbd3, serverPenalty ); + SAVE_TO_VAR_UINT( dnbd3, clientPenalty ); + SAVE_TO_VAR_UINT( dnbd3, uplinkTimeout ); + SAVE_TO_VAR_UINT( dnbd3, clientTimeout ); + SAVE_TO_VAR_UINT( limits, maxPayload ); + SAVE_TO_VAR_UINT64( limits, maxReplicationSize ); + if ( strcmp( section, "dnbd3" ) == 0 && strcmp( key, "backgroundReplication" ) == 0 ) { + if ( strcmp( value, "hashblock" ) == 0 ) { + _backgroundReplication = BGR_HASHBLOCK; + } else if ( IS_TRUE( value ) ) { + _backgroundReplication = BGR_FULL; + } else { + _backgroundReplication = BGR_DISABLED; + } + } + if ( strcmp( section, "logging" ) == 0 && strcmp( key, "fileMask" ) == 0 ) handleMaskString( value, &log_setFileMask ); + if ( strcmp( section, "logging" ) == 0 && strcmp( key, "consoleMask" ) == 0 ) handleMaskString( value, &log_setConsoleMask ); + if ( strcmp( section, "logging" ) == 0 && strcmp( key, "consoleTimestamps" ) == 0 ) log_setConsoleTimestamps( IS_TRUE(value) ); + if ( strcmp( section, "logging" ) == 0 && strcmp( key, "file" ) == 0 ) { + if ( log_openLogFile( value ) ) { + logadd( LOG_INFO, "Opened log file %s", value ); + } else { + logadd( LOG_ERROR, "Could not open log file %s", value ); + exit( EXIT_FAILURE ); + } + } + return 1; +} + +void globals_loadConfig() +{ + char *name = NULL; + asprintf( &name, "%s/%s", _configDir, CONFIG_FILENAME ); + if ( name == NULL ) return; + if ( pthread_mutex_trylock( &loadLock ) != 0 ) { + logadd( LOG_INFO, "Ignoring config reload request due to already running reload" ); + return; + } + ini_parse( name, &ini_handler, NULL ); + free( name ); + if ( initialLoad ) { + sanitizeFixedConfig(); + } + if ( _backgroundReplication == BGR_FULL && _sparseFiles && _bgrMinClients < 5 ) { + logadd( LOG_WARNING, "Ignoring 'sparseFiles=true' since backgroundReplication is set to true and bgrMinClients is too low" ); + _sparseFiles = false; + } + // Dump config as interpreted + char buffer[2000]; + globals_dumpConfig( buffer, sizeof(buffer) ); + logadd( LOG_DEBUG1, "Effective configuration:\n%s", buffer ); + initialLoad = false; + pthread_mutex_unlock( &loadLock ); +} + +static void sanitizeFixedConfig() +{ + // Validate settings after loading: + // base path for images valid? + if ( _basePath == NULL || _basePath[0] == '\0' ) { + logadd( LOG_WARNING, "No/empty basePath in " CONFIG_FILENAME ); + free( _basePath ); + _basePath = NULL; + } else if ( _basePath[0] != '/' ) { + logadd( LOG_WARNING, "basePath must be absolute!" ); + free( _basePath ); + _basePath = NULL; + } else { + char *end = _basePath + strlen( _basePath ) - 1; + while ( end >= _basePath && *end == '/' ) { + *end-- = '\0'; + } + } + // listen port + if ( _listenPort < 1 || _listenPort > 65535 ) { + logadd( LOG_ERROR, "listenPort must be 1-65535, but is %d", _listenPort ); + exit( EXIT_FAILURE ); + } + // Cap to hard limit + if ( _maxClients > SERVER_MAX_CLIENTS ) _maxClients = SERVER_MAX_CLIENTS; + if ( _maxImages > SERVER_MAX_IMAGES ) _maxImages = SERVER_MAX_IMAGES; + // Consider rlimits + struct rlimit limit; + if ( getrlimit( RLIMIT_NOFILE, &limit ) != 0 ) { + logadd( LOG_DEBUG1, "getrlimit failed, errno %d", errno ); + } else { + const rlim_t required = (rlim_t)( _maxClients + _maxImages * ( _isProxy ? 2 : 1 ) + 50 ); + if ( limit.rlim_cur != RLIM_INFINITY && limit.rlim_cur < required ) { + rlim_t current = limit.rlim_cur; + if ( required <= limit.rlim_max || limit.rlim_max == RLIM_INFINITY ) { + limit.rlim_cur = required; + } else { + limit.rlim_cur = limit.rlim_max; + } + if ( current != limit.rlim_cur && setrlimit( RLIMIT_NOFILE, &limit ) == 0 ) { + current = limit.rlim_cur; + logadd( LOG_INFO, "LIMIT_NOFILE (ulimit -n) soft limit increased to %d", (int)current ); + } + if ( current < required ) { + logadd( LOG_WARNING, "This process can only have %d open file handles," + " which is not enough for the selected maxClients and maxImages counts." + " Consider increasing the limit to at least %d (RLIMIT_NOFILE, ulimit -n)" + " to support the current configuration. maxClients and maxImages have" + " been lowered for this session.", (int)current, (int)required ); + do { + if ( _maxClients > 500 && _maxImages > 150 ) { + _maxImages -= _maxImages / 20 + 1; + _maxClients -= _maxClients / 20 + 1; + } else if ( _maxImages > 100 ) { + _maxImages -= _maxImages / 20 + 1; + if ( _maxClients > 200 ) _maxClients -= _maxClients / 25 + 1; + } else { + break; + } + } while ( (rlim_t)( _maxClients + _maxImages * ( _isProxy ? 2 : 1 ) + 50 ) > current ); + } + } + } +} + +#define SETLOGBIT(name) do { if ( strstr( value, #name ) != NULL ) mask |= LOG_ ## name; } while (0) +static void handleMaskString( const char *value, void(*func)(logmask_t) ) +{ + logmask_t mask = 0; + SETLOGBIT( ERROR ); + SETLOGBIT( WARNING ); + SETLOGBIT( MINOR ); + SETLOGBIT( INFO ); + SETLOGBIT( DEBUG1 ); + SETLOGBIT( DEBUG2 ); + (*func)( mask ); +} + +static bool parse64(const char *in, atomic_int_fast64_t *out, const char *optname) +{ + if ( *in == '\0' ) { + logadd( LOG_WARNING, "Ignoring empty numeric setting '%s'", optname ); + return false; + } + char *end; + long long int num = strtoll( in, &end, 10 ); + if ( end == in ) { + logadd( LOG_WARNING, "Ignoring value '%s' for '%s': Not a number", in, optname ); + return false; + } + int exp, base = 1024; + while ( *end == ' ' ) end++; + if ( *end == '\0' ) { + exp = 0; + } else { + char *pos = strchr( units, *end > 'Z' ? (*end - 32) : *end ); + if ( pos == NULL ) { + logadd( LOG_ERROR, "Invalid unit '%s' for '%s'", end, optname ); + return false; + } + exp = (int)( pos - units ) + 1; + end++; + if ( *end == 'B' || *end == 'b' ) { + base = 1000; + } + } + while ( exp-- > 0 ) num *= base; + *out = (int64_t)num; + return true; +} + +static bool parse64u(const char *in, atomic_uint_fast64_t *out, const char *optname) +{ + atomic_int_fast64_t v; + if ( !parse64( in, &v, optname ) ) return false; + if ( v < 0 ) { + logadd( LOG_WARNING, "Ignoring value '%s' for '%s': Cannot be negative", in, optname ); + return false; + } + *out = (uint64_t)v; + return true; +} + +static bool parse32(const char *in, atomic_int *out, const char *optname) +{ + atomic_int_fast64_t v; + if ( !parse64( in, &v, optname ) ) return false; + if ( v < INT_MIN || v > INT_MAX ) { + logadd( LOG_WARNING, "'%s' must be between %d and %d, but is '%s'", optname, (int)INT_MIN, (int)INT_MAX, in ); + return false; + } + *out = (int)v; + return true; +} + +static bool parse32u(const char *in, atomic_int *out, const char *optname) +{ + atomic_int_fast64_t v; + if ( !parse64( in, &v, optname ) ) return false; + if ( v < 0 || v > INT_MAX ) { + logadd( LOG_WARNING, "'%s' must be between %d and %d, but is '%s'", optname, (int)0, (int)INT_MAX, in ); + return false; + } + *out = (int)v; + return true; +} + +#define P_ARG(...) do { \ + int r = snprintf(buffer, rem, __VA_ARGS__); \ + if ( r < 0 || (size_t)r >= rem ) return size - 1; \ + rem -= r; \ + buffer += r; \ +} while (0) +#define PVAR(var,type,cast) P_ARG(#var "=%" type "\n", (cast) _ ## var) +#define PINT(var) PVAR(var, "d", int) +#define PUINT64(var) PVAR(var, PRIu64, uint64_t) +#define PSTR(var) PVAR(var, "s", const char*) +#define PBOOL(var) P_ARG(#var "=%s\n", _ ## var ? "true" : "false") + +size_t globals_dumpConfig(char *buffer, size_t size) +{ + size_t rem = size; + P_ARG("[dnbd3]\n"); + PINT(listenPort); + PSTR(basePath); + PINT(serverPenalty); + PINT(clientPenalty); + PBOOL(isProxy); + if ( _backgroundReplication == BGR_HASHBLOCK ) { + P_ARG("backgroundReplication=hashblock\n"); + } else { + PBOOL(backgroundReplication); + } + PINT(bgrMinClients); + PBOOL(lookupMissingForProxy); + PBOOL(sparseFiles); + PBOOL(removeMissingImages); + PINT(uplinkTimeout); + PINT(clientTimeout); + PBOOL(closeUnusedFd); + PBOOL(vmdkLegacyMode); + PBOOL(proxyPrivateOnly); + P_ARG("[limits]\n"); + PINT(maxClients); + PINT(maxImages); + PINT(maxPayload); + PUINT64(maxReplicationSize); + return size - rem; +} + diff --git a/src/server/globals.h b/src/server/globals.h new file mode 100644 index 0000000..2b30bc2 --- /dev/null +++ b/src/server/globals.h @@ -0,0 +1,277 @@ +#ifndef _GLOBALS_H_ +#define _GLOBALS_H_ + +#include "../types.h" +#include "../shared/fdsignal.h" +#include "../serverconfig.h" +#include <stdint.h> +#include <stdatomic.h> +#include <time.h> +#include <pthread.h> + +typedef struct timespec ticks; + +// ######### All structs/types used by the server ######## + +typedef struct _dnbd3_connection dnbd3_connection_t; +typedef struct _dnbd3_image dnbd3_image_t; +typedef struct _dnbd3_client dnbd3_client_t; + +// Slot is free, can be used. +// Must only be set in uplink_handle_receive() or uplink_remove_client() +#define ULR_FREE 0 +// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse. +// Must only be set in uplink_request() +#define ULR_NEW 1 +// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. +// Must only be set in uplink_mainloop() or uplink_request() +#define ULR_PENDING 2 +// Slot is being processed, do not consider for hop on. +// Must only be set in uplink_handle_receive() +#define ULR_PROCESSING 3 +typedef struct +{ + uint64_t handle; // Client defined handle to pass back in reply + uint64_t from; // First byte offset of requested block (ie. 4096) + uint64_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) + dnbd3_client_t * client; // Client to send reply to + int status; // status of this entry: ULR_* +#ifdef _DEBUG + ticks entered; // When this request entered the queue (for debugging) +#endif + uint8_t hopCount; // How many hops this request has already taken across proxies +} dnbd3_queued_request_t; + +#define RTT_IDLE 0 // Not in progress +#define RTT_INPROGRESS 1 // In progess, not finished +#define RTT_DONTCHANGE 2 // Finished, but no better alternative found +#define RTT_DOCHANGE 3 // Finished, better alternative written to .betterServer + .betterFd +#define RTT_NOT_REACHABLE 4 // No uplink was reachable +struct _dnbd3_connection +{ + int fd; // socket fd to remote server + int version; // remote server protocol version + dnbd3_signal_t* signal; // used to wake up the process + pthread_t thread; // thread holding the connection + pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. + dnbd3_image_t *image; // image that this uplink is used for; do not call get/release for this pointer + dnbd3_host_t currentServer; // Current server we're connected to + pthread_spinlock_t rttLock; // When accessing rttTestResult, betterFd or betterServer + int rttTestResult; // RTT_* + int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! + int betterVersion; // protocol version of better server + int betterFd; // Active connection to better server, ready to use + dnbd3_host_t betterServer; // The better server + uint8_t *recvBuffer; // Buffer for receiving payload + uint32_t recvBufferLen; // Len of ^^ + volatile bool shutdown; // signal this thread to stop, must only be set from uplink_shutdown() or cleanup in uplink_mainloop() + bool replicatedLastBlock; // bool telling if the last block has been replicated yet + bool cycleDetected; // connection cycle between proxies detected for current remote server + int nextReplicationIndex; // Which index in the cache map we should start looking for incomplete blocks at + // If BGR == BGR_HASHBLOCK, -1 means "currently no incomplete block" + uint64_t replicationHandle; // Handle of pending replication request + atomic_uint_fast64_t bytesReceived; // Number of bytes received by the uplink since startup. + int queueLen; // length of queue + uint32_t idleTime; // How many seconds the uplink was idle (apart from keep-alives) + dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; +}; + +typedef struct +{ + char comment[COMMENT_LENGTH]; + dnbd3_host_t host; + unsigned int rtt[SERVER_RTT_PROBES]; + unsigned int rttIndex; + bool isPrivate, isClientOnly; + ticks lastFail; + int numFails; +} dnbd3_alt_server_t; + +typedef struct +{ + uint8_t host[16]; + int bytes; + int bitMask; + int permissions; +} dnbd3_access_rule_t; + +/** + * Image struct. An image path could be something like + * /mnt/images/rz/zfs/Windows7 ZfS.vmdk.r1 + * and the name would then be + * rz/zfs/windows7 zfs.vmdk + */ +struct _dnbd3_image +{ + char *path; // absolute path of the image + char *name; // public name of the image (usually relative path minus revision ID) + dnbd3_connection_t *uplink; // pointer to a server connection + uint8_t *cache_map; // cache map telling which parts are locally cached, NULL if complete + uint64_t virtualFilesize; // virtual size of image (real size rounded up to multiple of 4k) + uint64_t realFilesize; // actual file size on disk + ticks atime; // last access time + ticks lastWorkCheck; // last time a non-working image has been checked + ticks nextCompletenessEstimate; // next time the completeness estimate should be updated + uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image + uint32_t masterCrc32; // CRC-32 of the crc-32 list + int readFd; // used to read the image. Used from multiple threads, so use atomic operations (pread et al) + int completenessEstimate; // Completeness estimate in percent + int users; // clients currently using this image + int id; // Unique ID of this image. Only unique in the context of this running instance of DNBD3-Server + bool working; // true if image exists and completeness is == 100% or a working upstream proxy is connected + uint16_t rid; // revision of image + pthread_spinlock_t lock; +}; + +struct _dnbd3_client +{ +#define HOSTNAMELEN (48) + atomic_uint_fast64_t bytesSent; // Byte counter for this client. + dnbd3_image_t *image; // Image in use by this client, or NULL during handshake + int sock; + bool isServer; // true if a server in proxy mode, false if real client + dnbd3_host_t host; + char hostName[HOSTNAMELEN]; // inet_ntop version of host + pthread_mutex_t sendMutex; // Held while writing to sock if image is incomplete (since uplink uses socket too) + pthread_spinlock_t lock; +}; + +// ####################################################### +#define CONFIG_FILENAME "server.conf" + +/** + * Base directory where the configuration files reside. Will never have a trailing slash. + */ +extern char *_configDir; + +/** + * Base directory where all images are stored in. Will never have a trailing slash. + */ +extern char *_basePath; + +/** + * Whether or not simple *.vmdk files should be treated as revision 1 + */ +extern atomic_bool _vmdkLegacyMode; + +/** + * How much artificial delay should we add when a server connects to us? + */ +extern atomic_int _serverPenalty; + +/** + * How much artificial delay should we add when a client connects to us? + */ +extern atomic_int _clientPenalty; + +/** + * Is server shutting down? + */ +extern atomic_bool _shutdown; + +/** + * Is server allowed to provide images in proxy mode? + */ +extern atomic_bool _isProxy; + +/** + * Only use servers as upstream proxy which are private? + */ +extern atomic_bool _proxyPrivateOnly; + +/** + * Whether to remove missing images from image list on SIGHUP + */ +extern atomic_bool _removeMissingImages; + +/** + * Read timeout when waiting for or sending data on an uplink + */ +extern atomic_int _uplinkTimeout; + +/** + * Read timeout when waiting for or sending data from/to client + */ +extern atomic_int _clientTimeout; + +/** + * If true, images with no active client will have their fd closed after some + * idle time. + */ +extern atomic_bool _closeUnusedFd; + +/** + * Should we replicate incomplete images in the background? + * Otherwise, only blocks that were explicitly requested will be cached. + */ +extern atomic_int _backgroundReplication; +#define BGR_DISABLED (0) +#define BGR_FULL (1) +#define BGR_HASHBLOCK (2) + +/** + * Minimum connected clients for background replication to kick in + */ +extern atomic_int _bgrMinClients; + +/** + * (In proxy mode): If connecting client is a proxy, and the requested image + * is not known locally, should we ask our known alt servers for it? + * Otherwise the request is rejected. + */ +extern atomic_bool _lookupMissingForProxy; + +/** + * Should we preallocate proxied images right at the start to make + * sure we can cache it entirely, or rather create sparse files + * with holes in them? With sparse files, we just keep writing + * cached blocks to disk until it is full, and only then will we + * start to delete old images. This might be a bit flaky so use + * only in space restricted environments. Also make sure your + * file system actually supports sparse files / files with holes + * in them, or you might get really shitty performance. + * This setting will have no effect if background replication is + * turned on. + */ +extern atomic_bool _sparseFiles; + +/** + * Port to listen on (default: #define PORT (5003)) + */ +extern atomic_int _listenPort; + +/** + * Max number of DNBD3 clients we accept + */ +extern atomic_int _maxClients; + +/** + * Max number of Images we support (in baseDir) + */ +extern atomic_int _maxImages; + +/** + * Maximum payload length we accept on uplinks and thus indirectly + * from clients in case the requested range is not cached locally. + * Usually this isn't even a megabyte for "real" clients (blockdev + * or fuse). + */ +extern atomic_int _maxPayload; + +/** + * If in proxy mode, don't replicate images that are + * larger than this according to the uplink server. + */ +extern atomic_uint_fast64_t _maxReplicationSize; + +/** + * Load the server configuration. + */ +void globals_loadConfig(); + +/** + * Dump the effective configuration in use to given buffer. + */ +size_t globals_dumpConfig(char *buffer, size_t size); + +#endif /* GLOBALS_H_ */ diff --git a/src/server/helper.c b/src/server/helper.c new file mode 100644 index 0000000..2dbc3ea --- /dev/null +++ b/src/server/helper.c @@ -0,0 +1,146 @@ +#include "helper.h" +#include <arpa/inet.h> +#include <stdlib.h> +#include <signal.h> +#include <stdio.h> +#include <errno.h> +#include <sys/socket.h> + +#ifdef HAVE_THREAD_NAMES +#include <sys/prctl.h> // For thread names +#endif + +/** + * Parse IPv4 or IPv6 address in string representation to a suitable format usable by the BSD socket library + * !! Contents of 'string' might be modified by this function !! + * + * @param string eg. "1.2.3.4" or "2a01::10:5", optially with port appended, eg "1.2.3.4:6666" or "[2a01::10:5]:6666" + * @param host pointer to dnbd3_host_t that will be filled with the following data: + * .type will contain either HOST_IP4 or HOST_IP6 + * .addr will contain the address in network representation + * .port will contain the port in network representation, defaulting to #define PORT if none was given + * @return true on success, false in failure. contents of af, addr and port are undefined in the latter case + */ +bool parse_address(char *string, dnbd3_host_t *host) +{ + struct in_addr v4; + struct in6_addr v6; + + memset( host, 0, sizeof(*host) ); + // Try IPv4 without port + if ( 1 == inet_pton( AF_INET, string, &v4 ) ) { + host->type = HOST_IP4; + memcpy( host->addr, &v4, 4 ); + host->port = htons( PORT ); + return true; + } + // Try IPv6 without port + if ( 1 == inet_pton( AF_INET6, string, &v6 ) ) { + host->type = HOST_IP6; + memcpy( host->addr, &v6, 16 ); + host->port = htons( PORT ); + return true; + } + + // Scan for port + char *portpos = NULL, *ptr = string; + while ( *ptr ) { + if ( *ptr == ':' ) portpos = ptr; + ++ptr; + } + if ( portpos == NULL ) return false; // No port in string + // Consider IP being surrounded by [ ] + if ( *string == '[' && *(portpos - 1) == ']' ) { + ++string; + *(portpos - 1) = '\0'; + } + *portpos++ = '\0'; + int p = atoi( portpos ); + if ( p < 1 || p > 65535 ) return false; // Invalid port + host->port = htons( (uint16_t)p ); + + // Try IPv4 with port + if ( 1 == inet_pton( AF_INET, string, &v4 ) ) { + host->type = HOST_IP4; + memcpy( host->addr, &v4, 4 ); + return true; + } + // Try IPv6 with port + if ( 1 == inet_pton( AF_INET6, string, &v6 ) ) { + host->type = HOST_IP6; + memcpy( host->addr, &v6, 16 ); + return true; + } + + // FAIL + return false; +} + +/** + * Convert a host and port (network byte order) to printable representation. + * Worst case required buffer len is 48, eg. [1234:1234:1234:1234:1234:1234:1234:1234]:12345 (+ \0) + * Returns true on success, false on error + */ +bool host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen) +{ + // Worst case: Port 5 chars, ':' to separate ip and port 1 char, terminating null 1 char = 7, [] for IPv6 + if ( targetlen < 10 ) return false; + if ( host->type == HOST_IP6 ) { + *target++ = '['; + inet_ntop( AF_INET6, host->addr, target, (socklen_t)targetlen - 10 ); + target += strlen( target ); + *target++ = ']'; + } else if ( host->type == HOST_IP4 ) { + inet_ntop( AF_INET, host->addr, target, (socklen_t)targetlen - 8 ); + target += strlen( target ); + } else { + snprintf( target, targetlen, "<?addrtype=%d>", (int)host->type ); + return false; + } + *target = '\0'; + if ( host->port != 0 ) { + // There are still at least 7 bytes left in the buffer, port is at most 5 bytes + ':' + '\0' = 7 + snprintf( target, 7, ":%d", (int)ntohs( host->port ) ); + } + return true; +} + +void remove_trailing_slash(char *string) +{ + char *ptr = string + strlen( string ) - 1; + while ( ptr >= string && *ptr == '/' ) + *ptr-- = '\0'; +} + +void trim_right(char * const string) +{ + char *end = string + strlen( string ) - 1; + while ( end >= string && (*end == '\r' || *end == '\n' || *end == ' ' || *end == '\t') ) + *end-- = '\0'; +} + +void setThreadName(const char *name) +{ + char newName[16]; + if ( strlen( name ) > 15 ) { + snprintf( newName, sizeof(newName), "%s", name ); + newName[15] = '\0'; + name = newName; + } +#ifdef HAVE_THREAD_NAMES + prctl( PR_SET_NAME, (unsigned long)name, 0, 0, 0 ); +#endif + //TODO: On FreeBSD set threadname with pthread_setname_np +} + +void blockNoncriticalSignals() +{ + sigset_t sigmask; + sigemptyset( &sigmask ); + sigaddset( &sigmask, SIGUSR1 ); + sigaddset( &sigmask, SIGUSR2 ); + sigaddset( &sigmask, SIGHUP ); + sigaddset( &sigmask, SIGPIPE ); + pthread_sigmask( SIG_BLOCK, &sigmask, NULL ); +} + diff --git a/src/server/helper.h b/src/server/helper.h new file mode 100644 index 0000000..102cb36 --- /dev/null +++ b/src/server/helper.h @@ -0,0 +1,42 @@ +#ifndef HELPER_H_ +#define HELPER_H_ + +#include "server.h" +#include "../shared/log.h" +#include "../types.h" +#include <netinet/in.h> +#include <string.h> +#include <unistd.h> + +bool parse_address(char *string, dnbd3_host_t *host); +bool host_to_string(const dnbd3_host_t *host, char *target, size_t targetlen); +void remove_trailing_slash(char *string); +void trim_right(char * const string); +void setThreadName(const char *name); +void blockNoncriticalSignals(); + +static inline bool isSameAddress(const dnbd3_host_t * const a, const dnbd3_host_t * const b) +{ + return (a->type == b->type) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) )); +} + +static inline bool isSameAddressPort(const dnbd3_host_t * const a, const dnbd3_host_t * const b) +{ + return (a->type == b->type) && (a->port == b->port) && (0 == memcmp( a->addr, b->addr, (a->type == HOST_IP4 ? 4 : 16) )); +} + +/** + * Test whether string ends in suffix. + * @return true if string =~ /suffix$/ + */ +static inline bool strend(char *string, char *suffix) +{ + if ( string == NULL ) return false; + if ( suffix == NULL || *suffix == '\0' ) return true; + const size_t len1 = strlen( string ); + const size_t len2 = strlen( suffix ); + if ( len2 > len1 ) return false; + return strcmp( string + len1 - len2, suffix ) == 0; +} + +#endif diff --git a/src/server/image.c b/src/server/image.c new file mode 100644 index 0000000..061f9a3 --- /dev/null +++ b/src/server/image.c @@ -0,0 +1,1794 @@ +#include "image.h" +#include "helper.h" +#include "fileutil.h" +#include "uplink.h" +#include "locks.h" +#include "integrity.h" +#include "altservers.h" +#include "../shared/protocol.h" +#include "../shared/timing.h" +#include "../shared/crc32.h" + +#include <assert.h> +#include <fcntl.h> +#include <sys/stat.h> +#include <dirent.h> +#include <inttypes.h> +#include <glob.h> +#include <jansson.h> + +#define PATHLEN (2000) +#define NONWORKING_RECHECK_INTERVAL_SECONDS (60) + +// ########################################## + +static dnbd3_image_t *_images[SERVER_MAX_IMAGES]; +static int _num_images = 0; + +static pthread_spinlock_t imageListLock; +static pthread_mutex_t remoteCloneLock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t reloadLock = PTHREAD_MUTEX_INITIALIZER; +#define NAMELEN 500 +#define CACHELEN 20 +typedef struct +{ + char name[NAMELEN]; + uint16_t rid; + ticks deadline; +} imagecache; +static imagecache remoteCloneCache[CACHELEN]; + +// ########################################## + +static bool isForbiddenExtension(const char* name); +static dnbd3_image_t* image_remove(dnbd3_image_t *image); +static dnbd3_image_t* image_free(dnbd3_image_t *image); +static bool image_load_all_internal(char *base, char *path); +static bool image_addToList(dnbd3_image_t *image); +static bool image_load(char *base, char *path, int withUplink); +static bool image_clone(int sock, char *name, uint16_t revision, uint64_t imageSize); +static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_t realFilesize, uint32_t *crc); +static bool image_ensureDiskSpace(uint64_t size, bool force); + +static uint8_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize); +static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc); +static bool image_checkRandomBlocks(const int count, int fdImage, const int64_t fileSize, uint32_t * const crc32list, uint8_t * const cache_map); + +// ########################################## + +void image_serverStartup() +{ + srand( (unsigned int)time( NULL ) ); + spin_init( &imageListLock, PTHREAD_PROCESS_PRIVATE ); +} + +/** + * Update cache-map of given image for the given byte range + * start (inclusive) - end (exclusive) + * Locks on: images[].lock + */ +void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set) +{ + assert( image != NULL ); + // This should always be block borders due to how the protocol works, but better be safe + // than accidentally mark blocks as cached when they really aren't entirely cached. + assert( end <= image->virtualFilesize ); + assert( start <= end ); + if ( set ) { + // If we set as cached, move "inwards" in case we're not at 4k border + end &= ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + start = (uint64_t)(start + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + } else { + // If marking as NOT cached, move "outwards" in case we're not at 4k border + start &= ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + end = (uint64_t)(end + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + } + if ( start >= end ) + return; + bool setNewBlocks = false; + uint64_t pos = start; + spin_lock( &image->lock ); + if ( image->cache_map == NULL ) { + // Image seems already complete + if ( set ) { + // This makes no sense + spin_unlock( &image->lock ); + logadd( LOG_DEBUG1, "image_updateCachemap(true) with no cache_map: %s", image->path ); + return; + } + // Recreate a cache map, set it to all 1 initially as we assume the image was complete + const int byteSize = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + image->cache_map = malloc( byteSize ); + memset( image->cache_map, 0xff, byteSize ); + } + while ( pos < end ) { + const size_t map_y = (int)( pos >> 15 ); + const int map_x = (int)( (pos >> 12) & 7 ); // mod 8 + const int bit_mask = 1 << map_x; + if ( set ) { + if ( (image->cache_map[map_y] & bit_mask) == 0 ) setNewBlocks = true; + image->cache_map[map_y] |= (uint8_t)bit_mask; + } else { + image->cache_map[map_y] &= (uint8_t)~bit_mask; + } + pos += DNBD3_BLOCK_SIZE; + } + if ( setNewBlocks && image->crc32 != NULL ) { + // If setNewBlocks is set, at least one of the blocks was not cached before, so queue all hash blocks + // for checking, even though this might lead to checking some hash block again, if it was + // already complete and the block range spanned at least two hash blocks. + // First set start and end to borders of hash blocks + start &= ~(uint64_t)(HASH_BLOCK_SIZE - 1); + end = (end + HASH_BLOCK_SIZE - 1) & ~(uint64_t)(HASH_BLOCK_SIZE - 1); + pos = start; + while ( pos < end ) { + if ( image->cache_map == NULL ) break; + const int block = (int)( pos / HASH_BLOCK_SIZE ); + if ( image_isHashBlockComplete( image->cache_map, block, image->realFilesize ) ) { + spin_unlock( &image->lock ); + integrity_check( image, block ); + spin_lock( &image->lock ); + } + pos += HASH_BLOCK_SIZE; + } + } + spin_unlock( &image->lock ); +} + +/** + * Returns true if the given image is complete. + * Also frees cache_map and deletes it on disk + * if it hasn't been complete before + * Locks on: image.lock + */ +bool image_isComplete(dnbd3_image_t *image) +{ + assert( image != NULL ); + spin_lock( &image->lock ); + if ( image->virtualFilesize == 0 ) { + spin_unlock( &image->lock ); + return false; + } + if ( image->cache_map == NULL ) { + spin_unlock( &image->lock ); + return true; + } + bool complete = true; + int j; + const int map_len_bytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + for (j = 0; j < map_len_bytes - 1; ++j) { + if ( image->cache_map[j] != 0xFF ) { + complete = false; + break; + } + } + if ( complete ) { // Every block except the last one is complete + // Last one might need extra treatment if it's not a full byte + const int blocks_in_last_byte = (image->virtualFilesize >> 12) & 7; + uint8_t last_byte = 0; + if ( blocks_in_last_byte == 0 ) { + last_byte = 0xFF; + } else { + for (j = 0; j < blocks_in_last_byte; ++j) + last_byte |= (uint8_t)(1 << j); + } + complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte); + } + if ( !complete ) { + spin_unlock( &image->lock ); + return false; + } + char mapfile[PATHLEN] = ""; + free( image->cache_map ); + image->cache_map = NULL; + snprintf( mapfile, PATHLEN, "%s.map", image->path ); + spin_unlock( &image->lock ); + unlink( mapfile ); + return true; +} + +/** + * Make sure readFd is open, useful when closeUnusedFd is active. + * This function assumes you called image_lock first, so its known + * to be active and the fd won't be closed halfway through the + * function. + * Does not update atime, so the fd might be closed again very soon. + * Since the caller should have image_lock()ed first, it could do + * a quick operation on it before calling image_release which + * guarantees that the fd will not be closed meanwhile. + */ +bool image_ensureOpen(dnbd3_image_t *image) +{ + if ( image->readFd != -1 ) return image; + int newFd = open( image->path, O_RDONLY ); + if ( newFd != -1 ) { + // Check size + const off_t flen = lseek( newFd, 0, SEEK_END ); + if ( flen == -1 ) { + logadd( LOG_WARNING, "Could not seek to end of %s (errno %d)", image->path, errno ); + close( newFd ); + newFd = -1; + } else if ( (uint64_t)flen != image->realFilesize ) { + logadd( LOG_WARNING, "Size of active image with closed fd changed from %" PRIu64 " to %" PRIu64, image->realFilesize, (uint64_t)flen ); + close( newFd ); + newFd = -1; + } + } + if ( newFd == -1 ) { + spin_lock( &image->lock ); + image->working = false; + spin_unlock( &image->lock ); + return false; + } + spin_lock( &image->lock ); + if ( image->readFd == -1 ) { + image->readFd = newFd; + spin_unlock( &image->lock ); + } else { + // There was a race while opening the file (happens cause not locked cause blocking), we lost the race so close new fd and proceed + spin_unlock( &image->lock ); + close( newFd ); + } + return image->readFd != -1; +} + +/** + * Get an image by name+rid. This function increases a reference counter, + * so you HAVE TO CALL image_release for every image_get() call at some + * point... + * Locks on: imageListLock, _images[].lock + */ +dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking) +{ + int i; + const char *removingText = _removeMissingImages ? ", removing from list" : ""; + dnbd3_image_t *candidate = NULL; + // Simple sanity check + const size_t slen = strlen( name ); + if ( slen == 0 || name[slen - 1] == '/' || name[0] == '/' ) return NULL ; + // Go through array + spin_lock( &imageListLock ); + for (i = 0; i < _num_images; ++i) { + dnbd3_image_t * const image = _images[i]; + if ( image == NULL || strcmp( image->name, name ) != 0 ) continue; + if ( revision == image->rid ) { + candidate = image; + break; + } else if ( revision == 0 && (candidate == NULL || candidate->rid < image->rid) ) { + candidate = image; + } + } + + // Not found + if ( candidate == NULL ) { + spin_unlock( &imageListLock ); + return NULL ; + } + + spin_lock( &candidate->lock ); + spin_unlock( &imageListLock ); + candidate->users++; + spin_unlock( &candidate->lock ); + + // Found, see if it works +// TODO: Also make sure a non-working image still has old fd open but created a new one and removed itself from the list +// TODO: But remember size-changed images forever + if ( candidate->working || checkIfWorking ) { + // Is marked working, but might not have an fd open + if ( !image_ensureOpen( candidate ) ) { + spin_lock( &candidate->lock ); + timing_get( &candidate->lastWorkCheck ); + spin_unlock( &candidate->lock ); + if ( _removeMissingImages ) { + candidate = image_remove( candidate ); // No release here, the image is still returned and should be released by caller + } + return candidate; + } + } + + if ( !checkIfWorking ) return candidate; // Not interested in re-cechking working state + + // ...not working... + + // Don't re-check too often + spin_lock( &candidate->lock ); + bool check; + declare_now; + check = timing_diff( &candidate->lastWorkCheck, &now ) > NONWORKING_RECHECK_INTERVAL_SECONDS; + if ( check ) { + candidate->lastWorkCheck = now; + } + spin_unlock( &candidate->lock ); + if ( !check ) { + return candidate; + } + + // reaching this point means: + // 1) We should check if the image is working, it might or might not be in working state right now + // 2) The image is open for reading (or at least was at some point, the fd might be stale if images lie on an NFS share etc.) + // 3) We made sure not to re-check this image too often + + // Common for ro and rw images: Size check, read check + const off_t len = lseek( candidate->readFd, 0, SEEK_END ); + bool reload = false; + if ( len == -1 ) { + logadd( LOG_WARNING, "lseek() on %s failed (errno=%d)%s.", candidate->path, errno, removingText ); + reload = true; + } else if ( (uint64_t)len != candidate->realFilesize ) { + logadd( LOG_DEBUG1, "Size of %s changed at runtime, keeping disabled! Expected: %" PRIu64 ", found: %" PRIu64 + ". Try sending SIGHUP to server if you know what you're doing.", + candidate->path, candidate->realFilesize, (uint64_t)len ); + } else { + // Seek worked, file size is same, now see if we can read from file + char buffer[100]; + if ( pread( candidate->readFd, buffer, sizeof(buffer), 0 ) == -1 ) { + logadd( LOG_DEBUG2, "Reading first %d bytes from %s failed (errno=%d)%s.", + (int)sizeof(buffer), candidate->path, errno, removingText ); + reload = true; + } else if ( !candidate->working ) { + // Seems everything is fine again \o/ + candidate->working = true; + logadd( LOG_INFO, "Changed state of %s:%d to 'working'", candidate->name, candidate->rid ); + } + } + + if ( reload ) { + // Could not access the image with exising fd - mark for reload which will re-open the file. + // make a copy of the image struct but keep the old one around. If/When it's not being used + // anymore, it will be freed automatically. + dnbd3_image_t *img = calloc( sizeof(dnbd3_image_t), 1 ); + img->path = strdup( candidate->path ); + img->name = strdup( candidate->name ); + img->virtualFilesize = candidate->virtualFilesize; + img->realFilesize = candidate->realFilesize; + img->atime = now; + img->masterCrc32 = candidate->masterCrc32; + img->readFd = -1; + img->rid = candidate->rid; + img->users = 1; + img->working = false; + spin_init( &img->lock, PTHREAD_PROCESS_PRIVATE ); + if ( candidate->crc32 != NULL ) { + const size_t mb = IMGSIZE_TO_HASHBLOCKS( candidate->virtualFilesize ) * sizeof(uint32_t); + img->crc32 = malloc( mb ); + memcpy( img->crc32, candidate->crc32, mb ); + } + spin_lock( &candidate->lock ); + if ( candidate->cache_map != NULL ) { + const size_t mb = IMGSIZE_TO_MAPBYTES( candidate->virtualFilesize ); + img->cache_map = malloc( mb ); + memcpy( img->cache_map, candidate->cache_map, mb ); + } + spin_unlock( &candidate->lock ); + if ( image_addToList( img ) ) { + image_release( candidate ); + candidate = img; + } else { + img->users = 0; + image_free( img ); + } + // readFd == -1 and working == FALSE at this point, + // this function needs some splitting up for handling as we need to run most + // of the above code again. for now we know that the next call for this + // name:rid will get ne newly inserted "img" and try to re-open the file. + } + + // Check if image is incomplete, handle + if ( candidate->cache_map != NULL ) { + if ( candidate->uplink == NULL ) { + uplink_init( candidate, -1, NULL, -1 ); + } + } + + return candidate; // We did all we can, hopefully it's working +} + +/** + * Lock the image by increasing its users count + * Returns the image on success, NULL if it is not found in the image list + * Every call to image_lock() needs to be followed by a call to image_release() at some point. + * Locks on: imageListLock, _images[].lock + */ +dnbd3_image_t* image_lock(dnbd3_image_t *image) // TODO: get rid, fix places that do image->users-- +{ + if ( image == NULL ) return NULL ; + int i; + spin_lock( &imageListLock ); + for (i = 0; i < _num_images; ++i) { + if ( _images[i] == image ) { + spin_lock( &image->lock ); + spin_unlock( &imageListLock ); + image->users++; + spin_unlock( &image->lock ); + return image; + } + } + spin_unlock( &imageListLock ); + return NULL ; +} + +/** + * Release given image. This will decrease the reference counter of the image. + * If the usage counter reaches 0 and the image is not in the images array + * anymore, the image will be freed + * Locks on: imageListLock, _images[].lock + */ +dnbd3_image_t* image_release(dnbd3_image_t *image) +{ + if ( image == NULL ) return NULL; + spin_lock( &imageListLock ); + spin_lock( &image->lock ); + assert( image->users > 0 ); + image->users--; + bool inUse = image->users != 0; + spin_unlock( &image->lock ); + if ( inUse ) { // Still in use, do nothing + spin_unlock( &imageListLock ); + return NULL; + } + // Getting here means we decreased the usage counter to zero + // If the image is not in the images list anymore, we're + // responsible for freeing it + for (int i = 0; i < _num_images; ++i) { + if ( _images[i] == image ) { // Found, do nothing + spin_unlock( &imageListLock ); + return NULL; + } + } + spin_unlock( &imageListLock ); + // So it wasn't in the images list anymore either, get rid of it + if ( !inUse ) image = image_free( image ); + return NULL; +} + +/** + * Returns true if the given file name ends in one of our meta data + * file extensions. Used to prevent loading them as images. + */ +static bool isForbiddenExtension(const char* name) +{ + const size_t len = strlen( name ); + if ( len < 4 ) return false; + const char *ptr = name + len - 4; + if ( strcmp( ptr, ".crc" ) == 0 ) return true; // CRC list + if ( strcmp( ptr, ".map" ) == 0 ) return true; // cache map for incomplete images + if ( len < 5 ) return false; + --ptr; + if ( strcmp( ptr, ".meta" ) == 0 ) return true; // Meta data (currently not in use) + return false; +} + +/** + * Remove image from images array. Only free it if it has + * no active users and was actually in the list. + * Locks on: imageListLock, image[].lock + * @return NULL if image was also freed, image otherwise + */ +static dnbd3_image_t* image_remove(dnbd3_image_t *image) +{ + bool mustFree = false; + spin_lock( &imageListLock ); + spin_lock( &image->lock ); + for ( int i = _num_images - 1; i >= 0; --i ) { + if ( _images[i] == image ) { + _images[i] = NULL; + mustFree = ( image->users == 0 ); + } + if ( _images[i] == NULL && i + 1 == _num_images ) _num_images--; + } + spin_unlock( &image->lock ); + spin_unlock( &imageListLock ); + if ( mustFree ) image = image_free( image ); + return image; +} + +/** + * Kill all uplinks + */ +void image_killUplinks() +{ + int i; + spin_lock( &imageListLock ); + for (i = 0; i < _num_images; ++i) { + if ( _images[i] == NULL ) continue; + spin_lock( &_images[i]->lock ); + if ( _images[i]->uplink != NULL ) { + spin_lock( &_images[i]->uplink->queueLock ); + if ( !_images[i]->uplink->shutdown ) { + thread_detach( _images[i]->uplink->thread ); + _images[i]->uplink->shutdown = true; + } + spin_unlock( &_images[i]->uplink->queueLock ); + signal_call( _images[i]->uplink->signal ); + } + spin_unlock( &_images[i]->lock ); + } + spin_unlock( &imageListLock ); +} + +/** + * Load all images in given path recursively. + * Pass NULL to use path from config. + */ +bool image_loadAll(char *path) +{ + bool ret; + char imgPath[PATHLEN]; + int imgId; + dnbd3_image_t *imgHandle; + + if ( path == NULL ) path = _basePath; + if ( pthread_mutex_trylock( &reloadLock ) != 0 ) { + logadd( LOG_MINOR, "Could not (re)load image list, already in progress." ); + return false; + } + if ( _removeMissingImages ) { + // Check if all loaded images still exist on disk + logadd( LOG_INFO, "Checking for vanished images" ); + spin_lock( &imageListLock ); + for ( int i = _num_images - 1; i >= 0; --i ) { + if ( _shutdown ) break; + if ( _images[i] == NULL ) { + if ( i + 1 == _num_images ) _num_images--; + continue; + } + imgId = _images[i]->id; + snprintf( imgPath, PATHLEN, "%s", _images[i]->path ); + spin_unlock( &imageListLock ); // isReadable hits the fs; unlock + // Check if fill can still be opened for reading + ret = file_isReadable( imgPath ); + // Lock again, see if image is still there, free if required + spin_lock( &imageListLock ); + if ( ret || i >= _num_images || _images[i] == NULL || _images[i]->id != imgId ) continue; + // Image needs to be removed + imgHandle = _images[i]; + _images[i] = NULL; + if ( i + 1 == _num_images ) _num_images--; + spin_lock( &imgHandle->lock ); + const bool freeImg = ( imgHandle->users == 0 ); + spin_unlock( &imgHandle->lock ); + // We unlocked, but the image has been removed from the list already, so + // there's no way the users-counter can increase at this point. + if ( freeImg ) { + // Image is not in use anymore, free the dangling entry immediately + spin_unlock( &imageListLock ); // image_free might do several fs operations; unlock + image_free( imgHandle ); + spin_lock( &imageListLock ); + } + } + spin_unlock( &imageListLock ); + if ( _shutdown ) { + pthread_mutex_unlock( &reloadLock ); + return true; + } + } + // Now scan for new images + logadd( LOG_INFO, "Scanning for new or modified images" ); + ret = image_load_all_internal( path, path ); + pthread_mutex_unlock( &reloadLock ); + logadd( LOG_INFO, "Finished scanning %s", path ); + return ret; +} + +/** + * Free all images we have, but only if they're not in use anymore. + * Locks on imageListLock + * @return true if all images have been freed + */ +bool image_tryFreeAll() +{ + spin_lock( &imageListLock ); + for (int i = _num_images - 1; i >= 0; --i) { + if ( _images[i] != NULL && _images[i]->users == 0 ) { // XXX Data race... + dnbd3_image_t *image = _images[i]; + _images[i] = NULL; + spin_unlock( &imageListLock ); + image = image_free( image ); + spin_lock( &imageListLock ); + } + if ( i + 1 == _num_images && _images[i] == NULL ) _num_images--; + } + spin_unlock( &imageListLock ); + return _num_images == 0; +} + +/** + * Free image. DOES NOT check if it's in use. + * Indirectly locks on imageListLock, image.lock, uplink.queueLock + */ +static dnbd3_image_t* image_free(dnbd3_image_t *image) +{ + assert( image != NULL ); + if ( !_shutdown ) { + logadd( LOG_INFO, "Freeing image %s:%d", image->name, (int)image->rid ); + } + // + uplink_shutdown( image ); + spin_lock( &image->lock ); + free( image->cache_map ); + free( image->crc32 ); + free( image->path ); + free( image->name ); + image->cache_map = NULL; + image->crc32 = NULL; + image->path = NULL; + image->name = NULL; + spin_unlock( &image->lock ); + if ( image->readFd != -1 ) close( image->readFd ); + spin_destroy( &image->lock ); + // + memset( image, 0, sizeof(*image) ); + free( image ); + return NULL ; +} + +bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t realFilesize) +{ + if ( cacheMap == NULL ) return true; + const uint64_t end = (block + 1) * HASH_BLOCK_SIZE; + if ( end <= realFilesize ) { + // Trivial case: block in question is not the last block (well, or image size is multiple of HASH_BLOCK_SIZE) + const int startCacheIndex = (int)( ( block * HASH_BLOCK_SIZE ) / ( DNBD3_BLOCK_SIZE * 8 ) ); + const int endCacheIndex = startCacheIndex + (int)( HASH_BLOCK_SIZE / ( DNBD3_BLOCK_SIZE * 8 ) ); + for ( int i = startCacheIndex; i < endCacheIndex; ++i ) { + if ( cacheMap[i] != 0xff ) { + return false; + } + } + } else { + // Special case: Checking last block, which is smaller than HASH_BLOCK_SIZE + for (uint64_t mapPos = block * HASH_BLOCK_SIZE; mapPos < realFilesize; mapPos += DNBD3_BLOCK_SIZE ) { + const size_t map_y = (size_t)( mapPos >> 15 ); + const int map_x = (int)( (mapPos >> 12) & 7 ); // mod 8 + const int mask = 1 << map_x; + if ( (cacheMap[map_y] & mask) == 0 ) return false; + } + } + return true; +} + +/** + * Load all images in the given path recursively, + * consider *base the base path that is to be cut off + */ +static bool image_load_all_internal(char *base, char *path) +{ +#define SUBDIR_LEN 150 + assert( path != NULL ); + assert( *path == '/' ); + struct dirent entry, *entryPtr; + const size_t pathLen = strlen( path ); + char subpath[PATHLEN]; + struct stat st; + DIR * const dir = opendir( path ); + + if ( dir == NULL ) { + logadd( LOG_ERROR, "Could not opendir '%s' for loading", path ); + return false; + } + + while ( !_shutdown && (entryPtr = readdir( dir )) != NULL ) { + entry = *entryPtr; + if ( strcmp( entry.d_name, "." ) == 0 || strcmp( entry.d_name, ".." ) == 0 ) continue; + if ( strlen( entry.d_name ) > SUBDIR_LEN ) { + logadd( LOG_WARNING, "Skipping entry %s: Too long (max %d bytes)", entry.d_name, (int)SUBDIR_LEN ); + continue; + } + if ( entry.d_name[0] == '/' || path[pathLen - 1] == '/' ) { + snprintf( subpath, PATHLEN, "%s%s", path, entry.d_name ); + } else { + snprintf( subpath, PATHLEN, "%s/%s", path, entry.d_name ); + } + if ( stat( subpath, &st ) < 0 ) { + logadd( LOG_WARNING, "stat() for '%s' failed. Ignoring....", subpath ); + continue; + } + if ( S_ISDIR( st.st_mode ) ) { + image_load_all_internal( base, subpath ); // Recurse + } else if ( !isForbiddenExtension( subpath ) ) { + image_load( base, subpath, true ); // Load image if possible + } + } + closedir( dir ); + return true; +#undef SUBDIR_LEN +} + +/** + */ +static bool image_addToList(dnbd3_image_t *image) +{ + int i; + static int imgIdCounter = 0; // Used to assign unique numeric IDs to images + spin_lock( &imageListLock ); + // Now we're locked, assign unique ID to image (unique for this running server instance!) + image->id = ++imgIdCounter; + for ( i = 0; i < _num_images; ++i ) { + if ( _images[i] != NULL ) continue; + _images[i] = image; + break; + } + if ( i >= _num_images ) { + if ( _num_images >= _maxImages ) { + spin_unlock( &imageListLock ); + return false; + } + _images[_num_images++] = image; + } + spin_unlock( &imageListLock ); + return true; +} + +/** + * Load image from given path. This will check if the image is + * already loaded and updates its information in that case. + * Note that this is NOT THREAD SAFE so make sure its always + * called on one thread only. + */ +static bool image_load(char *base, char *path, int withUplink) +{ + int revision = -1; + struct stat st; + uint8_t *cache_map = NULL; + uint32_t *crc32list = NULL; + dnbd3_image_t *existing = NULL; + int fdImage = -1; + bool function_return = false; // Return false by default + assert( base != NULL ); + assert( path != NULL ); + assert( *path == '/' ); + assert( strncmp( path, base, strlen(base)) == 0 ); + assert( base[strlen(base) - 1] != '/' ); + assert( strlen(path) > strlen(base) ); + char *lastSlash = strrchr( path, '/' ); + char *fileName = lastSlash + 1; + char imgName[strlen( path )]; + const size_t fileNameLen = strlen( fileName ); + + // Copy virtual path (relative path in "base") + char * const virtBase = path + strlen( base ) + 1; + assert( *virtBase != '/' ); + char *src = virtBase, *dst = imgName; + while ( src <= lastSlash ) { + *dst++ = *src++; + } + *dst = '\0'; + + do { + // Parse file name for revision + // Try to parse *.r<ID> syntax + size_t i; + for (i = fileNameLen - 1; i > 1; --i) { + if ( fileName[i] < '0' || fileName[i] > '9' ) break; + } + if ( i != fileNameLen - 1 && fileName[i] == 'r' && fileName[i - 1] == '.' ) { + revision = atoi( fileName + i + 1 ); + src = fileName; + while ( src < fileName + i - 1 ) { + *dst++ = *src++; + } + *dst = '\0'; + } + } while (0); + + // Legacy mode enabled and no rid extracted from filename? + if ( _vmdkLegacyMode && revision == -1 ) { + fdImage = open( path, O_RDONLY ); // Check if it exists + if ( fdImage == -1 ) goto load_error; + // Yes, simply append full file name and set rid to 1 + strcat( dst, fileName ); + revision = 1; + } + // Did we get anything? + if ( revision <= 0 || revision >= 65536 ) { + logadd( LOG_WARNING, "Image '%s' has invalid revision ID %d", path, revision ); + goto load_error; + } + + // Get pointer to already existing image if possible + existing = image_get( imgName, (uint16_t)revision, true ); + + // ### Now load the actual image related data ### + if ( fdImage == -1 ) { + fdImage = open( path, O_RDONLY ); + } + if ( fdImage == -1 ) { + logadd( LOG_ERROR, "Could not open '%s' for reading...", path ); + goto load_error; + } + // Determine file size + const off_t seekret = lseek( fdImage, 0, SEEK_END ); + if ( seekret < 0 ) { + logadd( LOG_ERROR, "Could not seek to end of file '%s'", path ); + goto load_error; + } else if ( seekret == 0 ) { + logadd( LOG_WARNING, "Empty image file '%s'", path ); + goto load_error; + } + const uint64_t realFilesize = (uint64_t)seekret; + const uint64_t virtualFilesize = ( realFilesize + (DNBD3_BLOCK_SIZE - 1) ) & ~(DNBD3_BLOCK_SIZE - 1); + if ( realFilesize != virtualFilesize ) { + logadd( LOG_DEBUG1, "Image size of '%s' is %" PRIu64 ", virtual size: %" PRIu64, path, realFilesize, virtualFilesize ); + } + + // 1. Allocate memory for the cache map if the image is incomplete + cache_map = image_loadCacheMap( path, virtualFilesize ); + + // XXX: Maybe try sha-256 or 512 first if you're paranoid (to be implemented) + + // 2. Load CRC-32 list of image + bool doFullCheck = false; + uint32_t masterCrc = 0; + const int hashBlockCount = IMGSIZE_TO_HASHBLOCKS( virtualFilesize ); + crc32list = image_loadCrcList( path, virtualFilesize, &masterCrc ); + + // Check CRC32 + if ( crc32list != NULL ) { + if ( !image_checkRandomBlocks( 4, fdImage, realFilesize, crc32list, cache_map ) ) { + logadd( LOG_ERROR, "quick crc32 check of %s failed. Data corruption?", path ); + doFullCheck = true; + } + } + + // Compare data just loaded to identical image we apparently already loaded + if ( existing != NULL ) { + if ( existing->realFilesize != realFilesize ) { + logadd( LOG_WARNING, "Size of image '%s:%d' has changed.", existing->name, (int)existing->rid ); + // Image will be replaced below + } else if ( existing->crc32 != NULL && crc32list != NULL + && memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlockCount ) != 0 ) { + logadd( LOG_WARNING, "CRC32 list of image '%s:%d' has changed.", existing->name, (int)existing->rid ); + logadd( LOG_WARNING, "The image will be reloaded, but you should NOT replace existing images while the server is running." ); + logadd( LOG_WARNING, "Actually even if it's not running this should never be done. Use a new RID instead!" ); + // Image will be replaced below + } else if ( existing->crc32 == NULL && crc32list != NULL ) { + logadd( LOG_INFO, "Found CRC-32 list for already loaded image '%s:%d', adding...", existing->name, (int)existing->rid ); + existing->crc32 = crc32list; + existing->masterCrc32 = masterCrc; + crc32list = NULL; + function_return = true; + goto load_error; // Keep existing + } else if ( existing->cache_map != NULL && cache_map == NULL ) { + // Just ignore that fact, if replication is really complete the cache map will be removed anyways + logadd( LOG_INFO, "Image '%s:%d' has no cache map on disk!", existing->name, (int)existing->rid ); + function_return = true; + goto load_error; // Keep existing + } else { + // Nothing changed about the existing image, so do nothing + logadd( LOG_DEBUG1, "Did not change" ); + function_return = true; + goto load_error; // Keep existing + } + // Remove existing image from images array, so it will be replaced by the reloaded image + existing = image_remove( existing ); + existing = image_release( existing ); + } + + // Load fresh image + dnbd3_image_t *image = calloc( 1, sizeof(dnbd3_image_t) ); + image->path = strdup( path ); + image->name = strdup( imgName ); + image->cache_map = cache_map; + image->crc32 = crc32list; + image->masterCrc32 = masterCrc; + image->uplink = NULL; + image->realFilesize = realFilesize; + image->virtualFilesize = virtualFilesize; + image->rid = (uint16_t)revision; + image->users = 0; + image->readFd = -1; + image->working = (image->cache_map == NULL ); + timing_get( &image->nextCompletenessEstimate ); + image->completenessEstimate = -1; + spin_init( &image->lock, PTHREAD_PROCESS_PRIVATE ); + int32_t offset; + if ( stat( path, &st ) == 0 ) { + // Negatively offset atime by file modification time + offset = (int32_t)( st.st_mtime - time( NULL ) ); + if ( offset > 0 ) offset = 0; + } else { + offset = 0; + } + timing_gets( &image->atime, offset ); + + // Prevent freeing in cleanup + cache_map = NULL; + crc32list = NULL; + + // Get rid of cache map if image is complete + if ( image->cache_map != NULL ) { + image_isComplete( image ); + } + + // Image is definitely incomplete, initialize uplink worker + if ( image->cache_map != NULL ) { + image->working = false; + if ( withUplink ) { + uplink_init( image, -1, NULL, -1 ); + } + } + + // ### Reaching this point means loading succeeded + image->readFd = fdImage; + if ( image_addToList( image ) ) { + // Keep fd for reading + fdImage = -1; + } else { + logadd( LOG_ERROR, "Image list full: Could not add image %s", path ); + image->readFd = -1; // Keep fdImage instead, will be closed below + image = image_free( image ); + goto load_error; + } + logadd( LOG_DEBUG1, "Loaded image '%s:%d'\n", image->name, (int)image->rid ); + // CRC errors found... + if ( doFullCheck ) { + logadd( LOG_INFO, "Queueing full CRC32 check for '%s:%d'\n", image->name, (int)image->rid ); + integrity_check( image, -1 ); + } + + function_return = true; + + // Clean exit: +load_error: ; + if ( existing != NULL ) existing = image_release( existing ); + if ( crc32list != NULL ) free( crc32list ); + if ( cache_map != NULL ) free( cache_map ); + if ( fdImage != -1 ) close( fdImage ); + return function_return; +} + +static uint8_t* image_loadCacheMap(const char * const imagePath, const int64_t fileSize) +{ + uint8_t *retval = NULL; + char mapFile[strlen( imagePath ) + 10 + 1]; + sprintf( mapFile, "%s.map", imagePath ); + int fdMap = open( mapFile, O_RDONLY ); + if ( fdMap >= 0 ) { + const int map_size = IMGSIZE_TO_MAPBYTES( fileSize ); + retval = calloc( 1, map_size ); + const ssize_t rd = read( fdMap, retval, map_size ); + if ( map_size != rd ) { + logadd( LOG_WARNING, "Could only read %d of expected %d bytes of cache map of '%s'", (int)rd, (int)map_size, imagePath ); + // Could not read complete map, that means the rest of the image file will be considered incomplete + } + close( fdMap ); + // Later on we check if the hash map says the image is complete + } + return retval; +} + +static uint32_t* image_loadCrcList(const char * const imagePath, const int64_t fileSize, uint32_t *masterCrc) +{ + assert( masterCrc != NULL ); + uint32_t *retval = NULL; + const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( fileSize ); + // Currently this should only prevent accidental corruption (esp. regarding transparent proxy mode) + // but maybe later on you want better security + char hashFile[strlen( imagePath ) + 10 + 1]; + sprintf( hashFile, "%s.crc", imagePath ); + int fdHash = open( hashFile, O_RDONLY ); + if ( fdHash >= 0 ) { + off_t fs = lseek( fdHash, 0, SEEK_END ); + if ( fs < (hashBlocks + 1) * 4 ) { + logadd( LOG_WARNING, "Ignoring crc32 list for '%s' as it is too short", imagePath ); + } else { + if ( pread( fdHash, masterCrc, sizeof(uint32_t), 0 ) != sizeof(uint32_t) ) { + logadd( LOG_WARNING, "Error reading first crc32 of '%s'", imagePath ); + } else { + const size_t crcFileLen = hashBlocks * sizeof(uint32_t); + size_t pos = 0; + retval = calloc( hashBlocks, sizeof(uint32_t) ); + while ( pos < crcFileLen ) { + ssize_t ret = pread( fdHash, retval + pos, crcFileLen - pos, pos + sizeof(uint32_t) /* skip master-crc */ ); + if ( ret == -1 ) { + if ( errno == EINTR || errno == EAGAIN ) continue; + } + if ( ret <= 0 ) break; + pos += ret; + } + if ( pos != crcFileLen ) { + free( retval ); + retval = NULL; + logadd( LOG_WARNING, "Could not read crc32 list of '%s'", imagePath ); + } else { + uint32_t lists_crc = crc32( 0, NULL, 0 ); + lists_crc = crc32( lists_crc, (uint8_t*)retval, hashBlocks * sizeof(uint32_t) ); + lists_crc = net_order_32( lists_crc ); + if ( lists_crc != *masterCrc ) { + free( retval ); + retval = NULL; + logadd( LOG_WARNING, "CRC-32 of CRC-32 list mismatch. CRC-32 list of '%s' might be corrupted.", imagePath ); + } + } + } + } + close( fdHash ); + } + return retval; +} + +static bool image_checkRandomBlocks(const int count, int fdImage, const int64_t realFilesize, uint32_t * const crc32list, uint8_t * const cache_map) +{ + // This checks the first block and (up to) count - 1 random blocks for corruption + // via the known crc32 list. This is very sloppy and is merely supposed to detect + // accidental corruption due to broken dnbd3-proxy functionality or file system + // corruption. + assert( count > 0 ); + const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( realFilesize ); + int blocks[count + 1]; + int index = 0, j; + int block; + if ( image_isHashBlockComplete( cache_map, 0, realFilesize ) ) blocks[index++] = 0; + int tries = count * 5; // Try only so many times to find a non-duplicate complete block + while ( index + 1 < count && --tries > 0 ) { + block = rand() % hashBlocks; // Random block + for ( j = 0; j < index; ++j ) { // Random block already in list? + if ( blocks[j] == block ) goto while_end; + } + // Block complete? If yes, add to list + if ( image_isHashBlockComplete( cache_map, block, realFilesize ) ) blocks[index++] = block; +while_end: ; + } + blocks[MIN(index, count)] = -1; // End of array has to be marked by a -1 + return image_checkBlocksCrc32( fdImage, crc32list, blocks, realFilesize ); // Return result of check +} + +/** + * Create a new image with the given image name and revision id in _basePath + * Returns true on success, false otherwise + */ +bool image_create(char *image, int revision, uint64_t size) +{ + assert( image != NULL ); + assert( size >= DNBD3_BLOCK_SIZE ); + if ( revision <= 0 ) { + logadd( LOG_ERROR, "revision id invalid: %d", revision ); + return false; + } + char path[PATHLEN], cache[PATHLEN]; + char *lastSlash = strrchr( image, '/' ); + if ( lastSlash == NULL ) { + snprintf( path, PATHLEN, "%s/%s.r%d", _basePath, image, revision ); + } else { + *lastSlash = '\0'; + snprintf( path, PATHLEN, "%s/%s", _basePath, image ); + mkdir_p( path ); + *lastSlash = '/'; + snprintf( path, PATHLEN, "%s/%s.r%d", _basePath, image, revision ); + } + snprintf( cache, PATHLEN, "%s.map", path ); + size = (size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const int mapsize = IMGSIZE_TO_MAPBYTES(size); + // Write files + int fdImage = -1, fdCache = -1; + fdImage = open( path, O_RDWR | O_TRUNC | O_CREAT, 0644 ); + fdCache = open( cache, O_RDWR | O_TRUNC | O_CREAT, 0644 ); + if ( fdImage < 0 ) { + logadd( LOG_ERROR, "Could not open %s for writing.", path ); + goto failure_cleanup; + } + if ( fdCache < 0 ) { + logadd( LOG_ERROR, "Could not open %s for writing.", cache ); + goto failure_cleanup; + } + // Try cache map first + if ( !file_alloc( fdCache, 0, mapsize ) && !file_setSize( fdCache, mapsize ) ) { + const int err = errno; + logadd( LOG_DEBUG1, "Could not allocate %d bytes for %s (errno=%d)", mapsize, cache, err ); + } + // Now write image + if ( !_sparseFiles && !file_alloc( fdImage, 0, size ) ) { + logadd( LOG_ERROR, "Could not allocate %" PRIu64 " bytes for %s (errno=%d)", size, path, errno ); + logadd( LOG_ERROR, "It is highly recommended to use a file system that supports preallocating disk" + " space without actually writing all zeroes to the block device." ); + logadd( LOG_ERROR, "If you cannot fix this, try setting sparseFiles=true, but don't expect" + " divine performance during replication." ); + goto failure_cleanup; + } else if ( _sparseFiles && !file_setSize( fdImage, size ) ) { + logadd( LOG_ERROR, "Could not create sparse file of %" PRIu64 " bytes for %s (errno=%d)", size, path, errno ); + logadd( LOG_ERROR, "Make sure you have enough disk space, check directory permissions, fs errors etc." ); + goto failure_cleanup; + } + close( fdImage ); + close( fdCache ); + return true; + // +failure_cleanup: ; + if ( fdImage >= 0 ) close( fdImage ); + if ( fdCache >= 0 ) close( fdCache ); + remove( path ); + remove( cache ); + return false; +} + +static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, const size_t len); +static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requestedRid); + +/** + * Does the same as image_get, but if the image is not known locally, or if + * revision 0 is requested, it will: + * a) Try to clone it from an authoritative dnbd3 server, if + * the server is running in proxy mode. + * b) Try to load it from disk by constructing the appropriate file name, if not + * running in proxy mode. + * + * If the return value is not NULL, + * image_release needs to be called on the image at some point. + * Locks on: remoteCloneLock, imageListLock, _images[].lock + */ +dnbd3_image_t* image_getOrLoad(char * const name, const uint16_t revision) +{ + // specific revision - try shortcut + if ( revision != 0 ) { + dnbd3_image_t *image = image_get( name, revision, true ); + if ( image != NULL ) return image; + } + const size_t len = strlen( name ); + // Sanity check + if ( len == 0 || name[len - 1] == '/' || name[0] == '/' + || name[0] == '.' || strstr( name, "/." ) != NULL ) return NULL; + // Call specific function depending on whether this is a proxy or not + if ( _isProxy ) { + return loadImageProxy( name, revision, len ); + } else { + return loadImageServer( name, revision ); + } +} + +/** + * Called if specific rid is not loaded, or if rid is 0 (some version might be loaded locally, + * but we should check if there's a higher rid on a remote server). + */ +static dnbd3_image_t *loadImageProxy(char * const name, const uint16_t revision, const size_t len) +{ + // Already existing locally? + dnbd3_image_t *image = NULL; + if ( revision == 0 ) { + image = image_get( name, revision, true ); + } + + // Doesn't exist or is rid 0, try remote if not already tried it recently + declare_now; + char *cmpname = name; + int useIndex = -1, fallbackIndex = 0; + if ( len >= NAMELEN ) cmpname += 1 + len - NAMELEN; + pthread_mutex_lock( &remoteCloneLock ); + for (int i = 0; i < CACHELEN; ++i) { + if ( remoteCloneCache[i].rid == revision && strcmp( cmpname, remoteCloneCache[i].name ) == 0 ) { + useIndex = i; + if ( timing_reached( &remoteCloneCache[i].deadline, &now ) ) break; + pthread_mutex_unlock( &remoteCloneLock ); // Was recently checked... + return image; + } + if ( timing_1le2( &remoteCloneCache[i].deadline, &remoteCloneCache[fallbackIndex].deadline ) ) { + fallbackIndex = i; + } + } + // Re-check to prevent two clients at the same time triggering this, + // but only if rid != 0, since we would just get an old rid then + if ( revision != 0 ) { + if ( image == NULL ) image = image_get( name, revision, true ); + if ( image != NULL ) { + pthread_mutex_unlock( &remoteCloneLock ); + return image; + } + } + // Reaching this point means we should contact an authority server + serialized_buffer_t serialized; + // Mark as recently checked + if ( useIndex == -1 ) { + useIndex = fallbackIndex; + } + timing_set( &remoteCloneCache[useIndex].deadline, &now, SERVER_REMOTE_IMAGE_CHECK_CACHETIME ); + snprintf( remoteCloneCache[useIndex].name, NAMELEN, "%s", cmpname ); + remoteCloneCache[useIndex].rid = revision; + pthread_mutex_unlock( &remoteCloneLock ); + + // Get some alt servers and try to get the image from there +#define REP_NUM_SRV (8) + dnbd3_host_t servers[REP_NUM_SRV]; + int uplinkSock = -1; + dnbd3_host_t uplinkServer; + const int count = altservers_getListForUplink( servers, REP_NUM_SRV, false ); + uint16_t remoteProtocolVersion; + uint16_t remoteRid = revision; + uint64_t remoteImageSize; + struct sockaddr_storage sa; + socklen_t salen; + poll_list_t *cons = sock_newPollList(); + logadd( LOG_DEBUG2, "Trying to clone %s:%d from %d hosts", name, (int)revision, count ); + for (int i = 0; i < count + 5; ++i) { // "i < count + 5" for 5 additional iterations, waiting on pending connects + char *remoteName; + bool ok = false; + int sock; + if ( i >= count ) { + sock = sock_multiConnect( cons, NULL, 100, 1000 ); + if ( sock == -2 ) break; + } else { + if ( log_hasMask( LOG_DEBUG2 ) ) { + char host[50]; + size_t len = sock_printHost( &servers[i], host, sizeof(host) ); + host[len] = '\0'; + logadd( LOG_DEBUG2, "Trying to replicate from %s", host ); + } + sock = sock_multiConnect( cons, &servers[i], 100, 1000 ); + } + if ( sock == -1 || sock == -2 ) continue; + salen = sizeof(sa); + if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) { + logadd( LOG_MINOR, "getpeername on successful connection failed!? (errno=%d)", errno ); + goto server_fail; + } + if ( !dnbd3_select_image( sock, name, revision, SI_SERVER_FLAGS ) ) goto server_fail; + if ( !dnbd3_select_image_reply( &serialized, sock, &remoteProtocolVersion, &remoteName, &remoteRid, &remoteImageSize ) ) goto server_fail; + if ( remoteProtocolVersion < MIN_SUPPORTED_SERVER || remoteRid == 0 ) goto server_fail; + if ( revision != 0 && remoteRid != revision ) goto server_fail; // Want specific revision but uplink supplied different rid + if ( revision == 0 && image != NULL && image->rid >= remoteRid ) goto server_fail; // Not actually a failure: Highest remote rid is <= highest local rid - don't clone! + if ( remoteImageSize < DNBD3_BLOCK_SIZE || remoteName == NULL || strcmp( name, remoteName ) != 0 ) goto server_fail; + if ( remoteImageSize > _maxReplicationSize ) { + logadd( LOG_MINOR, "Won't proxy '%s:%d': Larger than maxReplicationSize", name, (int)revision ); + goto server_fail; + } + pthread_mutex_lock( &reloadLock ); + // Ensure disk space entirely if not using sparse files, otherwise just make sure we have some room at least + if ( _sparseFiles ) { + ok = image_ensureDiskSpace( 2ull * 1024 * 1024 * 1024, false ); // 2GiB, maybe configurable one day + } else { + ok = image_ensureDiskSpace( remoteImageSize + ( 10 * 1024 * 1024 ), false ); // some extra space for cache map etc. + } + ok = ok && image_clone( sock, name, remoteRid, remoteImageSize ); // This sets up the file+map+crc and loads the img + pthread_mutex_unlock( &reloadLock ); + if ( !ok ) goto server_fail; + + // Cloning worked :-) + uplinkSock = sock; + if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &uplinkServer ) ) { + uplinkServer.type = 0; + } + break; + +server_fail: ; + close( sock ); + } + sock_destroyPollList( cons ); + + // If we still have a pointer to a local image, release the reference + if ( image != NULL ) image_release( image ); + // If everything worked out, this call should now actually return the image + image = image_get( name, remoteRid, false ); + if ( image != NULL && uplinkSock != -1 ) { + // If so, init the uplink and pass it the socket + sock_setTimeout( uplinkSock, _uplinkTimeout ); + if ( !uplink_init( image, uplinkSock, &uplinkServer, remoteProtocolVersion ) ) { + close( uplinkSock ); + } else { + // Clumsy busy wait, but this should only take as long as it takes to start a thread, so is it really worth using a signalling mechanism? + int i = 0; + while ( !image->working && ++i < 100 ) + usleep( 2000 ); + } + } else if ( uplinkSock != -1 ) { + close( uplinkSock ); + } + return image; +} + +/** + * Called if specific rid is not loaded, or if rid is 0, in which case we check on + * disk which revision is latest. + */ +static dnbd3_image_t *loadImageServer(char * const name, const uint16_t requestedRid) +{ + char imageFile[PATHLEN] = ""; + uint16_t detectedRid = 0; + + if ( requestedRid != 0 ) { + snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, (int)requestedRid ); + detectedRid = requestedRid; + } else { + glob_t g; + snprintf( imageFile, PATHLEN, "%s/%s.r*", _basePath, name ); + const int ret = glob( imageFile, GLOB_NOSORT | GLOB_MARK, NULL, &g ); + imageFile[0] = '\0'; + if ( ret == 0 ) { + long int best = 0; + for ( size_t i = 0; i < g.gl_pathc; ++i ) { + const char * const path = g.gl_pathv[i]; + const char * rev = strrchr( path, 'r' ); + if ( rev == NULL || rev == path || *(rev - 1) != '.' ) continue; + rev++; + if ( *rev < '0' || *rev > '9' ) continue; + char *err = NULL; + long int val = strtol( rev, &err, 10 ); + if ( err == NULL || *err != '\0' ) continue; + if ( val > best ) { + best = val; + snprintf( imageFile, PATHLEN, "%s", g.gl_pathv[i] ); + } + } + if ( best > 0 && best < 65536 ) { + detectedRid = (uint16_t)best; + } + } + globfree( &g ); + } + if ( _vmdkLegacyMode && requestedRid <= 1 + && !isForbiddenExtension( name ) + && ( detectedRid == 0 || !file_isReadable( imageFile ) ) ) { + snprintf( imageFile, PATHLEN, "%s/%s", _basePath, name ); + detectedRid = 1; + } + logadd( LOG_DEBUG2, "Trying to load %s:%d ( -> %d) as %s", name, (int)requestedRid, (int)detectedRid, imageFile ); + // No file was determined, or it doesn't seem to exist/be readable + if ( detectedRid == 0 ) { + logadd( LOG_DEBUG2, "Not found, bailing out" ); + return image_get( name, requestedRid, true ); + } + if ( !_vmdkLegacyMode && requestedRid == 0 ) { + // rid 0 requested - check if detected rid is readable, decrease rid if not until we reach 0 + while ( detectedRid != 0 ) { + dnbd3_image_t *image = image_get( name, detectedRid, true ); + if ( image != NULL ) { + // globbed rid already loaded, return + return image; + } + if ( file_isReadable( imageFile ) ) { + // globbed rid is + break; + } + logadd( LOG_DEBUG2, "%s: rid %d globbed but not readable, trying lower rid...", name, (int)detectedRid ); + detectedRid--; + snprintf( imageFile, PATHLEN, "%s/%s.r%d", _basePath, name, requestedRid ); + } + } + + // Now lock on the loading mutex, then check again if the image exists (we're multi-threaded) + pthread_mutex_lock( &reloadLock ); + dnbd3_image_t* image = image_get( name, detectedRid, true ); + if ( image != NULL ) { + // The image magically appeared in the meantime + logadd( LOG_DEBUG2, "Magically appeared" ); + pthread_mutex_unlock( &reloadLock ); + return image; + } + // Still not loaded, let's try to do so + logadd( LOG_DEBUG2, "Calling load" ); + image_load( _basePath, imageFile, false ); + pthread_mutex_unlock( &reloadLock ); + // If loading succeeded, this will return the image + logadd( LOG_DEBUG2, "Calling get" ); + return image_get( name, requestedRid, true ); +} + +/** + * Prepare a cloned image: + * 1. Allocate empty image file and its cache map + * 2. Use passed socket to request the crc32 list and save it to disk + * 3. Load the image from disk + * Returns: true on success, false otherwise + */ +static bool image_clone(int sock, char *name, uint16_t revision, uint64_t imageSize) +{ + // Allocate disk space and create cache map + if ( !image_create( name, revision, imageSize ) ) return false; + // CRC32 + const size_t len = strlen( _basePath ) + strlen( name ) + 20; + char crcFile[len]; + snprintf( crcFile, len, "%s/%s.r%d.crc", _basePath, name, (int)revision ); + if ( !file_isReadable( crcFile ) ) { + // Get crc32list from remote server + size_t crc32len = IMGSIZE_TO_HASHBLOCKS(imageSize) * sizeof(uint32_t); + uint32_t masterCrc; + uint8_t *crc32list = malloc( crc32len ); + if ( !dnbd3_get_crc32( sock, &masterCrc, crc32list, &crc32len ) ) { + free( crc32list ); + return false; + } + if ( crc32len != 0 ) { + uint32_t lists_crc = crc32( 0, NULL, 0 ); + lists_crc = crc32( lists_crc, (uint8_t*)crc32list, crc32len ); + lists_crc = net_order_32( lists_crc ); + if ( lists_crc != masterCrc ) { + logadd( LOG_WARNING, "OTF-Clone: Corrupted CRC-32 list. ignored. (%s)", name ); + } else { + int fd = open( crcFile, O_WRONLY | O_CREAT, 0644 ); + write( fd, &masterCrc, sizeof(uint32_t) ); + write( fd, crc32list, crc32len ); + close( fd ); + } + } + free( crc32list ); + } + // HACK: Chop of ".crc" to get the image file name + crcFile[strlen( crcFile ) - 4] = '\0'; + return image_load( _basePath, crcFile, false ); +} + +/** + * Generate the crc32 block list file for the given file. + * This function wants a plain file name instead of a dnbd3_image_t, + * as it can be used directly from the command line. + */ +bool image_generateCrcFile(char *image) +{ + int fdCrc = -1; + uint32_t crc; + char crcFile[strlen( image ) + 4 + 1]; + int fdImage = open( image, O_RDONLY ); + + if ( fdImage == -1 ) { + logadd( LOG_ERROR, "Could not open %s.", image ); + return false; + } + + const int64_t fileLen = lseek( fdImage, 0, SEEK_END ); + if ( fileLen <= 0 ) { + logadd( LOG_ERROR, "Error seeking to end, or file is empty." ); + goto cleanup_fail; + } + + struct stat sst; + sprintf( crcFile, "%s.crc", image ); + if ( stat( crcFile, &sst ) == 0 ) { + logadd( LOG_ERROR, "CRC File for %s already exists! Delete it first if you want to regen.", image ); + goto cleanup_fail; + } + + fdCrc = open( crcFile, O_RDWR | O_CREAT, 0644 ); + if ( fdCrc == -1 ) { + logadd( LOG_ERROR, "Could not open CRC File %s for writing..", crcFile ); + goto cleanup_fail; + } + // CRC of all CRCs goes first. Don't know it yet, write 4 bytes dummy data. + if ( write( fdCrc, crcFile, sizeof(crc) ) != sizeof(crc) ) { + logadd( LOG_ERROR, "Write error" ); + goto cleanup_fail; + } + + printf( "Generating CRC32" ); + fflush( stdout ); + const int blockCount = IMGSIZE_TO_HASHBLOCKS( fileLen ); + for ( int i = 0; i < blockCount; ++i ) { + if ( !image_calcBlockCrc32( fdImage, i, fileLen, &crc ) ) { + goto cleanup_fail; + } + if ( write( fdCrc, &crc, sizeof(crc) ) != sizeof(crc) ) { + printf( "\nWrite error writing crc file: %d\n", errno ); + goto cleanup_fail; + } + putchar( '.' ); + fflush( stdout ); + } + close( fdImage ); + fdImage = -1; + printf( "done!\n" ); + + logadd( LOG_INFO, "Generating master-crc..." ); + fflush( stdout ); + // File is written - read again to calc master crc + if ( lseek( fdCrc, 4, SEEK_SET ) != 4 ) { + logadd( LOG_ERROR, "Could not seek to beginning of crc list in file" ); + goto cleanup_fail; + } + char buffer[400]; + int blocksToGo = blockCount; + crc = crc32( 0, NULL, 0 ); + while ( blocksToGo > 0 ) { + const int numBlocks = MIN( (int)( sizeof(buffer) / sizeof(crc) ), blocksToGo ); + if ( read( fdCrc, buffer, numBlocks * sizeof(crc) ) != numBlocks * (int)sizeof(crc) ) { + logadd( LOG_ERROR, "Could not re-read from crc32 file" ); + goto cleanup_fail; + } + crc = crc32( crc, (uint8_t*)buffer, numBlocks * sizeof(crc) ); + blocksToGo -= numBlocks; + } + crc = net_order_32( crc ); + if ( pwrite( fdCrc, &crc, sizeof(crc), 0 ) != sizeof(crc) ) { + logadd( LOG_ERROR, "Could not write master crc to file" ); + goto cleanup_fail; + } + logadd( LOG_INFO, "CRC-32 file successfully generated." ); + fflush( stdout ); + return true; + +cleanup_fail:; + if ( fdImage != -1 ) close( fdImage ); + if ( fdCrc != -1 ) close( fdCrc ); + return false; +} + +json_t* image_getListAsJson() +{ + json_t *imagesJson = json_array(); + json_t *jsonImage; + int i; + char uplinkName[100] = { 0 }; + uint64_t bytesReceived; + int users, completeness, idleTime; + declare_now; + + spin_lock( &imageListLock ); + for ( i = 0; i < _num_images; ++i ) { + if ( _images[i] == NULL ) continue; + dnbd3_image_t *image = _images[i]; + spin_lock( &image->lock ); + spin_unlock( &imageListLock ); + users = image->users; + idleTime = (int)timing_diff( &image->atime, &now ); + completeness = image_getCompletenessEstimate( image ); + if ( image->uplink == NULL ) { + bytesReceived = 0; + uplinkName[0] = '\0'; + } else { + bytesReceived = image->uplink->bytesReceived; + if ( image->uplink->fd == -1 || !host_to_string( &image->uplink->currentServer, uplinkName, sizeof(uplinkName) ) ) { + uplinkName[0] = '\0'; + } + } + image->users++; // Prevent freeing after we unlock + spin_unlock( &image->lock ); + + jsonImage = json_pack( "{sisssisisisisI}", + "id", image->id, // id, name, rid never change, so access them without locking + "name", image->name, + "rid", (int) image->rid, + "users", users, + "complete", completeness, + "idle", idleTime, + "size", (json_int_t)image->virtualFilesize ); + if ( bytesReceived != 0 ) { + json_object_set_new( jsonImage, "bytesReceived", json_integer( (json_int_t) bytesReceived ) ); + } + if ( uplinkName[0] != '\0' ) { + json_object_set_new( jsonImage, "uplinkServer", json_string( uplinkName ) ); + } + json_array_append_new( imagesJson, jsonImage ); + + image = image_release( image ); // Since we did image->users++; + spin_lock( &imageListLock ); + } + spin_unlock( &imageListLock ); + return imagesJson; +} + +/** + * Get completeness of an image in percent. Only estimated, not exact. + * Returns: 0-100 + * DOES NOT LOCK, so make sure to do so before calling + */ +int image_getCompletenessEstimate(dnbd3_image_t * const image) +{ + assert( image != NULL ); + if ( image->cache_map == NULL ) return image->working ? 100 : 0; + declare_now; + if ( !timing_reached( &image->nextCompletenessEstimate, &now ) ) { + // Since this operation is relatively expensive, we cache the result for a while + return image->completenessEstimate; + } + int i; + int percent = 0; + const int len = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + if ( len == 0 ) return 0; + for ( i = 0; i < len; ++i ) { + if ( image->cache_map[i] == 0xff ) { + percent += 100; + } else if ( image->cache_map[i] != 0 ) { + percent += 50; + } + } + image->completenessEstimate = percent / len; + timing_set( &image->nextCompletenessEstimate, &now, 8 + rand() % 32 ); + return image->completenessEstimate; +} + +/** + * Check the CRC-32 of the given blocks. The array "blocks" is of variable length. + * !! pass -1 as the last block so the function knows when to stop !! + * Does NOT check whether block index is within image. + * Returns true or false + */ +bool image_checkBlocksCrc32(const int fd, uint32_t *crc32list, const int *blocks, const uint64_t realFilesize) +{ + while ( *blocks != -1 ) { + uint32_t crc; + if ( !image_calcBlockCrc32( fd, *blocks, realFilesize, &crc ) ) { + return false; + } + if ( crc != crc32list[*blocks] ) { + logadd( LOG_WARNING, "Block %d is %x, should be %x", *blocks, crc, crc32list[*blocks] ); + return false; + } + blocks++; + } + return true; +} + +/** + * Calc CRC-32 of block. Value is returned as little endian. + */ +static bool image_calcBlockCrc32(const int fd, const size_t block, const uint64_t realFilesize, uint32_t *crc) +{ + // Make buffer 4k aligned in case fd has O_DIRECT set +#define BSIZE 262144 + char rawBuffer[BSIZE + DNBD3_BLOCK_SIZE]; + char * const buffer = (char*)( ( (uintptr_t)rawBuffer + ( DNBD3_BLOCK_SIZE - 1 ) ) & ~( DNBD3_BLOCK_SIZE - 1 ) ); + // How many bytes to read from the input file + const uint64_t bytesFromFile = MIN( HASH_BLOCK_SIZE, realFilesize - ( block * HASH_BLOCK_SIZE) ); + // Determine how many bytes we had to read if the file size were a multiple of 4k + // This might be the same value if the real file's size is a multiple of 4k + const uint64_t vbs = ( ( realFilesize + ( DNBD3_BLOCK_SIZE - 1 ) ) & ~( DNBD3_BLOCK_SIZE - 1 ) ) - ( block * HASH_BLOCK_SIZE ); + const uint64_t virtualBytesFromFile = MIN( HASH_BLOCK_SIZE, vbs ); + const off_t readPos = (int64_t)block * HASH_BLOCK_SIZE; + size_t bytes = 0; + assert( vbs >= bytesFromFile ); + *crc = crc32( 0, NULL, 0 ); + // Calculate the crc32 by reading data from the file + while ( bytes < bytesFromFile ) { + const size_t n = (size_t)MIN( BSIZE, bytesFromFile - bytes ); + const ssize_t r = pread( fd, buffer, n, readPos + bytes ); + if ( r <= 0 ) { + logadd( LOG_WARNING, "CRC: Read error (errno=%d)", errno ); + return false; + } + *crc = crc32( *crc, (uint8_t*)buffer, r ); + bytes += (size_t)r; + } + // If the virtual file size is different, keep going using nullbytes + if ( bytesFromFile < virtualBytesFromFile ) { + memset( buffer, 0, BSIZE ); + bytes = (size_t)( virtualBytesFromFile - bytesFromFile ); + while ( bytes != 0 ) { + const size_t len = MIN( BSIZE, bytes ); + *crc = crc32( *crc, (uint8_t*)buffer, len ); + bytes -= len; + } + } + *crc = net_order_32( *crc ); + return true; +#undef BSIZE +} + +/** + * Call image_ensureDiskSpace (below), but aquire + * reloadLock first. + */ +bool image_ensureDiskSpaceLocked(uint64_t size, bool force) +{ + bool ret; + pthread_mutex_lock( &reloadLock ); + ret = image_ensureDiskSpace( size, force ); + pthread_mutex_unlock( &reloadLock ); + return ret; +} + +/** + * Make sure at least size bytes are available in _basePath. + * Will delete old images to make room for new ones. + * TODO: Store last access time of images. Currently the + * last access time is reset to the file modification time + * on server restart. Thus it will + * currently only delete images if server uptime is > 10 hours. + * This can be overridden by setting force to true, in case + * free space is desperately needed. + * Return true iff enough space is available. false in random other cases + */ +static bool image_ensureDiskSpace(uint64_t size, bool force) +{ + for ( int maxtries = 0; maxtries < 20; ++maxtries ) { + uint64_t available; + if ( !file_freeDiskSpace( _basePath, NULL, &available ) ) { + const int e = errno; + logadd( LOG_WARNING, "Could not get free disk space (errno %d), will assume there is enough space left... ;-)\n", e ); + return true; + } + if ( available > size ) return true; + if ( !force && dnbd3_serverUptime() < 10 * 3600 ) { + logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, but server uptime < 10 hours...", (int)(available / (1024ll * 1024ll)), + (int)(size / (1024 * 1024)) ); + return false; + } + logadd( LOG_INFO, "Only %dMiB free, %dMiB requested, freeing an image...", (int)(available / (1024ll * 1024ll)), + (int)(size / (1024 * 1024)) ); + // Find least recently used image + dnbd3_image_t *oldest = NULL; + int i; // XXX improve locking + for (i = 0; i < _num_images; ++i) { + if ( _images[i] == NULL ) continue; + dnbd3_image_t *current = image_lock( _images[i] ); + if ( current == NULL ) continue; + if ( current->users == 1 ) { // Just from the lock above + if ( oldest == NULL || timing_1le2( ¤t->atime, &oldest->atime ) ) { + // Oldest access time so far + oldest = current; + } + } + current = image_release( current ); + } + declare_now; + if ( oldest == NULL || ( !_sparseFiles && timing_diff( &oldest->atime, &now ) < 86400 ) ) { + if ( oldest == NULL ) { + logadd( LOG_INFO, "All images are currently in use :-(" ); + } else { + logadd( LOG_INFO, "Won't free any image, all have been in use in the past 24 hours :-(" ); + } + return false; + } + oldest = image_lock( oldest ); + if ( oldest == NULL ) continue; // Image freed in the meantime? Try again + logadd( LOG_INFO, "'%s:%d' has to go!", oldest->name, (int)oldest->rid ); + char *filename = strdup( oldest->path ); + oldest = image_remove( oldest ); + oldest = image_release( oldest ); + unlink( filename ); + size_t len = strlen( filename ) + 10; + char buffer[len]; + snprintf( buffer, len, "%s.map", filename ); + unlink( buffer ); + snprintf( buffer, len, "%s.crc", filename ); + unlink( buffer ); + snprintf( buffer, len, "%s.meta", filename ); + unlink( buffer ); + free( filename ); + } + return false; +} + +void image_closeUnusedFd() +{ + int fd, i; + ticks deadline; + timing_gets( &deadline, -UNUSED_FD_TIMEOUT ); + char imgstr[300]; + spin_lock( &imageListLock ); + for (i = 0; i < _num_images; ++i) { + dnbd3_image_t * const image = _images[i]; + if ( image == NULL ) + continue; + spin_lock( &image->lock ); + spin_unlock( &imageListLock ); + if ( image->users == 0 && image->uplink == NULL && timing_reached( &image->atime, &deadline ) ) { + snprintf( imgstr, sizeof(imgstr), "%s:%d", image->name, (int)image->rid ); + fd = image->readFd; + image->readFd = -1; + } else { + fd = -1; + } + spin_unlock( &image->lock ); + if ( fd != -1 ) { + close( fd ); + logadd( LOG_DEBUG1, "Inactive fd closed for %s", imgstr ); + } + spin_lock( &imageListLock ); + } + spin_unlock( &imageListLock ); +} + +/* + void image_find_latest() + { + // Not in array or most recent rid is requested, try file system + if (revision != 0) { + // Easy case - specific RID + char + } else { + // Determine base directory where the image in question has to reside. + // Eg, the _basePath is "/srv/", requested image is "rz/ubuntu/default-13.04" + // Then searchPath has to be set to "/srv/rz/ubuntu" + char searchPath[strlen(_basePath) + len + 1]; + char *lastSlash = strrchr(name, '/'); + char *baseName; // Name of the image. In the example above, it will be "default-13.04" + if ( lastSlash == NULL ) { + *searchPath = '\0'; + baseName = name; + } else { + char *from = name, *to = searchPath; + while (from < lastSlash) *to++ = *from++; + *to = '\0'; + baseName = lastSlash + 1; + } + // Now we have the search path in our real file system and the expected image name. + // The revision naming sceme is <IMAGENAME>.r<RID>, so if we're looking for revision 13, + // our example image has to be named default-13.04.r13 + } + } + */ diff --git a/src/server/image.h b/src/server/image.h new file mode 100644 index 0000000..4668eff --- /dev/null +++ b/src/server/image.h @@ -0,0 +1,63 @@ +#ifndef _IMAGE_H_ +#define _IMAGE_H_ + +#include "globals.h" + +struct json_t; + +void image_serverStartup(); + +bool image_isComplete(dnbd3_image_t *image); + +bool image_isHashBlockComplete(const uint8_t * const cacheMap, const uint64_t block, const uint64_t fileSize); + +void image_updateCachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, const bool set); + +void image_markComplete(dnbd3_image_t *image); + +bool image_ensureOpen(dnbd3_image_t *image); + +dnbd3_image_t* image_get(char *name, uint16_t revision, bool checkIfWorking); + +bool image_reopenCacheFd(dnbd3_image_t *image, const bool force); + +dnbd3_image_t* image_getOrLoad(char *name, uint16_t revision); + +dnbd3_image_t* image_lock(dnbd3_image_t *image); + +dnbd3_image_t* image_release(dnbd3_image_t *image); + +bool image_checkBlocksCrc32(int fd, uint32_t *crc32list, const int *blocks, const uint64_t fileSize); + +void image_killUplinks(); + +bool image_loadAll(char *path); + +bool image_tryFreeAll(); + +bool image_create(char *image, int revision, uint64_t size); + +bool image_generateCrcFile(char *image); + +struct json_t* image_getListAsJson(); + +int image_getCompletenessEstimate(dnbd3_image_t * const image); + +void image_closeUnusedFd(); + +bool image_ensureDiskSpaceLocked(uint64_t size, bool force); + +// one byte in the map covers 8 4kib blocks, so 32kib per byte +// "+ (1 << 15) - 1" is required to account for the last bit of +// the image that is smaller than 32kib +// this would be the case whenever the image file size is not a +// multiple of 32kib (= the number of blocks is not divisible by 8) +// ie: if the image is 49152 bytes and you do 49152 >> 15 you get 1, +// but you actually need 2 bytes to have a complete cache map +#define IMGSIZE_TO_MAPBYTES(bytes) ((int)(((bytes) + (1 << 15) - 1) >> 15)) + +// calculate number of hash blocks in file. One hash block is 16MiB +#define HASH_BLOCK_SIZE ((int64_t)(1 << 24)) +#define IMGSIZE_TO_HASHBLOCKS(bytes) ((int)(((bytes) + HASH_BLOCK_SIZE - 1) / HASH_BLOCK_SIZE)) + +#endif diff --git a/src/server/ini.c b/src/server/ini.c new file mode 100644 index 0000000..216543b --- /dev/null +++ b/src/server/ini.c @@ -0,0 +1,164 @@ +/* inih -- simple .INI file parser + + inih is released under the New BSD license (see LICENSE.txt). Go to the project + home page for more info: + + http://code.google.com/p/inih/ + + */ + +#include "ini.h" + +#include <ctype.h> +#include <string.h> + +#if !INI_USE_STACK +#include <stdlib.h> +#endif + +#define MAX_SECTION 50 +#define MAX_NAME 50 + +/* Strip whitespace chars off end of given string, in place. Return s. */ +static char* rstrip(char* s) +{ + char* p = s + strlen( s ); + while ( p > s && isspace((unsigned char)(*--p))) + *p = '\0'; + return s; +} + +/* Return pointer to first non-whitespace char in given string. */ +static char* lskip(const char* s) +{ + while ( *s && isspace((unsigned char)(*s))) + s++; + return (char*)s; +} + +/* Return pointer to first char c or ';' comment in given string, or pointer to + null at end of string if neither found. ';' must be prefixed by a whitespace + character to register as a comment. */ +static char* find_char_or_comment(const char* s, char c) +{ + int was_whitespace = 0; + while ( *s && *s != c && !(was_whitespace && *s == ';') ) { + was_whitespace = isspace((unsigned char)(*s)); + s++; + } + return (char*)s; +} + +/* Version of strncpy that ensures dest (size bytes) is null-terminated. */ +static char* strncpy0(char* dest, const char* src, size_t size) +{ + strncpy( dest, src, size ); + dest[size - 1] = '\0'; + return dest; +} + +/* See documentation in header file. */ +int ini_parse_file(FILE* file, int (*handler)(void*, const char*, const char*, const char*), void* user) +{ + /* Uses a fair bit of stack (use heap instead if you need to) */ +#if INI_USE_STACK + char line[INI_MAX_LINE]; +#else + char* line; +#endif + char section[MAX_SECTION] = ""; + char prev_name[MAX_NAME] = ""; + + char* start; + char* end; + char* name; + char* value; + int lineno = 0; + int error = 0; + +#if !INI_USE_STACK + line = (char*)malloc( INI_MAX_LINE ); + if ( !line ) { + return -2; + } +#endif + + /* Scan through file line by line */ + while ( fgets( line, INI_MAX_LINE, file ) != NULL ) { + lineno++; + + start = line; +#if INI_ALLOW_BOM + if (lineno == 1 && (unsigned char)start[0] == 0xEF && + (unsigned char)start[1] == 0xBB && + (unsigned char)start[2] == 0xBF) { + start += 3; + } +#endif + start = lskip( rstrip( start ) ); + + if ( *start == ';' || *start == '#' ) { + /* Per Python ConfigParser, allow '#' comments at start of line */ + } +#if INI_ALLOW_MULTILINE + else if (*prev_name && *start && start > line) { + /* Non-black line with leading whitespace, treat as continuation + of previous name's value (as per Python ConfigParser). */ + if (!handler(user, section, prev_name, start) && !error) + error = lineno; + } +#endif + else if ( *start == '[' ) { + /* A "[section]" line */ + end = find_char_or_comment( start + 1, ']' ); + if ( *end == ']' ) { + *end = '\0'; + strncpy0( section, start + 1, sizeof(section) ); + *prev_name = '\0'; + } else if ( !error ) { + /* No ']' found on section line */ + error = lineno; + } + } else if ( *start && *start != ';' ) { + /* Not a comment, must be a name[=:]value pair */ + end = find_char_or_comment( start, '=' ); + if ( *end != '=' ) { + end = find_char_or_comment( start, ':' ); + } + if ( *end == '=' || *end == ':' ) { + *end = '\0'; + name = rstrip( start ); + value = lskip( end + 1 ); + end = find_char_or_comment( value, '\0' ); + if ( *end == ';' ) *end = '\0'; + rstrip( value ); + + /* Valid name[=:]value pair found, call handler */ + strncpy0( prev_name, name, sizeof(prev_name) ); + if ( !handler( user, section, name, value ) && !error ) error = lineno; + } else if ( !error ) { + /* No '=' or ':' found on name[=:]value line */ + error = lineno; + } + } + } + +#if !INI_USE_STACK + free( line ); +#endif + + return error; +} + +/* See documentation in header file. */ +int ini_parse(const char* filename, int (*handler)(void*, const char*, const char*, const char*), void* user) +{ + FILE* file; + int error; + + file = fopen( filename, "r" ); + if ( !file ) return -1; + error = ini_parse_file( file, handler, user ); + fclose( file ); + return error; +} diff --git a/src/server/ini.h b/src/server/ini.h new file mode 100644 index 0000000..06f1123 --- /dev/null +++ b/src/server/ini.h @@ -0,0 +1,66 @@ +/* inih -- simple .INI file parser + + inih is released under the New BSD license (see LICENSE.txt). Go to the project + home page for more info: + + http://code.google.com/p/inih/ + + */ + +#ifndef __INI_H__ +#define __INI_H__ + +/* Make this header file easier to include in C++ code */ +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdio.h> + +/* Parse given INI-style file. May have [section]s, name=value pairs + (whitespace stripped), and comments starting with ';' (semicolon). Section + is "" if name=value pair parsed before any section heading. name:value + pairs are also supported as a concession to Python's ConfigParser. + + For each name=value pair parsed, call handler function with given user + pointer as well as section, name, and value (data only valid for duration + of handler call). Handler should return nonzero on success, zero on error. + + Returns 0 on success, line number of first error on parse error (doesn't + stop on first error), -1 on file open error, or -2 on memory allocation + error (only when INI_USE_STACK is zero). + */ +int ini_parse(const char* filename, int (*handler)(void* user, const char* section, const char* name, const char* value), void* user); + +/* Same as ini_parse(), but takes a FILE* instead of filename. This doesn't + close the file when it's finished -- the caller must do that. */ +int ini_parse_file(FILE* file, int (*handler)(void* user, const char* section, const char* name, const char* value), void* user); + +/* Nonzero to allow multi-line value parsing, in the style of Python's + ConfigParser. If allowed, ini_parse() will call the handler with the same + name for each subsequent line parsed. */ +#ifndef INI_ALLOW_MULTILINE +#define INI_ALLOW_MULTILINE 1 +#endif + +/* Nonzero to allow a UTF-8 BOM sequence (0xEF 0xBB 0xBF) at the start of + the file. See http://code.google.com/p/inih/issues/detail?id=21 */ +#ifndef INI_ALLOW_BOM +#define INI_ALLOW_BOM 1 +#endif + +/* Nonzero to use stack, zero to use heap (malloc/free). */ +#ifndef INI_USE_STACK +#define INI_USE_STACK 1 +#endif + +/* Maximum line length for any line in INI file. */ +#ifndef INI_MAX_LINE +#define INI_MAX_LINE 200 +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* __INI_H__ */ diff --git a/src/server/integrity.c b/src/server/integrity.c new file mode 100644 index 0000000..88b7487 --- /dev/null +++ b/src/server/integrity.c @@ -0,0 +1,274 @@ +#include "integrity.h" + +#include "helper.h" +#include "locks.h" +#include "image.h" +#include "uplink.h" + +#include <assert.h> +#include <sys/syscall.h> +#include <sys/resource.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <fcntl.h> + +#define CHECK_QUEUE_SIZE 200 + +#define CHECK_ALL (0x7fffffff) + +typedef struct +{ + dnbd3_image_t *image; // Image to check + int block; // Block to check + int count; // How many blocks to check starting at .block +} queue_entry; + +static pthread_t thread; +static queue_entry checkQueue[CHECK_QUEUE_SIZE]; +static pthread_mutex_t integrityQueueLock; +static pthread_cond_t queueSignal; +static int queueLen = -1; +static volatile bool bRunning = false; + +static void* integrity_main(void *data); + +/** + * Initialize the integrity check thread + */ +void integrity_init() +{ + assert( queueLen == -1 ); + pthread_mutex_init( &integrityQueueLock, NULL ); + pthread_cond_init( &queueSignal, NULL ); + pthread_mutex_lock( &integrityQueueLock ); + queueLen = 0; + pthread_mutex_unlock( &integrityQueueLock ); + bRunning = true; + if ( 0 != thread_create( &thread, NULL, &integrity_main, (void *)NULL ) ) { + bRunning = false; + logadd( LOG_WARNING, "Could not start integrity check thread. Corrupted images will not be detected." ); + return; + } +} + +void integrity_shutdown() +{ + assert( queueLen != -1 ); + logadd( LOG_DEBUG1, "Shutting down integrity checker...\n" ); + pthread_mutex_lock( &integrityQueueLock ); + pthread_cond_signal( &queueSignal ); + pthread_mutex_unlock( &integrityQueueLock ); + thread_join( thread, NULL ); + while ( bRunning ) + usleep( 10000 ); + pthread_mutex_destroy( &integrityQueueLock ); + pthread_cond_destroy( &queueSignal ); + logadd( LOG_DEBUG1, "Integrity checker exited normally.\n" ); +} + +/** + * Schedule an integrity check on the given image for the given hash block. + * It is not checked whether the block is completely cached locally, so + * make sure it is before calling, otherwise it will result in falsely + * detected corruption. + */ +void integrity_check(dnbd3_image_t *image, int block) +{ + if ( !bRunning ) { + logadd( LOG_MINOR, "Ignoring check request; thread not running..." ); + return; + } + int i, freeSlot = -1; + pthread_mutex_lock( &integrityQueueLock ); + for (i = 0; i < queueLen; ++i) { + if ( freeSlot == -1 && checkQueue[i].image == NULL ) { + freeSlot = i; + } else if ( checkQueue[i].image == image + && checkQueue[i].block <= block && checkQueue[i].block + checkQueue[i].count >= block ) { + // Already queued check dominates this one, or at least lies directly before this block + if ( checkQueue[i].block + checkQueue[i].count == block ) { + // It's directly before this one; expand range + checkQueue[i].count += 1; + } + logadd( LOG_DEBUG2, "Attaching to existing check request (%d/%d) (%d +%d)", i, queueLen, checkQueue[i].block, checkQueue[i].count ); + pthread_mutex_unlock( &integrityQueueLock ); + return; + } + } + if ( freeSlot == -1 ) { + if ( queueLen >= CHECK_QUEUE_SIZE ) { + pthread_mutex_unlock( &integrityQueueLock ); + logadd( LOG_INFO, "Check queue full, discarding check request...\n" ); + return; + } + freeSlot = queueLen++; + } + checkQueue[freeSlot].image = image; + if ( block == -1 ) { + checkQueue[freeSlot].block = 0; + checkQueue[freeSlot].count = CHECK_ALL; + } else { + checkQueue[freeSlot].block = block; + checkQueue[freeSlot].count = 1; + } + pthread_cond_signal( &queueSignal ); + pthread_mutex_unlock( &integrityQueueLock ); +} + +static void* integrity_main(void * data UNUSED) +{ + int i; + uint8_t *buffer = NULL; + size_t bufferSize = 0; + setThreadName( "image-check" ); + blockNoncriticalSignals(); +#if defined(linux) || defined(__linux) + // Setting nice of this thread - this is not POSIX conforming, so check if other platforms support this. + // POSIX says that setpriority() should set the nice value of all threads belonging to the current process, + // but on linux you can do this per thread. + pid_t tid = (pid_t)syscall( SYS_gettid ); + setpriority( PRIO_PROCESS, tid, 10 ); +#endif + pthread_mutex_lock( &integrityQueueLock ); + while ( !_shutdown ) { + if ( queueLen == 0 ) { + pthread_cond_wait( &queueSignal, &integrityQueueLock ); + } + for (i = queueLen - 1; i >= 0; --i) { + if ( _shutdown ) break; + dnbd3_image_t * const image = image_lock( checkQueue[i].image ); + if ( checkQueue[i].count == 0 || image == NULL ) { + checkQueue[i].image = image_release( image ); + if ( i + 1 == queueLen ) queueLen--; + continue; + } + // We have the image. Call image_release() some time + const int qCount = checkQueue[i].count; + bool foundCorrupted = false; + spin_lock( &image->lock ); + if ( image->crc32 != NULL && image->realFilesize != 0 ) { + int blocks[2] = { checkQueue[i].block, -1 }; + pthread_mutex_unlock( &integrityQueueLock ); + // Make copy of crc32 list as it might go away + const uint64_t fileSize = image->realFilesize; + const int numHashBlocks = IMGSIZE_TO_HASHBLOCKS(fileSize); + const size_t required = numHashBlocks * sizeof(uint32_t); + if ( buffer == NULL || required > bufferSize ) { + bufferSize = required; + if ( buffer != NULL ) free( buffer ); + buffer = malloc( bufferSize ); + } + memcpy( buffer, image->crc32, required ); + spin_unlock( &image->lock ); + // Open for direct I/O if possible; this prevents polluting the fs cache + int fd = open( image->path, O_RDONLY | O_DIRECT ); + bool direct = fd != -1; + if ( unlikely( !direct ) ) { + // Try unbuffered; flush to disk for that + logadd( LOG_DEBUG1, "O_DIRECT failed for %s", image->path ); + image_ensureOpen( image ); + fd = image->readFd; + } + int checkCount = MIN( qCount, 5 ); + if ( fd != -1 ) { + while ( blocks[0] < numHashBlocks && !_shutdown ) { + const uint64_t start = blocks[0] * HASH_BLOCK_SIZE; + const uint64_t end = MIN( (uint64_t)(blocks[0] + 1) * HASH_BLOCK_SIZE, image->virtualFilesize ); + bool complete = true; + if ( qCount == CHECK_ALL ) { + // When checking full image, skip incomplete blocks, otherwise assume block is complete + spin_lock( &image->lock ); + complete = image_isHashBlockComplete( image->cache_map, blocks[0], fileSize ); + spin_unlock( &image->lock ); + } +#if defined(linux) || defined(__linux) + if ( sync_file_range( fd, start, end - start, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER ) == -1 ) { +#else + if ( fsync( fd ) == -1 ) { +#endif + logadd( LOG_ERROR, "Cannot flush %s for integrity check", image->path ); + exit( 1 ); + } + // Use direct I/O only if read length is multiple of 4096 to be on the safe side + int tfd; + if ( direct && ( end % DNBD3_BLOCK_SIZE ) == 0 ) { + // Suitable for direct io + tfd = fd; + } else if ( !image_ensureOpen( image ) ) { + logadd( LOG_WARNING, "Cannot open %s for reading", image->path ); + break; + } else { + tfd = image->readFd; + // Evict from cache so we have to re-read, making sure data was properly stored + posix_fadvise( fd, start, end - start, POSIX_FADV_DONTNEED ); + } + if ( complete && !image_checkBlocksCrc32( tfd, (uint32_t*)buffer, blocks, fileSize ) ) { + logadd( LOG_WARNING, "Hash check for block %d of %s failed!", blocks[0], image->name ); + image_updateCachemap( image, start, end, false ); + // If this is not a full check, queue one + if ( qCount != CHECK_ALL ) { + logadd( LOG_INFO, "Queueing full check for %s", image->name ); + integrity_check( image, -1 ); + } + foundCorrupted = true; + } + blocks[0]++; // Increase before break, so it always points to the next block to check after loop + if ( complete && --checkCount == 0 ) break; + } + if ( direct ) { + close( fd ); + } + } + pthread_mutex_lock( &integrityQueueLock ); + assert( checkQueue[i].image == image ); + if ( qCount != CHECK_ALL ) { + // Not a full check; update the counter + checkQueue[i].count -= ( blocks[0] - checkQueue[i].block ); + if ( checkQueue[i].count < 0 ) { + logadd( LOG_WARNING, "BUG! checkQueue counter ran negative" ); + } + } + if ( checkCount > 0 || checkQueue[i].count <= 0 || fd == -1 ) { + // Done with this task as nothing left, OR we don't have an fd to read from + if ( fd == -1 ) { + logadd( LOG_WARNING, "Cannot hash check %s: bad fd", image->path ); + } + checkQueue[i].image = NULL; + if ( i + 1 == queueLen ) queueLen--; + // Mark as working again if applicable + if ( !foundCorrupted ) { + spin_lock( &image->lock ); + if ( image->uplink != NULL ) { // TODO: image_determineWorkingState() helper? + image->working = image->uplink->fd != -1 && image->readFd != -1; + } + spin_unlock( &image->lock ); + } + } else { + // Still more blocks to go... + checkQueue[i].block = blocks[0]; + } + } else { + spin_unlock( &image->lock ); + } + if ( foundCorrupted ) { + // Something was fishy, make sure uplink exists + spin_lock( &image->lock ); + image->working = false; + bool restart = image->uplink == NULL || image->uplink->shutdown; + spin_unlock( &image->lock ); + if ( restart ) { + uplink_shutdown( image ); + uplink_init( image, -1, NULL, -1 ); + } + } + // Release :-) + image_release( image ); + } + } + pthread_mutex_unlock( &integrityQueueLock ); + if ( buffer != NULL ) free( buffer ); + bRunning = false; + return NULL; +} + diff --git a/src/server/integrity.h b/src/server/integrity.h new file mode 100644 index 0000000..c3c2b44 --- /dev/null +++ b/src/server/integrity.h @@ -0,0 +1,12 @@ +#ifndef _INTEGRITY_H_ +#define _INTEGRITY_H_ + +#include "globals.h" + +void integrity_init(); + +void integrity_shutdown(); + +void integrity_check(dnbd3_image_t *image, int block); + +#endif /* INTEGRITY_H_ */ diff --git a/src/server/locks.c b/src/server/locks.c new file mode 100644 index 0000000..71a1845 --- /dev/null +++ b/src/server/locks.c @@ -0,0 +1,306 @@ +/* + * locks.c + * + * Created on: 16.07.2013 + * Author: sr + */ + +#include "locks.h" +#include "helper.h" +#include "../shared/timing.h" + +#ifdef _DEBUG +#define MAXLOCKS (SERVER_MAX_CLIENTS * 2 + SERVER_MAX_ALTS + 200 + SERVER_MAX_IMAGES) +#define MAXTHREADS (SERVER_MAX_CLIENTS + 100) +#define LOCKLEN 60 +typedef struct +{ + void *lock; + ticks locktime; + char locked; + pthread_t thread; + int lockId; + char name[LOCKLEN]; + char where[LOCKLEN]; +} debug_lock_t; + +typedef struct +{ + pthread_t tid; + ticks time; + char name[LOCKLEN]; + char where[LOCKLEN]; + +} debug_thread_t; + +int debugThreadCount = 0; + +static debug_lock_t locks[MAXLOCKS]; +static debug_thread_t threads[MAXTHREADS]; +static int init_done = 0; +static pthread_spinlock_t initdestory; +static int lockId = 0; +static pthread_t watchdog = 0; +static dnbd3_signal_t* watchdogSignal = NULL; + +static void *debug_thread_watchdog(void *something); + +int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared) +{ + if ( !init_done ) { + memset( locks, 0, MAXLOCKS * sizeof(debug_lock_t) ); + memset( threads, 0, MAXTHREADS * sizeof(debug_thread_t) ); + pthread_spin_init( &initdestory, PTHREAD_PROCESS_PRIVATE ); + init_done = 1; + } + int first = -1; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == lock ) { + logadd( LOG_ERROR, "Lock %p (%s) already initialized (%s:%d)\n", (void*)lock, name, file, line ); + exit( 4 ); + } + if ( first == -1 && locks[i].lock == NULL ) first = i; + } + if ( first == -1 ) { + logadd( LOG_ERROR, "No more free debug locks (%s:%d)\n", file, line ); + pthread_spin_unlock( &initdestory ); + debug_dump_lock_stats(); + exit( 4 ); + } + locks[first].lock = (void*)lock; + locks[first].locked = 0; + snprintf( locks[first].name, LOCKLEN, "%s", name ); + snprintf( locks[first].where, LOCKLEN, "I %s:%d", file, line ); + pthread_spin_unlock( &initdestory ); + return pthread_spin_init( lock, shared ); +} + +int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock) +{ + debug_lock_t *l = NULL; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == lock ) { + l = &locks[i]; + break; + } + } + pthread_spin_unlock( &initdestory ); + if ( l == NULL ) { + logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + debug_dump_lock_stats(); + exit( 4 ); + } + debug_thread_t *t = NULL; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXTHREADS; ++i) { + if ( threads[i].tid != 0 ) continue; + threads[i].tid = pthread_self(); + timing_get( &threads[i].time ); + snprintf( threads[i].name, LOCKLEN, "%s", name ); + snprintf( threads[i].where, LOCKLEN, "%s:%d", file, line ); + t = &threads[i]; + break; + } + pthread_spin_unlock( &initdestory ); + if ( t == NULL ) { + logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + const int retval = pthread_spin_lock( lock ); + pthread_spin_lock( &initdestory ); + t->tid = 0; + pthread_spin_unlock( &initdestory ); + if ( l->locked ) { + logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + l->locked = 1; + timing_get( &l->locktime ); + l->thread = pthread_self(); + snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); + pthread_spin_lock( &initdestory ); + l->lockId = ++lockId; + pthread_spin_unlock( &initdestory ); + return retval; +} + +int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock) +{ + debug_lock_t *l = NULL; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == lock ) { + l = &locks[i]; + break; + } + } + pthread_spin_unlock( &initdestory ); + if ( l == NULL ) { + logadd( LOG_ERROR, "Tried to lock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + debug_dump_lock_stats(); + exit( 4 ); + } + debug_thread_t *t = NULL; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXTHREADS; ++i) { + if ( threads[i].tid != 0 ) continue; + threads[i].tid = pthread_self(); + timing_get( &threads[i].time ); + snprintf( threads[i].name, LOCKLEN, "%s", name ); + snprintf( threads[i].where, LOCKLEN, "%s:%d", file, line ); + t = &threads[i]; + break; + } + pthread_spin_unlock( &initdestory ); + if ( t == NULL ) { + logadd( LOG_ERROR, "Lock sanity check: Too many waiting threads for %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + const int retval = pthread_spin_trylock( lock ); + pthread_spin_lock( &initdestory ); + t->tid = 0; + pthread_spin_unlock( &initdestory ); + if ( retval == 0 ) { + if ( l->locked ) { + logadd( LOG_ERROR, "Lock sanity check: lock %p (%s) already locked at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + l->locked = 1; + timing_get( &l->locktime ); + l->thread = pthread_self(); + snprintf( l->where, LOCKLEN, "L %s:%d", file, line ); + pthread_spin_lock( &initdestory ); + l->lockId = ++lockId; + pthread_spin_unlock( &initdestory ); + } + return retval; +} + +int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock) +{ + debug_lock_t *l = NULL; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == lock ) { + l = &locks[i]; + break; + } + } + pthread_spin_unlock( &initdestory ); + if ( l == NULL ) { + logadd( LOG_ERROR, "Tried to unlock uninitialized lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + if ( !l->locked ) { + logadd( LOG_ERROR, "Unlock sanity check: lock %p (%s) not locked at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); + } + l->locked = 0; + l->thread = 0; + snprintf( l->where, LOCKLEN, "U %s:%d", file, line ); + int retval = pthread_spin_unlock( lock ); + return retval; +} + +int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock) +{ + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == lock ) { + if ( locks[i].locked ) { + logadd( LOG_ERROR, "Tried to destroy lock %p (%s) at %s:%d when it is still locked\n", (void*)lock, name, file, line ); + exit( 4 ); + } + locks[i].lock = NULL; + snprintf( locks[i].where, LOCKLEN, "D %s:%d", file, line ); + pthread_spin_unlock( &initdestory ); + return pthread_spin_destroy( lock ); + } + } + logadd( LOG_ERROR, "Tried to destroy non-existent lock %p (%s) at %s:%d\n", (void*)lock, name, file, line ); + exit( 4 ); +} + +void debug_dump_lock_stats() +{ + declare_now; + pthread_spin_lock( &initdestory ); + printf( "\n **** LOCKS ****\n\n" ); + for (int i = 0; i < MAXLOCKS; ++i) { + if ( locks[i].lock == NULL ) continue; + if ( locks[i].locked ) { + printf( "* *** %s ***\n" + "* Where: %s\n" + "* When: %d secs ago\n" + "* Locked: %d\n" + "* Serial: %d\n" + "* Thread: %d\n", locks[i].name, locks[i].where, (int)timing_diff( &locks[i].locktime, &now ), (int)locks[i].locked, locks[i].lockId, + (int)locks[i].thread ); + } else { + printf( "* *** %s ***\n" + "* Where: %s\n" + "* Locked: %d\n", locks[i].name, locks[i].where, (int)locks[i].locked ); + } + } + printf( "\n **** WAITING THREADS ****\n\n" ); + for (int i = 0; i < MAXTHREADS; ++i) { + if ( threads[i].tid == 0 ) continue; + printf( "* *** Thread %d ***\n" + "* Lock: %s\n" + "* Where: %s\n" + "* How long: %d secs\n", (int)threads[i].tid, threads[i].name, threads[i].where, (int)timing_diff( &threads[i].time, &now ) ); + } + pthread_spin_unlock( &initdestory ); +} + +static void *debug_thread_watchdog(void *something UNUSED) +{ + setThreadName( "debug-watchdog" ); + while ( !_shutdown ) { + if ( init_done ) { + declare_now; + pthread_spin_lock( &initdestory ); + for (int i = 0; i < MAXTHREADS; ++i) { + if ( threads[i].tid == 0 ) continue; + const uint32_t diff = timing_diff( &threads[i].time, &now ); + if ( diff > 6 && diff < 100000 ) { + printf( "\n\n +++++++++ DEADLOCK ++++++++++++\n\n" ); + pthread_spin_unlock( &initdestory ); + debug_dump_lock_stats(); + exit( 99 ); + } + } + pthread_spin_unlock( &initdestory ); + } + if ( watchdogSignal == NULL || signal_wait( watchdogSignal, 5000 ) == SIGNAL_ERROR ) sleep( 5 ); + } + return NULL ; +} + +#endif + +void debug_locks_start_watchdog() +{ +#ifdef _DEBUG + watchdogSignal = signal_new(); + if ( 0 != thread_create( &watchdog, NULL, &debug_thread_watchdog, (void *)NULL ) ) { + logadd( LOG_ERROR, "Could not start debug-lock watchdog." ); + return; + } +#endif +} + +void debug_locks_stop_watchdog() +{ +#ifdef _DEBUG + _shutdown = true; + printf( "Killing debug watchdog...\n" ); + pthread_spin_lock( &initdestory ); + signal_call( watchdogSignal ); + pthread_spin_unlock( &initdestory ); + thread_join( watchdog, NULL ); + signal_close( watchdogSignal ); +#endif +} diff --git a/src/server/locks.h b/src/server/locks.h new file mode 100644 index 0000000..16b59a7 --- /dev/null +++ b/src/server/locks.h @@ -0,0 +1,85 @@ +#ifndef _LOCKS_H_ +#define _LOCKS_H_ + +#include <pthread.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> + +#ifdef _DEBUG + +#define spin_init( lock, type ) debug_spin_init( #lock, __FILE__, __LINE__, lock, type) +#define spin_lock( lock ) debug_spin_lock( #lock, __FILE__, __LINE__, lock) +#define spin_trylock( lock ) debug_spin_trylock( #lock, __FILE__, __LINE__, lock) +#define spin_unlock( lock ) debug_spin_unlock( #lock, __FILE__, __LINE__, lock) +#define spin_destroy( lock ) debug_spin_destroy( #lock, __FILE__, __LINE__, lock) + +int debug_spin_init(const char *name, const char *file, int line, pthread_spinlock_t *lock, int shared); +int debug_spin_lock(const char *name, const char *file, int line, pthread_spinlock_t *lock); +int debug_spin_trylock(const char *name, const char *file, int line, pthread_spinlock_t *lock); +int debug_spin_unlock(const char *name, const char *file, int line, pthread_spinlock_t *lock); +int debug_spin_destroy(const char *name, const char *file, int line, pthread_spinlock_t *lock); + +void debug_dump_lock_stats(); + + +#else + +#define spin_init( lock, type ) pthread_spin_init(lock, type) +#define spin_lock( lock ) pthread_spin_lock(lock) +#define spin_trylock( lock ) pthread_spin_trylock(lock) +#define spin_unlock( lock ) pthread_spin_unlock(lock) +#define spin_destroy( lock ) pthread_spin_destroy(lock) + +#endif + +#ifdef DEBUG_THREADS + +extern int debugThreadCount; +#define thread_create(thread,attr,routine,arg) (logadd( LOG_THREAD CREATE, "%d @ %s:%d\n", debugThreadCount, __FILE__, (int)__LINE__), debug_thread_create(thread, attr, routine, arg)) +static inline pthread_t debug_thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg) +{ + int i; + if (attr == NULL || pthread_attr_getdetachstate(attr, &i) != 0 || i == PTHREAD_CREATE_JOINABLE) { + ++debugThreadCount; + } + return pthread_create( thread, attr, start_routine, arg ); +} + +#define thread_detach(thread) (logadd( LOG_THREAD DETACH, "%d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_detach(thread)) +static inline int debug_thread_detach(pthread_t thread) +{ + const int ret = pthread_detach(thread); + if (ret == 0) { + --debugThreadCount; + } else { + logadd( LOG_THREAD DETACH, "Tried to detach invalid thread (error %d)\n", (int)errno); + exit(1); + } + return ret; +} +#define thread_join(thread,value) (logadd( LOG_THREAD JOIN, "%d @ %s:%d\n", debugThreadCount, __FILE__, __LINE__), debug_thread_join(thread,value)) +static inline int debug_thread_join(pthread_t thread, void **value_ptr) +{ + const int ret = pthread_join(thread, value_ptr); + if (ret == 0) { + --debugThreadCount; + } else { + logadd( LOG_THREAD JOIN, "Tried to join invalid thread (error %d)\n", (int)errno); + exit(1); + } + return ret; +} + +#else + +#define thread_create(thread,attr,routine,param) pthread_create( thread, attr, routine, param ) +#define thread_detach(thread) pthread_detach( thread ) +#define thread_join(thread,value) pthread_join( thread, value ) + +#endif + +void debug_locks_start_watchdog(); +void debug_locks_stop_watchdog(); + +#endif /* LOCKS_H_ */ diff --git a/src/server/net.c b/src/server/net.c new file mode 100644 index 0000000..00e88e0 --- /dev/null +++ b/src/server/net.c @@ -0,0 +1,731 @@ +/* + * This file is part of the Distributed Network Block Device 3 + * + * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> + * + * This file may be licensed under the terms of of the + * GNU General Public License Version 2 (the ``GPL''). + * + * Software distributed under the License is distributed + * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either + * express or implied. See the GPL for the specific language + * governing rights and limitations. + * + * You should have received a copy of the GPL along with this + * program. If not, go to http://www.gnu.org/licenses/gpl.html + * or write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + */ + +#include "helper.h" +#include "image.h" +#include "uplink.h" +#include "locks.h" +#include "rpc.h" +#include "altservers.h" + +#include "../shared/sockhelper.h" +#include "../shared/timing.h" +#include "../shared/protocol.h" +#include "../serialize.h" + +#include <assert.h> + +#ifdef __linux__ +#include <sys/sendfile.h> +#endif +#ifdef __FreeBSD__ +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#endif +#include <jansson.h> +#include <inttypes.h> +#include <stdatomic.h> + +static dnbd3_client_t *_clients[SERVER_MAX_CLIENTS]; +static int _num_clients = 0; +static pthread_spinlock_t _clients_lock; + +static char nullbytes[500]; + +static atomic_uint_fast64_t totalBytesSent = 0; + +// Adding and removing clients -- list management +static bool addToList(dnbd3_client_t *client); +static void removeFromList(dnbd3_client_t *client); +static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client); + +static inline bool recv_request_header(int sock, dnbd3_request_t *request) +{ + ssize_t ret, fails = 0; +#ifdef AFL_MODE + sock = 0; +#endif + // Read request header from socket + while ( ( ret = recv( sock, request, sizeof(*request), MSG_WAITALL ) ) != sizeof(*request) ) { + if ( errno == EINTR && ++fails < 10 ) continue; + if ( ret >= 0 || ++fails > SOCKET_TIMEOUT_CLIENT_RETRIES ) return false; + if ( errno == EAGAIN ) continue; + logadd( LOG_DEBUG2, "Error receiving request: Could not read message header (%d/%d, e=%d)\n", (int)ret, (int)sizeof(*request), errno ); + return false; + } + // Make sure all bytes are in the right order (endianness) + fixup_request( *request ); + if ( request->magic != dnbd3_packet_magic ) { + logadd( LOG_DEBUG2, "Magic in client request incorrect (cmd: %d, len: %d)\n", (int)request->cmd, (int)request->size ); + return false; + } + // Payload sanity check + if ( request->cmd != CMD_GET_BLOCK && request->size > MAX_PAYLOAD ) { + logadd( LOG_WARNING, "Client tries to send a packet of type %d with %d bytes payload. Dropping client.", (int)request->cmd, (int)request->size ); + return false; + } + return true; +} + +static inline bool recv_request_payload(int sock, uint32_t size, serialized_buffer_t *payload) +{ +#ifdef AFL_MODE + sock = 0; +#endif + if ( size == 0 ) { + logadd( LOG_ERROR, "Called recv_request_payload() to receive 0 bytes" ); + return false; + } + if ( size > MAX_PAYLOAD ) { + logadd( LOG_ERROR, "Called recv_request_payload() for more bytes than the passed buffer could hold!" ); + return false; + } + if ( sock_recv( sock, payload->buffer, size ) != (ssize_t)size ) { + logadd( LOG_DEBUG1, "Could not receive request payload of length %d\n", (int)size ); + return false; + } + // Prepare payload buffer for reading + serializer_reset_read( payload, size ); + return true; +} + +/** + * Send reply with optional payload. payload can be null. The caller has to + * acquire the sendMutex first. + */ +static inline bool send_reply(int sock, dnbd3_reply_t *reply, void *payload) +{ + const uint32_t size = reply->size; + fixup_reply( *reply ); + if ( sock_sendAll( sock, reply, sizeof(dnbd3_reply_t), 1 ) != sizeof(dnbd3_reply_t) ) { + logadd( LOG_DEBUG1, "Sending reply header to client failed" ); + return false; + } + if ( size != 0 && payload != NULL ) { + if ( sock_sendAll( sock, payload, size, 1 ) != (ssize_t)size ) { + logadd( LOG_DEBUG1, "Sending payload of %"PRIu32" bytes to client failed", size ); + return false; + } + } + return true; +} + +/** + * Send given amount of null bytes. The caller has to acquire the sendMutex first. + */ +static inline bool sendPadding( const int fd, uint32_t bytes ) +{ + ssize_t ret; + while ( bytes >= sizeof(nullbytes) ) { + ret = sock_sendAll( fd, nullbytes, sizeof(nullbytes), 2 ); + if ( ret <= 0 ) + return false; + bytes -= (uint32_t)ret; + } + return sock_sendAll( fd, nullbytes, bytes, 2 ) == (ssize_t)bytes; +} + +void net_init() +{ + spin_init( &_clients_lock, PTHREAD_PROCESS_PRIVATE ); +} + +void* net_handleNewConnection(void *clientPtr) +{ + dnbd3_client_t * const client = (dnbd3_client_t *)clientPtr; + dnbd3_request_t request; + + // Await data from client. Since this is a fresh connection, we expect data right away + sock_setTimeout( client->sock, _clientTimeout ); + do { +#ifdef AFL_MODE + const int ret = (int)recv( 0, &request, sizeof(request), MSG_WAITALL ); +#else + const int ret = (int)recv( client->sock, &request, sizeof(request), MSG_WAITALL ); +#endif + // It's expected to be a real dnbd3 client + // Check request for validity. This implicitly dictates that all HTTP requests are more than 24 bytes... + if ( ret != (int)sizeof(request) ) { + logadd( LOG_DEBUG2, "Error receiving request: Could not read message header (%d/%d, e=%d)", (int)ret, (int)sizeof(request), errno ); + goto fail_preadd; + } + + if ( request.magic != dnbd3_packet_magic ) { + // Let's see if this looks like an HTTP request + if ( ((char*)&request)[0] == 'G' || ((char*)&request)[0] == 'P' ) { + // Close enough... + rpc_sendStatsJson( client->sock, &client->host, &request, ret ); + } else { + logadd( LOG_DEBUG1, "Magic in client handshake incorrect" ); + } + goto fail_preadd; + } + // Magic OK, untangle byte order if required + fixup_request( request ); + if ( request.cmd != CMD_SELECT_IMAGE ) { + logadd( LOG_WARNING, "Client sent != CMD_SELECT_IMAGE in handshake (got cmd=%d, size=%d), dropping client.", (int)request.cmd, (int)request.size ); + goto fail_preadd; + } + } while (0); + // Fully init client struct + spin_init( &client->lock, PTHREAD_PROCESS_PRIVATE ); + pthread_mutex_init( &client->sendMutex, NULL ); + + spin_lock( &client->lock ); + host_to_string( &client->host, client->hostName, HOSTNAMELEN ); + client->hostName[HOSTNAMELEN-1] = '\0'; + spin_unlock( &client->lock ); + client->bytesSent = 0; + + if ( !addToList( client ) ) { + freeClientStruct( client ); + logadd( LOG_WARNING, "Could not add new client to list when connecting" ); + return NULL; + } + + dnbd3_reply_t reply; + + dnbd3_image_t *image = NULL; + int image_file = -1; + + int num; + bool bOk = false; + bool hasName = false; + + serialized_buffer_t payload; + uint16_t rid, client_version; + uint64_t start, end; + + dnbd3_server_entry_t server_list[NUMBER_SERVERS]; + + // Set to zero to make valgrind happy + memset( &reply, 0, sizeof(reply) ); + memset( &payload, 0, sizeof(payload) ); + reply.magic = dnbd3_packet_magic; + + // Receive first packet's payload + if ( recv_request_payload( client->sock, request.size, &payload ) ) { + char *image_name; + client_version = serializer_get_uint16( &payload ); + image_name = serializer_get_string( &payload ); + rid = serializer_get_uint16( &payload ); + const uint8_t flags = serializer_get_uint8( &payload ); + client->isServer = ( flags & FLAGS8_SERVER ); + if ( request.size < 3 || !image_name || client_version < MIN_SUPPORTED_CLIENT ) { + if ( client_version < MIN_SUPPORTED_CLIENT ) { + logadd( LOG_DEBUG1, "Client %s too old", client->hostName ); + } else { + logadd( LOG_DEBUG1, "Incomplete handshake received from %s", client->hostName ); + } + } else { + if ( !client->isServer || !_isProxy ) { + // Is a normal client, or we're not proxy + image = image_getOrLoad( image_name, rid ); + } else if ( _backgroundReplication != BGR_FULL && ( flags & FLAGS8_BG_REP ) ) { + // We're a proxy, client is another proxy, we don't do BGR, but connecting proxy does... + // Reject, as this would basically force this proxy to do BGR too. + image = image_get( image_name, rid, true ); + if ( image != NULL && image->cache_map != NULL ) { + // Only exception is if the image is complete locally + image = image_release( image ); + } + } else if ( _lookupMissingForProxy ) { + // No BGR mismatch and we're told to lookup missing images on a known uplink server + // if the requesting client is a proxy + image = image_getOrLoad( image_name, rid ); + } else { + // No BGR mismatch, but don't lookup if image is unknown locally + image = image_get( image_name, rid, true ); + } + spin_lock( &client->lock ); + client->image = image; + spin_unlock( &client->lock ); + if ( image == NULL ) { + //logadd( LOG_DEBUG1, "Client requested non-existent image '%s' (rid:%d), rejected\n", image_name, (int)rid ); + } else if ( !image->working ) { + logadd( LOG_DEBUG1, "Client %s requested non-working image '%s' (rid:%d), rejected\n", + client->hostName, image_name, (int)rid ); + } else { + bool penalty; + // Image is fine so far, but occasionally drop a client if the uplink for the image is clogged or unavailable + bOk = true; + if ( image->cache_map != NULL ) { + spin_lock( &image->lock ); + if ( image->uplink == NULL || image->uplink->cacheFd == -1 || image->uplink->queueLen > SERVER_UPLINK_QUEUELEN_THRES ) { + bOk = ( rand() % 4 ) == 1; + } + penalty = bOk && image->uplink != NULL && image->uplink->cacheFd == -1; + spin_unlock( &image->lock ); + if ( penalty ) { // Wait 100ms if local caching is not working so this + usleep( 100000 ); // server gets a penalty and is less likely to be selected + } + } + if ( bOk ) { + spin_lock( &image->lock ); + image_file = image->readFd; + if ( !client->isServer ) { + // Only update immediately if this is a client. Servers are handled on disconnect. + timing_get( &image->atime ); + } + spin_unlock( &image->lock ); + serializer_reset_write( &payload ); + serializer_put_uint16( &payload, client_version < 3 ? client_version : PROTOCOL_VERSION ); // XXX: Since messed up fuse client was messed up before :( + serializer_put_string( &payload, image->name ); + serializer_put_uint16( &payload, (uint16_t)image->rid ); + serializer_put_uint64( &payload, image->virtualFilesize ); + reply.cmd = CMD_SELECT_IMAGE; + reply.size = serializer_get_written_length( &payload ); + if ( !send_reply( client->sock, &reply, &payload ) ) { + bOk = false; + } + } + } + } + } + + if ( bOk ) { + // add artificial delay if applicable + if ( client->isServer && _serverPenalty != 0 ) { + usleep( _serverPenalty ); + } else if ( !client->isServer && _clientPenalty != 0 ) { + usleep( _clientPenalty ); + } + // client handling mainloop + while ( recv_request_header( client->sock, &request ) ) { + if ( _shutdown ) break; + switch ( request.cmd ) { + + case CMD_GET_BLOCK:; + const uint64_t offset = request.offset_small; // Copy to full uint64 to prevent repeated masking + if ( offset >= image->virtualFilesize ) { + // Sanity check + logadd( LOG_WARNING, "Client %s requested non-existent block", client->hostName ); + reply.size = 0; + reply.cmd = CMD_ERROR; + send_reply( client->sock, &reply, NULL ); + break; + } + if ( offset + request.size > image->virtualFilesize ) { + // Sanity check + logadd( LOG_WARNING, "Client %s requested data block that extends beyond image size", client->hostName ); + reply.size = 0; + reply.cmd = CMD_ERROR; + send_reply( client->sock, &reply, NULL ); + break; + } + + if ( request.size != 0 && image->cache_map != NULL ) { + // This is a proxyed image, check if we need to relay the request... + start = offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + end = (offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + bool isCached = true; + spin_lock( &image->lock ); + // Check again as we only aquired the lock just now + if ( image->cache_map != NULL ) { + const uint64_t firstByteInMap = start >> 15; + const uint64_t lastByteInMap = (end - 1) >> 15; + uint64_t pos; + // Middle - quick checking + if ( isCached ) { + pos = firstByteInMap + 1; + while ( pos < lastByteInMap ) { + if ( image->cache_map[pos] != 0xff ) { + isCached = false; + break; + } + ++pos; + } + } + // First byte + if ( isCached ) { + pos = start; + do { + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + if ( (image->cache_map[firstByteInMap] & bit_mask) == 0 ) { + isCached = false; + break; + } + pos += DNBD3_BLOCK_SIZE; + } while ( firstByteInMap == (pos >> 15) && pos < end ); + } + // Last byte - only check if request spans multiple bytes in cache map + if ( isCached && firstByteInMap != lastByteInMap ) { + pos = lastByteInMap << 15; + while ( pos < end ) { + assert( lastByteInMap == (pos >> 15) ); + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = (uint8_t)( 1 << map_x ); + if ( (image->cache_map[lastByteInMap] & bit_mask) == 0 ) { + isCached = false; + break; + } + pos += DNBD3_BLOCK_SIZE; + } + } + } + spin_unlock( &image->lock ); + if ( !isCached ) { + if ( !uplink_request( client, request.handle, offset, request.size, request.hops ) ) { + logadd( LOG_DEBUG1, "Could not relay uncached request from %s to upstream proxy, disabling image %s:%d", + client->hostName, image->name, image->rid ); + image->working = false; + goto exit_client_cleanup; + } + break; // DONE, exit request.cmd switch + } + } + + reply.cmd = CMD_GET_BLOCK; + reply.size = request.size; + reply.handle = request.handle; + + fixup_reply( reply ); + const bool lock = image->uplink != NULL; + if ( lock ) pthread_mutex_lock( &client->sendMutex ); + // Send reply header + if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + logadd( LOG_DEBUG1, "Sending CMD_GET_BLOCK reply header to %s failed", client->hostName ); + goto exit_client_cleanup; + } + + if ( request.size != 0 ) { + // Send payload if request length > 0 + size_t done = 0; + off_t foffset = (off_t)offset; + size_t realBytes; + if ( offset + request.size <= image->realFilesize ) { + realBytes = request.size; + } else { + realBytes = (size_t)(image->realFilesize - offset); + } + while ( done < realBytes ) { + // TODO: Should we consider EOPNOTSUPP on BSD for sendfile and fallback to read/write? + // Linux would set EINVAL or ENOSYS instead, which it unfortunately also does for a couple of other failures :/ + // read/write would kill performance anyways so a fallback would probably be of little use either way. +#ifdef AFL_MODE + char buf[1000]; + size_t cnt = realBytes - done; + if ( cnt > 1000 ) { + cnt = 1000; + } + const ssize_t sent = pread( image_file, buf, cnt, foffset ); + if ( sent > 0 ) { + //write( client->sock, buf, sent ); // This is not verified in any way, so why even do it... + } else { + const int err = errno; +#elif defined(__linux__) + const ssize_t sent = sendfile( client->sock, image_file, &foffset, realBytes - done ); + if ( sent <= 0 ) { + const int err = errno; +#elif defined(__FreeBSD__) + off_t sent; + const int ret = sendfile( image_file, client->sock, foffset, realBytes - done, NULL, &sent, 0 ); + if ( ret == -1 || sent == 0 ) { + const int err = errno; + if ( ret == -1 ) { + if ( err == EAGAIN || err == EINTR ) { // EBUSY? manpage doesn't explicitly mention *sent here.. But then again we dont set the according flag anyways + done += sent; + continue; + } + sent = -1; + } +#endif + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + if ( sent == -1 ) { + if ( err != EPIPE && err != ECONNRESET && err != ESHUTDOWN + && err != EAGAIN && err != EWOULDBLOCK ) { + logadd( LOG_DEBUG1, "sendfile to %s failed (image to net. sent %d/%d, errno=%d)", + client->hostName, (int)done, (int)realBytes, err ); + } + if ( err == EBADF || err == EFAULT || err == EINVAL || err == EIO ) { + logadd( LOG_INFO, "Disabling %s:%d", image->name, image->rid ); + image->working = false; + } + } + goto exit_client_cleanup; + } + done += sent; + } + if ( request.size > (uint32_t)realBytes ) { + if ( !sendPadding( client->sock, request.size - (uint32_t)realBytes ) ) { + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + goto exit_client_cleanup; + } + } + } + if ( lock ) pthread_mutex_unlock( &client->sendMutex ); + // Global per-client counter + client->bytesSent += request.size; // Increase counter for statistics. + break; + + case CMD_GET_SERVERS: + // Build list of known working alt servers + num = altservers_getListForClient( &client->host, server_list, NUMBER_SERVERS ); + reply.cmd = CMD_GET_SERVERS; + reply.size = (uint32_t)( num * sizeof(dnbd3_server_entry_t) ); + pthread_mutex_lock( &client->sendMutex ); + send_reply( client->sock, &reply, server_list ); + pthread_mutex_unlock( &client->sendMutex ); + goto set_name; + break; + + case CMD_KEEPALIVE: + reply.cmd = CMD_KEEPALIVE; + reply.size = 0; + pthread_mutex_lock( &client->sendMutex ); + send_reply( client->sock, &reply, NULL ); + pthread_mutex_unlock( &client->sendMutex ); +set_name: ; + if ( !hasName ) { + hasName = true; + setThreadName( client->hostName ); + } + break; + + case CMD_SET_CLIENT_MODE: + client->isServer = false; + break; + + case CMD_GET_CRC32: + reply.cmd = CMD_GET_CRC32; + pthread_mutex_lock( &client->sendMutex ); + if ( image->crc32 == NULL ) { + reply.size = 0; + send_reply( client->sock, &reply, NULL ); + } else { + const uint32_t size = reply.size = (uint32_t)( (IMGSIZE_TO_HASHBLOCKS(image->realFilesize) + 1) * sizeof(uint32_t) ); + send_reply( client->sock, &reply, NULL ); + send( client->sock, &image->masterCrc32, sizeof(uint32_t), MSG_MORE ); + send( client->sock, image->crc32, size - sizeof(uint32_t), 0 ); + } + pthread_mutex_unlock( &client->sendMutex ); + break; + + default: + logadd( LOG_ERROR, "Unknown command from client %s: %d", client->hostName, (int)request.cmd ); + break; + + } + } + } +exit_client_cleanup: ; + // First remove from list, then add to counter to prevent race condition + removeFromList( client ); + totalBytesSent += client->bytesSent; + // Access time, but only if client didn't just probe + if ( image != NULL ) { + spin_lock( &image->lock ); + if ( client->bytesSent > DNBD3_BLOCK_SIZE * 10 ) { + timing_get( &image->atime ); + } + spin_unlock( &image->lock ); + } + freeClientStruct( client ); // This will also call image_release on client->image + return NULL ; +fail_preadd: ; + close( client->sock ); + free( client ); + return NULL; +} + +/** + * Get list of all clients. + */ +struct json_t* net_getListAsJson() +{ + json_t *jsonClients = json_array(); + json_t *clientStats; + int imgId, isServer; + uint64_t bytesSent; + char host[HOSTNAMELEN]; + host[HOSTNAMELEN-1] = '\0'; + + spin_lock( &_clients_lock ); + for ( int i = 0; i < _num_clients; ++i ) { + dnbd3_client_t * const client = _clients[i]; + if ( client == NULL || client->image == NULL ) + continue; + spin_lock( &client->lock ); + // Unlock so we give other threads a chance to access the client list. + // We might not get an atomic snapshot of the currently connected clients, + // but that doesn't really make a difference anyways. + spin_unlock( &_clients_lock ); + strncpy( host, client->hostName, HOSTNAMELEN - 1 ); + imgId = client->image->id; + isServer = (int)client->isServer; + bytesSent = client->bytesSent; + spin_unlock( &client->lock ); + clientStats = json_pack( "{sssisisI}", + "address", host, + "imageId", imgId, + "isServer", isServer, + "bytesSent", (json_int_t)bytesSent ); + json_array_append_new( jsonClients, clientStats ); + spin_lock( &_clients_lock ); + } + spin_unlock( &_clients_lock ); + return jsonClients; +} + +/** + * Get number of clients connected, total bytes sent, or both. + * we don't unlock the list while iterating or we might get an + * incorrect result if a client is disconnecting while iterating. + */ +void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent) +{ + int cc = 0, sc = 0; + uint64_t bs = 0; + + spin_lock( &_clients_lock ); + for ( int i = 0; i < _num_clients; ++i ) { + const dnbd3_client_t * const client = _clients[i]; + if ( client == NULL || client->image == NULL ) + continue; + if ( client->isServer ) { + sc += 1; + } else { + cc += 1; + } + bs += client->bytesSent; + } + spin_unlock( &_clients_lock ); + if ( clientCount != NULL ) { + *clientCount = cc; + } + if ( serverCount != NULL ) { + *serverCount = sc; + } + if ( bytesSent != NULL ) { + *bytesSent = totalBytesSent + bs; + } +} + +void net_disconnectAll() +{ + int i; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] == NULL ) continue; + dnbd3_client_t * const client = _clients[i]; + spin_lock( &client->lock ); + if ( client->sock >= 0 ) shutdown( client->sock, SHUT_RDWR ); + spin_unlock( &client->lock ); + } + spin_unlock( &_clients_lock ); +} + +void net_waitForAllDisconnected() +{ + int retries = 10, count, i; + do { + count = 0; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] == NULL ) continue; + count++; + } + spin_unlock( &_clients_lock ); + if ( count != 0 ) { + logadd( LOG_INFO, "%d clients still active...\n", count ); + sleep( 1 ); + } + } while ( count != 0 && --retries > 0 ); + _num_clients = 0; +} + +/* +++ + * Client list. + * + * Adding and removing clients. + */ + +/** + * Remove a client from the clients array + * Locks on: _clients_lock + */ +static void removeFromList(dnbd3_client_t *client) +{ + int i; + spin_lock( &_clients_lock ); + for ( i = _num_clients - 1; i >= 0; --i ) { + if ( _clients[i] == client ) { + _clients[i] = NULL; + } + if ( _clients[i] == NULL && i + 1 == _num_clients ) --_num_clients; + } + spin_unlock( &_clients_lock ); +} + +/** + * Free the client struct recursively. + * !! Make sure to call this function after removing the client from _dnbd3_clients !! + * Locks on: _clients[].lock, _images[].lock + * might call functions that lock on _images, _image[], uplink.queueLock, client.sendMutex + */ +static dnbd3_client_t* freeClientStruct(dnbd3_client_t *client) +{ + spin_lock( &client->lock ); + pthread_mutex_lock( &client->sendMutex ); + if ( client->sock != -1 ) close( client->sock ); + client->sock = -1; + pthread_mutex_unlock( &client->sendMutex ); + if ( client->image != NULL ) { + spin_lock( &client->image->lock ); + if ( client->image->uplink != NULL ) uplink_removeClient( client->image->uplink, client ); + spin_unlock( &client->image->lock ); + client->image = image_release( client->image ); + } + spin_unlock( &client->lock ); + spin_destroy( &client->lock ); + pthread_mutex_destroy( &client->sendMutex ); + free( client ); + return NULL ; +} + +//###// + +/** + * Add client to the clients array. + * Locks on: _clients_lock + */ +static bool addToList(dnbd3_client_t *client) +{ + int i; + spin_lock( &_clients_lock ); + for (i = 0; i < _num_clients; ++i) { + if ( _clients[i] != NULL ) continue; + _clients[i] = client; + spin_unlock( &_clients_lock ); + return true; + } + if ( _num_clients >= _maxClients ) { + spin_unlock( &_clients_lock ); + logadd( LOG_ERROR, "Maximum number of clients reached!" ); + return false; + } + _clients[_num_clients++] = client; + spin_unlock( &_clients_lock ); + return true; +} + diff --git a/src/server/net.h b/src/server/net.h new file mode 100644 index 0000000..6813b49 --- /dev/null +++ b/src/server/net.h @@ -0,0 +1,40 @@ +/* + * This file is part of the Distributed Network Block Device 3 + * + * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> + * + * This file may be licensed under the terms of of the + * GNU General Public License Version 2 (the ``GPL''). + * + * Software distributed under the License is distributed + * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either + * express or implied. See the GPL for the specific language + * governing rights and limitations. + * + * You should have received a copy of the GPL along with this + * program. If not, go to http://www.gnu.org/licenses/gpl.html + * or write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + */ + +#ifndef NET_H_ +#define NET_H_ + +#include "globals.h" + +struct json_t; + +void net_init(); + +void* net_handleNewConnection(void *clientPtr); + +struct json_t* net_getListAsJson(); + +void net_getStats(int *clientCount, int *serverCount, uint64_t *bytesSent); + +void net_disconnectAll(); + +void net_waitForAllDisconnected(); + +#endif /* NET_H_ */ diff --git a/src/server/picohttpparser/README.md b/src/server/picohttpparser/README.md new file mode 100644 index 0000000..cb32f58 --- /dev/null +++ b/src/server/picohttpparser/README.md @@ -0,0 +1,116 @@ +PicoHTTPParser +============= + +Copyright (c) 2009-2014 [Kazuho Oku](https://github.com/kazuho), [Tokuhiro Matsuno](https://github.com/tokuhirom), [Daisuke Murase](https://github.com/typester), [Shigeo Mitsunari](https://github.com/herumi) + +PicoHTTPParser is a tiny, primitive, fast HTTP request/response parser. + +Unlike most parsers, it is stateless and does not allocate memory by itself. +All it does is accept pointer to buffer and the output structure, and setups the pointers in the latter to point at the necessary portions of the buffer. + +The code is widely deployed within Perl applications through popular modules that use it, including [Plack](https://metacpan.org/pod/Plack), [Starman](https://metacpan.org/pod/Starman), [Starlet](https://metacpan.org/pod/Starlet), [Furl](https://metacpan.org/pod/Furl). It is also the HTTP/1 parser of [H2O](https://github.com/h2o/h2o). + +Check out [test.c] to find out how to use the parser. + +The software is dual-licensed under the Perl License or the MIT License. + +Usage +----- + +The library exposes four functions: `phr_parse_request`, `phr_parse_response`, `phr_parse_headers`, `phr_decode_chunked`. + +### phr_parse_request + +The example below reads an HTTP request from socket `sock` using `read(2)`, parses it using `phr_parse_request`, and prints the details. + +```c +char buf[4096], *method, *path; +int pret, minor_version; +struct phr_header headers[100]; +size_t buflen = 0, prevbuflen = 0, method_len, path_len, num_headers; +ssize_t rret; + +while (1) { + /* read the request */ + while ((rret = read(sock, buf + buflen, sizeof(buf) - buflen)) == -1 && errno == EINTR) + ; + if (rret <= 0) + return IOError; + prevbuflen = buflen; + buflen += rret; + /* parse the request */ + num_headers = sizeof(headers) / sizeof(headers[0]); + pret = phr_parse_request(buf, buflen, &method, &method_len, &path, &path_len, + &minor_version, headers, &num_headers, prevbuflen); + if (pret > 0) + break; /* successfully parsed the request */ + else if (pret == -1) + return ParseError; + /* request is incomplete, continue the loop */ + assert(pret == -2); + if (buflen == sizeof(buf)) + return RequestIsTooLongError; +} + +printf("request is %d bytes long\n", pret); +printf("method is %.*s\n", (int)method_len, method); +printf("path is %.*s\n", (int)path_len, path); +printf("HTTP version is 1.%d\n", minor_version); +printf("headers:\n"); +for (i = 0; i != num_headers; ++i) { + printf("%.*s: %.*s\n", (int)headers[i].name_len, headers[i].name, + (int)headers[i].value_len, headers[i].value); +} +``` + +### phr_parse_response, phr_parse_headers + +`phr_parse_response` and `phr_parse_headers` provide similar interfaces as `phr_parse_request`. `phr_parse_response` parses an HTTP response, and `phr_parse_headers` parses the headers only. + +### phr_decode_chunked + +The example below decodes incoming data in chunked-encoding. The data is decoded in-place. + +```c +struct phr_chunked_decoder decoder = {}; /* zero-clear */ +char *buf = malloc(4096); +size_t size = 0, capacity = 4096, rsize; +ssize_t rret, pret; + +/* set consume_trailer to 1 to discard the trailing header, or the application + * should call phr_parse_headers to parse the trailing header */ +decoder.consume_trailer = 1; + +do { + /* expand the buffer if necessary */ + if (size == capacity) { + capacity *= 2; + buf = realloc(buf, capacity); + assert(buf != NULL); + } + /* read */ + while ((rret = read(sock, buf + size, capacity - size)) == -1 && errno == EINTR) + ; + if (rret <= 0) + return IOError; + /* decode */ + rsize = rret; + pret = phr_decode_chunked(&decoder, buf + size, &rsize); + if (pret == -1) + return ParseError; + size += rsize; +} while (pret == -2); + +/* successfully decoded the chunked data */ +assert(pret >= 0); +printf("decoded data is at %p (%zu bytes)\n", buf, size); +``` + +Benchmark +--------- + +![benchmark results](http://i.gyazo.com/a85c18d3162dfb46b485bb41e0ad443a.png) + +The benchmark code is from [fukamachi/fast-http@6b91103](https://github.com/fukamachi/fast-http/tree/6b9110347c7a3407310c08979aefd65078518478). + +The internals of picohttpparser has been described to some extent in [my blog entry]( http://blog.kazuhooku.com/2014/11/the-internals-h2o-or-how-to-write-fast.html). diff --git a/src/server/picohttpparser/picohttpparser.c b/src/server/picohttpparser/picohttpparser.c new file mode 100644 index 0000000..cfa05ef --- /dev/null +++ b/src/server/picohttpparser/picohttpparser.c @@ -0,0 +1,620 @@ +/* + * Copyright (c) 2009-2014 Kazuho Oku, Tokuhiro Matsuno, Daisuke Murase, + * Shigeo Mitsunari + * + * The software is licensed under either the MIT License (below) or the Perl + * license. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include <assert.h> +#include <stddef.h> +#include <string.h> +#ifdef __SSE4_2__ +#ifdef _MSC_VER +#include <nmmintrin.h> +#else +#include <x86intrin.h> +#endif +#endif +#include "picohttpparser.h" + +/* $Id$ */ + +#if __GNUC__ >= 3 +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) +#else +#define likely(x) (x) +#define unlikely(x) (x) +#endif + +#ifdef _MSC_VER +#define ALIGNED(n) _declspec(align(n)) +#else +#define ALIGNED(n) __attribute__((aligned(n))) +#endif + +#define IS_PRINTABLE_ASCII(c) ((unsigned char)(c)-040u < 0137u) + +#define CHECK_EOF() \ + if (buf == buf_end) { \ + *ret = -2; \ + return NULL; \ + } + +#define EXPECT_CHAR_NO_CHECK(ch) \ + if (*buf++ != ch) { \ + *ret = -1; \ + return NULL; \ + } + +#define EXPECT_CHAR(ch) \ + CHECK_EOF(); \ + EXPECT_CHAR_NO_CHECK(ch); + +#define ADVANCE_TOKEN(tok, toklen) \ + do { \ + const char *tok_start = buf; \ + static const char ALIGNED(16) ranges2[] = "\000\040\177\177"; \ + int found2; \ + buf = findchar_fast(buf, buf_end, ranges2, sizeof(ranges2) - 1, &found2); \ + if (!found2) { \ + CHECK_EOF(); \ + } \ + while (1) { \ + if (*buf == ' ') { \ + break; \ + } else if (unlikely(!IS_PRINTABLE_ASCII(*buf))) { \ + if ((unsigned char)*buf < '\040' || *buf == '\177') { \ + *ret = -1; \ + return NULL; \ + } \ + } \ + ++buf; \ + CHECK_EOF(); \ + } \ + tok = tok_start; \ + toklen = buf - tok_start; \ + } while (0) + +static const char *token_char_map = "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" + "\0\1\0\1\1\1\1\1\0\0\1\1\0\1\1\0\1\1\1\1\1\1\1\1\1\1\0\0\0\0\0\0" + "\0\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\0\0\0\1\1" + "\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\0\1\0\1\0" + "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" + "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" + "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" + "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"; + +static const char *findchar_fast(const char *buf, const char *buf_end, const char *ranges, size_t ranges_size, int *found) +{ + *found = 0; +#if __SSE4_2__ + if (likely(buf_end - buf >= 16)) { + __m128i ranges16 = _mm_loadu_si128((const __m128i *)ranges); + + size_t left = (buf_end - buf) & ~15; + do { + __m128i b16 = _mm_loadu_si128((const __m128i *)buf); + int r = _mm_cmpestri(ranges16, ranges_size, b16, 16, _SIDD_LEAST_SIGNIFICANT | _SIDD_CMP_RANGES | _SIDD_UBYTE_OPS); + if (unlikely(r != 16)) { + buf += r; + *found = 1; + break; + } + buf += 16; + left -= 16; + } while (likely(left != 0)); + } +#else + /* suppress unused parameter warning */ + (void)buf_end; + (void)ranges; + (void)ranges_size; +#endif + return buf; +} + +static const char *get_token_to_eol(const char *buf, const char *buf_end, struct string *token, int *ret) +{ + const char *token_start = buf; + +#ifdef __SSE4_2__ + static const char ranges1[] = "\0\010" + /* allow HT */ + "\012\037" + /* allow SP and up to but not including DEL */ + "\177\177" + /* allow chars w. MSB set */ + ; + int found; + buf = findchar_fast(buf, buf_end, ranges1, sizeof(ranges1) - 1, &found); + if (found) + goto FOUND_CTL; +#else + /* find non-printable char within the next 8 bytes, this is the hottest code; manually inlined */ + while (likely(buf_end - buf >= 8)) { +#define DOIT() \ + do { \ + if (unlikely(!IS_PRINTABLE_ASCII(*buf))) \ + goto NonPrintable; \ + ++buf; \ + } while (0) + DOIT(); + DOIT(); + DOIT(); + DOIT(); + DOIT(); + DOIT(); + DOIT(); + DOIT(); +#undef DOIT + continue; + NonPrintable: + if ((likely((unsigned char)*buf < '\040') && likely(*buf != '\011')) || unlikely(*buf == '\177')) { + goto FOUND_CTL; + } + ++buf; + } +#endif + for (;; ++buf) { + CHECK_EOF(); + if (unlikely(!IS_PRINTABLE_ASCII(*buf))) { + if ((likely((unsigned char)*buf < '\040') && likely(*buf != '\011')) || unlikely(*buf == '\177')) { + goto FOUND_CTL; + } + } + } +FOUND_CTL: + if (likely(*buf == '\015')) { + ++buf; + EXPECT_CHAR('\012'); + token->l = buf - 2 - token_start; + } else if (*buf == '\012') { + token->l = buf - token_start; + ++buf; + } else { + *ret = -1; + return NULL; + } + token->s = token_start; + + return buf; +} + +static const char *is_complete(const char *buf, const char *buf_end, size_t last_len, int *ret) +{ + int ret_cnt = 0; + buf = last_len < 3 ? buf : buf + last_len - 3; + + while (1) { + CHECK_EOF(); + if (*buf == '\015') { + ++buf; + CHECK_EOF(); + EXPECT_CHAR('\012'); + ++ret_cnt; + } else if (*buf == '\012') { + ++buf; + ++ret_cnt; + } else { + ++buf; + ret_cnt = 0; + } + if (ret_cnt == 2) { + return buf; + } + } + + *ret = -2; + return NULL; +} + +#define PARSE_INT(valp_, mul_) \ + if (*buf < '0' || '9' < *buf) { \ + buf++; \ + *ret = -1; \ + return NULL; \ + } \ + *(valp_) = (mul_) * (*buf++ - '0'); + +#define PARSE_INT_3(valp_) \ + do { \ + int res_ = 0; \ + PARSE_INT(&res_, 100) \ + *valp_ = res_; \ + PARSE_INT(&res_, 10) \ + *valp_ += res_; \ + PARSE_INT(&res_, 1) \ + *valp_ += res_; \ + } while (0) + +/* returned pointer is always within [buf, buf_end), or null */ +static const char *parse_http_version(const char *buf, const char *buf_end, int *minor_version, int *ret) +{ + /* we want at least [HTTP/1.<two chars>] to try to parse */ + if (buf_end - buf < 9) { + *ret = -2; + return NULL; + } + EXPECT_CHAR_NO_CHECK('H'); + EXPECT_CHAR_NO_CHECK('T'); + EXPECT_CHAR_NO_CHECK('T'); + EXPECT_CHAR_NO_CHECK('P'); + EXPECT_CHAR_NO_CHECK('/'); + EXPECT_CHAR_NO_CHECK('1'); + EXPECT_CHAR_NO_CHECK('.'); + PARSE_INT(minor_version, 1); + return buf; +} + +static const char *parse_headers(const char *buf, const char *buf_end, struct phr_header *headers, size_t *num_headers, + size_t max_headers, int *ret) +{ + for (;; ++*num_headers) { + CHECK_EOF(); + if (*buf == '\015') { + ++buf; + EXPECT_CHAR('\012'); + break; + } else if (*buf == '\012') { + ++buf; + break; + } + if (*num_headers == max_headers) { + *ret = -1; + return NULL; + } + if (!(*num_headers != 0 && (*buf == ' ' || *buf == '\t'))) { + /* parsing name, but do not discard SP before colon, see + * http://www.mozilla.org/security/announce/2006/mfsa2006-33.html */ + headers[*num_headers].name.s = buf; + static const char ALIGNED(16) ranges1[] = "\x00 " /* control chars and up to SP */ + "\"\"" /* 0x22 */ + "()" /* 0x28,0x29 */ + ",," /* 0x2c */ + "//" /* 0x2f */ + ":@" /* 0x3a-0x40 */ + "[]" /* 0x5b-0x5d */ + "{\377"; /* 0x7b-0xff */ + int found; + buf = findchar_fast(buf, buf_end, ranges1, sizeof(ranges1) - 1, &found); + if (!found) { + CHECK_EOF(); + } + while (1) { + if (*buf == ':') { + break; + } else if (!token_char_map[(unsigned char)*buf]) { + *ret = -1; + return NULL; + } + ++buf; + CHECK_EOF(); + } + if ((headers[*num_headers].name.l = buf - headers[*num_headers].name.s) == 0) { + *ret = -1; + return NULL; + } + ++buf; + for (;; ++buf) { + CHECK_EOF(); + if (!(*buf == ' ' || *buf == '\t')) { + break; + } + } + } else { + headers[*num_headers].name.s = NULL; + headers[*num_headers].name.l = 0; + } + if ((buf = get_token_to_eol(buf, buf_end, &headers[*num_headers].value, ret)) == NULL) { + return NULL; + } + } + return buf; +} + +static const char *parse_request(const char *buf, const char *buf_end, struct string *method, struct string *path, + int *minor_version, struct phr_header *headers, size_t *num_headers, + size_t max_headers, int *ret) +{ + /* skip first empty line (some clients add CRLF after POST content) */ + CHECK_EOF(); + if (*buf == '\015') { + ++buf; + EXPECT_CHAR('\012'); + } else if (*buf == '\012') { + ++buf; + } + + /* parse request line */ + ADVANCE_TOKEN(method->s, method->l); + ++buf; + ADVANCE_TOKEN(path->s, path->l); + ++buf; + if ((buf = parse_http_version(buf, buf_end, minor_version, ret)) == NULL) { + return NULL; + } + if (*buf == '\015') { + ++buf; + EXPECT_CHAR('\012'); + } else if (*buf == '\012') { + ++buf; + } else { + *ret = -1; + return NULL; + } + + return parse_headers(buf, buf_end, headers, num_headers, max_headers, ret); +} + +int phr_parse_request(const char *buf_start, size_t len, struct string *method, struct string *path, + int *minor_version, struct phr_header *headers, size_t *num_headers, size_t last_len) +{ + const char *buf = buf_start, *buf_end = buf_start + len; + size_t max_headers = *num_headers; + int r; + + method->s = NULL; + method->l = 0; + path->s = NULL; + path->l = 0; + *minor_version = -1; + *num_headers = 0; + + /* if last_len != 0, check if the request is complete (a fast countermeasure + againt slowloris */ + if (last_len != 0 && is_complete(buf, buf_end, last_len, &r) == NULL) { + return r; + } + + if ((buf = parse_request(buf, buf_end, method, path, minor_version, headers, num_headers, max_headers, + &r)) == NULL) { + return r; + } + + return (int)(buf - buf_start); +} + +static const char *parse_response(const char *buf, const char *buf_end, int *minor_version, int *status, struct string *msg, + struct phr_header *headers, size_t *num_headers, size_t max_headers, int *ret) +{ + /* parse "HTTP/1.x" */ + if ((buf = parse_http_version(buf, buf_end, minor_version, ret)) == NULL) { + return NULL; + } + /* skip space */ + if (*buf++ != ' ') { + *ret = -1; + return NULL; + } + /* parse status code, we want at least [:digit:][:digit:][:digit:]<other char> to try to parse */ + if (buf_end - buf < 4) { + *ret = -2; + return NULL; + } + PARSE_INT_3(status); + + /* skip space */ + if (*buf++ != ' ') { + *ret = -1; + return NULL; + } + /* get message */ + if ((buf = get_token_to_eol(buf, buf_end, msg, ret)) == NULL) { + return NULL; + } + + return parse_headers(buf, buf_end, headers, num_headers, max_headers, ret); +} + +int phr_parse_response(const char *buf_start, size_t len, int *minor_version, int *status, struct string *msg, + struct phr_header *headers, size_t *num_headers, size_t last_len) +{ + const char *buf = buf_start, *buf_end = buf + len; + size_t max_headers = *num_headers; + int r; + + *minor_version = -1; + *status = 0; + msg->s = NULL; + msg->l = 0; + *num_headers = 0; + + /* if last_len != 0, check if the response is complete (a fast countermeasure + against slowloris */ + if (last_len != 0 && is_complete(buf, buf_end, last_len, &r) == NULL) { + return r; + } + + if ((buf = parse_response(buf, buf_end, minor_version, status, msg, headers, num_headers, max_headers, &r)) == NULL) { + return r; + } + + return (int)(buf - buf_start); +} + +int phr_parse_headers(const char *buf_start, size_t len, struct phr_header *headers, size_t *num_headers, size_t last_len) +{ + const char *buf = buf_start, *buf_end = buf + len; + size_t max_headers = *num_headers; + int r; + + *num_headers = 0; + + /* if last_len != 0, check if the response is complete (a fast countermeasure + against slowloris */ + if (last_len != 0 && is_complete(buf, buf_end, last_len, &r) == NULL) { + return r; + } + + if ((buf = parse_headers(buf, buf_end, headers, num_headers, max_headers, &r)) == NULL) { + return r; + } + + return (int)(buf - buf_start); +} + +enum { + CHUNKED_IN_CHUNK_SIZE, + CHUNKED_IN_CHUNK_EXT, + CHUNKED_IN_CHUNK_DATA, + CHUNKED_IN_CHUNK_CRLF, + CHUNKED_IN_TRAILERS_LINE_HEAD, + CHUNKED_IN_TRAILERS_LINE_MIDDLE +}; + +static int decode_hex(int ch) +{ + if ('0' <= ch && ch <= '9') { + return ch - '0'; + } else if ('A' <= ch && ch <= 'F') { + return ch - 'A' + 0xa; + } else if ('a' <= ch && ch <= 'f') { + return ch - 'a' + 0xa; + } else { + return -1; + } +} + +ssize_t phr_decode_chunked(struct phr_chunked_decoder *decoder, char *buf, size_t *_bufsz) +{ + size_t dst = 0, src = 0, bufsz = *_bufsz; + ssize_t ret = -2; /* incomplete */ + + while (1) { + switch (decoder->_state) { + case CHUNKED_IN_CHUNK_SIZE: + for (;; ++src) { + int v; + if (src == bufsz) + goto Exit; + if ((v = decode_hex(buf[src])) == -1) { + if (decoder->_hex_count == 0) { + ret = -1; + goto Exit; + } + break; + } + if (decoder->_hex_count == sizeof(size_t) * 2) { + ret = -1; + goto Exit; + } + decoder->bytes_left_in_chunk = decoder->bytes_left_in_chunk * 16 + v; + ++decoder->_hex_count; + } + decoder->_hex_count = 0; + decoder->_state = CHUNKED_IN_CHUNK_EXT; + /* fallthru */ + case CHUNKED_IN_CHUNK_EXT: + /* RFC 7230 A.2 "Line folding in chunk extensions is disallowed" */ + for (;; ++src) { + if (src == bufsz) + goto Exit; + if (buf[src] == '\012') + break; + } + ++src; + if (decoder->bytes_left_in_chunk == 0) { + if (decoder->consume_trailer) { + decoder->_state = CHUNKED_IN_TRAILERS_LINE_HEAD; + break; + } else { + goto Complete; + } + } + decoder->_state = CHUNKED_IN_CHUNK_DATA; + /* fallthru */ + case CHUNKED_IN_CHUNK_DATA: { + size_t avail = bufsz - src; + if (avail < decoder->bytes_left_in_chunk) { + if (dst != src) + memmove(buf + dst, buf + src, avail); + src += avail; + dst += avail; + decoder->bytes_left_in_chunk -= avail; + goto Exit; + } + if (dst != src) + memmove(buf + dst, buf + src, decoder->bytes_left_in_chunk); + src += decoder->bytes_left_in_chunk; + dst += decoder->bytes_left_in_chunk; + decoder->bytes_left_in_chunk = 0; + decoder->_state = CHUNKED_IN_CHUNK_CRLF; + } + /* fallthru */ + case CHUNKED_IN_CHUNK_CRLF: + for (;; ++src) { + if (src == bufsz) + goto Exit; + if (buf[src] != '\015') + break; + } + if (buf[src] != '\012') { + ret = -1; + goto Exit; + } + ++src; + decoder->_state = CHUNKED_IN_CHUNK_SIZE; + break; + case CHUNKED_IN_TRAILERS_LINE_HEAD: + for (;; ++src) { + if (src == bufsz) + goto Exit; + if (buf[src] != '\015') + break; + } + if (buf[src++] == '\012') + goto Complete; + decoder->_state = CHUNKED_IN_TRAILERS_LINE_MIDDLE; + /* fallthru */ + case CHUNKED_IN_TRAILERS_LINE_MIDDLE: + for (;; ++src) { + if (src == bufsz) + goto Exit; + if (buf[src] == '\012') + break; + } + ++src; + decoder->_state = CHUNKED_IN_TRAILERS_LINE_HEAD; + break; + default: + assert(!"decoder is corrupt"); + } + } + +Complete: + ret = bufsz - src; +Exit: + if (dst != src) + memmove(buf + dst, buf + src, bufsz - src); + *_bufsz = dst; + return ret; +} + +int phr_decode_chunked_is_in_data(struct phr_chunked_decoder *decoder) +{ + return decoder->_state == CHUNKED_IN_CHUNK_DATA; +} + +#undef CHECK_EOF +#undef EXPECT_CHAR +#undef ADVANCE_TOKEN diff --git a/src/server/picohttpparser/picohttpparser.h b/src/server/picohttpparser/picohttpparser.h new file mode 100644 index 0000000..b315795 --- /dev/null +++ b/src/server/picohttpparser/picohttpparser.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2009-2014 Kazuho Oku, Tokuhiro Matsuno, Daisuke Murase, + * Shigeo Mitsunari + * + * The software is licensed under either the MIT License (below) or the Perl + * license. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef picohttpparser_h +#define picohttpparser_h + +#include <sys/types.h> + +#ifdef _MSC_VER +#define ssize_t intptr_t +#endif + +/* $Id$ */ + +#ifdef __cplusplus +extern "C" { +#endif + +struct string { + const char *s; + size_t l; +}; + +/* contains name and value of a header (name == NULL if is a continuing line + * of a multiline header */ +struct phr_header { + struct string name; + struct string value; +}; + +/* returns number of bytes consumed if successful, -2 if request is partial, + * -1 if failed */ +int phr_parse_request(const char *buf, size_t len, struct string *method, struct string *path, + int *minor_version, struct phr_header *headers, size_t *num_headers, size_t last_len); + +/* ditto */ +int phr_parse_response(const char *_buf, size_t len, int *minor_version, int *status, struct string *msg, + struct phr_header *headers, size_t *num_headers, size_t last_len); + +/* ditto */ +int phr_parse_headers(const char *buf, size_t len, struct phr_header *headers, size_t *num_headers, size_t last_len); + +/* should be zero-filled before start */ +struct phr_chunked_decoder { + size_t bytes_left_in_chunk; /* number of bytes left in current chunk */ + char consume_trailer; /* if trailing headers should be consumed */ + char _hex_count; + char _state; +}; + +/* the function rewrites the buffer given as (buf, bufsz) removing the chunked- + * encoding headers. When the function returns without an error, bufsz is + * updated to the length of the decoded data available. Applications should + * repeatedly call the function while it returns -2 (incomplete) every time + * supplying newly arrived data. If the end of the chunked-encoded data is + * found, the function returns a non-negative number indicating the number of + * octets left undecoded at the tail of the supplied buffer. Returns -1 on + * error. + */ +ssize_t phr_decode_chunked(struct phr_chunked_decoder *decoder, char *buf, size_t *bufsz); + +/* returns if the chunked decoder is in middle of chunked data */ +int phr_decode_chunked_is_in_data(struct phr_chunked_decoder *decoder); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/server/rpc.c b/src/server/rpc.c new file mode 100644 index 0000000..1ea09cb --- /dev/null +++ b/src/server/rpc.c @@ -0,0 +1,504 @@ +#include "rpc.h" +#include "helper.h" +#include "net.h" +#include "uplink.h" +#include "locks.h" +#include "image.h" +#include "altservers.h" +#include "../shared/sockhelper.h" +#include "fileutil.h" +#include "picohttpparser/picohttpparser.h" +#include "urldecode.h" + +#include <jansson.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +#if JANSSON_VERSION_HEX < 0x020600 +#define json_stringn_nocheck(a,b) json_string_nocheck(a) +#endif + +#define ACL_ALL 0x7fffffff +#define ACL_STATS 1 +#define ACL_CLIENT_LIST 2 +#define ACL_IMAGE_LIST 4 +#define ACL_CONFIG 8 +#define ACL_LOG 16 +#define ACL_ALTSERVERS 32 + +#define HTTP_CLOSE 4 +#define HTTP_KEEPALIVE 9 + +// Make sure compiler does not reserve more space for static strings than required (or rather, does not tell so in sizeof calls) +// TODO Might be time for a dedicated string.h +_Static_assert( sizeof("test") == 5 && sizeof("test2") == 6, "Stringsize messup :/" ); +#define STRCMP(str,chr) ( (str).s != NULL && (str).l == sizeof(chr)-1 && strncmp( (str).s, (chr), MIN((str).l, sizeof(chr)-1) ) == 0 ) +#define STRSTART(str,chr) ( (str).s != NULL && (str).l >= sizeof(chr)-1 && strncmp( (str).s, (chr), MIN((str).l, sizeof(chr)-1) ) == 0 ) +#define SETSTR(name,value) do { name.s = value; name.l = sizeof(value)-1; } while (0) +#define DEFSTR(name,value) static struct string name = { .s = value, .l = sizeof(value)-1 }; +#define chartolower(c) ((char)( (c) >= 'A' && (c) <= 'Z' ? (c) + ('a'-'A') : (c) )) + +DEFSTR(STR_CONNECTION, "connection") +DEFSTR(STR_CLOSE, "close") +DEFSTR(STR_QUERY, "/query") +DEFSTR(STR_Q, "q") + +static inline bool equals(struct string *s1,struct string *s2) +{ + if ( s1->s == NULL ) { + return s2->s == NULL; + } else if ( s2->s == NULL || s1->l != s2->l ) { + return false; + } + return memcmp( s1->s, s2->s, s1->l ) == 0; +} + +static inline bool iequals(struct string *cmpMixed, struct string *cmpLower) +{ + if ( cmpMixed->s == NULL ) { + return cmpLower->s == NULL; + } else if ( cmpLower->s == NULL || cmpMixed->l != cmpLower->l ) { + return false; + } + for ( size_t i = 0; i < cmpMixed->l; ++i ) { + if ( chartolower( cmpMixed->s[i] ) != cmpLower->s[i] ) return false; + } + return true; +} + +#define MAX_ACLS 100 +static int aclCount = 0; +static dnbd3_access_rule_t aclRules[MAX_ACLS]; +static json_int_t randomRunId; +static pthread_spinlock_t aclLock; +#define MAX_CLIENTS 50 +#define CUTOFF_START 40 +static pthread_spinlock_t statusLock; +static struct { + int count; + bool overloaded; +} status; + +static bool handleStatus(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive); +static bool sendReply(int sock, const char *status, const char *ctype, const char *payload, ssize_t plen, int keepAlive); +static void parsePath(struct string *path, struct string *file, struct field *getv, size_t *getc); +static bool hasHeaderValue(struct phr_header *headers, size_t numHeaders, struct string *name, struct string *value); +static int getacl(dnbd3_host_t *host); +static void addacl(int argc, char **argv, void *data); +static void loadAcl(); + +void rpc_init() +{ + spin_init( &aclLock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &statusLock, PTHREAD_PROCESS_PRIVATE ); + randomRunId = (((json_int_t)getpid()) << 16) | (json_int_t)time(NULL); + // </guard> + if ( sizeof(randomRunId) > 4 ) { + int fd = open( "/dev/urandom", O_RDONLY ); + if ( fd != -1 ) { + uint32_t bla = 1; + read( fd, &bla, 4 ); + randomRunId = (randomRunId << 32) | bla; + } + close( fd ); + } + loadAcl(); +} + +#define UPDATE_LOADSTATE(cnt) do { \ + if ( cnt < (CUTOFF_START/2) ) { \ + if ( status.overloaded ) status.overloaded = false; \ + } else if ( cnt > CUTOFF_START ) { \ + if ( !status.overloaded ) status.overloaded = true; \ + } \ +} while (0) + +void rpc_sendStatsJson(int sock, dnbd3_host_t* host, const void* data, const int dataLen) +{ + int permissions = getacl( host ); + if ( permissions == 0 ) { + sendReply( sock, "403 Forbidden", "text/plain", "Access denied", -1, HTTP_CLOSE ); + return; + } + do { + spin_lock( &statusLock ); + const int curCount = ++status.count; + UPDATE_LOADSTATE( curCount ); + spin_unlock( &statusLock ); + if ( curCount > MAX_CLIENTS ) { + sendReply( sock, "503 Service Temporarily Unavailable", "text/plain", "Too many HTTP clients", -1, HTTP_CLOSE ); + goto func_return; + } + } while (0); + char headerBuf[3000]; + if ( dataLen > 0 ) { + // We call this function internally with a maximum data len of sizeof(dnbd3_request_t) so no bounds checking + memcpy( headerBuf, data, dataLen ); + } + size_t hoff = dataLen; + bool hasName = false; + bool ok; + int keepAlive = HTTP_KEEPALIVE; + do { + // Read request from client + struct phr_header headers[100]; + size_t numHeaders, prevLen = 0, consumed; + struct string method, path; + int minorVersion; + do { + // Parse before calling recv, there might be a complete pipelined request in the buffer already + // If the request is incomplete, we allow exactly one additional recv() to complete it. + // This should suffice for real world scenarios as I don't know of any + // HTTP client that sends the request headers in multiple packets. Even + // with pipelining this should not break as we re-enter this loop after + // processing the requests one by one, so a potential partial request in the + // buffer will get another recv() (blocking mode) + // The alternative would be manual tracking of idle/request time to protect + // against never ending requests (slowloris) + int pret; + if ( hoff >= sizeof(headerBuf) ) goto func_return; // Request too large + if ( hoff != 0 ) { + numHeaders = 100; + pret = phr_parse_request( headerBuf, hoff, &method, &path, &minorVersion, headers, &numHeaders, prevLen ); + } else { + // Nothing in buffer yet, just set to -2 which is the phr goto func_return code for "partial request" + pret = -2; + } + if ( pret > 0 ) { + // > 0 means parsing completed without error + consumed = (size_t)pret; + break; + } + // Reaching here means partial request or parse error + if ( pret == -2 ) { // Partial, keep reading + prevLen = hoff; +#ifdef AFL_MODE + ssize_t ret = recv( 0, headerBuf + hoff, sizeof(headerBuf) - hoff, 0 ); +#else + ssize_t ret = recv( sock, headerBuf + hoff, sizeof(headerBuf) - hoff, 0 ); +#endif + if ( ret == 0 ) goto func_return; + if ( ret == -1 ) { + if ( errno == EINTR ) continue; + if ( errno != EAGAIN && errno != EWOULDBLOCK ) { + sendReply( sock, "500 Internal Server Error", "text/plain", "Server made a boo-boo", -1, HTTP_CLOSE ); + } + goto func_return; // Timeout or unknown error + } + hoff += ret; + } else { // Parse error + sendReply( sock, "400 Bad Request", "text/plain", "Server cannot understand what you're trying to say", -1, HTTP_CLOSE ); + goto func_return; + } + } while ( true ); + if ( keepAlive == HTTP_KEEPALIVE ) { + // Only keep the connection alive (and indicate so) if the client seems to support this + if ( minorVersion == 0 || hasHeaderValue( headers, numHeaders, &STR_CONNECTION, &STR_CLOSE ) ) { + keepAlive = HTTP_CLOSE; + } else { // And if there aren't too many active HTTP sessions + spin_lock( &statusLock ); + if ( status.overloaded ) keepAlive = HTTP_CLOSE; + spin_unlock( &statusLock ); + } + } + if ( method.s != NULL && path.s != NULL ) { + // Basic data filled from request parser + // Handle stuff + struct string file; + struct field getv[10]; + size_t getc = 10; + parsePath( &path, &file, getv, &getc ); + if ( method.s && method.s[0] == 'P' ) { + // POST only methods + } + // Don't care if GET or POST + if ( equals( &file, &STR_QUERY ) ) { + ok = handleStatus( sock, permissions, getv, getc, keepAlive ); + } else { + ok = sendReply( sock, "404 Not found", "text/plain", "Nothing", -1, keepAlive ); + } + if ( !ok ) break; + } + // hoff might be beyond end if the client sent another request (burst) + const ssize_t extra = hoff - consumed; + if ( extra > 0 ) { + memmove( headerBuf, headerBuf + consumed, extra ); + } + hoff = extra; + if ( !hasName ) { + hasName = true; + setThreadName( "HTTP" ); + } + } while (true); +func_return:; + do { + spin_lock( &statusLock ); + const int curCount = --status.count; + UPDATE_LOADSTATE( curCount ); + spin_unlock( &statusLock ); + } while (0); +} + +void rpc_sendErrorMessage(int sock, const char* message) +{ + static const char *encoded = NULL; + static size_t len; + if ( encoded == NULL ) { + json_t *tmp = json_pack( "{ss}", "errorMsg", message ); + encoded = json_dumps( tmp, 0 ); + json_decref( tmp ); + len = strlen( encoded ); + } + sendReply( sock, "200 Somewhat OK", "application/json", encoded, len, HTTP_CLOSE ); +} + +static bool handleStatus(int sock, int permissions, struct field *fields, size_t fields_num, int keepAlive) +{ + bool ok; + bool stats = false, images = false, clients = false, space = false; + bool logfile = false, config = false, altservers = false; +#define SETVAR(var) if ( !var && STRCMP(fields[i].value, #var) ) var = true + for (size_t i = 0; i < fields_num; ++i) { + if ( !equals( &fields[i].name, &STR_Q ) ) continue; + SETVAR(stats); + else SETVAR(space); + else SETVAR(images); + else SETVAR(clients); + else SETVAR(logfile); + else SETVAR(config); + else SETVAR(altservers); + } +#undef SETVAR + if ( ( stats || space ) && !(permissions & ACL_STATS) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access statistics", -1, keepAlive ); + } + if ( images && !(permissions & ACL_IMAGE_LIST) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access image list", -1, keepAlive ); + } + if ( clients && !(permissions & ACL_CLIENT_LIST) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access client list", -1, keepAlive ); + } + if ( logfile && !(permissions & ACL_LOG) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access log", -1, keepAlive ); + } + if ( config && !(permissions & ACL_CONFIG) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access config", -1, keepAlive ); + } + if ( altservers && !(permissions & ACL_ALTSERVERS) ) { + return sendReply( sock, "403 Forbidden", "text/plain", "No permission to access altservers", -1, keepAlive ); + } + + json_t *statisticsJson; + if ( stats ) { + int clientCount, serverCount; + uint64_t bytesSent; + const uint64_t bytesReceived = uplink_getTotalBytesReceived(); + net_getStats( &clientCount, &serverCount, &bytesSent ); + statisticsJson = json_pack( "{sIsIsisisIsI}", + "bytesReceived", (json_int_t) bytesReceived, + "bytesSent", (json_int_t) bytesSent, + "clientCount", clientCount, + "serverCount", serverCount, + "uptime", (json_int_t) dnbd3_serverUptime(), + "runId", randomRunId ); + } else { + statisticsJson = json_pack( "{sI}", + "runId", randomRunId ); + } + if ( space ) { + uint64_t spaceTotal = 0, spaceAvail = 0; + file_freeDiskSpace( _basePath, &spaceTotal, &spaceAvail ); + json_object_set_new( statisticsJson, "spaceTotal", json_integer( spaceTotal ) ); + json_object_set_new( statisticsJson, "spaceFree", json_integer( spaceAvail ) ); + } + if ( clients ) { + json_object_set_new( statisticsJson, "clients", net_getListAsJson() ); + } + if ( images ) { + json_object_set_new( statisticsJson, "images", image_getListAsJson() ); + } + if ( logfile ) { + char logbuf[4000]; + ssize_t len = log_fetch( logbuf, sizeof(logbuf) ); + json_t *val; + if ( len <= 0 ) { + val = json_null(); + } else { + val = json_stringn_nocheck( logbuf, (size_t)len ); + + } + json_object_set_new( statisticsJson, "logfile", val ); + } + if ( config ) { + char buf[2000]; + size_t len = globals_dumpConfig( buf, sizeof(buf) ); + json_object_set_new( statisticsJson, "config", json_stringn_nocheck( buf, len ) ); + } + if ( altservers ) { + json_object_set_new( statisticsJson, "altservers", altservers_toJson() ); + } + + char *jsonString = json_dumps( statisticsJson, 0 ); + json_decref( statisticsJson ); + ok = sendReply( sock, "200 OK", "application/json", jsonString, -1, keepAlive ); + free( jsonString ); + return ok; +} + +static bool sendReply(int sock, const char *status, const char *ctype, const char *payload, ssize_t plen, int keepAlive) +{ + if ( plen == -1 ) plen = strlen( payload ); + char buffer[600]; + const char *connection = ( keepAlive == HTTP_KEEPALIVE ) ? "Keep-Alive" : "Close"; + int hlen = snprintf(buffer, sizeof(buffer), "HTTP/1.1 %s\r\n" + "Connection: %s\r\n" + "Content-Type: %s; charset=utf-8\r\n" + "Content-Length: %u\r\n" + "\r\n", + status, connection, ctype, (unsigned int)plen ); + if ( hlen < 0 || hlen >= (int)sizeof(buffer) ) return false; // Truncated + if ( send( sock, buffer, hlen, MSG_MORE ) != hlen ) return false; + if ( !sock_sendAll( sock, payload, plen, 10 ) ) return false; + if ( keepAlive == HTTP_CLOSE ) { + // Wait for flush + shutdown( sock, SHUT_WR ); +#ifdef AFL_MODE + sock = 0; +#endif + while ( read( sock, buffer, sizeof buffer ) > 0 ); + return false; + } + return true; +} + +static void parsePath(struct string *path, struct string *file, struct field *getv, size_t *getc) +{ + size_t i = 0; + while ( i < path->l && path->s[i] != '?' ) ++i; + if ( i == path->l ) { + *getc = 0; + *file = *path; + return; + } + file->s = path->s; + file->l = i; + ++i; + path->s += i; + path->l -= i; + urldecode( path, getv, getc ); + path->s -= i; + path->l += i; +} + +static bool hasHeaderValue(struct phr_header *headers, size_t numHeaders, struct string *name, struct string *value) +{ + for (size_t i = 0; i < numHeaders; ++i) { + if ( !iequals( &headers[i].name, name ) ) continue; + if ( iequals( &headers[i].value, value ) ) return true; + } + return false; +} + +static int getacl(dnbd3_host_t *host) +{ + if ( aclCount == 0 ) return 0x7fffff; // For now compat mode - no rules defined == all access + for (int i = 0; i < aclCount; ++i) { + if ( aclRules[i].bytes == 0 && aclRules[i].bitMask == 0 ) return aclRules[i].permissions; + if ( memcmp( aclRules[i].host, host->addr, aclRules[i].bytes ) != 0 ) continue; + if ( aclRules[i].bitMask != 0 && aclRules[i].host[aclRules[i].bytes] != ( host->addr[aclRules[i].bytes] & aclRules[i].bitMask ) ) continue; + return aclRules[i].permissions; + } +#ifdef AFL_MODE + return 0x7fffff; +#else + return 0; +#endif +} + +#define SETBIT(x) else if ( strcmp( argv[i], #x ) == 0 ) mask |= ACL_ ## x + +static void addacl(int argc, char **argv, void *data UNUSED) +{ + if ( argv[0][0] == '#' ) return; + spin_lock( &aclLock ); + if ( aclCount >= MAX_ACLS ) { + logadd( LOG_WARNING, "Too many ACL rules, ignoring %s", argv[0] ); + goto unlock_end; + } + int mask = 0; + for (int i = 1; i < argc; ++i) { + if (false) {} + SETBIT(ALL); + SETBIT(STATS); + SETBIT(CLIENT_LIST); + SETBIT(IMAGE_LIST); + else logadd( LOG_WARNING, "Invalid ACL flag '%s' for %s", argv[i], argv[0] ); + } + if ( mask == 0 ) { + logadd( LOG_INFO, "Ignoring empty rule for %s", argv[0] ); + goto unlock_end; + } + dnbd3_host_t host; + char *slash = strchr( argv[0], '/' ); + if ( slash != NULL ) { + *slash++ = '\0'; + } + if ( !parse_address( argv[0], &host ) ) goto unlock_end; + long int bits; + if ( slash != NULL ) { + char *last; + bits = strtol( slash, &last, 10 ); + if ( last == slash ) slash = NULL; + if ( host.type == HOST_IP4 && bits > 32 ) bits = 32; + if ( bits > 128 ) bits = 128; + } + if ( slash == NULL ) { + if ( host.type == HOST_IP4 ) { + bits = 32; + } else { + bits = 128; + } + } + memcpy( aclRules[aclCount].host, host.addr, 16 ); + aclRules[aclCount].bytes = (int)( bits / 8 ); + aclRules[aclCount].bitMask = 0; + aclRules[aclCount].permissions = mask; + bits %= 8; + if ( bits != 0 ) { + for (long int i = 0; i < bits; ++i) { + aclRules[aclCount].bitMask = ( aclRules[aclCount].bitMask >> 1 ) | 0x80; + } + aclRules[aclCount].host[aclRules[aclCount].bytes] &= (uint8_t)aclRules[aclCount].bitMask; + } + // We now have .bytes set to the number of bytes to memcmp. + // In case we have an odd bitmask, .bitMask will be != 0, so when comparing, + // we need AND the host[.bytes] of the address to compare with the value + // in .bitMask, and compate it, otherwise, a simple memcmp will do. + aclCount++; +unlock_end:; + spin_unlock( &aclLock ); +} + +static void loadAcl() +{ + static bool inProgress = false; + char *fn; + if ( asprintf( &fn, "%s/%s", _configDir, "rpc.acl" ) == -1 ) return; + spin_lock( &aclLock ); + if ( inProgress ) { + spin_unlock( &aclLock ); + return; + } + aclCount = 0; + inProgress = true; + spin_unlock( &aclLock ); + file_loadLineBased( fn, 1, 20, &addacl, NULL ); + spin_lock( &aclLock ); + inProgress = false; + spin_unlock( &aclLock ); + free( fn ); + logadd( LOG_INFO, "%d HTTPRPC ACL rules loaded", (int)aclCount ); +} + diff --git a/src/server/rpc.h b/src/server/rpc.h new file mode 100644 index 0000000..285242c --- /dev/null +++ b/src/server/rpc.h @@ -0,0 +1,10 @@ +#ifndef _RPC_H_ +#define _RPC_H_ + +struct dnbd3_host_t; + +void rpc_init(); +void rpc_sendStatsJson(int sock, struct dnbd3_host_t* host, const void *data, const int dataLen); +void rpc_sendErrorMessage(int sock, const char* message); + +#endif diff --git a/src/server/serialize.c b/src/server/serialize.c new file mode 100644 index 0000000..4934132 --- /dev/null +++ b/src/server/serialize.c @@ -0,0 +1,5 @@ +#include <stdio.h> +#include <string.h> +#include <stdint.h> + +#include "../serialize.c" diff --git a/src/server/server.c b/src/server/server.c new file mode 100644 index 0000000..10ab208 --- /dev/null +++ b/src/server/server.c @@ -0,0 +1,495 @@ + /* + * This file is part of the Distributed Network Block Device 3 + * + * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> + * + * This file may be licensed under the terms of of the + * GNU General Public License Version 2 (the ``GPL''). + * + * Software distributed under the License is distributed + * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either + * express or implied. See the GPL for the specific language + * governing rights and limitations. + * + * You should have received a copy of the GPL along with this + * program. If not, go to http://www.gnu.org/licenses/gpl.html + * or write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + */ + +#include "server.h" +#include "helper.h" + +#include "locks.h" +#include "image.h" +#include "uplink.h" +#include "net.h" +#include "altservers.h" +#include "integrity.h" +#include "threadpool.h" +#include "rpc.h" + +#include "../version.h" +#include "../shared/sockhelper.h" +#include "../shared/timing.h" + +#include <signal.h> +#include <getopt.h> +#include <assert.h> + +#define LONGOPT_CRC4 1000 +#define LONGOPT_ASSERT 1001 +#define LONGOPT_CREATE 1002 +#define LONGOPT_REVISION 1003 +#define LONGOPT_SIZE 1004 +#define LONGOPT_ERRORMSG 1005 + +static poll_list_t *listeners = NULL; + +/** + * Time the server was started + */ +static ticks startupTime; +static bool sigReload = false, sigLogCycle = false; + +/** + * Copied to in signal handler so we can print info + * later on + */ +static siginfo_t lastSignal; + +void printSignal(); + +static poll_list_t* setupNetwork(char *bindAddress); + +static dnbd3_client_t* dnbd3_prepareClient(struct sockaddr_storage *client, int fd); + +static void dnbd3_handleSignal(int signum); + +static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data); + +static void* server_asyncImageListLoad(void *data); + +/** + * Print help text for usage instructions + */ +void dnbd3_printHelp(char *argv_0) +{ + printf( "Version: %s\n\n", VERSION_STRING ); + printf( "Usage: %s [OPTIONS]...\n", argv_0 ); + printf( "Start the DNBD3 server\n" ); + printf( "-c or --config Configuration directory (default /etc/dnbd3-server/)\n" ); + printf( "-n or --nodaemon Start server in foreground\n" ); + printf( "-b or --bind Local Address to bind to\n" ); + printf( "-h or --help Show this help text and quit\n" ); + printf( "-v or --version Show version and quit\n" ); + printf( "\nManagement functions:\n" ); + printf( "--crc [image-file] Generate crc block list for given image\n" ); + printf( "--create [image-name] --revision [rid] --size [filesize]\n" + "\tCreate a local empty image file with a zeroed cache-map for the specified image\n" ); + printf( "--errormsg [text] Just serve given error message via HTTP, no service otherwise\n" ); + printf( "\n" ); + exit( 0 ); +} + +/** + * Print version information + */ +void dnbd3_printVersion() +{ + printf( "Version: %s\n", VERSION_STRING ); + exit( 0 ); +} + +/** + * Clean up structs, connections, write out data, then exit + */ +void dnbd3_cleanup() +{ + int retries; + + _shutdown = true; + logadd( LOG_INFO, "Cleanup..." ); + + if ( listeners != NULL ) sock_destroyPollList( listeners ); + listeners = NULL; + + // Kill connection to all clients + net_disconnectAll(); + + // Disable threadpool + threadpool_close(); + + // Terminate the altserver checking thread + altservers_shutdown(); + + // Terminate all uplinks + image_killUplinks(); + + // Terminate integrity checker + integrity_shutdown(); + + // Wait for clients to disconnect + net_waitForAllDisconnected(); + + // Watchdog not needed anymore + debug_locks_stop_watchdog(); + + // Clean up images + retries = 5; + while ( !image_tryFreeAll() && --retries > 0 ) { + logadd( LOG_INFO, "Waiting for images to free...\n" ); + sleep( 1 ); + } + + free( _basePath ); + free( _configDir ); + exit( EXIT_SUCCESS ); +} + +/** + * Program entry point + */ +int main(int argc, char *argv[]) +{ + int demonize = 1; + int opt = 0; + int longIndex = 0; + char *paramCreate = NULL; + char *bindAddress = NULL; + char *errorMsg = NULL; + int64_t paramSize = -1; + int paramRevision = -1; + static const char *optString = "b:c:d:hnv?"; + static const struct option longOpts[] = { + { "config", required_argument, NULL, 'c' }, + { "nodaemon", no_argument, NULL, 'n' }, + { "reload", no_argument, NULL, 'r' }, + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, 'v' }, + { "bind", required_argument, NULL, 'b' }, + { "crc", required_argument, NULL, LONGOPT_CRC4 }, + { "assert", no_argument, NULL, LONGOPT_ASSERT }, + { "create", required_argument, NULL, LONGOPT_CREATE }, + { "revision", required_argument, NULL, LONGOPT_REVISION }, + { "size", required_argument, NULL, LONGOPT_SIZE }, + { "errormsg", required_argument, NULL, LONGOPT_ERRORMSG }, + { 0, 0, 0, 0 } + }; + + opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); + + while ( opt != -1 ) { + switch ( opt ) { + case 'c': + _configDir = strdup( optarg ); + break; + case 'n': + demonize = 0; + break; + case 'h': + case '?': + dnbd3_printHelp( argv[0] ); + break; + case 'v': + dnbd3_printVersion(); + break; + case 'b': + bindAddress = strdup( optarg ); + break; + case LONGOPT_CRC4: + return image_generateCrcFile( optarg ) ? 0 : EXIT_FAILURE; + case LONGOPT_ASSERT: + printf( "Testing a failing assertion:\n" ); + assert( 4 == 5 ); + printf( "Assertion 4 == 5 seems to hold. ;-)\n" ); + return EXIT_SUCCESS; + case LONGOPT_CREATE: + paramCreate = strdup( optarg ); + break; + case LONGOPT_REVISION: + paramRevision = atoi( optarg ); + break; + case LONGOPT_SIZE: + paramSize = strtoll( optarg, NULL, 10 ); + break; + case LONGOPT_ERRORMSG: + errorMsg = strdup( optarg ); + break; + } + opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); + } + + // Load general config + + if ( _configDir == NULL ) _configDir = strdup( "/etc/dnbd3-server" ); + globals_loadConfig(); + if ( _basePath == NULL && errorMsg == NULL ) { + logadd( LOG_ERROR, "Aborting, set proper basePath in %s/%s", _configDir, CONFIG_FILENAME ); + exit( EXIT_FAILURE ); + } + + timing_setBase(); + timing_get( &startupTime ); + +#ifdef AFL_MODE + // ###### AFL + // + image_serverStartup(); + net_init(); + uplink_globalsInit(); + rpc_init(); + if ( !image_loadAll( NULL ) || _shutdown ) { + fprintf( stderr, "Error loading images\n" ); + exit( 3 ); + } + { + struct sockaddr_storage client; + memset( &client, 0, sizeof client ); + client.ss_family = AF_INET; + dnbd3_client_t *dnbd3_client = dnbd3_prepareClient( &client, 1 ); + if ( dnbd3_client == NULL ) { + fprintf( stderr, "New client failed\n" ); + exit( 1 ); + } +#ifdef __AFL_HAVE_MANUAL_CONTROL + __AFL_INIT(); +#endif + net_handleNewConnection( dnbd3_client ); + exit( 0 ); + } + // + // ###### AFL END +#endif + + + // One-shots first: + + if ( paramCreate != NULL ) { + return image_create( paramCreate, paramRevision, paramSize ) ? 0 : EXIT_FAILURE; + } + + // No one-shot detected, normal server operation or errormsg serving + if ( demonize ) { + logadd( LOG_INFO, "Forking into background, see log file for further information" ); + daemon( 1, 0 ); + } + if ( errorMsg != NULL ) { + setupNetwork( bindAddress ); + logadd( LOG_INFO, "Running errormsg server" ); + while ( true ) { + const int fd = sock_accept( listeners, NULL, NULL ); + if ( fd >= 0 ) { + rpc_sendErrorMessage( fd, errorMsg ); + } else { + const int err = errno; + if ( err == EINTR || err == EAGAIN ) continue; + logadd( LOG_ERROR, "Client accept failure (err=%d)", err ); + usleep( 10000 ); // 10ms + } + } + exit( 0 ); + } + image_serverStartup(); + altservers_init(); + integrity_init(); + net_init(); + uplink_globalsInit(); + rpc_init(); + logadd( LOG_INFO, "DNBD3 server starting.... Machine type: " ENDIAN_MODE ); + + if ( altservers_load() < 0 ) { + logadd( LOG_WARNING, "Could not load alt-servers. Does the file exist in %s?", _configDir ); + } + +#ifdef _DEBUG + debug_locks_start_watchdog(); +#endif + + // setup signal handler + struct sigaction sa; + memset( &sa, 0, sizeof(sa) ); + sa.sa_sigaction = dnbd3_handleSignal2; + sa.sa_flags = SA_SIGINFO; + //sa.sa_mask = ; + sigaction( SIGTERM, &sa, NULL ); + sigaction( SIGINT, &sa, NULL ); + sigaction( SIGUSR1, &sa, NULL ); + sigaction( SIGHUP, &sa, NULL ); + sigaction( SIGUSR2, &sa, NULL ); + signal( SIGPIPE, SIG_IGN ); + + logadd( LOG_INFO, "Loading images...." ); + // Load all images in base path + if ( !image_loadAll( NULL ) || _shutdown ) { + if ( _shutdown ) { + logadd( LOG_ERROR, "Received shutdown request while loading images." ); + } else { + logadd( LOG_ERROR, "Could not load images." ); + } + free( bindAddress ); + dnbd3_cleanup(); + return _shutdown ? 0 : 1; + } + + // Give other threads some time to start up before accepting connections + usleep( 100000 ); + + // setup network + listeners = setupNetwork( bindAddress ); + + // Initialize thread pool + if ( !threadpool_init( 8 ) ) { + logadd( LOG_ERROR, "Could not init thread pool!\n" ); + exit( EXIT_FAILURE ); + } + + logadd( LOG_INFO, "Server is ready. (%s)", VERSION_STRING ); + + // +++++++++++++++++++++++++++++++++++++++++++++++++++ main loop + struct sockaddr_storage client; + socklen_t len; + int fd; + while ( !_shutdown ) { + // Handle signals + printSignal(); + if ( sigReload ) { + sigReload = false; + logadd( LOG_INFO, "SIGHUP received, re-scanning image directory" ); + threadpool_run( &server_asyncImageListLoad, NULL ); + } + if ( sigLogCycle ) { + sigLogCycle = false; + logadd( LOG_INFO, "SIGUSR2 received, reopening log file..." ); + if ( log_openLogFile( NULL ) ) + logadd( LOG_INFO, "Log file has been reopened." ); + else + logadd( LOG_WARNING, "Could not cycle log file." ); + } + // + len = sizeof(client); + fd = sock_accept( listeners, &client, &len ); + if ( fd < 0 ) { + const int err = errno; + if ( err == EINTR || err == EAGAIN ) continue; + logadd( LOG_ERROR, "Client accept failure (err=%d)", err ); + usleep( 10000 ); // 10ms + continue; + } + + dnbd3_client_t *dnbd3_client = dnbd3_prepareClient( &client, fd ); + if ( dnbd3_client == NULL ) { + close( fd ); + continue; + } + + if ( !threadpool_run( &net_handleNewConnection, (void *)dnbd3_client ) ) { + logadd( LOG_ERROR, "Could not start thread for new connection." ); + free( dnbd3_client ); + continue; + } + } + printSignal(); + free( bindAddress ); + dnbd3_cleanup(); + return 0; +} + +void printSignal() +{ + if ( lastSignal.si_signo != 0 ) { + logadd( LOG_INFO, "Signal %d (via %d) by pid %u, uid %u", + lastSignal.si_signo, lastSignal.si_code, + (unsigned int)lastSignal.si_pid, (unsigned int)lastSignal.si_uid ); + if ( lastSignal.si_pid != 0 ) { + char buffer[500], path[100]; + snprintf( path, sizeof(path), "/proc/%u/exe", (unsigned int)lastSignal.si_pid ); + ssize_t len = readlink( path, buffer, sizeof(buffer) ); + if ( len > 0 ) { + logadd( LOG_INFO, "%u is %.*s", (unsigned int)lastSignal.si_pid, (int)len, buffer ); + } + } + lastSignal.si_signo = 0; + } +} + +static poll_list_t* setupNetwork(char *bindAddress) +{ + listeners = sock_newPollList(); + if ( listeners == NULL ) { + logadd( LOG_ERROR, "Didnt get a poll list!" ); + exit( EXIT_FAILURE ); + } + if ( !sock_listen( listeners, bindAddress, (uint16_t)_listenPort ) ) { + logadd( LOG_ERROR, "Could not listen on any local interface." ); + exit( EXIT_FAILURE ); + } + return listeners; +} + +/** + * Initialize and partially populate the client struct - called when an incoming + * connection is accepted. As this might be an HTTP request we don't initialize the + * locks, that would happen later once we know. + */ +static dnbd3_client_t* dnbd3_prepareClient(struct sockaddr_storage *client, int fd) +{ + dnbd3_client_t *dnbd3_client = calloc( 1, sizeof(dnbd3_client_t) ); + if ( dnbd3_client == NULL ) { // This will never happen thanks to memory overcommit + logadd( LOG_ERROR, "Could not alloc dnbd3_client_t for new client." ); + return NULL; + } + + if ( client->ss_family == AF_INET ) { + struct sockaddr_in *v4 = (struct sockaddr_in *)client; + dnbd3_client->host.type = HOST_IP4; + memcpy( dnbd3_client->host.addr, &(v4->sin_addr), 4 ); + dnbd3_client->host.port = v4->sin_port; + } else if ( client->ss_family == AF_INET6 ) { + struct sockaddr_in6 *v6 = (struct sockaddr_in6 *)client; + dnbd3_client->host.type = HOST_IP6; + memcpy( dnbd3_client->host.addr, &(v6->sin6_addr), 16 ); + dnbd3_client->host.port = v6->sin6_port; + } else { + logadd( LOG_ERROR, "New client has unknown address family %d, disconnecting...", (int)client->ss_family ); + free( dnbd3_client ); + return NULL; + } + dnbd3_client->sock = fd; + return dnbd3_client; +} + +static void dnbd3_handleSignal(int signum) +{ + if ( _shutdown ) return; + if ( signum == SIGINT || signum == SIGTERM ) { + _shutdown = true; + } else if ( signum == SIGUSR1 || signum == SIGHUP ) { + sigReload = true; + } else if ( signum == SIGUSR2 ) { + sigLogCycle = true; + } +} + +static void dnbd3_handleSignal2(int signum, siginfo_t *info, void *data UNUSED) +{ + memcpy( &lastSignal, info, sizeof(siginfo_t) ); + dnbd3_handleSignal( signum ); +} + +uint32_t dnbd3_serverUptime() +{ + ticks now; + timing_get( &now ); + return timing_diff( &startupTime, &now ); +} + +static void* server_asyncImageListLoad(void *data UNUSED) +{ + setThreadName( "img-list-loader" ); + globals_loadConfig(); + image_loadAll( NULL ); + return NULL; +} + diff --git a/src/server/server.h b/src/server/server.h new file mode 100644 index 0000000..bab8421 --- /dev/null +++ b/src/server/server.h @@ -0,0 +1,34 @@ +/* + * This file is part of the Distributed Network Block Device 3 + * + * Copyright(c) 2011-2012 Johann Latocha <johann@latocha.de> + * + * This file may be licensed under the terms of of the + * GNU General Public License Version 2 (the ``GPL''). + * + * Software distributed under the License is distributed + * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either + * express or implied. See the GPL for the specific language + * governing rights and limitations. + * + * You should have received a copy of the GPL along with this + * program. If not, go to http://www.gnu.org/licenses/gpl.html + * or write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + */ + +#ifndef SERVER_H_ +#define SERVER_H_ + +#include "globals.h" +#include "../types.h" + +void dnbd3_cleanup(); +uint32_t dnbd3_serverUptime(); + +#if !defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64 +#error Please set _FILE_OFFSET_BITS to 64 in your makefile/configuration +#endif + +#endif /* SERVER_H_ */ diff --git a/src/server/threadpool.c b/src/server/threadpool.c new file mode 100644 index 0000000..b55fe19 --- /dev/null +++ b/src/server/threadpool.c @@ -0,0 +1,126 @@ +#include "threadpool.h" +#include "globals.h" +#include "helper.h" +#include "locks.h" + +typedef struct _entry_t { + struct _entry_t *next; + pthread_t thread; + dnbd3_signal_t* signal; + void *(*startRoutine)(void *); + void * arg; +} entry_t; + +static void *threadpool_worker(void *entryPtr); + +static pthread_attr_t threadAttrs; + +static int maxIdleThreads = -1; +static entry_t *pool = NULL; +static pthread_spinlock_t poolLock; + +bool threadpool_init(int maxIdle) +{ + if ( maxIdle < 0 || maxIdleThreads >= 0 ) return false; + spin_init( &poolLock, PTHREAD_PROCESS_PRIVATE ); + maxIdleThreads = maxIdle; + pthread_attr_init( &threadAttrs ); + pthread_attr_setdetachstate( &threadAttrs, PTHREAD_CREATE_DETACHED ); + return true; +} + +void threadpool_close() +{ + _shutdown = true; + if ( maxIdleThreads < 0 ) return; + spin_lock( &poolLock ); + maxIdleThreads = -1; + entry_t *ptr = pool; + while ( ptr != NULL ) { + entry_t *current = ptr; + ptr = ptr->next; + signal_call( current->signal ); + } + spin_unlock( &poolLock ); + spin_destroy( &poolLock ); +} + +bool threadpool_run(void *(*startRoutine)(void *), void *arg) +{ + spin_lock( &poolLock ); + entry_t *entry = pool; + if ( entry != NULL ) pool = entry->next; + spin_unlock( &poolLock ); + if ( entry == NULL ) { + entry = (entry_t*)malloc( sizeof(entry_t) ); + if ( entry == NULL ) { + logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); + return false; + } + entry->signal = signal_newBlocking(); + if ( entry->signal == NULL ) { + logadd( LOG_WARNING, "Could not create signal for new thread pool thread\n" ); + free( entry ); + return false; + } + if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) { + logadd( LOG_WARNING, "Could not create new thread for thread pool\n" ); + signal_close( entry->signal ); + free( entry ); + return false; + } + } + entry->next = NULL; + entry->startRoutine = startRoutine; + entry->arg = arg; + signal_call( entry->signal ); + return true; +} + +/** + * This is a worker thread of our thread pool. + */ +static void *threadpool_worker(void *entryPtr) +{ + blockNoncriticalSignals(); + entry_t *entry = (entry_t*)entryPtr; + for ( ;; ) { + // Wait for signal from outside that we have work to do + int ret = signal_clear( entry->signal ); + if ( _shutdown ) break; + if ( ret > 0 ) { + if ( entry->startRoutine == NULL ) { + logadd( LOG_DEBUG1, "Worker woke up but has no work to do!" ); + continue; + } + // Start assigned work + (*entry->startRoutine)( entry->arg ); + // Reset vars for safety + entry->startRoutine = NULL; + entry->arg = NULL; + if ( _shutdown ) break; + // Put thread back into pool if there are less than maxIdleThreds threads, just die otherwise + int threadCount = 0; + spin_lock( &poolLock ); + entry_t *ptr = pool; + while ( ptr != NULL ) { + threadCount++; + ptr = ptr->next; + } + if ( threadCount >= maxIdleThreads ) { + spin_unlock( &poolLock ); + break; + } + entry->next = pool; + pool = entry; + spin_unlock( &poolLock ); + setThreadName( "[pool]" ); + } else { + logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); + } + } + signal_close( entry->signal ); + free( entry ); + return NULL; +} + diff --git a/src/server/threadpool.h b/src/server/threadpool.h new file mode 100644 index 0000000..15dd151 --- /dev/null +++ b/src/server/threadpool.h @@ -0,0 +1,29 @@ +#ifndef _THREADPOOL_H_ +#define _THREADPOOL_H_ + +#include "../types.h" + +/** + * Initialize the thread pool. This must be called before using + * threadpool_run, and must only be called once. + * @param maxIdleThreadCount maximum number of idle threads in the pool + * @return true if initialized successfully + */ +bool threadpool_init(int maxIdleThreadCount); + +/** + * Shut down threadpool. + * Only call if it has been initialized before. + */ +void threadpool_close(); + +/** + * Run a thread using the thread pool. + * @param startRoutine function to run in new thread + * @param arg argument to pass to thead + * @return true if thread was started + */ +bool threadpool_run(void *(*startRoutine)(void *), void *arg); + +#endif + diff --git a/src/server/uplink.c b/src/server/uplink.c new file mode 100644 index 0000000..31b220d --- /dev/null +++ b/src/server/uplink.c @@ -0,0 +1,1034 @@ +#include "uplink.h" +#include "helper.h" +#include "locks.h" +#include "image.h" +#include "altservers.h" +#include "../shared/sockhelper.h" +#include "../shared/protocol.h" +#include "../shared/timing.h" +#include "../shared/crc32.h" + +#include <assert.h> +#include <inttypes.h> +#include <fcntl.h> +#include <poll.h> +#include <unistd.h> +#include <stdatomic.h> + +#define FILE_BYTES_PER_MAP_BYTE ( DNBD3_BLOCK_SIZE * 8 ) +#define MAP_BYTES_PER_HASH_BLOCK (int)( HASH_BLOCK_SIZE / FILE_BYTES_PER_MAP_BYTE ) +#define MAP_INDEX_HASH_START_MASK ( ~(int)( MAP_BYTES_PER_HASH_BLOCK - 1 ) ) + +#define REP_NONE ( (uint64_t)0xffffffffffffffff ) + +static atomic_uint_fast64_t totalBytesReceived = 0; + +static void* uplink_mainloop(void *data); +static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly); +static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int lastBlockIndex); +static void uplink_handleReceive(dnbd3_connection_t *link); +static int uplink_sendKeepalive(const int fd); +static void uplink_addCrc32(dnbd3_connection_t *uplink); +static void uplink_sendReplicationRequest(dnbd3_connection_t *link); +static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force); +static bool uplink_saveCacheMap(dnbd3_connection_t *link); +static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link); +static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew); + +// ############ uplink connection handling + +void uplink_globalsInit() +{ +} + +uint64_t uplink_getTotalBytesReceived() +{ + return (uint64_t)totalBytesReceived; +} + +/** + * Create and initialize an uplink instance for the given + * image. Uplinks run in their own thread. + * Locks on: _images[].lock + */ +bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version) +{ + if ( !_isProxy || _shutdown ) return false; + dnbd3_connection_t *link = NULL; + assert( image != NULL ); + spin_lock( &image->lock ); + if ( image->uplink != NULL && !image->uplink->shutdown ) { + spin_unlock( &image->lock ); + if ( sock >= 0 ) close( sock ); + return true; // There's already an uplink, so should we consider this success or failure? + } + if ( image->cache_map == NULL ) { + logadd( LOG_WARNING, "Uplink was requested for image %s, but it is already complete", image->name ); + goto failure; + } + link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); + spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &link->rttLock, PTHREAD_PROCESS_PRIVATE ); + link->image = image; + link->bytesReceived = 0; + link->idleTime = 0; + link->queueLen = 0; + link->fd = -1; + link->cacheFd = -1; + link->signal = NULL; + link->replicationHandle = REP_NONE; + spin_lock( &link->rttLock ); + link->cycleDetected = false; + if ( sock >= 0 ) { + link->betterFd = sock; + link->betterServer = *host; + link->rttTestResult = RTT_DOCHANGE; + link->betterVersion = version; + } else { + link->betterFd = -1; + link->rttTestResult = RTT_IDLE; + } + spin_unlock( &link->rttLock ); + link->recvBufferLen = 0; + link->shutdown = false; + if ( 0 != thread_create( &(link->thread), NULL, &uplink_mainloop, (void *)link ) ) { + logadd( LOG_ERROR, "Could not start thread for new uplink." ); + goto failure; + } + spin_unlock( &image->lock ); + return true; +failure: ; + if ( link != NULL ) { + free( link ); + link = image->uplink = NULL; + } + spin_unlock( &image->lock ); + return false; +} + +/** + * Locks on image.lock, uplink.lock + * Calling it multiple times, even concurrently, will + * not break anything. + */ +void uplink_shutdown(dnbd3_image_t *image) +{ + bool join = false; + pthread_t thread; + assert( image != NULL ); + spin_lock( &image->lock ); + if ( image->uplink == NULL ) { + spin_unlock( &image->lock ); + return; + } + dnbd3_connection_t * const uplink = image->uplink; + spin_lock( &uplink->queueLock ); + if ( !uplink->shutdown ) { + uplink->shutdown = true; + signal_call( uplink->signal ); + thread = uplink->thread; + join = true; + } + spin_unlock( &uplink->queueLock ); + bool wait = image->uplink != NULL; + spin_unlock( &image->lock ); + if ( join ) thread_join( thread, NULL ); + while ( wait ) { + usleep( 5000 ); + spin_lock( &image->lock ); + wait = image->uplink != NULL && image->uplink->shutdown; + spin_unlock( &image->lock ); + } +} + +/** + * Remove given client from uplink request queue + * Locks on: uplink.queueLock + */ +void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) +{ + spin_lock( &uplink->queueLock ); + for (int i = uplink->queueLen - 1; i >= 0; --i) { + if ( uplink->queue[i].client == client ) { + uplink->queue[i].client = NULL; + uplink->queue[i].status = ULR_FREE; + } + if ( uplink->queue[i].client == NULL && uplink->queueLen == i + 1 ) uplink->queueLen--; + } + spin_unlock( &uplink->queueLock ); +} + +/** + * Request a chunk of data through an uplink server + * Locks on: image.lock, uplink.queueLock + */ +bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hops) +{ + if ( client == NULL || client->image == NULL ) return false; + if ( length > (uint32_t)_maxPayload ) { + logadd( LOG_WARNING, "Cannot relay request by client; length of %" PRIu32 " exceeds maximum payload", length ); + return false; + } + spin_lock( &client->image->lock ); + if ( client->image->uplink == NULL ) { + spin_unlock( &client->image->lock ); + logadd( LOG_DEBUG1, "Uplink request for image with no uplink" ); + return false; + } + dnbd3_connection_t * const uplink = client->image->uplink; + if ( uplink->shutdown ) { + spin_unlock( &client->image->lock ); + logadd( LOG_DEBUG1, "Uplink request for image with uplink shutting down" ); + return false; + } + // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain + // This might be a false positive if there are multiple instances running on the same host (IP) + if ( hops != 0 && isSameAddress( &uplink->currentServer, &client->host ) ) { + spin_unlock( &client->image->lock ); + logadd( LOG_WARNING, "Proxy cycle detected (same host)." ); + spin_lock( &uplink->rttLock ); + uplink->cycleDetected = true; + spin_unlock( &uplink->rttLock ); + signal_call( uplink->signal ); + return false; + } + + int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise + int existingType = -1; // ULR_* type of existing request + int i; + int freeSlot = -1; + bool requestLoop = false; + const uint64_t end = start + length; + + spin_lock( &uplink->queueLock ); + spin_unlock( &client->image->lock ); + for (i = 0; i < uplink->queueLen; ++i) { + if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) { + freeSlot = i; + continue; + } + if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; + if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { + if ( hops > uplink->queue[i].hopCount && uplink->queue[i].from == start && uplink->queue[i].to == end ) { + requestLoop = true; + break; + } + if ( foundExisting == -1 || existingType == ULR_PENDING ) { + foundExisting = i; + existingType = uplink->queue[i].status; + if ( freeSlot != -1 ) break; + } + } + } + if ( requestLoop ) { + spin_unlock( &uplink->queueLock ); + logadd( LOG_WARNING, "Rejecting relay of request to upstream proxy because of possible cyclic proxy chain. Incoming hop-count is %" PRIu8 ".", hops ); + spin_lock( &uplink->rttLock ); + uplink->cycleDetected = true; + spin_unlock( &uplink->rttLock ); + signal_call( uplink->signal ); + return false; + } + if ( freeSlot == -1 ) { + if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { + spin_unlock( &uplink->queueLock ); + logadd( LOG_WARNING, "Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); + return false; + } + freeSlot = uplink->queueLen++; + } + // Do not send request to uplink server if we have a matching pending request AND the request either has the + // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise + // explicitly send this request to the uplink server. The second condition mentioned here is to prevent + // a race condition where the reply for the outstanding request already arrived and the uplink thread + // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might + // already have passed the index of the free slot we determined, but not reached the existing request we just found above. + if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // -1 means "send request" +#ifdef _DEBUG + if ( foundExisting != -1 ) { + logadd( LOG_DEBUG2, "%p (%s) Found existing request of type %s at slot %d, attaching in slot %d.\n", (void*)uplink, uplink->image->name, existingType == ULR_NEW ? "ULR_NEW" : "ULR_PENDING", foundExisting, freeSlot ); + logadd( LOG_DEBUG2, "Original %" PRIu64 "-%" PRIu64 " (%p)\n" + "New %" PRIu64 "-%" PRIu64 " (%p)\n", + uplink->queue[foundExisting].from, uplink->queue[foundExisting].to, (void*)uplink->queue[foundExisting].client, + start, end, (void*)client ); + } +#endif + // Fill structure + uplink->queue[freeSlot].from = start; + uplink->queue[freeSlot].to = end; + uplink->queue[freeSlot].handle = handle; + uplink->queue[freeSlot].client = client; + //int old = uplink->queue[freeSlot].status; + uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING); + uplink->queue[freeSlot].hopCount = hops; +#ifdef _DEBUG + timing_get( &uplink->queue[freeSlot].entered ); + //logadd( LOG_DEBUG2 %p] Inserting request at slot %d, was %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 "\n", (void*)uplink, freeSlot, old, uplink->queue[freeSlot].status, uplink->queue[freeSlot, ".handle, start, end ); +#endif + spin_unlock( &uplink->queueLock ); + + if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed + if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); + } + } + return true; +} + +/** + * Uplink thread. + * Locks are irrelevant as this is never called from another function + */ +static void* uplink_mainloop(void *data) +{ +#define EV_SIGNAL (0) +#define EV_SOCKET (1) +#define EV_COUNT (2) + struct pollfd events[EV_COUNT]; + dnbd3_connection_t * const link = (dnbd3_connection_t*)data; + int numSocks, i, waitTime; + int altCheckInterval = SERVER_RTT_INTERVAL_INIT; + uint32_t discoverFailCount = 0; + uint32_t unsavedSeconds = 0; + ticks nextAltCheck, lastKeepalive; + char buffer[200]; + memset( events, 0, sizeof(events) ); + timing_get( &nextAltCheck ); + lastKeepalive = nextAltCheck; + // + assert( link != NULL ); + setThreadName( "idle-uplink" ); + blockNoncriticalSignals(); + // Make sure file is open for writing + if ( !uplink_reopenCacheFd( link, false ) ) { + // It might have failed - still offer proxy mode, we just can't cache + logadd( LOG_WARNING, "Cannot open cache file %s for writing (errno=%d); will just proxy traffic without caching!", link->image->path, errno ); + } + // + link->signal = signal_new(); + if ( link->signal == NULL ) { + logadd( LOG_WARNING, "error creating signal. Uplink unavailable." ); + goto cleanup; + } + events[EV_SIGNAL].events = POLLIN; + events[EV_SIGNAL].fd = signal_getWaitFd( link->signal ); + events[EV_SOCKET].fd = -1; + while ( !_shutdown && !link->shutdown ) { + // poll() + spin_lock( &link->rttLock ); + waitTime = link->rttTestResult == RTT_DOCHANGE ? 0 : -1; + spin_unlock( &link->rttLock ); + if ( waitTime == 0 ) { + // Nothing + } else if ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) { + waitTime = 1000; + } else { + declare_now; + waitTime = (int)timing_diffMs( &now, &nextAltCheck ); + if ( waitTime < 100 ) waitTime = 100; + if ( waitTime > 5000 ) waitTime = 5000; + } + events[EV_SOCKET].fd = link->fd; + numSocks = poll( events, EV_COUNT, waitTime ); + if ( _shutdown || link->shutdown ) goto cleanup; + if ( numSocks == -1 ) { // Error? + if ( errno == EINTR ) continue; + logadd( LOG_DEBUG1, "poll() error %d", (int)errno ); + usleep( 10000 ); + continue; + } + // Check if server switch is in order + spin_lock( &link->rttLock ); + if ( link->rttTestResult != RTT_DOCHANGE ) { + spin_unlock( &link->rttLock ); + } else { + link->rttTestResult = RTT_IDLE; + // The rttTest worker thread has finished our request. + // And says it's better to switch to another server + const int fd = link->fd; + link->fd = link->betterFd; + link->betterFd = -1; + link->currentServer = link->betterServer; + link->version = link->betterVersion; + link->cycleDetected = false; + spin_unlock( &link->rttLock ); + discoverFailCount = 0; + if ( fd != -1 ) close( fd ); + link->replicationHandle = REP_NONE; + link->image->working = true; + link->replicatedLastBlock = false; // Reset this to be safe - request could've been sent but reply was never received + buffer[0] = '@'; + if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { + logadd( LOG_DEBUG1, "(Uplink %s) Now connected to %s\n", link->image->name, buffer + 1 ); + setThreadName( buffer ); + } + // If we don't have a crc32 list yet, see if the new server has one + if ( link->image->crc32 == NULL ) { + uplink_addCrc32( link ); + } + // Re-send all pending requests + uplink_sendRequests( link, false ); + uplink_sendReplicationRequest( link ); + events[EV_SOCKET].events = POLLIN | POLLRDHUP; + timing_gets( &nextAltCheck, altCheckInterval ); + // The rtt worker already did the handshake for our image, so there's nothing + // more to do here + } + // Check events + // Signal + if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" ); + goto cleanup; + } else if ( (events[EV_SIGNAL].revents & POLLIN) ) { + // signal triggered -> pending requests + if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name ); + } + if ( link->fd != -1 ) { + // Uplink seems fine, relay requests to it... + uplink_sendRequests( link, true ); + } else { // No uplink; maybe it was shutdown since it was idle for too long + link->idleTime = 0; + } + } + // Uplink socket + if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + uplink_connectionFailed( link, true ); + logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); + setThreadName( "panic-uplink" ); + } else if ( (events[EV_SOCKET].revents & POLLIN) ) { + uplink_handleReceive( link ); + if ( _shutdown || link->shutdown ) goto cleanup; + } + declare_now; + uint32_t timepassed = timing_diff( &lastKeepalive, &now ); + if ( timepassed >= SERVER_UPLINK_KEEPALIVE_INTERVAL ) { + lastKeepalive = now; + link->idleTime += timepassed; + unsavedSeconds += timepassed; + if ( unsavedSeconds > 240 || ( unsavedSeconds > 60 && link->idleTime >= 20 && link->idleTime <= 70 ) ) { + // fsync/save every 4 minutes, or every 60 seconds if link is idle + unsavedSeconds = 0; + uplink_saveCacheMap( link ); + } + // Keep-alive + if ( link->fd != -1 && link->replicationHandle == REP_NONE ) { + // Send keep-alive if nothing is happening + if ( uplink_sendKeepalive( link->fd ) ) { + // Re-trigger periodically, in case it requires a minimum user count + uplink_sendReplicationRequest( link ); + } else { + uplink_connectionFailed( link, true ); + logadd( LOG_DEBUG1, "Error sending keep-alive, panic!\n" ); + setThreadName( "panic-uplink" ); + } + } + // Don't keep link established if we're idle for too much + if ( link->fd != -1 && uplink_connectionShouldShutdown( link ) ) { + close( link->fd ); + link->fd = events[EV_SOCKET].fd = -1; + link->cycleDetected = false; + if ( link->recvBufferLen != 0 ) { + link->recvBufferLen = 0; + free( link->recvBuffer ); + link->recvBuffer = NULL; + } + logadd( LOG_DEBUG1, "Closing idle uplink for image %s:%d", link->image->name, (int)link->image->rid ); + setThreadName( "idle-uplink" ); + } + } + // See if we should trigger an RTT measurement + spin_lock( &link->rttLock ); + const int rttTestResult = link->rttTestResult; + spin_unlock( &link->rttLock ); + if ( rttTestResult == RTT_IDLE || rttTestResult == RTT_DONTCHANGE ) { + if ( timing_reached( &nextAltCheck, &now ) || ( link->fd == -1 && !uplink_connectionShouldShutdown( link ) ) || link->cycleDetected ) { + // It seems it's time for a check + if ( image_isComplete( link->image ) ) { + // Quit work if image is complete + logadd( LOG_INFO, "Replication of %s complete.", link->image->name ); + setThreadName( "finished-uplink" ); + goto cleanup; + } else if ( !uplink_connectionShouldShutdown( link ) ) { + // Not complete - do measurement + altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) + if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { + link->nextReplicationIndex = 0; + } + } + altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_INTERVAL_MAX); + timing_set( &nextAltCheck, &now, altCheckInterval ); + } + } else if ( rttTestResult == RTT_NOT_REACHABLE ) { + spin_lock( &link->rttLock ); + link->rttTestResult = RTT_IDLE; + spin_unlock( &link->rttLock ); + discoverFailCount++; + timing_set( &nextAltCheck, &now, (discoverFailCount < SERVER_RTT_BACKOFF_COUNT ? altCheckInterval : SERVER_RTT_INTERVAL_FAILED) ); + } +#ifdef _DEBUG + if ( link->fd != -1 && !link->shutdown ) { + bool resend = false; + ticks deadline; + timing_set( &deadline, &now, -10 ); + spin_lock( &link->queueLock ); + for (i = 0; i < link->queueLen; ++i) { + if ( link->queue[i].status != ULR_FREE && timing_reached( &link->queue[i].entered, &deadline ) ) { + snprintf( buffer, sizeof(buffer), "[DEBUG %p] Starving request slot %d detected:\n" + "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", (void*)link, i, link->queue[i].client->image->name, + link->queue[i].from, link->queue[i].to, link->queue[i].status ); + link->queue[i].entered = now; +#ifdef _DEBUG_RESEND_STARVING + link->queue[i].status = ULR_NEW; + resend = true; +#endif + spin_unlock( &link->queueLock ); + logadd( LOG_WARNING, "%s", buffer ); + spin_lock( &link->queueLock ); + } + } + spin_unlock( &link->queueLock ); + if ( resend ) + uplink_sendRequests( link, true ); + } +#endif + } + cleanup: ; + altservers_removeUplink( link ); + uplink_saveCacheMap( link ); + spin_lock( &link->image->lock ); + if ( link->image->uplink == link ) { + link->image->uplink = NULL; + } + spin_lock( &link->queueLock ); + const int fd = link->fd; + const dnbd3_signal_t* signal = link->signal; + link->fd = -1; + link->signal = NULL; + if ( !link->shutdown ) { + link->shutdown = true; + thread_detach( link->thread ); + } + // Do not access link->image after unlocking, since we set + // image->uplink to NULL. Acquire with image_lock first, + // like done below when checking whether to re-init uplink + spin_unlock( &link->image->lock ); + spin_unlock( &link->queueLock ); + if ( fd != -1 ) close( fd ); + if ( signal != NULL ) signal_close( signal ); + // Wait for the RTT check to finish/fail if it's in progress + while ( link->rttTestResult == RTT_INPROGRESS ) + usleep( 10000 ); + if ( link->betterFd != -1 ) { + close( link->betterFd ); + } + spin_destroy( &link->queueLock ); + spin_destroy( &link->rttLock ); + free( link->recvBuffer ); + link->recvBuffer = NULL; + if ( link->cacheFd != -1 ) { + close( link->cacheFd ); + } + dnbd3_image_t *image = image_lock( link->image ); + free( link ); // !!! + if ( image != NULL ) { + if ( !_shutdown && image->cache_map != NULL ) { + // Ingegrity checker must have found something in the meantime + uplink_init( image, -1, NULL, 0 ); + } + image_release( image ); + } + return NULL ; +} + +static void uplink_sendRequests(dnbd3_connection_t *link, bool newOnly) +{ + // Scan for new requests + int j; + spin_lock( &link->queueLock ); + for (j = 0; j < link->queueLen; ++j) { + if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue; + link->queue[j].status = ULR_PENDING; + uint8_t hops = link->queue[j].hopCount; + const uint64_t reqStart = link->queue[j].from & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + const uint32_t reqSize = (uint32_t)(((link->queue[j].to + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1)) - reqStart); + /* + logadd( LOG_DEBUG2, "[%p] Sending slot %d, now %d, handle %" PRIu64 ", Range: %" PRIu64 "-%" PRIu64 " (%" PRIu64 "-%" PRIu64 ")", + (void*)link, j, link->queue[j].status, link->queue[j].handle, link->queue[j].from, link->queue[j].to, reqStart, reqStart+reqSize ); + */ + spin_unlock( &link->queueLock ); + if ( hops < 200 ) ++hops; + const int ret = dnbd3_get_block( link->fd, reqStart, reqSize, reqStart, COND_HOPCOUNT( link->version, hops ) ); + if ( !ret ) { + // Non-critical - if the connection dropped or the server was changed + // the thread will re-send this request as soon as the connection + // is reestablished. + logadd( LOG_DEBUG1, "Error forwarding request to uplink server!\n" ); + altservers_serverFailed( &link->currentServer ); + return; + } + spin_lock( &link->queueLock ); + } + spin_unlock( &link->queueLock ); +} + +/** + * Send a block request to an uplink server without really having + * any client that needs that data. This will be used for background replication. + * + * We'll go through the cache map of the image and look for bytes that don't have + * all bits set. We then request the corresponding 8 blocks of 4kb from the uplink + * server. This means we might request data we already have, but it makes + * the code simpler. Worst case would be only one bit is zero, which means + * 4kb are missing, but we will request 32kb. + */ +static void uplink_sendReplicationRequest(dnbd3_connection_t *link) +{ + if ( link == NULL || link->fd == -1 ) return; + if ( _backgroundReplication == BGR_DISABLED || link->cacheFd == -1 ) return; // Don't do background replication + if ( link->nextReplicationIndex == -1 || link->replicationHandle != REP_NONE ) + return; + dnbd3_image_t * const image = link->image; + if ( image->virtualFilesize < DNBD3_BLOCK_SIZE ) return; + spin_lock( &image->lock ); + if ( image == NULL || image->cache_map == NULL || image->users < _bgrMinClients ) { + // No cache map (=image complete), or replication pending, or not enough users, do nothing + spin_unlock( &image->lock ); + return; + } + const int mapBytes = IMGSIZE_TO_MAPBYTES( image->virtualFilesize ); + const int lastBlockIndex = mapBytes - 1; + int endByte; + if ( _backgroundReplication == BGR_FULL ) { // Full mode: consider all blocks + endByte = link->nextReplicationIndex + mapBytes; + } else { // Hashblock based: Only look for match in current hash block + endByte = ( link->nextReplicationIndex + MAP_BYTES_PER_HASH_BLOCK ) & MAP_INDEX_HASH_START_MASK; + if ( endByte > mapBytes ) { + endByte = mapBytes; + } + } + int replicationIndex = -1; + for ( int j = link->nextReplicationIndex; j < endByte; ++j ) { + const int i = j % ( mapBytes ); // Wrap around for BGR_FULL + if ( image->cache_map[i] != 0xff && ( i != lastBlockIndex || !link->replicatedLastBlock ) ) { + // Found incomplete one + replicationIndex = i; + break; + } + } + spin_unlock( &image->lock ); + if ( replicationIndex == -1 && _backgroundReplication == BGR_HASHBLOCK ) { + // Nothing left in current block, find next one + replicationIndex = uplink_findNextIncompleteHashBlock( link, endByte ); + } + if ( replicationIndex == -1 ) { + // Replication might be complete, uplink_mainloop should take care.... + link->nextReplicationIndex = -1; + return; + } + const uint64_t offset = (uint64_t)replicationIndex * FILE_BYTES_PER_MAP_BYTE; + link->replicationHandle = offset; + const uint32_t size = (uint32_t)MIN( image->virtualFilesize - offset, FILE_BYTES_PER_MAP_BYTE ); + if ( !dnbd3_get_block( link->fd, offset, size, link->replicationHandle, COND_HOPCOUNT( link->version, 1 ) ) ) { + logadd( LOG_DEBUG1, "Error sending background replication request to uplink server!\n" ); + return; + } + if ( replicationIndex == lastBlockIndex ) { + link->replicatedLastBlock = true; // Special treatment, last byte in map could represent less than 8 blocks + } + link->nextReplicationIndex = replicationIndex + 1; // Remember last incomplete offset for next time so we don't play Schlemiel the painter + if ( _backgroundReplication == BGR_HASHBLOCK + && link->nextReplicationIndex % MAP_BYTES_PER_HASH_BLOCK == 0 ) { + // Just crossed a hash block boundary, look for new candidate starting at this very index + link->nextReplicationIndex = uplink_findNextIncompleteHashBlock( link, link->nextReplicationIndex ); + } +} + +/** + * find next index into cache_map that corresponds to the beginning + * of a hash block which is neither completely empty nor completely + * replicated yet. Returns -1 if no match. + */ +static int uplink_findNextIncompleteHashBlock(dnbd3_connection_t *link, const int startMapIndex) +{ + int retval = -1; + spin_lock( &link->image->lock ); + const int mapBytes = IMGSIZE_TO_MAPBYTES( link->image->virtualFilesize ); + const uint8_t *cache_map = link->image->cache_map; + if ( cache_map != NULL ) { + int j; + const int start = ( startMapIndex & MAP_INDEX_HASH_START_MASK ); + for (j = 0; j < mapBytes; ++j) { + const int i = ( start + j ) % mapBytes; + const bool isFull = cache_map[i] == 0xff || ( i + 1 == mapBytes && link->replicatedLastBlock ); + const bool isEmpty = cache_map[i] == 0; + if ( !isEmpty && !isFull ) { + // Neither full nor empty, replicate + if ( retval == -1 ) { + retval = i; + } + break; + } + if ( ( i & MAP_INDEX_HASH_START_MASK ) == i ) { + // Reset state if we just crossed into the next hash chunk + retval = ( isEmpty ) ? ( i ) : ( -1 ); + } else if ( isFull ) { + if ( retval != -1 ) { + // It's a full one, previous one was empty -> replicate + break; + } + } else if ( isEmpty ) { + if ( retval == -1 ) { // Previous one was full -> replicate + retval = i; + break; + } + } + } + if ( j == mapBytes ) { // Nothing found, loop ran until end + retval = -1; + } + } + spin_unlock( &link->image->lock ); + return retval; +} + +/** + * Receive data from uplink server and process/dispatch + * Locks on: link.lock, images[].lock + */ +static void uplink_handleReceive(dnbd3_connection_t *link) +{ + dnbd3_reply_t inReply, outReply; + int ret, i; + for (;;) { + ret = dnbd3_read_reply( link->fd, &inReply, false ); + if ( unlikely( ret == REPLY_INTR ) && likely( !_shutdown && !link->shutdown ) ) continue; + if ( ret == REPLY_AGAIN ) break; + if ( unlikely( ret == REPLY_CLOSED ) ) { + logadd( LOG_INFO, "Uplink: Remote host hung up (%s)", link->image->path ); + goto error_cleanup; + } + if ( unlikely( ret == REPLY_WRONGMAGIC ) ) { + logadd( LOG_WARNING, "Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); + goto error_cleanup; + } + if ( unlikely( ret != REPLY_OK ) ) { + logadd( LOG_INFO, "Uplink: Connection error %d (%s)", ret, link->image->path ); + goto error_cleanup; + } + if ( unlikely( inReply.size > (uint32_t)_maxPayload ) ) { + logadd( LOG_WARNING, "Pure evil: Uplink server sent too much payload (%" PRIu32 ") for %s", inReply.size, link->image->path ); + goto error_cleanup; + } + + if ( unlikely( link->recvBufferLen < inReply.size ) ) { + link->recvBufferLen = MIN((uint32_t)_maxPayload, inReply.size + 65536); + link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); + if ( link->recvBuffer == NULL ) { + logadd( LOG_ERROR, "Out of memory when trying to allocate receive buffer for uplink" ); + exit( 1 ); + } + } + if ( unlikely( (uint32_t)sock_recv( link->fd, link->recvBuffer, inReply.size ) != inReply.size ) ) { + logadd( LOG_INFO, "Lost connection to uplink server of %s (payload)", link->image->path ); + goto error_cleanup; + } + // Payload read completely + // Bail out if we're not interested + if ( unlikely( inReply.cmd != CMD_GET_BLOCK ) ) continue; + // Is a legit block reply + struct iovec iov[2]; + const uint64_t start = inReply.handle; + const uint64_t end = inReply.handle + inReply.size; + totalBytesReceived += inReply.size; + link->bytesReceived += inReply.size; + // 1) Write to cache file + if ( unlikely( link->cacheFd == -1 ) ) { + uplink_reopenCacheFd( link, false ); + } + if ( likely( link->cacheFd != -1 ) ) { + int err = 0; + bool tryAgain = true; // Allow one retry in case we run out of space or the write fd became invalid + uint32_t done = 0; + ret = 0; + while ( done < inReply.size ) { + ret = (int)pwrite( link->cacheFd, link->recvBuffer + done, inReply.size - done, start + done ); + if ( unlikely( ret == -1 ) ) { + err = errno; + if ( err == EINTR ) continue; + if ( err == ENOSPC || err == EDQUOT ) { + // try to free 256MiB + if ( !tryAgain || !image_ensureDiskSpaceLocked( 256ull * 1024 * 1024, true ) ) break; + tryAgain = false; + continue; // Success, retry write + } + if ( err == EBADF || err == EINVAL || err == EIO ) { + if ( !tryAgain || !uplink_reopenCacheFd( link, true ) ) + break; + tryAgain = false; + continue; // Write handle to image successfully re-opened, try again + } + logadd( LOG_DEBUG1, "Error trying to cache data for %s:%d -- errno=%d", link->image->name, (int)link->image->rid, err ); + break; + } + if ( unlikely( ret <= 0 || (uint32_t)ret > inReply.size - done ) ) { + logadd( LOG_WARNING, "Unexpected return value %d from pwrite to %s:%d", ret, link->image->name, (int)link->image->rid ); + break; + } + done += (uint32_t)ret; + } + if ( likely( done > 0 ) ) { + image_updateCachemap( link->image, start, start + done, true ); + } + if ( unlikely( ret == -1 && ( err == EBADF || err == EINVAL || err == EIO ) ) ) { + logadd( LOG_WARNING, "Error writing received data for %s:%d (errno=%d); disabling caching.", + link->image->name, (int)link->image->rid, err ); + } + } + // 2) Figure out which clients are interested in it + spin_lock( &link->queueLock ); + for (i = 0; i < link->queueLen; ++i) { + dnbd3_queued_request_t * const req = &link->queue[i]; + assert( req->status != ULR_PROCESSING ); + if ( req->status != ULR_PENDING && req->status != ULR_NEW ) continue; + assert( req->client != NULL ); + if ( req->from >= start && req->to <= end ) { // Match :-) + req->status = ULR_PROCESSING; + } + } + // 3) Send to interested clients - iterate backwards so request collaboration works, and + // so we can decrease queueLen on the fly while iterating. Should you ever change this to start + // from 0, you also need to change the "attach to existing request"-logic in uplink_request() + outReply.magic = dnbd3_packet_magic; + bool served = false; + for ( i = link->queueLen - 1; i >= 0; --i ) { + dnbd3_queued_request_t * const req = &link->queue[i]; + if ( req->status == ULR_PROCESSING ) { + size_t bytesSent = 0; + assert( req->from >= start && req->to <= end ); + dnbd3_client_t * const client = req->client; + outReply.cmd = CMD_GET_BLOCK; + outReply.handle = req->handle; + outReply.size = (uint32_t)( req->to - req->from ); + iov[0].iov_base = &outReply; + iov[0].iov_len = sizeof outReply; + iov[1].iov_base = link->recvBuffer + (req->from - start); + iov[1].iov_len = outReply.size; + fixup_reply( outReply ); + req->status = ULR_FREE; + req->client = NULL; + served = true; + pthread_mutex_lock( &client->sendMutex ); + spin_unlock( &link->queueLock ); + if ( client->sock != -1 ) { + ssize_t sent = writev( client->sock, iov, 2 ); + if ( sent > (ssize_t)sizeof outReply ) { + bytesSent = (size_t)sent - sizeof outReply; + } + } + pthread_mutex_unlock( &client->sendMutex ); + if ( bytesSent != 0 ) { + client->bytesSent += bytesSent; + } + spin_lock( &link->queueLock ); + } + if ( req->status == ULR_FREE && i == link->queueLen - 1 ) link->queueLen--; + } + spin_unlock( &link->queueLock ); +#ifdef _DEBUG + if ( !served && start != link->replicationHandle ) { + logadd( LOG_DEBUG2, "%p, %s -- Unmatched reply: %" PRIu64 " to %" PRIu64, (void*)link, link->image->name, start, end ); + } +#endif + if ( start == link->replicationHandle ) { + // Was our background replication + link->replicationHandle = REP_NONE; + // Try to remove from fs cache if no client was interested in this data + if ( !served && link->cacheFd != -1 ) { + posix_fadvise( link->cacheFd, start, inReply.size, POSIX_FADV_DONTNEED ); + } + } + if ( served ) { + // Was some client -- reset idle counter + link->idleTime = 0; + // Re-enable replication if disabled + if ( link->nextReplicationIndex == -1 ) { + link->nextReplicationIndex = (int)( start / FILE_BYTES_PER_MAP_BYTE ) & MAP_INDEX_HASH_START_MASK; + } + } + } + if ( link->replicationHandle == REP_NONE ) { + spin_lock( &link->queueLock ); + const bool rep = ( link->queueLen == 0 ); + spin_unlock( &link->queueLock ); + if ( rep ) uplink_sendReplicationRequest( link ); + } + return; + // Error handling from failed receive or message parsing + error_cleanup: ; + uplink_connectionFailed( link, true ); +} + +static void uplink_connectionFailed(dnbd3_connection_t *link, bool findNew) +{ + if ( link->fd == -1 ) + return; + altservers_serverFailed( &link->currentServer ); + close( link->fd ); + link->fd = -1; + link->replicationHandle = REP_NONE; + if ( _backgroundReplication == BGR_FULL && link->nextReplicationIndex == -1 ) { + link->nextReplicationIndex = 0; + } + if ( !findNew ) + return; + spin_lock( &link->rttLock ); + bool bail = link->rttTestResult == RTT_INPROGRESS || link->betterFd != -1; + spin_unlock( &link->rttLock ); + if ( bail ) + return; + altservers_findUplink( link ); +} + +/** + * Send keep alive request to server + */ +static int uplink_sendKeepalive(const int fd) +{ + static dnbd3_request_t request = { 0 }; + if ( request.magic == 0 ) { + request.magic = dnbd3_packet_magic; + request.cmd = CMD_KEEPALIVE; + fixup_request( request ); + } + return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); +} + +static void uplink_addCrc32(dnbd3_connection_t *uplink) +{ + dnbd3_image_t *image = uplink->image; + if ( image == NULL || image->virtualFilesize == 0 ) return; + size_t bytes = IMGSIZE_TO_HASHBLOCKS( image->virtualFilesize ) * sizeof(uint32_t); + uint32_t masterCrc; + uint32_t *buffer = malloc( bytes ); + if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) { + free( buffer ); + return; + } + uint32_t lists_crc = crc32( 0, NULL, 0 ); + lists_crc = crc32( lists_crc, (uint8_t*)buffer, bytes ); + lists_crc = net_order_32( lists_crc ); + if ( lists_crc != masterCrc ) { + logadd( LOG_WARNING, "Received corrupted crc32 list from uplink server (%s)!", uplink->image->name ); + free( buffer ); + return; + } + uplink->image->masterCrc32 = masterCrc; + uplink->image->crc32 = buffer; + const size_t len = strlen( uplink->image->path ) + 30; + char path[len]; + snprintf( path, len, "%s.crc", uplink->image->path ); + const int fd = open( path, O_WRONLY | O_CREAT, 0644 ); + if ( fd >= 0 ) { + write( fd, &masterCrc, sizeof(uint32_t) ); + write( fd, buffer, bytes ); + close( fd ); + } +} + +/** + * Open the given image's main image file in + * rw mode, assigning it to the cacheFd struct member. + * + * @param force If cacheFd was previously assigned a file descriptor (not == -1), + * it will be closed first. Otherwise, nothing will happen and true will be returned + * immediately. + */ +static bool uplink_reopenCacheFd(dnbd3_connection_t *link, const bool force) +{ + if ( link->cacheFd != -1 ) { + if ( !force ) return true; + close( link->cacheFd ); + } + link->cacheFd = open( link->image->path, O_WRONLY | O_CREAT, 0644 ); + return link->cacheFd != -1; +} + +/** + * Saves the cache map of the given image. + * Return true on success. + * Locks on: imageListLock, image.lock + */ +static bool uplink_saveCacheMap(dnbd3_connection_t *link) +{ + dnbd3_image_t *image = link->image; + assert( image != NULL ); + + if ( link->cacheFd != -1 ) { + if ( fsync( link->cacheFd ) == -1 ) { + // A failing fsync means we have no guarantee that any data + // since the last fsync (or open if none) has been saved. Apart + // from keeping the cache_map from the last successful fsync + // around and restoring it there isn't much we can do to recover + // a consistent state. Bail out. + logadd( LOG_ERROR, "fsync() on image file %s failed with errno %d", image->path, errno ); + logadd( LOG_ERROR, "Bailing out immediately" ); + exit( 1 ); + } + } + + if ( image->cache_map == NULL ) return true; + logadd( LOG_DEBUG2, "Saving cache map of %s:%d", image->name, (int)image->rid ); + spin_lock( &image->lock ); + // Lock and get a copy of the cache map, as it could be freed by another thread that is just about to + // figure out that this image's cache copy is complete + if ( image->cache_map == NULL || image->virtualFilesize < DNBD3_BLOCK_SIZE ) { + spin_unlock( &image->lock ); + return true; + } + const size_t size = IMGSIZE_TO_MAPBYTES(image->virtualFilesize); + uint8_t *map = malloc( size ); + memcpy( map, image->cache_map, size ); + // Unlock. Use path and cacheFd without locking. path should never change after initialization of the image, + // cacheFd is owned by the uplink thread and we don't want to hold a spinlock during I/O + spin_unlock( &image->lock ); + assert( image->path != NULL ); + char mapfile[strlen( image->path ) + 4 + 1]; + strcpy( mapfile, image->path ); + strcat( mapfile, ".map" ); + + int fd = open( mapfile, O_WRONLY | O_CREAT, 0644 ); + if ( fd == -1 ) { + const int err = errno; + free( map ); + logadd( LOG_WARNING, "Could not open file to write cache map to disk (errno=%d) file %s", err, mapfile ); + return false; + } + + size_t done = 0; + while ( done < size ) { + const ssize_t ret = write( fd, map, size - done ); + if ( ret == -1 ) { + if ( errno == EINTR ) continue; + logadd( LOG_WARNING, "Could not write cache map (errno=%d) file %s", errno, mapfile ); + break; + } + if ( ret <= 0 ) { + logadd( LOG_WARNING, "Unexpected return value %d for write() to %s", (int)ret, mapfile ); + break; + } + done += (size_t)ret; + } + if ( fsync( fd ) == -1 ) { + logadd( LOG_WARNING, "fsync() on image map %s failed with errno %d", mapfile, errno ); + } + close( fd ); + free( map ); + return true; +} + +static bool uplink_connectionShouldShutdown(dnbd3_connection_t *link) +{ + return ( link->idleTime > SERVER_UPLINK_IDLE_TIMEOUT && _backgroundReplication != BGR_FULL ); +} + diff --git a/src/server/uplink.h b/src/server/uplink.h new file mode 100644 index 0000000..2b41dfc --- /dev/null +++ b/src/server/uplink.h @@ -0,0 +1,19 @@ +#ifndef _UPLINK_H_ +#define _UPLINK_H_ + +#include "globals.h" +#include "../types.h" + +void uplink_globalsInit(); + +uint64_t uplink_getTotalBytesReceived(); + +bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host, int version); + +void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client); + +bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length, uint8_t hopCount); + +void uplink_shutdown(dnbd3_image_t *image); + +#endif /* UPLINK_H_ */ diff --git a/src/server/urldecode.c b/src/server/urldecode.c new file mode 100644 index 0000000..4553097 --- /dev/null +++ b/src/server/urldecode.c @@ -0,0 +1,61 @@ +#include "urldecode.h" +#include <stdlib.h> +#include <ctype.h> + +#define hex2int(a) do { \ + if ( a >= 'a' ) { \ + a = (char)(a - ( 'a' - 'A' - 10 )); \ + } else if ( a > 'F' ) { \ + goto normie; \ + } else if ( a >= 'A' ) { \ + a = (char)(a - ( 'A' - 10 )); \ + } else if ( a < '0' || a > '9' ) { \ + goto normie; \ + } else { \ + a = (char)(a - '0'); \ + } \ +} while (0) + +void urldecode(struct string* str, struct field *out, size_t *out_num) +{ + char *src = (char*)str->s; + char *dst = src; + const char * const end = str->s + str->l; + char a, b; + size_t max_out = *out_num; + *out_num = 0; + do { + if ( *out_num == max_out ) return; + out->name.s = dst; + while ( src < end && *src != '=' ) { + *dst++ = *src++; + } + if ( src == end ) return; + out->name.l = (size_t)( dst - out->name.s ); + ++src; + out->value.s = ++dst; + while ( src < end && *src != '&' ) { + if ( *src == '%' && src + 2 < end ) { + if ( src[1] > 'f' || src[2] > 'f' ) goto normie; + a = src[1]; + hex2int(a); + b = src[2]; + hex2int(b); + *dst++ = (char)( (16 * a) + b ); + src += 3; + } else if (*src == '+') { + *dst++ = (char)' '; + ++src; + } else { + normie:; + *dst++ = *src++; + } + } + out->value.l = (size_t)( dst - out->value.s ); + out++; + (*out_num)++; + if ( src++ >= end ) return; + ++dst; + } while ( 1 ); +} + diff --git a/src/server/urldecode.h b/src/server/urldecode.h new file mode 100644 index 0000000..e27f8f8 --- /dev/null +++ b/src/server/urldecode.h @@ -0,0 +1,19 @@ +#ifndef _URLENCODE_H_ +#define _URLENCODE_H_ + +#include "picohttpparser/picohttpparser.h" + +struct field { + struct string name; + struct string value; +}; + +/** + * decode given x-form-urlencoded string. Breaks constness rules by + * casting the const char* s from str to char* and modifying it, then + * populating out with pointers into it, so make sure the memory + * is actually writable. + */ +void urldecode(struct string* str, struct field *out, size_t *out_num); + +#endif |