summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2013-08-26 18:59:22 +0200
committerSimon Rettberg2013-08-26 18:59:22 +0200
commitcfb0fba59db0937a00ff04b03aaa28ca671fe4d7 (patch)
treee24109886f706b96f87903fab9e786153f628e9f /src/server/uplink.c
parent[SERVER] WIP: On-the-fly image cloning (diff)
downloaddnbd3-cfb0fba59db0937a00ff04b03aaa28ca671fe4d7.tar.gz
dnbd3-cfb0fba59db0937a00ff04b03aaa28ca671fe4d7.tar.xz
dnbd3-cfb0fba59db0937a00ff04b03aaa28ca671fe4d7.zip
[SERVER] On-the-fly transparent proxying
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c76
1 files changed, 42 insertions, 34 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 38b1415..dc2874e 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -30,7 +30,7 @@ static int uplink_send_keepalive(const int fd);
* image. Uplinks run in their own thread.
* Locks on: _images[].lock
*/
-int uplink_init(dnbd3_image_t *image)
+int uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
{
dnbd3_connection_t *link = NULL;
assert( image != NULL );
@@ -45,8 +45,14 @@ int uplink_init(dnbd3_image_t *image)
link->queueLen = 0;
link->fd = -1;
link->signal = -1;
- link->betterFd = -1;
- link->rttTestResult = RTT_IDLE;
+ if ( sock >= 0 ) {
+ link->betterFd = sock;
+ link->betterServer = *host;
+ link->rttTestResult = RTT_DOCHANGE;
+ } else {
+ link->betterFd = -1;
+ link->rttTestResult = RTT_IDLE;
+ }
link->recvBufferLen = 0;
link->shutdown = FALSE;
spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
@@ -83,6 +89,7 @@ void uplink_shutdown(dnbd3_image_t *image)
/**
* Remove given client from uplink request queue
+ * Locks on: uplink.queueLock, client.sendMutex
*/
void uplink_removeClient(dnbd3_connection_t *uplink, dnbd3_client_t *client)
{
@@ -201,6 +208,35 @@ static void* uplink_mainloop(void *data)
}
}
while ( !_shutdown && !link->shutdown ) {
+ // Check if server switch is in order
+ if ( link->rttTestResult == RTT_DOCHANGE ) {
+ link->rttTestResult = RTT_IDLE;
+ // The rttTest worker thread has finished our request.
+ // And says it's better to switch to another server
+ const int fd = link->fd;
+ link->fd = link->betterFd;
+ if ( fd != -1 ) close( fd );
+ // Re-send all pending requests
+ uplink_send_requests( link, FALSE );
+ link->betterFd = -1;
+ link->currentServer = link->betterServer;
+ link->image->working = TRUE;
+ buffer[0] = '@';
+ if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
+ printf( "[DEBUG] Now connected to %s\n", buffer + 1 );
+ setThreadName( buffer );
+ }
+ memset( &ev, 0, sizeof(ev) );
+ ev.events = EPOLLIN;
+ ev.data.fd = link->fd;
+ if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
+ memlogf( "[WARNING] adding uplink to epoll set failed" );
+ goto cleanup;
+ }
+ nextAltCheck = time( NULL ) + altCheckInterval;
+ // The rtt worker already did the handshake for our image, so there's nothing
+ // more to do here
+ }
// epoll()
if ( link->fd == -1 ) {
waitTime = 2000;
@@ -216,7 +252,8 @@ static void* uplink_mainloop(void *data)
usleep( 10000 );
continue;
}
- for (i = 0; i < numSocks; ++i) { // Check all events
+ // Check all events
+ for (i = 0; i < numSocks; ++i) {
if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) {
if ( events[i].data.fd == link->signal ) {
memlogf( "[WARNING] epoll error on signal-pipe!" );
@@ -259,35 +296,6 @@ static void* uplink_mainloop(void *data)
}
}
// Done handling epoll sockets
- // Check if server switch is in order
- if ( link->rttTestResult == RTT_DOCHANGE ) {
- link->rttTestResult = RTT_IDLE;
- // The rttTest worker thread has finished our request.
- // And says it's better to switch to another server
- const int fd = link->fd;
- link->fd = link->betterFd;
- if ( fd != -1 ) close( fd );
- // Re-send all pending requests
- uplink_send_requests( link, FALSE );
- link->betterFd = -1;
- link->currentServer = link->betterServer;
- link->image->working = TRUE;
- buffer[0] = '@';
- if ( host_to_string( &link->currentServer, buffer + 1, sizeof(buffer) - 1 ) ) {
- printf( "[DEBUG] Now connected to %s\n", buffer + 1 );
- setThreadName( buffer );
- }
- memset( &ev, 0, sizeof(ev) );
- ev.events = EPOLLIN;
- ev.data.fd = link->fd;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
- memlogf( "[WARNING] adding uplink to epoll set failed" );
- goto cleanup;
- }
- nextAltCheck = time( NULL ) + altCheckInterval;
- // The rtt worker already did the handshake for our image, so there's nothing
- // more to do here
- }
// See if we should trigger a RTT measurement
if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
const time_t now = time( NULL );
@@ -315,7 +323,7 @@ static void* uplink_mainloop(void *data)
}
} else {
// Not complete - do measurement
- altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ altservers_findUplink( link ); // This will set RTT_INPROGRESS (synchronous)
// Also send a keepalive packet to the currently connected server
if ( link->fd != -1 ) {
if ( !uplink_send_keepalive( link->fd ) ) {