summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorsr2013-07-26 18:42:52 +0200
committersr2013-07-26 18:42:52 +0200
commit8b65d18653bb7a5c7aba714de0767a1e93ef78c1 (patch)
tree207212bdedaa918c2dc84005fe54826db40b26a6 /src/server/uplink.c
parentWork in progress: uplink (diff)
downloaddnbd3-8b65d18653bb7a5c7aba714de0767a1e93ef78c1.tar.gz
dnbd3-8b65d18653bb7a5c7aba714de0767a1e93ef78c1.tar.xz
dnbd3-8b65d18653bb7a5c7aba714de0767a1e93ef78c1.zip
[SERVER] Still working on the uplink... Almost there
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c190
1 files changed, 140 insertions, 50 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 0116fda..ab23b70 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -18,6 +18,7 @@ int _num_alts = 0;
pthread_spinlock_t _alts_lock;
static void* uplink_mainloop(void *data);
+static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
static void uplink_handle_receive(dnbd3_connection_t *link);
/**
@@ -103,14 +104,14 @@ int uplink_init(dnbd3_image_t *image)
}
link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) );
link->image = image;
- link->queuelen = 0;
+ link->queueLen = 0;
link->fd = -1;
link->signal = -1;
link->betterFd = -1;
link->rttTestResult = RTT_IDLE;
link->recvBufferLen = 0;
link->shutdown = FALSE;
- spin_init( &link->lock, PTHREAD_PROCESS_PRIVATE );
+ spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE );
if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) {
memlogf( "[ERROR] Could not start thread for new client." );
goto failure;
@@ -124,15 +125,61 @@ int uplink_init(dnbd3_image_t *image)
return FALSE;
}
-dnbd3_connection_t* uplink_shutdown(dnbd3_connection_t *uplink)
+void uplink_shutdown(dnbd3_image_t *image)
{
- assert( uplink != NULL );
- if ( uplink->shutdown ) return NULL ;
+ assert( image != NULL );
+ if ( image->uplink == NULL || image->uplink->shutdown ) return;
+ dnbd3_connection_t * const uplink = image->uplink;
+ image->uplink = NULL;
uplink->shutdown = TRUE;
- if ( uplink->signal != -1 ) write( uplink->signal, uplink, 1 );
+ if ( uplink->signal != -1 ) write( uplink->signal, "", 1 );
pthread_join( uplink->thread, NULL );
+ spin_lock( &uplink->queueLock );
+ spin_unlock( &uplink->queueLock );
+ spin_destroy( &uplink->queueLock );
free( uplink );
- return NULL ;
+}
+
+/**
+ * Request a chunk of data through an uplink server
+ */
+int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
+{
+ if ( client == NULL || client->image == NULL || client->image->uplink == NULL ) return FALSE;
+ dnbd3_connection_t * const uplink = client->image->uplink;
+ int foundExisting = FALSE; // Is there a pending request that is a superset of our range?
+ int i;
+ int freeSlot = -1;
+ const uint64_t end = start + length;
+
+ spin_lock( &uplink->queueLock );
+ for (i = 0; i < uplink->queueLen; ++i) {
+ if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i;
+ if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
+ if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) {
+ foundExisting = TRUE;
+ break;
+ }
+ }
+ if ( freeSlot == -1 ) {
+ if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) {
+ spin_unlock( &uplink->queueLock );
+ memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." );
+ return FALSE;
+ }
+ freeSlot = uplink->queueLen++;
+ }
+ uplink->queue[freeSlot].from = start;
+ uplink->queue[freeSlot].to = end;
+ uplink->queue[freeSlot].handle = handle;
+ uplink->queue[freeSlot].client = client;
+ uplink->queue[freeSlot].status = (foundExisting ? ULR_PENDING : ULR_NEW);
+ spin_unlock( &uplink->queueLock );
+
+ if ( !foundExisting ) {
+ write( uplink->signal, "", 1 );
+ }
+ return TRUE;
}
/**
@@ -151,7 +198,7 @@ static void* uplink_mainloop(void *data)
char buffer[100];
//
assert( link != NULL );
- assert( link->queuelen == 0 );
+ assert( link->queueLen == 0 );
//
fdEpoll = epoll_create( 2 );
if ( fdEpoll == -1 ) {
@@ -177,24 +224,6 @@ static void* uplink_mainloop(void *data)
}
}
while ( !_shutdown && !link->shutdown ) {
- if ( link->rttTestResult == RTT_DOCHANGE ) {
- link->rttTestResult = RTT_IDLE;
- // The rttTest worker thread has finished our request.
- // And says it's better to switch to another server
- if ( link->fd != -1 ) close( link->fd );
- link->fd = link->betterFd;
- link->betterFd = -1;
- link->currentServer = link->betterServer;
- memset( &ev, 0, sizeof(ev) );
- ev.events = EPOLLIN;
- ev.data.fd = link->fd;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
- memlogf( "[WARNING] adding uplink to epoll set failed" );
- goto cleanup;
- }
- // The rtt worker already did the handshake for our image, so there's nothing
- // more to do here
- }
// epoll()
if ( link->fd == -1 ) {
waitTime = 1500;
@@ -215,11 +244,14 @@ static void* uplink_mainloop(void *data)
memlogf( "[WARNING] epoll error on signal-pipe!" );
goto cleanup;
}
- close( events[i].data.fd );
if ( events[i].data.fd == link->fd ) {
link->fd = -1;
+ close( events[i].data.fd );
printf( "[DEBUG] Uplink gone away, panic!\n" );
nextAltCheck = 0;
+ } else {
+ printf( "[DEBUG] Error on unknown FD in uplink epoll" );
+ close( events[i].data.fd );
}
continue;
}
@@ -227,6 +259,9 @@ static void* uplink_mainloop(void *data)
if ( events[i].data.fd == fdPipe ) {
while ( read( fdPipe, buffer, sizeof buffer ) > 0 ) {
} // Throw data away, this is just used for waking this thread up
+ if ( link->fd != -1 ) {
+ uplink_send_requests( link, TRUE );
+ }
} else if ( events[i].data.fd == link->fd ) {
uplink_handle_receive( link );
if ( link->fd == -1 ) nextAltCheck = 0;
@@ -235,17 +270,71 @@ static void* uplink_mainloop(void *data)
close( events[i].data.fd );
}
}
+ // Done handling epoll sockets
+ // Check if server switch is in order
+ if ( link->rttTestResult == RTT_DOCHANGE ) {
+ link->rttTestResult = RTT_IDLE;
+ // The rttTest worker thread has finished our request.
+ // And says it's better to switch to another server
+ const int fd = link->fd;
+ link->fd = link->betterFd;
+ if ( fd != -1 ) close( fd );
+ // Re-send all pending requests
+ uplink_send_requests( link, FALSE );
+ link->betterFd = -1;
+ link->currentServer = link->betterServer;
+ memset( &ev, 0, sizeof(ev) );
+ ev.events = EPOLLIN;
+ ev.data.fd = link->fd;
+ if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
+ memlogf( "[WARNING] adding uplink to epoll set failed" );
+ goto cleanup;
+ }
+ nextAltCheck = time( NULL ) + altCheckInterval;
+ // The rtt worker already did the handshake for our image, so there's nothing
+ // more to do here
+ }
}
cleanup: ;
- if ( link->fd != -1 ) close( link->fd );
+ const int fd = link->fd;
+ const int signal = link->signal;
link->fd = -1;
- if ( link->signal != -1 ) close( link->signal );
link->signal = -1;
+ if ( fd != -1 ) close( fd );
+ if ( signal != -1 ) close( signal );
if ( fdPipe != -1 ) close( fdPipe );
if ( fdEpoll != -1 ) close( fdEpoll );
return NULL ;
}
+static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
+{
+ // Scan for new requests
+ int j;
+ dnbd3_request_t request;
+ request.magic = dnbd3_packet_magic;
+ spin_lock( &link->queueLock );
+ for (j = 0; j < link->queueLen; ++j) {
+ if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
+ link->queue[j].status = ULR_PENDING;
+ request.handle = link->queue[j].handle;
+ request.cmd = CMD_GET_BLOCK;
+ request.offset = link->queue[j].from;
+ request.size = link->queue[j].to - link->queue[j].from;
+ spin_unlock( &link->queueLock );
+ fixup_request( request );
+ const int ret = write( link->fd, &request, sizeof request );
+ if ( ret != sizeof(request) ) {
+ // Non-critical - if the connection dropped or the server was changed
+ // the thread will re-send this request as soon as the connection
+ // is reestablished.
+ printf( "[DEBUG] Error sending request to uplink server!" );
+ }
+ spin_lock( &link->queueLock );
+ }
+ spin_unlock( &link->queueLock );
+}
+
/**
* Receive data from uplink server and process/dispatch
* Locks on: link.lock, indirectly on images[].lock
@@ -279,12 +368,20 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
done += ret;
}
// Payload read completely
- // 1) Figure out which clients are interested in it
const uint64_t start = reply.handle;
const uint64_t end = reply.handle + reply.size;
+ // 1) Write to cache file
+ assert( link->image->cacheFd != -1 );
+ if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
+ memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
+ } else {
+ ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size );
+ if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE );
+ }
+ // 2) Figure out which clients are interested in it
struct iovec iov[2];
- spin_lock( &link->lock );
- for (i = 0; i < link->queuelen; ++i) {
+ spin_lock( &link->queueLock );
+ for (i = 0; i < link->queueLen; ++i) {
dnbd3_queued_request_t * const req = &link->queue[i];
assert( req->status != ULR_PROCESSING );
if ( req->status != ULR_PENDING ) continue;
@@ -292,19 +389,9 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
req->status = ULR_PROCESSING;
}
}
- spin_unlock( &link->lock );
- // 2) Write to cache file
- assert( link->image->cacheFd != -1 );
- if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
- memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
- } else {
- ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size );
- if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE);
- }
// 3) Send to interested clients
reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here
- spin_lock( &link->lock );
- for (i = link->queuelen - 1; i >= 0; --i) {
+ for (i = link->queueLen - 1; i >= 0; --i) {
dnbd3_queued_request_t * const req = &link->queue[i];
if ( req->status != ULR_PROCESSING ) continue;
assert( req->from >= start && req->to <= end );
@@ -316,18 +403,21 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
iov[1].iov_base = link->recvBuffer + (req->from - start);
iov[1].iov_len = reply.size;
fixup_reply( reply );
- spin_unlock( &link->lock );
+ spin_unlock( &link->queueLock );
// send: Don't care about errors here, let the client
- // connection thread deal with it if something goes wrong here
- writev( req->socket, iov, 2 );
- spin_lock( &link->lock );
+ // connection thread deal with it if something goes wrong
+ pthread_mutex_lock( &req->client->sendMutex );
+ writev( req->client->sock, iov, 2 );
+ pthread_mutex_unlock( &req->client->sendMutex );
+ spin_lock( &link->queueLock );
req->status = ULR_FREE;
- if ( i > 20 && i == link->queuelen - 1 ) link->queuelen--;
+ if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--;
}
- spin_unlock( &link->lock );
+ spin_unlock( &link->queueLock );
return;
error_cleanup: ;
- close( link->fd );
+ const int fd = link->fd;
link->fd = -1;
+ if ( fd != -1 ) close( fd );
return;
}