summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2013-08-12 18:04:39 +0200
committerSimon Rettberg2013-08-12 18:04:39 +0200
commita1dd0acdbdd6a9b70f9d7aa447e323f2072c650a (patch)
treeea67a4cc5c3dd003184bb13ccfaf3fa90c42720c /src/server/uplink.c
parentI'm stupid (diff)
downloaddnbd3-a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a.tar.gz
dnbd3-a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a.tar.xz
dnbd3-a1dd0acdbdd6a9b70f9d7aa447e323f2072c650a.zip
[SERVER] Improve proxy mode, implement integrity check in proxy mode
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c262
1 files changed, 165 insertions, 97 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 4dbe75a..6d05f94 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -10,6 +10,7 @@
#include <string.h>
#include <sys/epoll.h>
#include <sys/errno.h>
+#include <sys/eventfd.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
@@ -19,6 +20,7 @@
static void* uplink_mainloop(void *data);
static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
static void uplink_handle_receive(dnbd3_connection_t *link);
+static int uplink_send_keepalive(const int fd);
// ############ uplink connection handling
@@ -79,6 +81,24 @@ void uplink_shutdown(dnbd3_image_t *image)
}
/**
+ * Remove given client from uplink request queue
+ */
+void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
+{
+ spin_lock( &uplink->queueLock );
+ for (int i = 0; i < uplink->queueLen; ++i) {
+ if ( uplink->queue[i].client == client ) {
+ // Lock on the send mutex as the uplink thread might just be writing to the client
+ pthread_mutex_lock( &client->sendMutex );
+ uplink->queue[i].client = NULL;
+ uplink->queue[i].status = ULR_FREE;
+ pthread_mutex_unlock( &client->sendMutex );
+ }
+ }
+ spin_unlock( &uplink->queueLock );
+}
+
+/**
* Request a chunk of data through an uplink server
*/
int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
@@ -118,10 +138,14 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
uplink->queue[freeSlot].handle = handle;
uplink->queue[freeSlot].client = client;
uplink->queue[freeSlot].status = (foundExisting ? ULR_PENDING : ULR_NEW);
+#ifdef _DEBUG
+ uplink->queue[freeSlot].entered = time( NULL );
+#endif
spin_unlock( &uplink->queueLock );
if ( !foundExisting ) {
- write( uplink->signal, "", 1 );
+ static uint64_t counter = 1;
+ write( uplink->signal, &counter, sizeof(uint64_t) );
}
return TRUE;
}
@@ -135,7 +159,7 @@ static void* uplink_mainloop(void *data)
const int MAXEVENTS = 3;
struct epoll_event ev, events[MAXEVENTS];
dnbd3_connection_t *link = (dnbd3_connection_t*)data;
- int fdEpoll = -1, fdPipe = -1;
+ int fdEpoll = -1;
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
int bFree = FALSE;
@@ -151,27 +175,24 @@ static void* uplink_mainloop(void *data)
goto cleanup;
}
{
- int pipes[2];
- if ( pipe( pipes ) < 0 ) {
+ link->signal = eventfd( 0, EFD_NONBLOCK );
+ if ( link->signal < 0 ) {
memlogf( "[WARNING] error creating pipe. Uplink unavailable." );
goto cleanup;
}
- sock_set_nonblock( pipes[0] );
- sock_set_nonblock( pipes[1] );
- fdPipe = pipes[0];
- link->signal = pipes[1];
memset( &ev, 0, sizeof(ev) );
ev.events = EPOLLIN;
- ev.data.fd = fdPipe;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, fdPipe, &ev ) < 0 ) {
- memlogf( "[WARNING] adding signal-pipe to epoll set failed" );
+ ev.data.fd = link->signal;
+ if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) {
+ memlogf( "[WARNING] adding eventfd to epoll set failed" );
goto cleanup;
}
}
while ( !_shutdown && !link->shutdown ) {
// epoll()
if ( link->fd == -1 ) {
- waitTime = 1500;
+ waitTime = 2000;
+ nextAltCheck = 0;
} else {
waitTime = (time( NULL ) - nextAltCheck) * 1000;
if ( waitTime < 1500 ) waitTime = 1500;
@@ -185,7 +206,7 @@ static void* uplink_mainloop(void *data)
}
for (i = 0; i < numSocks; ++i) { // Check all events
if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) {
- if ( events[i].data.fd == fdPipe ) {
+ if ( events[i].data.fd == link->signal ) {
memlogf( "[WARNING] epoll error on signal-pipe!" );
goto cleanup;
}
@@ -195,23 +216,23 @@ static void* uplink_mainloop(void *data)
printf( "[DEBUG] Uplink gone away, panic!\n" );
nextAltCheck = 0;
} else {
- printf( "[DEBUG] Error on unknown FD in uplink epoll" );
+ printf( "[DEBUG] Error on unknown FD in uplink epoll\n" );
close( events[i].data.fd );
}
continue;
}
// No error, handle normally
- if ( events[i].data.fd == fdPipe ) {
+ if ( events[i].data.fd == link->signal ) {
int ret;
do {
- ret = read( fdPipe, buffer, sizeof buffer );
+ ret = read( link->signal, buffer, sizeof buffer );
} while ( ret > 0 ); // Throw data away, this is just used for waking this thread up
if ( ret == 0 ) {
- memlogf( "[WARNING] Signal pipe of uplink for %s closed! Things will break!", link->image->lower_name );
+ memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name );
}
ret = errno;
if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) {
- memlogf( "[WARNING] Errno %d on pipe-read on uplink for %s! Things will break!", ret, link->image->lower_name );
+ memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name );
}
if ( link->fd != -1 ) {
uplink_send_requests( link, TRUE );
@@ -263,8 +284,9 @@ static void* uplink_mainloop(void *data)
// It seems it's time for a check
if ( image_isComplete( link->image ) ) {
// Quit work if image is complete
+ memlogf( "[INFO] Replication of %s complete.", link->image->lower_name );
if ( spin_trylock( &link->image->lock ) == 0 ) {
- image_markComplete(link->image);
+ image_markComplete( link->image );
link->image->uplink = NULL;
link->shutdown = TRUE;
free( link->recvBuffer );
@@ -280,11 +302,34 @@ static void* uplink_mainloop(void *data)
} else {
// Not complete- do measurement
altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ // Also send a keepalive packet to the currently connected server
+ if ( link->fd != -1 ) {
+ if ( !uplink_send_keepalive( link->fd ) ) {
+ printf( "[DEBUG] Error sending keep-alive to uplink\n" );
+ const int fd = link->fd;
+ link->fd = -1;
+ close( fd );
+ }
+ }
}
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
nextAltCheck = now + altCheckInterval;
}
}
+#ifdef _DEBUG
+ if ( link->fd != -1 ) {
+ time_t deadline = time( NULL ) - 10;
+ spin_lock( &link->queueLock );
+ for (i = 0; i < link->queueLen; ++i) {
+ if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) {
+ printf( "[DEBUG WARNING] Starving request detected:\n"
+ "%s\n(from %" PRIu64 " to %" PRIu64 "\n", link->queue[i].client->image->lower_name, link->queue[i].from,
+ link->queue[i].to );
+ }
+ }
+ spin_unlock( &link->queueLock );
+ }
+#endif
}
cleanup: ;
const int fd = link->fd;
@@ -293,7 +338,6 @@ static void* uplink_mainloop(void *data)
link->signal = -1;
if ( fd != -1 ) close( fd );
if ( signal != -1 ) close( signal );
- if ( fdPipe != -1 ) close( fdPipe );
if ( fdEpoll != -1 ) close( fdEpoll );
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
@@ -324,7 +368,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
// Non-critical - if the connection dropped or the server was changed
// the thread will re-send this request as soon as the connection
// is reestablished.
- printf( "[DEBUG] Error sending request to uplink server!" );
+ printf( "[DEBUG] Error sending request to uplink server!\n" );
}
spin_lock( &link->queueLock );
}
@@ -337,90 +381,114 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
*/
static void uplink_handle_receive(dnbd3_connection_t *link)
{
- dnbd3_reply_t reply;
+ dnbd3_reply_t inReply, outReply;
int ret, i;
- ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL );
- if ( ret != sizeof reply ) {
- memlogf( "[INFO] Lost connection to uplink server for %s", link->image->path );
- goto error_cleanup;
- }
- fixup_reply( reply );
- if ( reply.magic != dnbd3_packet_magic ) {
- memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
- goto error_cleanup;
- }
- if ( reply.size > 9000000 ) {
- memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
- goto error_cleanup;
- }
- if ( link->recvBufferLen < reply.size ) {
- if ( link->recvBuffer != NULL ) free( link->recvBuffer );
- link->recvBufferLen = MIN(9000000, reply.size + 8192);
- link->recvBuffer = malloc( link->recvBufferLen );
- }
- uint32_t done = 0;
- while ( done < reply.size ) {
- ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 );
- if ( ret <= 0 ) {
- memlogf( "[INFO] Lost connection to uplink server of", link->image->path );
+ for (;;) {
+ ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT );
+ if ( ret < 0 ) {
+ const int err = errno;
+ if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) return; // OK cases
goto error_cleanup;
}
- done += ret;
- }
- // Payload read completely
- // Bail out if we're not interested
- if ( reply.cmd != CMD_GET_BLOCK ) return;
- // Is a legit block reply
- const uint64_t start = reply.handle;
- const uint64_t end = reply.handle + reply.size;
- // 1) Write to cache file
- assert( link->image->cacheFd != -1 );
- if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
- memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
- } else {
- ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size );
- if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE );
- }
- // 2) Figure out which clients are interested in it
- struct iovec iov[2];
- spin_lock( &link->queueLock );
- for (i = 0; i < link->queueLen; ++i) {
- dnbd3_queued_request_t * const req = &link->queue[i];
- assert( req->status != ULR_PROCESSING );
- if ( req->status != ULR_PENDING ) continue;
- if ( req->from >= start && req->to <= end ) { // Match :-)
- req->status = ULR_PROCESSING;
+ if ( ret == 0 ) {
+ memlogf( "[INFO] Uplink: Remote host hung up (%s)", link->image->path );
+ goto error_cleanup;
}
- }
- // 3) Send to interested clients
- reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here
- for (i = link->queueLen - 1; i >= 0; --i) {
- dnbd3_queued_request_t * const req = &link->queue[i];
- if ( req->status != ULR_PROCESSING ) continue;
- assert( req->from >= start && req->to <= end );
- reply.cmd = CMD_GET_BLOCK;
- reply.handle = req->handle;
- reply.size = req->to - req->from;
- iov[0].iov_base = &reply;
- iov[0].iov_len = sizeof reply;
- iov[1].iov_base = link->recvBuffer + (req->from - start);
- iov[1].iov_len = reply.size;
- fixup_reply( reply );
- spin_unlock( &link->queueLock );
- // send: Don't care about errors here, let the client
- // connection thread deal with it if something goes wrong
- pthread_mutex_lock( &req->client->sendMutex );
- writev( req->client->sock, iov, 2 );
- pthread_mutex_unlock( &req->client->sendMutex );
+ if ( ret != sizeof inReply ) ret += recv( link->fd, &inReply + ret, sizeof(inReply) - ret, MSG_WAITALL );
+ if ( ret != sizeof inReply ) {
+ const int err = errno;
+ memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply),
+ err );
+ goto error_cleanup;
+ }
+ fixup_reply( inReply );
+ if ( inReply.magic != dnbd3_packet_magic ) {
+ memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
+ goto error_cleanup;
+ }
+ if ( inReply.size > 9000000 ) {
+ memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
+ goto error_cleanup;
+ }
+ if ( link->recvBufferLen < inReply.size ) {
+ if ( link->recvBuffer != NULL ) free( link->recvBuffer );
+ link->recvBufferLen = MIN(9000000, inReply.size + 8192);
+ link->recvBuffer = malloc( link->recvBufferLen );
+ }
+ uint32_t done = 0;
+ while ( done < inReply.size ) {
+ ret = recv( link->fd, link->recvBuffer + done, inReply.size - done, 0 );
+ if ( ret <= 0 ) {
+ memlogf( "[INFO] Lost connection to uplink server of %s (payload)", link->image->path );
+ goto error_cleanup;
+ }
+ done += ret;
+ }
+ // Payload read completely
+ // Bail out if we're not interested
+ if ( inReply.cmd != CMD_GET_BLOCK ) return;
+ // Is a legit block reply
+ const uint64_t start = inReply.handle;
+ const uint64_t end = inReply.handle + inReply.size;
+ // 1) Write to cache file
+ assert( link->image->cacheFd != -1 );
+ if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
+ memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
+ } else {
+ ret = (int)write( link->image->cacheFd, link->recvBuffer, inReply.size );
+ if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, TRUE );
+ }
+ // 2) Figure out which clients are interested in it
+ struct iovec iov[2];
spin_lock( &link->queueLock );
- req->status = ULR_FREE;
- if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
+ for (i = 0; i < link->queueLen; ++i) {
+ dnbd3_queued_request_t * const req = &link->queue[i];
+ assert( req->status != ULR_PROCESSING );
+ if ( req->status != ULR_PENDING ) continue;
+ if ( req->from >= start && req->to <= end ) { // Match :-)
+ req->status = ULR_PROCESSING;
+ }
+ }
+ // 3) Send to interested clients
+ outReply.magic = dnbd3_packet_magic;
+ for (i = link->queueLen - 1; i >= 0; --i) {
+ dnbd3_queued_request_t * const req = &link->queue[i];
+ if ( req->status != ULR_PROCESSING ) continue;
+ assert( req->from >= start && req->to <= end );
+ outReply.cmd = CMD_GET_BLOCK;
+ outReply.handle = req->handle;
+ outReply.size = req->to - req->from;
+ iov[0].iov_base = &outReply;
+ iov[0].iov_len = sizeof outReply;
+ iov[1].iov_base = link->recvBuffer + (req->from - start);
+ iov[1].iov_len = outReply.size;
+ fixup_reply( outReply );
+ pthread_mutex_lock( &req->client->sendMutex );
+ spin_unlock( &link->queueLock );
+ writev( req->client->sock, iov, 2 );
+ pthread_mutex_unlock( &req->client->sendMutex );
+ spin_lock( &link->queueLock );
+ if ( req->status == ULR_PROCESSING ) req->status = ULR_FREE; // Might have changed in the meantime
+ if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
+ }
+ spin_unlock( &link->queueLock );
}
- spin_unlock( &link->queueLock );
- return;
error_cleanup: ;
const int fd = link->fd;
link->fd = -1;
if ( fd != -1 ) close( fd );
- return;
+}
+
+/**
+ * Send keep alive request to server
+ */
+static int uplink_send_keepalive(const int fd)
+{
+ static dnbd3_request_t request = { 0, 0, 0, 0, 0 };
+ if ( request.magic == 0 ) {
+ request.magic = dnbd3_packet_magic;
+ request.cmd = CMD_KEEPALIVE;
+ fixup_request( request );
+ }
+ return send( fd, &request, sizeof(request), 0 ) == sizeof(request);
}