From c4d2c6c6753fd2c41c3db1877c5c46613dc45510 Mon Sep 17 00:00:00 2001 From: sr Date: Thu, 25 Jul 2013 23:44:18 +0200 Subject: Work in progress: uplink --- src/server/uplink.c | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 92cf944..0116fda 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -2,6 +2,7 @@ #include "locks.h" #include "memlog.h" #include "sockhelper.h" +#include "image.h" #include #include #include @@ -108,6 +109,7 @@ int uplink_init(dnbd3_image_t *image) link->betterFd = -1; link->rttTestResult = RTT_IDLE; link->recvBufferLen = 0; + link->shutdown = FALSE; spin_init( &link->lock, PTHREAD_PROCESS_PRIVATE ); if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) { memlogf( "[ERROR] Could not start thread for new client." ); @@ -122,13 +124,21 @@ int uplink_init(dnbd3_image_t *image) return FALSE; } -void uplink_shutdown(dnbd3_connection_t *uplink) +dnbd3_connection_t* uplink_shutdown(dnbd3_connection_t *uplink) { assert( uplink != NULL ); - if ( uplink->fd != -1 ) close( uplink->fd ); + if ( uplink->shutdown ) return NULL ; + uplink->shutdown = TRUE; + if ( uplink->signal != -1 ) write( uplink->signal, uplink, 1 ); pthread_join( uplink->thread, NULL ); + free( uplink ); + return NULL ; } +/** + * Uplink thread. + * Locks are irrelevant as this is never called from another function + */ static void* uplink_mainloop(void *data) { const int MAXEVENTS = 3; @@ -166,7 +176,7 @@ static void* uplink_mainloop(void *data) goto cleanup; } } - while ( !_shutdown ) { + while ( !_shutdown && !link->shutdown ) { if ( link->rttTestResult == RTT_DOCHANGE ) { link->rttTestResult = RTT_IDLE; // The rttTest worker thread has finished our request. @@ -193,6 +203,7 @@ static void* uplink_mainloop(void *data) if ( waitTime < 1500 ) waitTime = 1500; } numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime ); + if ( _shutdown || link->shutdown ) break; if ( numSocks < 0 ) { // Error? memlogf( "[DEBUG] epoll_wait() error %d", (int)errno); usleep( 10000 ); @@ -235,18 +246,22 @@ static void* uplink_mainloop(void *data) return NULL ; } +/** + * Receive data from uplink server and process/dispatch + * Locks on: link.lock, indirectly on images[].lock + */ static void uplink_handle_receive(dnbd3_connection_t *link) { dnbd3_reply_t reply; int ret, i; ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL ); if ( ret != sizeof reply ) { - memlogf( "[INFO] Lost connection to uplink server." ); + memlogf( "[INFO] Lost connection to uplink server for %s", link->image->path ); goto error_cleanup; } fixup_reply( reply ); if ( reply.size > 9000000 ) { - memlogf( "[WARNING] Pure evil: Uplink server sent too much payload!" ); + memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } if ( link->recvBufferLen < reply.size ) { @@ -258,19 +273,16 @@ static void uplink_handle_receive(dnbd3_connection_t *link) while ( done < reply.size ) { ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 ); if ( ret <= 0 ) { - memlogf( "[INFO] Lost connection to uplink server" ); + memlogf( "[INFO] Lost connection to uplink server of", link->image->path ); goto error_cleanup; } done += ret; } // Payload read completely - // 1) Write to cache file - assert( link->image->cacheFd != -1 ); - // 2) Figure out which clients are interested in it + // 1) Figure out which clients are interested in it const uint64_t start = reply.handle; const uint64_t end = reply.handle + reply.size; struct iovec iov[2]; - reply.magic = dnbd3_packet_magic; spin_lock( &link->lock ); for (i = 0; i < link->queuelen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; @@ -280,6 +292,18 @@ static void uplink_handle_receive(dnbd3_connection_t *link) req->status = ULR_PROCESSING; } } + spin_unlock( &link->lock ); + // 2) Write to cache file + assert( link->image->cacheFd != -1 ); + if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { + memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); + } else { + ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size ); + if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE); + } + // 3) Send to interested clients + reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here + spin_lock( &link->lock ); for (i = link->queuelen - 1; i >= 0; --i) { dnbd3_queued_request_t * const req = &link->queue[i]; if ( req->status != ULR_PROCESSING ) continue; -- cgit v1.2.3-55-g7522