From 8b65d18653bb7a5c7aba714de0767a1e93ef78c1 Mon Sep 17 00:00:00 2001 From: sr Date: Fri, 26 Jul 2013 18:42:52 +0200 Subject: [SERVER] Still working on the uplink... Almost there --- src/server/globals.h | 24 ++++--- src/server/image.c | 10 ++- src/server/net.c | 166 ++++++++++++++++++-------------------------- src/server/server.c | 12 +++- src/server/uplink.c | 190 +++++++++++++++++++++++++++++++++++++-------------- src/server/uplink.h | 4 +- src/types.h | 4 +- 7 files changed, 245 insertions(+), 165 deletions(-) (limited to 'src') diff --git a/src/server/globals.h b/src/server/globals.h index 106531e..7f47288 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -10,22 +10,26 @@ typedef struct _dnbd3_connection dnbd3_connection_t; typedef struct _dnbd3_image dnbd3_image_t; +typedef struct _dnbd3_client dnbd3_client_t; // Slot is free, can be used. // Must only be set in uplink_handle_receive() or uplink_remove_client() #define ULR_FREE 0 -// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. +// Slot has been filled with a request that hasn't been sent to the upstream server yet, matching request can safely rely on reuse. // Must only be set in uplink_request() -#define ULR_PENDING 1 +#define ULR_NEW 1 +// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. +// Must only be set in uplink_mainloop() +#define ULR_PENDING 2 // Slot is being processed, do not consider for hop on. // Must only be set in uplink_handle_receive() -#define ULR_PROCESSING 2 +#define ULR_PROCESSING 3 typedef struct { uint64_t handle; // Client defined handle to pass back in reply uint64_t from; // First byte offset of requested block (ie. 4096) volatile uint32_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) - volatile int socket; // Socket to send reply to + dnbd3_client_t * volatile client; // Client to send reply to volatile int status; // status of this entry: ULR_* } dnbd3_queued_request_t; @@ -38,9 +42,9 @@ struct _dnbd3_connection int fd; // socket fd to remote server int signal; // write end of pipe used to wake up the process pthread_t thread; // thread holding the connection - pthread_spinlock_t lock; // lock for synchronization on request queue etc. + pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; - volatile int queuelen; // length of queue + volatile int queueLen; // length of queue dnbd3_image_t *image; // image that this uplink is used for do not call get/release for this pointer dnbd3_host_t currentServer; // Current server we're connected to volatile int rttTestResult; // RTT_* @@ -48,7 +52,7 @@ struct _dnbd3_connection int betterFd; // Active connection to better server, ready to use uint8_t *recvBuffer; // Buffer for receiving payload int recvBufferLen; // Len of ^^ - volatile int shutdown; // bool to signal thread to stop + volatile int shutdown; // bool to signal thread to stop, must only be set from uplink_shutdown() }; typedef struct @@ -99,7 +103,7 @@ struct _dnbd3_image pthread_spinlock_t lock; }; -typedef struct +struct _dnbd3_client { int sock; dnbd3_host_t host; @@ -107,8 +111,8 @@ typedef struct pthread_t thread; dnbd3_image_t *image; pthread_spinlock_t lock; - //GSList *sendqueue; // list of dnbd3_binstring_t* -} dnbd3_client_t; + pthread_mutex_t sendMutex; +}; // ####################################################### diff --git a/src/server/image.c b/src/server/image.c index abecda5..2a24092 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -106,6 +106,7 @@ void image_update_cachemap(dnbd3_image_t *image, uint64_t start, uint64_t end, c while ( pos < end ) { const int block = pos / HASH_BLOCK_SIZE; // TODO: Actually queue the hash block for checking as soon as there's a worker for that + (void)block; pos += HASH_BLOCK_SIZE; } } @@ -250,7 +251,7 @@ dnbd3_image_t* image_free(dnbd3_image_t *image) free( image->crc32 ); free( image->path ); free( image->lower_name ); - image->uplink = uplink_shutdown( image->uplink ); + uplink_shutdown( image ); if ( image->cacheFd != -1 ) close( image->cacheFd ); spin_destroy( &image->lock ); // @@ -482,6 +483,10 @@ static int image_try_load(char *base, char *path) } else if ( existing->crc32 != NULL && crc32list != NULL && memcmp( existing->crc32, crc32list, sizeof(uint32_t) * hashBlocks ) != 0 ) { memlogf( "[WARNING] CRC32 list of image '%s' has changed.", path ); + } else if ( existing->crc32 == NULL && crc32list != NULL ) { + memlogf( "[INFO] Found CRC-32 list for already loaded image, adding...", path ); + existing->crc32 = crc32list; + crc32list = NULL; } else { function_return = TRUE; goto load_error; @@ -515,7 +520,8 @@ static int image_try_load(char *base, char *path) free( image->cache_map ); image->cache_map = NULL; image->working = TRUE; - } else { + } + if ( image->cache_map != NULL ) { image->working = FALSE; image->cacheFd = open( path, O_WRONLY ); if ( image->cacheFd < 0 ) { diff --git a/src/server/net.c b/src/server/net.c index 16b202f..1383454 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "sockhelper.h" #include "helper.h" @@ -123,15 +124,7 @@ void *net_client_handler(void *dnbd3_client) serialized_buffer_t payload; char *image_name; uint16_t rid, client_version; - - /* - char map_x, bit_mask; - uint64_t map_y; - uint64_t todo_size = 0; - uint64_t todo_offset = 0; - uint64_t cur_offset = 0; - uint64_t last_offset = 0; - */ + uint64_t start, end; dnbd3_server_entry_t server_list[NUMBER_SERVERS]; @@ -175,7 +168,6 @@ void *net_client_handler(void *dnbd3_client) if ( send_reply( client->sock, &reply, &payload ) ) { client->image = image; if ( !client->is_server ) image->atime = time( NULL ); - bOk = TRUE; } } @@ -216,109 +208,85 @@ void *net_client_handler(void *dnbd3_client) break; } + if ( request.size != 0 && image->cache_map != NULL ) { + // This is a proxyed image, check if we need to relay the request... + start = request.offset & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + end = (request.offset + request.size + DNBD3_BLOCK_SIZE - 1) & ~(uint64_t)(DNBD3_BLOCK_SIZE - 1); + int isCached = TRUE; + spin_lock( &image->lock ); + // Check again as we only aquired the lock just now + if ( image->cache_map != NULL ) { + // First byte + uint64_t pos = start; + const uint64_t firstByte = start >> 15; + const uint64_t lastByte = (end - 1) >> 15; + do { + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = 0b00000001 << map_x; + if ( (image->cache_map[firstByte] & bit_mask) == 0 ) { + isCached = FALSE; + break; + } + pos += DNBD3_BLOCK_SIZE; + } while ( firstByte == (pos >> 15) ); + // Middle - quick checking + if ( isCached ) { + pos = firstByte + 1; + while ( pos < lastByte ) { + if ( image->cache_map[pos] != 0xff ) { + isCached = FALSE; + break; + } + } + } + // Last byte + if ( isCached ) { + pos = lastByte << 15; + while ( pos < end ) { + assert( lastByte == (pos >> 15) ); + const int map_x = (pos >> 12) & 7; // mod 8 + const uint8_t bit_mask = 0b00000001 << map_x; + if ( (image->cache_map[lastByte] & bit_mask) == 0 ) { + isCached = FALSE; + break; + } + pos += DNBD3_BLOCK_SIZE; + } + } + } + spin_unlock( &image->lock ); + if ( !isCached ) { + if ( !uplink_request( client, request.handle, request.offset, request.size ) ) { + printf( "[DEBUG] Could not relay uncached request to upstream proxy\n" ); + goto exit_client_cleanup; + } + break; // DONE + } + } + reply.cmd = CMD_GET_BLOCK; reply.size = request.size; reply.handle = request.handle; fixup_reply( reply ); - if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), MSG_MORE ) != sizeof(dnbd3_reply_t) ) { + pthread_mutex_lock( &client->sendMutex ); + // Send reply header + if ( send( client->sock, &reply, sizeof(dnbd3_reply_t), (request.size == 0 ? 0 : MSG_MORE) ) != sizeof(dnbd3_reply_t) ) { + pthread_mutex_unlock( &client->sendMutex ); printf( "[DEBUG] Sending CMD_GET_BLOCK header failed\n" ); goto exit_client_cleanup; } - if ( request.size == 0 ) { // Request for 0 bytes, done after sending header - send( client->sock, &reply, 0, 0 ); // Since we used MSG_MORE above... - break; - } - - // no cache map means image is complete - if ( image->cache_map == NULL ) { + if ( request.size != 0 ) { + // Send payload if request length > 0 const ssize_t ret = sendfile( client->sock, image_file, (off_t *)&request.offset, request.size ); if ( ret != request.size ) { + pthread_mutex_unlock( &client->sendMutex ); printf( "[ERROR] sendfile failed (image to net %d/%d)\n", (int)ret, (int)request.size ); goto exit_client_cleanup; } - break; } - - printf( "[DEBUG] Caching/Proxying not implemented yet!\n" ); - goto exit_client_cleanup; - - /* - - // caching is on - dirty = 0; - todo_size = 0; - todo_offset = request.offset; - cur_offset = request.offset; - last_offset = request.offset + request.size; - - // first make sure the whole requested part is in the local cache file - while(cur_offset < last_offset) - { - map_y = cur_offset >> 15; // div 32768 - map_x = (cur_offset >> 12) & 7; // (X div 4096) mod 8 - bit_mask = 0b00000001 << (map_x); - - cur_offset += 4096; - - if ((image->cache_map[map_y] & bit_mask) != 0) // cache hit - { - if (todo_size != 0) // fetch missing chunks - { - lseek(image_cache, todo_offset, SEEK_SET); - if (sendfile(image_cache, image_file, (off_t *) &todo_offset, todo_size) != todo_size) - { - if (image->file == NULL) - printf("[ERROR] Device was closed when local copy was incomplete."); - printf("[ERROR] sendfile failed (copy to cache 1)\n"); - goto exit_client_cleanup; - } - todo_size = 0; - dirty = 1; - } - todo_offset = cur_offset; - } - else - { - todo_size += 4096; - } - } - - // whole request was missing - if (todo_size != 0) - { - lseek(image_cache, todo_offset, SEEK_SET); - if (sendfile(image_cache, image_file, (off_t *) &todo_offset, todo_size) != todo_size) - { - printf("[ERROR] sendfile failed (copy to cache 2)\n"); - goto exit_client_cleanup; - } - dirty = 1; - } - - if (dirty) // cache map needs to be updated as something was missing locally - { - // set 1 in cache map for whole request - cur_offset = request.offset; - while(cur_offset < last_offset) - { - map_y = cur_offset >> 15; - map_x = (cur_offset >> 12) & 7; // mod 8 - bit_mask = 0b00000001 << (map_x); - image->cache_map[map_y] |= bit_mask; - cur_offset += 4096; - } - } - - // send data to client - if (sendfile(client->sock, image_cache, (off_t *) &request.offset, request.size) != request.size) - { - memlogf("[ERROR] sendfile failed (cache to net)\n"); - close(client->sock); - client->sock = -1; - } - */ + pthread_mutex_unlock( &client->sendMutex ); break; case CMD_GET_SERVERS: @@ -377,5 +345,5 @@ void *net_client_handler(void *dnbd3_client) if ( image_file != -1 ) close( image_file ); dnbd3_remove_client( client ); client = dnbd3_free_client( client ); - pthread_exit( (void *)0 ); + return NULL ; } diff --git a/src/server/server.c b/src/server/server.c index 575d9c4..cc7a76a 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -28,6 +28,7 @@ #include #include #include +#include #include "../types.h" #include "../version.h" @@ -147,7 +148,7 @@ int main(int argc, char *argv[]) static const struct option longOpts[] = { { "file", required_argument, NULL, 'f' }, { "delay", required_argument, NULL, 'd' }, { "nodaemon", no_argument, NULL, 'n' }, { "reload", no_argument, NULL, 'r' }, { "stop", no_argument, NULL, 's' }, { "info", no_argument, NULL, 'i' }, { "help", no_argument, NULL, 'H' }, { "version", no_argument, NULL, 'v' }, { "crc", required_argument, - NULL, 'crc4' }, { 0, 0, 0, 0 } }; + NULL, 'crc4' }, { "assert", no_argument, NULL, 'asrt' }, { 0, 0, 0, 0 } }; opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); @@ -187,6 +188,11 @@ int main(int argc, char *argv[]) break; case 'crc4': return image_generate_crc_file( optarg ) ? 0 : EXIT_FAILURE; + case 'asrt': + printf("Testing a failing assertion:\n"); + assert( 4 == 5 ); + printf("Assertion 4 == 5 seems to hold. ;-)\n"); + return EXIT_SUCCESS; } opt = getopt_long( argc, argv, optString, longOpts, &longIndex ); } @@ -312,6 +318,7 @@ dnbd3_client_t* dnbd3_init_client(struct sockaddr_storage *client, int fd) } dnbd3_client->sock = fd; spin_init( &dnbd3_client->lock, PTHREAD_PROCESS_PRIVATE ); + pthread_mutex_init( &dnbd3_client->sendMutex, NULL ); return dnbd3_client; } @@ -352,6 +359,9 @@ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) client->image = NULL; spin_unlock( &client->lock ); spin_destroy( &client->lock ); + pthread_mutex_lock( &client->sendMutex ); + pthread_mutex_unlock( &client->sendMutex ); + pthread_mutex_destroy( &client->sendMutex ); free( client ); return NULL ; } diff --git a/src/server/uplink.c b/src/server/uplink.c index 0116fda..ab23b70 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -18,6 +18,7 @@ int _num_alts = 0; pthread_spinlock_t _alts_lock; static void* uplink_mainloop(void *data); +static void uplink_send_requests(dnbd3_connection_t *link, int newOnly); static void uplink_handle_receive(dnbd3_connection_t *link); /** @@ -103,14 +104,14 @@ int uplink_init(dnbd3_image_t *image) } link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); link->image = image; - link->queuelen = 0; + link->queueLen = 0; link->fd = -1; link->signal = -1; link->betterFd = -1; link->rttTestResult = RTT_IDLE; link->recvBufferLen = 0; link->shutdown = FALSE; - spin_init( &link->lock, PTHREAD_PROCESS_PRIVATE ); + spin_init( &link->queueLock, PTHREAD_PROCESS_PRIVATE ); if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) { memlogf( "[ERROR] Could not start thread for new client." ); goto failure; @@ -124,15 +125,61 @@ int uplink_init(dnbd3_image_t *image) return FALSE; } -dnbd3_connection_t* uplink_shutdown(dnbd3_connection_t *uplink) +void uplink_shutdown(dnbd3_image_t *image) { - assert( uplink != NULL ); - if ( uplink->shutdown ) return NULL ; + assert( image != NULL ); + if ( image->uplink == NULL || image->uplink->shutdown ) return; + dnbd3_connection_t * const uplink = image->uplink; + image->uplink = NULL; uplink->shutdown = TRUE; - if ( uplink->signal != -1 ) write( uplink->signal, uplink, 1 ); + if ( uplink->signal != -1 ) write( uplink->signal, "", 1 ); pthread_join( uplink->thread, NULL ); + spin_lock( &uplink->queueLock ); + spin_unlock( &uplink->queueLock ); + spin_destroy( &uplink->queueLock ); free( uplink ); - return NULL ; +} + +/** + * Request a chunk of data through an uplink server + */ +int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length) +{ + if ( client == NULL || client->image == NULL || client->image->uplink == NULL ) return FALSE; + dnbd3_connection_t * const uplink = client->image->uplink; + int foundExisting = FALSE; // Is there a pending request that is a superset of our range? + int i; + int freeSlot = -1; + const uint64_t end = start + length; + + spin_lock( &uplink->queueLock ); + for (i = 0; i < uplink->queueLen; ++i) { + if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i; + if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue; + if ( uplink->queue[i].from <= start && uplink->queue[i].to >= end ) { + foundExisting = TRUE; + break; + } + } + if ( freeSlot == -1 ) { + if ( uplink->queueLen >= SERVER_MAX_UPLINK_QUEUE ) { + spin_unlock( &uplink->queueLock ); + memlogf( "[WARNING] Uplink queue is full, consider increasing SERVER_MAX_UPLINK_QUEUE. Dropping client..." ); + return FALSE; + } + freeSlot = uplink->queueLen++; + } + uplink->queue[freeSlot].from = start; + uplink->queue[freeSlot].to = end; + uplink->queue[freeSlot].handle = handle; + uplink->queue[freeSlot].client = client; + uplink->queue[freeSlot].status = (foundExisting ? ULR_PENDING : ULR_NEW); + spin_unlock( &uplink->queueLock ); + + if ( !foundExisting ) { + write( uplink->signal, "", 1 ); + } + return TRUE; } /** @@ -151,7 +198,7 @@ static void* uplink_mainloop(void *data) char buffer[100]; // assert( link != NULL ); - assert( link->queuelen == 0 ); + assert( link->queueLen == 0 ); // fdEpoll = epoll_create( 2 ); if ( fdEpoll == -1 ) { @@ -177,24 +224,6 @@ static void* uplink_mainloop(void *data) } } while ( !_shutdown && !link->shutdown ) { - if ( link->rttTestResult == RTT_DOCHANGE ) { - link->rttTestResult = RTT_IDLE; - // The rttTest worker thread has finished our request. - // And says it's better to switch to another server - if ( link->fd != -1 ) close( link->fd ); - link->fd = link->betterFd; - link->betterFd = -1; - link->currentServer = link->betterServer; - memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN; - ev.data.fd = link->fd; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { - memlogf( "[WARNING] adding uplink to epoll set failed" ); - goto cleanup; - } - // The rtt worker already did the handshake for our image, so there's nothing - // more to do here - } // epoll() if ( link->fd == -1 ) { waitTime = 1500; @@ -215,11 +244,14 @@ static void* uplink_mainloop(void *data) memlogf( "[WARNING] epoll error on signal-pipe!" ); goto cleanup; } - close( events[i].data.fd ); if ( events[i].data.fd == link->fd ) { link->fd = -1; + close( events[i].data.fd ); printf( "[DEBUG] Uplink gone away, panic!\n" ); nextAltCheck = 0; + } else { + printf( "[DEBUG] Error on unknown FD in uplink epoll" ); + close( events[i].data.fd ); } continue; } @@ -227,6 +259,9 @@ static void* uplink_mainloop(void *data) if ( events[i].data.fd == fdPipe ) { while ( read( fdPipe, buffer, sizeof buffer ) > 0 ) { } // Throw data away, this is just used for waking this thread up + if ( link->fd != -1 ) { + uplink_send_requests( link, TRUE ); + } } else if ( events[i].data.fd == link->fd ) { uplink_handle_receive( link ); if ( link->fd == -1 ) nextAltCheck = 0; @@ -235,17 +270,71 @@ static void* uplink_mainloop(void *data) close( events[i].data.fd ); } } + // Done handling epoll sockets + // Check if server switch is in order + if ( link->rttTestResult == RTT_DOCHANGE ) { + link->rttTestResult = RTT_IDLE; + // The rttTest worker thread has finished our request. + // And says it's better to switch to another server + const int fd = link->fd; + link->fd = link->betterFd; + if ( fd != -1 ) close( fd ); + // Re-send all pending requests + uplink_send_requests( link, FALSE ); + link->betterFd = -1; + link->currentServer = link->betterServer; + memset( &ev, 0, sizeof(ev) ); + ev.events = EPOLLIN; + ev.data.fd = link->fd; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { + memlogf( "[WARNING] adding uplink to epoll set failed" ); + goto cleanup; + } + nextAltCheck = time( NULL ) + altCheckInterval; + // The rtt worker already did the handshake for our image, so there's nothing + // more to do here + } } cleanup: ; - if ( link->fd != -1 ) close( link->fd ); + const int fd = link->fd; + const int signal = link->signal; link->fd = -1; - if ( link->signal != -1 ) close( link->signal ); link->signal = -1; + if ( fd != -1 ) close( fd ); + if ( signal != -1 ) close( signal ); if ( fdPipe != -1 ) close( fdPipe ); if ( fdEpoll != -1 ) close( fdEpoll ); return NULL ; } +static void uplink_send_requests(dnbd3_connection_t *link, int newOnly) +{ + // Scan for new requests + int j; + dnbd3_request_t request; + request.magic = dnbd3_packet_magic; + spin_lock( &link->queueLock ); + for (j = 0; j < link->queueLen; ++j) { + if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue; + link->queue[j].status = ULR_PENDING; + request.handle = link->queue[j].handle; + request.cmd = CMD_GET_BLOCK; + request.offset = link->queue[j].from; + request.size = link->queue[j].to - link->queue[j].from; + spin_unlock( &link->queueLock ); + fixup_request( request ); + const int ret = write( link->fd, &request, sizeof request ); + if ( ret != sizeof(request) ) { + // Non-critical - if the connection dropped or the server was changed + // the thread will re-send this request as soon as the connection + // is reestablished. + printf( "[DEBUG] Error sending request to uplink server!" ); + } + spin_lock( &link->queueLock ); + } + spin_unlock( &link->queueLock ); +} + /** * Receive data from uplink server and process/dispatch * Locks on: link.lock, indirectly on images[].lock @@ -279,12 +368,20 @@ static void uplink_handle_receive(dnbd3_connection_t *link) done += ret; } // Payload read completely - // 1) Figure out which clients are interested in it const uint64_t start = reply.handle; const uint64_t end = reply.handle + reply.size; + // 1) Write to cache file + assert( link->image->cacheFd != -1 ); + if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { + memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); + } else { + ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size ); + if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE ); + } + // 2) Figure out which clients are interested in it struct iovec iov[2]; - spin_lock( &link->lock ); - for (i = 0; i < link->queuelen; ++i) { + spin_lock( &link->queueLock ); + for (i = 0; i < link->queueLen; ++i) { dnbd3_queued_request_t * const req = &link->queue[i]; assert( req->status != ULR_PROCESSING ); if ( req->status != ULR_PENDING ) continue; @@ -292,19 +389,9 @@ static void uplink_handle_receive(dnbd3_connection_t *link) req->status = ULR_PROCESSING; } } - spin_unlock( &link->lock ); - // 2) Write to cache file - assert( link->image->cacheFd != -1 ); - if ( lseek( link->image->cacheFd, start, SEEK_SET ) != start ) { - memlogf( "[ERROR] lseek() failed when writing to cache for %s", link->image->path ); - } else { - ret = (int)write( link->image->cacheFd, link->recvBuffer, reply.size ); - if ( ret > 0 ) image_update_cachemap( link->image, start, start + ret, TRUE); - } // 3) Send to interested clients reply.magic = dnbd3_packet_magic; // !! re-using reply struct - do not read from it after here - spin_lock( &link->lock ); - for (i = link->queuelen - 1; i >= 0; --i) { + for (i = link->queueLen - 1; i >= 0; --i) { dnbd3_queued_request_t * const req = &link->queue[i]; if ( req->status != ULR_PROCESSING ) continue; assert( req->from >= start && req->to <= end ); @@ -316,18 +403,21 @@ static void uplink_handle_receive(dnbd3_connection_t *link) iov[1].iov_base = link->recvBuffer + (req->from - start); iov[1].iov_len = reply.size; fixup_reply( reply ); - spin_unlock( &link->lock ); + spin_unlock( &link->queueLock ); // send: Don't care about errors here, let the client - // connection thread deal with it if something goes wrong here - writev( req->socket, iov, 2 ); - spin_lock( &link->lock ); + // connection thread deal with it if something goes wrong + pthread_mutex_lock( &req->client->sendMutex ); + writev( req->client->sock, iov, 2 ); + pthread_mutex_unlock( &req->client->sendMutex ); + spin_lock( &link->queueLock ); req->status = ULR_FREE; - if ( i > 20 && i == link->queuelen - 1 ) link->queuelen--; + if ( i > 20 && i == link->queueLen - 1 ) link->queueLen--; } - spin_unlock( &link->lock ); + spin_unlock( &link->queueLock ); return; error_cleanup: ; - close( link->fd ); + const int fd = link->fd; link->fd = -1; + if ( fd != -1 ) close( fd ); return; } diff --git a/src/server/uplink.h b/src/server/uplink.h index 82412b4..f6917be 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -14,6 +14,8 @@ int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2); int uplink_init(dnbd3_image_t *image); -dnbd3_connection_t* uplink_shutdown( dnbd3_connection_t *uplink); +int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length); + +void uplink_shutdown(dnbd3_image_t *image); #endif /* UPLINK_H_ */ diff --git a/src/types.h b/src/types.h index 56f328b..8800b53 100644 --- a/src/types.h +++ b/src/types.h @@ -27,10 +27,10 @@ #endif #ifndef TRUE -#define TRUE (1) +#define TRUE 1 #endif #ifndef FALSE -#define FALSE (0) +#define FALSE 0 #endif #ifndef MIN -- cgit v1.2.3-55-g7522