summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorsr2013-07-25 23:44:18 +0200
committersr2013-07-25 23:44:18 +0200
commitc4d2c6c6753fd2c41c3db1877c5c46613dc45510 (patch)
treeda8fabd3a41935ab8650500d217c6555887a3da1 /src/server/uplink.c
parent...Working on proxy mode... (diff)
downloaddnbd3-c4d2c6c6753fd2c41c3db1877c5c46613dc45510.tar.gz
dnbd3-c4d2c6c6753fd2c41c3db1877c5c46613dc45510.tar.xz
dnbd3-c4d2c6c6753fd2c41c3db1877c5c46613dc45510.zip
Work in progress: uplink
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c44
1 files changed, 34 insertions, 10 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 92cf944..0116fda 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -2,6 +2,7 @@
#include "locks.h"
#include "memlog.h"
#include "sockhelper.h"
+#include "image.h"
#include <pthread.h>
#include <sys/socket.h>
#include <string.h>
@@ -108,6 +109,7 @@ int uplink_init(dnbd3_image_t *image)
link->betterFd = -1;
link->rttTestResult = RTT_IDLE;
link->recvBufferLen = 0;
+ link->shutdown = FALSE;
spin_init( &link->lock, PTHREAD_PROCESS_PRIVATE );
if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) {
memlogf( "[ERROR] Could not start thread for new client." );
@@ -122,13 +124,21 @@ int uplink_init(dnbd3_image_t *image)
return FALSE;
}
-void uplink_shutdown(dnbd3_connection_t *uplink)
+dnbd3_connection_t* uplink_shutdown(dnbd3_connection_t *uplink)
{
assert( uplink != NULL );
- if ( uplink->fd != -1 ) close( uplink->fd );
+ if ( uplink->shutdown ) return NULL ;
+ uplink->shutdown = TRUE;
+ if ( uplink->signal != -1 ) write( uplink->signal, uplink, 1 );
pthread_join( uplink->thread, NULL );
+ free( uplink );
+ return NULL ;
}
+/**
+ * Uplink thread.
+ * Locks are irrelevant as this is never called from another function
+ */
static void* uplink_mainloop(void *data)
{
const int MAXEVENTS = 3;
@@ -166,7 +176,7 @@ static void* uplink_mainloop(void *data)
goto cleanup;
}
}
- while ( !_shutdown ) {
+ while ( !_shutdown && !link->shutdown ) {
if ( link->rttTestResult == RTT_DOCHANGE ) {
link->rttTestResult = RTT_IDLE;
// The rttTest worker thread has finished our request.
@@ -193,6 +203,7 @@ static void* uplink_mainloop(void *data)
if ( waitTime < 1500 ) waitTime = 1500;
}
numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime );
+ if ( _shutdown || link->shutdown ) break;
if ( numSocks < 0 ) { // Error?
memlogf( "[DEBUG] epoll_wait() error %d", (int)errno);
usleep( 10000 );
@@ -235,18 +246,22 @@ static void* uplink_mainloop(void *data)
return NULL ;
}
+/**
+ * Receive data from uplink server and process/dispatch
+ * Locks on: link.lock, indirectly on images[].lock
+ */
static void uplink_handle_receive(dnbd3_connection_t *link)
{
dnbd3_reply_t reply;
int ret, i;
ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL );
if ( ret != sizeof reply ) {
- memlogf( "[INFO] Lost connection to uplink server." );
+ memlogf( "[INFO] Lost connection to uplink server for %s", link->image->path );
goto error_cleanup;
}
fixup_reply( reply );
if ( reply.size > 9000000 ) {
- memlogf( "[WARNING] Pure evil: Uplink server sent too much payload!" );
+ memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
goto error_cleanup;
}
if ( link->recvBufferLen < reply.size ) {
@@ -258,19 +273,16 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
while ( done < reply.size ) {
ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 );
if ( ret <= 0 ) {
- memlogf( "[INFO] Lost connection to uplink server" );
+ memlogf( "[INFO] Lost connection to uplink server of", link->image->path );
goto error_cleanup;
}
done += ret;
}
// Payload read completely
- // 1) Write to cache file
- assert( link->image->cacheFd != -1 );
- // 2) Figure out which clients are interested in it
+ // 1) Figure out which clients are interested in it
const uint64_t start = reply.handle;
const uint64_t end = reply.handle + reply.size;
struct iovec iov[2];
- reply.magic = dnbd3_packet_magic;
spin_lock( &link->lock );
for (i = 0; i < link->queuelen; ++i) {
dnbd3_queued_request_t * const req = &link->queue[i];
@@ -280,6 +292,18 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
req->status = ULR_PROCESSING;
}
}
+ spin_unlock( &link->lock );
+ // 2) Write to cache file
+ assert( link->image->cacheFd != -1 );
+ if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) {
+ memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path );
+ } else {
+ ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size );
+ if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE);
+ }
+ // 3) Send to interested clients
+ reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here
+ spin_lock( &link->lock );
for (i = link->queuelen - 1; i >= 0; --i) {
dnbd3_queued_request_t * const req = &link->queue[i];
if ( req->status != ULR_PROCESSING ) continue;