#include "uplink.h" #include "locks.h" #include "memlog.h" #include "sockhelper.h" #include "image.h" #include "helper.h" #include "altservers.h" #include "helper.h" #include "protocol.h" #include #include #include #include #include #include #include #include #include #include #include #include #include static void* uplink_mainloop(void *data); static void uplink_send_requests(dnbd3_connection_t *link, int newOnly); static void uplink_handle_receive(dnbd3_connection_t *link); static int uplink_send_keepalive(const int fd); static void uplink_addCrc32(dnbd3_connection_t *uplink); static void uplink_sendReplicationRequest(dnbd3_connection_t *link); // ############ uplink connection handling /** * Create and initialize an uplink instance for the given * image. Uplinks run in their own thread. * Locks on: _images[].lock */ int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) { if ( !_isProxy ) return FALSE; dnbd3_connection_t *link = NULL; assert( image != NULL ); spin_lock( &image->lock ); if ( image->uplink != NULL ) { spin_unlock( &image->lock ); return TRUE; } if ( image->cache_map == NULL ) { memlogf( "[WARNING] Uplink was requested for image %s, but it is already complete", image->lower_name ); goto failure; } link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); link->image = image; link->queueLen = 0; link->fd = -1; link->signal = -1; if ( sock >= 0 ) { link->betterFd = sock; link->betterServer = *host; link->rttTestResult = RTT_DOCHANGE; } else { link->betterFd = -1; link->rttTestResult = RTT_IDLE; } link->recvBufferLen = 0; link->shutdown = FALSE; spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) { memlogf( "[ERROR] Could not start thread for new client." ); goto failure; } spin_unlock( &image->lock ); return TRUE; failure: ; if ( link != NULL ) free( link ); link = image->uplink = NULL; spin_unlock( &image->lock ); return FALSE; } /** * Locks on image.lock, uplink.lock */ void uplink_shutdown(dnbd3_image_t *image) { assert( image != NULL ); spin_lock( &image->lock ); if ( image->uplink == NULL || image->uplink->shutdown ) { spin_unlock( &image->lock ); return; } dnbd3_connection_t * const uplink = image->uplink; spin_lock( &uplink->queueLock ); if ( uplink->shutdown ) { spin_unlock( &uplink->queueLock ); spin_unlock( &image->lock ); return; } image->uplink = NULL; uplink->shutdown = TRUE; spin_unlock( &uplink->queueLock ); spin_unlock( &image->lock ); if ( uplink->signal != -1 ) write( uplink->signal, "", 1 ); if ( uplink->image != NULL ) { pthread_join( uplink->thread, NULL ); } } /** * Remove given client from uplink request queue * Locks on: uplink.queueLock, client.sendMutex */ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) { spin_lock( &uplink->queueLock ); for (int i = uplink->queueLen - 1; i >= 0; --i) { if ( uplink->queue[i].client == client ) { uplink->queue[i].client = NULL; uplink->queue[i].status = ULR_FREE; if ( i > 20 && uplink->queueLen == i + 1 ) uplink->queueLen--; } } spin_unlock( &uplink->queueLock ); } /** * Request a chunk of data through an uplink server * Locks on: image.lock, uplink.queueLock */ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length) { if ( client == NULL || client->image == NULL ) return FALSE; spin_lock( &client->image->lock ); if ( client->image->uplink == NULL ) { spin_unlock( &client->image->lock ); printf( "[DEBUG] Uplink request for image with no uplink (%s)\n", client->image->lower_name ); return FALSE; } dnbd3_connection_t * const uplink = client->image->uplink; // Check if the client is the same host as the uplink. If so assume this is a circular proxy chain if ( isSameAddress( &uplink->currentServer, &client->host ) ) { spin_unlock( &client->image->lock ); printf( "[DEBUG] Proxy cycle detected.\n" ); return FALSE; } int foundExisting = -1; // Index of a pending request that is a superset of our range, -1 otherwise int existingType = -1; // ULR_* type of existing request int i; int freeSlot = -1; const uint64_t end = start + length; spin_lock( &uplink->queueLock ); spin_unlock( &client->image->lock ); for (i = 0; i < uplink->queueLen; ++i) { if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; if ( (foundExisting == -1 || existingType == ULR_PENDING) && uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { foundExisting = i; existingType = uplink->queue[i].status; break; } } if ( freeSlot == -1 ) { if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { spin_unlock( &uplink->queueLock ); memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); return FALSE; } freeSlot = uplink->queueLen++; } // Do not send request to uplink server if we have a matching pending request AND the request either has the // status ULR_NEW OR we found a free slot with LOWER index than the one we attach to. Otherwise // explicitly send this request to the uplink server. The second condition mentioned here is to prevent // a race condition where the reply for the outstanding request already arrived and the uplink thread // is currently traversing the request queue. As it is processing the queue from highest to lowest index, it might // already have passed the index of the free slot we determined, but not reached the existing request we just found above. if ( foundExisting != -1 && existingType != ULR_NEW && freeSlot > foundExisting ) foundExisting = -1; // Fill structure uplink->queue[freeSlot].from = start; uplink->queue[freeSlot].to = end; uplink->queue[freeSlot].handle = handle; uplink->queue[freeSlot].client = client; uplink->queue[freeSlot].status = (foundExisting == -1 ? ULR_NEW : ULR_PENDING); #ifdef _DEBUG uplink->queue[freeSlot].entered = time( NULL ); #endif const int signalFd = uplink->signal; spin_unlock( &uplink->queueLock ); if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed static uint64_t counter = 1; write( signalFd, &counter, sizeof(uint64_t) ); } return TRUE; } /** * Uplink thread. * Locks are irrelevant as this is never called from another function */ static void* uplink_mainloop(void *data) { const int MAXEVENTS = 3; struct epoll_event ev, events[MAXEVENTS]; dnbd3_connection_t *link = (dnbd3_connection_t*)data; int fdEpoll = -1; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; int discoverFailCount = 0; time_t nextAltCheck = 0; char buffer[200]; // assert( link != NULL ); assert( link->queueLen == 0 ); setThreadName( "idle-uplink" ); blockNoncriticalSignals(); // fdEpoll = epoll_create( 2 ); if ( fdEpoll == -1 ) { memlogf( "[WARNING] epoll_create failed. Uplink unavailable." ); goto cleanup; } link->signal = eventfd( 0, EFD_NONBLOCK ); if ( link->signal < 0 ) { memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); goto cleanup; } memset( &ev, 0, sizeof(ev) ); ev.events = EPOLLIN; ev.data.fd = link->signal; if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) { memlogf( "[WARNING] adding eventfd to epoll set failed" ); goto cleanup; } while ( !_shutdown && !link->shutdown ) { // Check if server switch is in order if ( link->rttTestResult == RTT_DOCHANGE ) { link->rttTestResult = RTT_IDLE; discoverFailCount = 0; // The rttTest worker thread has finished our request. // And says it's better to switch to another server const int fd = link->fd; link->fd = link->betterFd; if ( fd != -1 ) close( fd ); // If we don't have a crc32 list yet, see if the new server has one if ( link->image->crc32 == NULL ) { uplink_addCrc32( link ); } // Re-send all pending requests uplink_send_requests( link, FALSE ); link->betterFd = -1; link->currentServer = link->betterServer; link->image->working = TRUE; buffer[0] = '@'; if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { printf( "[DEBUG] Now connected to %s\n", buffer + 1 ); setThreadName( buffer ); } memset( &ev, 0, sizeof(ev) ); ev.events = EPOLLIN; ev.data.fd = link->fd; if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { memlogf( "[WARNING] adding uplink to epoll set failed" ); goto cleanup; } nextAltCheck = time( NULL ) + altCheckInterval; // The rtt worker already did the handshake for our image, so there's nothing // more to do here } // epoll() waitTime = (time( NULL ) - nextAltCheck) * 1000; if ( waitTime < 1500 ) waitTime = 1500; if ( waitTime > 5000 ) waitTime = 5000; numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime ); if ( _shutdown || link->shutdown ) goto cleanup; if ( numSocks < 0 ) { // Error? memlogf( "[DEBUG] epoll_wait() error %d", (int)errno); usleep( 10000 ); continue; } // Check all events for (i = 0; i < numSocks; ++i) { // Check for errors.... if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { if ( events[i].data.fd == link->signal ) { memlogf( "[WARNING] epoll error on signal-pipe!" ); goto cleanup; } if ( events[i].data.fd == link->fd ) { link->fd = -1; close( events[i].data.fd ); printf( "[DEBUG] Uplink gone away, panic!\n" ); nextAltCheck = 0; } else { printf( "[DEBUG] Error on unknown FD in uplink epoll\n" ); close( events[i].data.fd ); } continue; } // No error, handle normally if ( events[i].data.fd == link->signal ) { // Event on the signal fd -> a client requests data int ret; do { ret = read( link->signal, buffer, sizeof buffer ); } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up if ( ret == 0 ) { memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name ); } if ( ret < 0 ) { ret = errno; if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name ); } } if ( link->fd != -1 ) { // Uplink seems fine, relay requests to it... uplink_send_requests( link, TRUE ); } } else if ( events[i].data.fd == link->fd ) { uplink_handle_receive( link ); if ( link->fd == -1 ) nextAltCheck = 0; if ( _shutdown || link->shutdown ) goto cleanup; } else { printf( "[DEBUG] Sanity check: unknown FD ready on epoll! Closing...\n" ); close( events[i].data.fd ); } } // Done handling epoll sockets // Replicate missing blocks from the image so the proxy will eventually have a full copy uplink_sendReplicationRequest( link ); // See if we should trigger an RTT measurement if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { const time_t now = time( NULL ); if ( now + SERVER_RTT_DELAY_FAILED < nextAltCheck ) { // This probably means the system time was changed - handle this case properly by capping the timeout nextAltCheck = now + SERVER_RTT_DELAY_FAILED; } else if ( now >= nextAltCheck ) { // It seems it's time for a check if ( image_isComplete( link->image ) ) { // Quit work if image is complete memlogf( "[INFO] Replication of %s complete.", link->image->lower_name ); if ( spin_trylock( &link->image->lock ) == 0 ) { image_markComplete( link->image ); spin_lock( &link->queueLock ); if ( !link->shutdown ) { link->image->uplink = NULL; link->shutdown = TRUE; pthread_detach( link->thread ); } spin_unlock( &link->queueLock ); spin_unlock( &link->image->lock ); goto cleanup; } } else { // Not complete - do measurement altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) // Also send a keepalive packet to the currently connected server if ( link->fd != -1 ) { if ( !uplink_send_keepalive( link->fd ) ) { printf( "[DEBUG] Error sending keep-alive to uplink\n" ); altservers_serverFailed( &link->currentServer ); const int fd = link->fd; link->fd = -1; close( fd ); } } } altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); nextAltCheck = now + altCheckInterval; } } else if ( link->rttTestResult == RTT_NOT_REACHABLE ) { link->rttTestResult = RTT_IDLE; discoverFailCount++; nextAltCheck = time( NULL ) + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED); } #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { time_t deadline = time( NULL ) - 10; spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) { snprintf( buffer, sizeof(buffer), "[DEBUG WARNING] Starving request detected:\n" "%s\n(from %" PRIu64 " to %" PRIu64 ", status: %d)\n", link->queue[i].client->image->lower_name, link->queue[i].from, link->queue[i].to, link->queue[i].status ); spin_unlock( &link->queueLock ); printf("%s", buffer); spin_lock( &link->queueLock ); } } spin_unlock( &link->queueLock ); } #endif } cleanup: ; spin_lock( &link->image->lock ); spin_lock( &link->queueLock ); if (link->image != NULL) link->image->uplink = NULL; spin_unlock( &link->image->lock ); const int fd = link->fd; const int signal = link->signal; link->fd = -1; link->signal = -1; spin_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); if ( signal != -1 ) close( signal ); if ( fdEpoll != -1 ) close( fdEpoll ); // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); if ( link->betterFd != -1 ) close( link->betterFd ); spin_destroy( &link->queueLock ); free( link->recvBuffer ); link->recvBuffer = NULL; free( link ); return NULL ; } static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) { // Scan for new requests int j; dnbd3_request_t request; request.magic = dnbd3_packet_magic; spin_lock( &link->queueLock ); for (j = 0; j < link->queueLen; ++j) { if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue; link->queue[j].status = ULR_PENDING; request.handle = link->queue[j].from; // HACK: Store offset in handle too, as it won't be included in the reply request.cmd = CMD_GET_BLOCK; request.offset = link->queue[j].from; request.size = link->queue[j].to - link->queue[j].from; spin_unlock( &link->queueLock ); fixup_request( request ); const int ret = send( link->fd, &request, sizeof request, MSG_NOSIGNAL ); if ( ret != sizeof(request) ) { // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection // is reestablished. printf( "[DEBUG] Error forwarding request to uplink server!\n" ); altservers_serverFailed( &link->currentServer ); return; } spin_lock( &link->queueLock ); } spin_unlock( &link->queueLock ); } /** * Sent a block request to an uplink server without really having * any client that needs that data. This will be used for background replication */ static void uplink_sendReplicationRequest(dnbd3_connection_t *link) { if ( link == NULL || link->fd == -1 ) return; dnbd3_image_t * const image = link->image; if ( image == NULL || image->cache_map == NULL || image->filesize < DNBD3_BLOCK_SIZE ) return; const time_t now = time( NULL ); if ( now <= link->lastReplication + 1 ) return; link->lastReplication = now; spin_lock( &image->lock ); if ( image == NULL || image->cache_map == NULL ) { spin_unlock( &image->lock ); return; } dnbd3_request_t request; request.magic = dnbd3_packet_magic; int sent = 0; const size_t len = IMGSIZE_TO_MAPBYTES( image->filesize ) - 1; for (int i = 0; i <= len; ++i) { if ( image->cache_map == NULL || link->fd == -1 || sent > 20 ) break; if ( image->cache_map[i] == 0xff || (i == len && link->replicatedLastBlock) ) continue; if ( i == len ) link->replicatedLastBlock = TRUE; spin_unlock( &image->lock ); // Unlocked - do not break or continue here... ++sent; request.cmd = CMD_GET_BLOCK; request.offset = request.handle = (uint64_t)i * DNBD3_BLOCK_SIZE * (uint64_t)8; request.size = DNBD3_BLOCK_SIZE * (uint64_t)8; if ( request.offset + request.size > image->filesize ) { request.size = image->filesize - request.offset; } fixup_request( request ); const int ret = send( link->fd, &request, sizeof request, MSG_NOSIGNAL ); if ( ret != sizeof(request) ) { printf( "[DEBUG] Error sending background replication request to uplink server!\n" ); return; } // Lock again... spin_lock( &image->lock ); } spin_unlock( &image->lock ); } /** * Receive data from uplink server and process/dispatch * Locks on: link.lock, indirectly on images[].lock */ static void uplink_handle_receive(dnbd3_connection_t *link) { dnbd3_reply_t inReply, outReply; int ret, i; for (;;) { ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT | MSG_NOSIGNAL ); if ( ret < 0 ) { const int err = errno; if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases goto error_cleanup; } if ( ret == 0 ) { memlogf( "[INFO] Uplink: Remote host hung up (%s)", link->image->path ); goto error_cleanup; } if ( ret != sizeof inReply ) ret += recv( link->fd, &inReply + ret, sizeof(inReply) - ret, MSG_WAITALL | MSG_NOSIGNAL ); if ( ret != sizeof inReply ) { const int err = errno; memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply), err ); goto error_cleanup; } fixup_reply( inReply ); if ( inReply.magic != dnbd3_packet_magic ) { memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); goto error_cleanup; } if ( inReply.size > 9000000 ) { memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } if ( link->recvBufferLen < inReply.size ) { if ( link->recvBuffer != NULL ) free( link->recvBuffer ); link->recvBufferLen = MIN(9000000, inReply.size + 8192); link->recvBuffer = malloc( link->recvBufferLen ); } uint32_t done = 0; while ( done < inReply.size ) { ret = recv( link->fd, link->recvBuffer + done, inReply.size - done, MSG_NOSIGNAL ); if ( ret <= 0 ) { memlogf( "[INFO] Lost connection to uplink server of %s (payload)", link->image->path ); goto error_cleanup; } done += ret; } // Payload read completely // Bail out if we're not interested if ( inReply.cmd != CMD_GET_BLOCK ) return; // Is a legit block reply const uint64_t start = inReply.handle; const uint64_t end = inReply.handle + inReply.size; // 1) Write to cache file assert( link->image->cacheFd != -1 ); if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); } else { ret = (int)write( link->image->cacheFd, link->recvBuffer, inReply.size ); if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE ); } // 2) Figure out which clients are interested in it struct iovec iov[2]; spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; assert( req->status != ULR_PROCESSING || req->status != ULR_NEW ); if ( req->status != ULR_PENDING ) continue; if ( req->from >= start && req->to <= end ) { // Match :-) req->status = ULR_PROCESSING; } } // 3) Send to interested clients - iterate backwards so request collaboration works, and // so we can decrease queueLen on the fly while iterating. Should you ever change this to start // from 0, you also need to change the "attach to existing request"-logic in uplink_request() outReply.magic = dnbd3_packet_magic; for (i = link->queueLen - 1; i >= 0; --i) { dnbd3_queued_request_t * const req = &link->queue[i]; if ( req->status != ULR_PROCESSING ) continue; assert( req->from >= start && req->to <= end ); dnbd3_client_t * const client = req->client; outReply.cmd = CMD_GET_BLOCK; outReply.handle = req->handle; outReply.size = req->to - req->from; iov[0].iov_base = &outReply; iov[0].iov_len = sizeof outReply; iov[1].iov_base = link->recvBuffer + (req->from - start); iov[1].iov_len = outReply.size; fixup_reply( outReply ); req->status = ULR_FREE; pthread_mutex_lock( &client->sendMutex ); spin_unlock( &link->queueLock ); writev( client->sock, iov, 2 ); pthread_mutex_unlock( &client->sendMutex ); spin_lock( &link->queueLock ); if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; } spin_unlock( &link->queueLock ); } error_cleanup: ; altservers_serverFailed( &link->currentServer ); const int fd = link->fd; link->fd = -1; if ( fd != -1 ) close( fd ); } /** * Send keep alive request to server */ static int uplink_send_keepalive(const int fd) { static dnbd3_request_t request = { 0, 0, 0, 0, 0 }; if ( request.magic == 0 ) { request.magic = dnbd3_packet_magic; request.cmd = CMD_KEEPALIVE; fixup_request( request ); } return send( fd, &request, sizeof(request), MSG_NOSIGNAL ) == sizeof(request); } static void uplink_addCrc32(dnbd3_connection_t *uplink) { dnbd3_image_t *image = uplink->image; if ( image == NULL || image->filesize == 0 ) return; size_t bytes = IMGSIZE_TO_HASHBLOCKS(image->filesize) * sizeof(uint32_t); uint32_t masterCrc; uint32_t *buffer = malloc( bytes ); if ( !dnbd3_get_crc32( uplink->fd, &masterCrc, buffer, &bytes ) || bytes == 0 ) { free( buffer ); return; } uint32_t lists_crc = crc32( 0L, Z_NULL, 0 ); lists_crc = crc32( lists_crc, (Bytef*)buffer, bytes ); if ( lists_crc != masterCrc ) { memlogf( "[WARNING] Received corrupted crc32 list from uplink server (%s)!", uplink->image->lower_name ); free( buffer ); return; } uplink->image->masterCrc32 = masterCrc; uplink->image->crc32 = buffer; const size_t len = strlen( uplink->image->path ) + 30; char path[len]; snprintf( path, len, "%s.crc", uplink->image->path ); const int fd = open( path, O_WRONLY | O_CREAT, 0640 ); if ( fd >= 0 ) { write( fd, &masterCrc, sizeof(uint32_t) ); write( fd, buffer, bytes ); close( fd ); } }