From a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 12 Aug 2013 18:04:39 +0200 Subject: [SERVER] Improve proxy mode, implement integrity check in proxy mode --- src/server/uplink.c | 262 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 165 insertions(+), 97 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 4dbe75a..6d05f94 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -19,6 +20,7 @@ static void* uplink_mainloop(void *data); static void uplink_send_requests(dnbd3_connection_t *link, int newOnly); static void uplink_handle_receive(dnbd3_connection_t *link); +static int uplink_send_keepalive(const int fd); // ############ uplink connection handling @@ -78,6 +80,24 @@ void uplink_shutdown(dnbd3_image_t *image) free( uplink ); } +/** + * Remove given client from uplink request queue + */ +void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) +{ + spin_lock( &uplink->queueLock ); + for (int i = 0; i < uplink->queueLen; ++i) { + if ( uplink->queue[i].client == client ) { + // Lock on the send mutex as the uplink thread might just be writing to the client + pthread_mutex_lock( &client->sendMutex ); + uplink->queue[i].client = NULL; + uplink->queue[i].status = ULR_FREE; + pthread_mutex_unlock( &client->sendMutex ); + } + } + spin_unlock( &uplink->queueLock ); +} + /** * Request a chunk of data through an uplink server */ @@ -118,10 +138,14 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint uplink->queue[freeSlot].handle = handle; uplink->queue[freeSlot].client = client; uplink->queue[freeSlot].status = (foundExisting ? ULR_PENDING : ULR_NEW); +#ifdef _DEBUG + uplink->queue[freeSlot].entered = time( NULL ); +#endif spin_unlock( &uplink->queueLock ); if ( !foundExisting ) { - write( uplink->signal, "", 1 ); + static uint64_t counter = 1; + write( uplink->signal, &counter, sizeof(uint64_t) ); } return TRUE; } @@ -135,7 +159,7 @@ static void* uplink_mainloop(void *data) const int MAXEVENTS = 3; struct epoll_event ev, events[MAXEVENTS]; dnbd3_connection_t *link = (dnbd3_connection_t*)data; - int fdEpoll = -1, fdPipe = -1; + int fdEpoll = -1; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; int bFree = FALSE; @@ -151,27 +175,24 @@ static void* uplink_mainloop(void *data) goto cleanup; } { - int pipes[2]; - if ( pipe( pipes ) < 0 ) { + link->signal = eventfd( 0, EFD_NONBLOCK ); + if ( link->signal < 0 ) { memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); goto cleanup; } - sock_set_nonblock( pipes[0] ); - sock_set_nonblock( pipes[1] ); - fdPipe = pipes[0]; - link->signal = pipes[1]; memset( &ev, 0, sizeof(ev) ); ev.events = EPOLLIN; - ev.data.fd = fdPipe; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, fdPipe, &ev ) < 0 ) { - memlogf( "[WARNING] adding signal-pipe to epoll set failed" ); + ev.data.fd = link->signal; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) { + memlogf( "[WARNING] adding eventfd to epoll set failed" ); goto cleanup; } } while ( !_shutdown && !link->shutdown ) { // epoll() if ( link->fd == -1 ) { - waitTime = 1500; + waitTime = 2000; + nextAltCheck = 0; } else { waitTime = (time( NULL ) - nextAltCheck) * 1000; if ( waitTime < 1500 ) waitTime = 1500; @@ -185,7 +206,7 @@ static void* uplink_mainloop(void *data) } for (i = 0; i < numSocks; ++i) { // Check all events if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { - if ( events[i].data.fd == fdPipe ) { + if ( events[i].data.fd == link->signal ) { memlogf( "[WARNING] epoll error on signal-pipe!" ); goto cleanup; } @@ -195,23 +216,23 @@ static void* uplink_mainloop(void *data) printf( "[DEBUG] Uplink gone away, panic!\n" ); nextAltCheck = 0; } else { - printf( "[DEBUG] Error on unknown FD in uplink epoll" ); + printf( "[DEBUG] Error on unknown FD in uplink epoll\n" ); close( events[i].data.fd ); } continue; } // No error, handle normally - if ( events[i].data.fd == fdPipe ) { + if ( events[i].data.fd == link->signal ) { int ret; do { - ret = read( fdPipe, buffer, sizeof buffer ); + ret = read( link->signal, buffer, sizeof buffer ); } while ( ret > 0 ); // Throw data away, this is just used for waking this thread up if ( ret == 0 ) { - memlogf( "[WARNING] Signal pipe of uplink for %s closed! Things will break!", link->image->lower_name ); + memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name ); } ret = errno; if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { - memlogf( "[WARNING] Errno %d on pipe-read on uplink for %s! Things will break!", ret, link->image->lower_name ); + memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name ); } if ( link->fd != -1 ) { uplink_send_requests( link, TRUE ); @@ -263,8 +284,9 @@ static void* uplink_mainloop(void *data) // It seems it's time for a check if ( image_isComplete( link->image ) ) { // Quit work if image is complete + memlogf( "[INFO] Replication of %s complete.", link->image->lower_name ); if ( spin_trylock( &link->image->lock ) == 0 ) { - image_markComplete(link->image); + image_markComplete( link->image ); link->image->uplink = NULL; link->shutdown = TRUE; free( link->recvBuffer ); @@ -280,11 +302,34 @@ static void* uplink_mainloop(void *data) } else { // Not complete- do measurement altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) + // Also send a keepalive packet to the currently connected server + if ( link->fd != -1 ) { + if ( !uplink_send_keepalive( link->fd ) ) { + printf( "[DEBUG] Error sending keep-alive to uplink\n" ); + const int fd = link->fd; + link->fd = -1; + close( fd ); + } + } } altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX); nextAltCheck = now + altCheckInterval; } } +#ifdef _DEBUG + if ( link->fd != -1 ) { + time_t deadline = time( NULL ) - 10; + spin_lock( &link->queueLock ); + for (i = 0; i < link->queueLen; ++i) { + if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) { + printf( "[DEBUG WARNING] Starving request detected:\n" + "%s\n(from %" PRIu64 " to %" PRIu64 "\n", link->queue[i].client->image->lower_name, link->queue[i].from, + link->queue[i].to ); + } + } + spin_unlock( &link->queueLock ); + } +#endif } cleanup: ; const int fd = link->fd; @@ -293,7 +338,6 @@ static void* uplink_mainloop(void *data) link->signal = -1; if ( fd != -1 ) close( fd ); if ( signal != -1 ) close( signal ); - if ( fdPipe != -1 ) close( fdPipe ); if ( fdEpoll != -1 ) close( fdEpoll ); // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) @@ -324,7 +368,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) // Non-critical - if the connection dropped or the server was changed // the thread will re-send this request as soon as the connection // is reestablished. - printf( "[DEBUG] Error sending request to uplink server!" ); + printf( "[DEBUG] Error sending request to uplink server!\n" ); } spin_lock( &link->queueLock ); } @@ -337,90 +381,114 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) */ static void uplink_handle_receive(dnbd3_connection_t *link) { - dnbd3_reply_t reply; + dnbd3_reply_t inReply, outReply; int ret, i; - ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL ); - if ( ret != sizeof reply ) { - memlogf( "[INFO] Lost connection to uplink server for %s", link->image->path ); - goto error_cleanup; - } - fixup_reply( reply ); - if ( reply.magic != dnbd3_packet_magic ) { - memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); - goto error_cleanup; - } - if ( reply.size > 9000000 ) { - memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); - goto error_cleanup; - } - if ( link->recvBufferLen < reply.size ) { - if ( link->recvBuffer != NULL ) free( link->recvBuffer ); - link->recvBufferLen = MIN(9000000, reply.size + 8192); - link->recvBuffer = malloc( link->recvBufferLen ); - } - uint32_t done = 0; - while ( done < reply.size ) { - ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 ); - if ( ret <= 0 ) { - memlogf( "[INFO] Lost connection to uplink server of", link->image->path ); + for (;;) { + ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT ); + if ( ret < 0 ) { + const int err = errno; + if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases goto error_cleanup; } - done += ret; - } - // Payload read completely - // Bail out if we're not interested - if ( reply.cmd != CMD_GET_BLOCK ) return; - // Is a legit block reply - const uint64_t start = reply.handle; - const uint64_t end = reply.handle + reply.size; - // 1) Write to cache file - assert( link->image->cacheFd != -1 ); - if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { - memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); - } else { - ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size ); - if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE ); - } - // 2) Figure out which clients are interested in it - struct iovec iov[2]; - spin_lock( &link->queueLock ); - for (i = 0; i < link->queueLen; ++i) { - dnbd3_queued_request_t * const req = &link->queue[i]; - assert( req->status != ULR_PROCESSING ); - if ( req->status != ULR_PENDING ) continue; - if ( req->from >= start && req->to <= end ) { // Match :-) - req->status = ULR_PROCESSING; + if ( ret == 0 ) { + memlogf( "[INFO] Uplink: Remote host hung up (%s)", link->image->path ); + goto error_cleanup; } - } - // 3) Send to interested clients - reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here - for (i = link->queueLen - 1; i >= 0; --i) { - dnbd3_queued_request_t * const req = &link->queue[i]; - if ( req->status != ULR_PROCESSING ) continue; - assert( req->from >= start && req->to <= end ); - reply.cmd = CMD_GET_BLOCK; - reply.handle = req->handle; - reply.size = req->to - req->from; - iov[0].iov_base = &reply; - iov[0].iov_len = sizeof reply; - iov[1].iov_base = link->recvBuffer + (req->from - start); - iov[1].iov_len = reply.size; - fixup_reply( reply ); - spin_unlock( &link->queueLock ); - // send: Don't care about errors here, let the client - // connection thread deal with it if something goes wrong - pthread_mutex_lock( &req->client->sendMutex ); - writev( req->client->sock, iov, 2 ); - pthread_mutex_unlock( &req->client->sendMutex ); + if ( ret != sizeof inReply ) ret += recv( link->fd, &inReply + ret, sizeof(inReply) - ret, MSG_WAITALL ); + if ( ret != sizeof inReply ) { + const int err = errno; + memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply), + err ); + goto error_cleanup; + } + fixup_reply( inReply ); + if ( inReply.magic != dnbd3_packet_magic ) { + memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); + goto error_cleanup; + } + if ( inReply.size > 9000000 ) { + memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); + goto error_cleanup; + } + if ( link->recvBufferLen < inReply.size ) { + if ( link->recvBuffer != NULL ) free( link->recvBuffer ); + link->recvBufferLen = MIN(9000000, inReply.size + 8192); + link->recvBuffer = malloc( link->recvBufferLen ); + } + uint32_t done = 0; + while ( done < inReply.size ) { + ret = recv( link->fd, link->recvBuffer + done, inReply.size - done, 0 ); + if ( ret <= 0 ) { + memlogf( "[INFO] Lost connection to uplink server of %s (payload)", link->image->path ); + goto error_cleanup; + } + done += ret; + } + // Payload read completely + // Bail out if we're not interested + if ( inReply.cmd != CMD_GET_BLOCK ) return; + // Is a legit block reply + const uint64_t start = inReply.handle; + const uint64_t end = inReply.handle + inReply.size; + // 1) Write to cache file + assert( link->image->cacheFd != -1 ); + if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { + memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); + } else { + ret = (int)write( link->image->cacheFd, link->recvBuffer, inReply.size ); + if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE ); + } + // 2) Figure out which clients are interested in it + struct iovec iov[2]; spin_lock( &link->queueLock ); - req->status = ULR_FREE; - if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; + for (i = 0; i < link->queueLen; ++i) { + dnbd3_queued_request_t * const req = &link->queue[i]; + assert( req->status != ULR_PROCESSING ); + if ( req->status != ULR_PENDING ) continue; + if ( req->from >= start && req->to <= end ) { // Match :-) + req->status = ULR_PROCESSING; + } + } + // 3) Send to interested clients + outReply.magic = dnbd3_packet_magic; + for (i = link->queueLen - 1; i >= 0; --i) { + dnbd3_queued_request_t * const req = &link->queue[i]; + if ( req->status != ULR_PROCESSING ) continue; + assert( req->from >= start && req->to <= end ); + outReply.cmd = CMD_GET_BLOCK; + outReply.handle = req->handle; + outReply.size = req->to - req->from; + iov[0].iov_base = &outReply; + iov[0].iov_len = sizeof outReply; + iov[1].iov_base = link->recvBuffer + (req->from - start); + iov[1].iov_len = outReply.size; + fixup_reply( outReply ); + pthread_mutex_lock( &req->client->sendMutex ); + spin_unlock( &link->queueLock ); + writev( req->client->sock, iov, 2 ); + pthread_mutex_unlock( &req->client->sendMutex ); + spin_lock( &link->queueLock ); + if ( req->status == ULR_PROCESSING ) req->status = ULR_FREE; // Might have changed in the meantime + if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; + } + spin_unlock( &link->queueLock ); } - spin_unlock( &link->queueLock ); - return; error_cleanup: ; const int fd = link->fd; link->fd = -1; if ( fd != -1 ) close( fd ); - return; +} + +/** + * Send keep alive request to server + */ +static int uplink_send_keepalive(const int fd) +{ + static dnbd3_request_t request = { 0, 0, 0, 0, 0 }; + if ( request.magic == 0 ) { + request.magic = dnbd3_packet_magic; + request.cmd = CMD_KEEPALIVE; + fixup_request( request ); + } + return send( fd, &request, sizeof(request), 0 ) == sizeof(request); } -- cgit v1.2.3-55-g7522