From cfb0fba59db0937a00ff04b03aaa28ca671fe4d7 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 26 Aug 2013 18:59:22 +0200 Subject: [SERVER] On-the-fly transparent proxying --- src/server/uplink.c | 76 +++++++++++++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 34 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 38b1415..dc2874e 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -30,7 +30,7 @@ static int uplink_send_keepalive(const int fd); * image. Uplinks run in their own thread. * Locks on: _images[].lock */ -int uplink_init(dnbd3_image_t *image) +int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) { dnbd3_connection_t *link = NULL; assert( image != NULL ); @@ -45,8 +45,14 @@ int uplink_init(dnbd3_image_t *image) link->queueLen = 0; link->fd = -1; link->signal = -1; - link->betterFd = -1; - link->rttTestResult = RTT_IDLE; + if ( sock >= 0 ) { + link->betterFd = sock; + link->betterServer = *host; + link->rttTestResult = RTT_DOCHANGE; + } else { + link->betterFd = -1; + link->rttTestResult = RTT_IDLE; + } link->recvBufferLen = 0; link->shutdown = FALSE; spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); @@ -83,6 +89,7 @@ void uplink_shutdown(dnbd3_image_t *image) /** * Remove given client from uplink request queue + * Locks on: uplink.queueLock, client.sendMutex */ void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client) { @@ -201,6 +208,35 @@ static void* uplink_mainloop(void *data) } } while ( !_shutdown && !link->shutdown ) { + // Check if server switch is in order + if ( link->rttTestResult == RTT_DOCHANGE ) { + link->rttTestResult = RTT_IDLE; + // The rttTest worker thread has finished our request. + // And says it's better to switch to another server + const int fd = link->fd; + link->fd = link->betterFd; + if ( fd != -1 ) close( fd ); + // Re-send all pending requests + uplink_send_requests( link, FALSE ); + link->betterFd = -1; + link->currentServer = link->betterServer; + link->image->working = TRUE; + buffer[0] = '@'; + if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { + printf( "[DEBUG] Now connected to %s\n", buffer + 1 ); + setThreadName( buffer ); + } + memset( &ev, 0, sizeof(ev) ); + ev.events = EPOLLIN; + ev.data.fd = link->fd; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { + memlogf( "[WARNING] adding uplink to epoll set failed" ); + goto cleanup; + } + nextAltCheck = time( NULL ) + altCheckInterval; + // The rtt worker already did the handshake for our image, so there's nothing + // more to do here + } // epoll() if ( link->fd == -1 ) { waitTime = 2000; @@ -216,7 +252,8 @@ static void* uplink_mainloop(void *data) usleep( 10000 ); continue; } - for (i = 0; i < numSocks; ++i) { // Check all events + // Check all events + for (i = 0; i < numSocks; ++i) { if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { if ( events[i].data.fd == link->signal ) { memlogf( "[WARNING] epoll error on signal-pipe!" ); @@ -259,35 +296,6 @@ static void* uplink_mainloop(void *data) } } // Done handling epoll sockets - // Check if server switch is in order - if ( link->rttTestResult == RTT_DOCHANGE ) { - link->rttTestResult = RTT_IDLE; - // The rttTest worker thread has finished our request. - // And says it's better to switch to another server - const int fd = link->fd; - link->fd = link->betterFd; - if ( fd != -1 ) close( fd ); - // Re-send all pending requests - uplink_send_requests( link, FALSE ); - link->betterFd = -1; - link->currentServer = link->betterServer; - link->image->working = TRUE; - buffer[0] = '@'; - if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) { - printf( "[DEBUG] Now connected to %s\n", buffer + 1 ); - setThreadName( buffer ); - } - memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN; - ev.data.fd = link->fd; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { - memlogf( "[WARNING] adding uplink to epoll set failed" ); - goto cleanup; - } - nextAltCheck = time( NULL ) + altCheckInterval; - // The rtt worker already did the handshake for our image, so there's nothing - // more to do here - } // See if we should trigger a RTT measurement if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) { const time_t now = time( NULL ); @@ -315,7 +323,7 @@ static void* uplink_mainloop(void *data) } } else { // Not complete - do measurement - altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous) + altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous) // Also send a keepalive packet to the currently connected server if ( link->fd != -1 ) { if ( !uplink_send_keepalive( link->fd ) ) { -- cgit v1.2.3-55-g7522