From 706ef832d30d2797103d5a49e9e4f4ee0a509411 Mon Sep 17 00:00:00 2001 From: sr Date: Thu, 25 Jul 2013 22:00:10 +0200 Subject: ...Working on proxy mode... --- src/server/globals.h | 47 +++++++++- src/server/image.c | 42 ++++++++- src/server/net.c | 58 ++++++++----- src/server/server.c | 5 +- src/server/uplink.c | 241 +++++++++++++++++++++++++++++++++++++++++++++++++-- src/server/uplink.h | 2 + 6 files changed, 356 insertions(+), 39 deletions(-) (limited to 'src/server') diff --git a/src/server/globals.h b/src/server/globals.h index 4d1ee4b..3e5af8b 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -8,10 +8,47 @@ // ######### All structs/types used by the server ######## +typedef struct _dnbd3_connection dnbd3_connection_t; +typedef struct _dnbd3_image dnbd3_image_t; + +// Slot is free, can be used. +// Must only be set in uplink_handle_receive() or uplink_remove_client() +#define ULR_FREE 0 +// Slot is occupied, reply has not yet been received, matching request can safely rely on reuse. +// Must only be set in uplink_request() +#define ULR_PENDING 1 +// Slot is being processed, do not consider for hop on. +// Must only be set in uplink_handle_receive() +#define ULR_PROCESSING 2 typedef struct { - int fd; -} dnbd3_connection_t; + uint64_t handle; // Client defined handle to pass back in reply + uint64_t from; // First byte offset of requested block (ie. 4096) + volatile uint32_t to; // Last byte + 1 of requested block (ie. 8192, if request len is 4096, resulting in bytes 4096-8191) + volatile int socket; // Socket to send reply to + volatile int status; // status of this entry: ULR_* +} dnbd3_queued_request_t; + +#define RTT_IDLE 0 // Not in progress +#define RTT_INPROGRESS 1 // In progess, not finished +#define RTT_DONTCHANGE 2 // Finished, but no better alternative found +#define RTT_DOCHANGE 3 // Finished, better alternative written to .betterServer + .betterFd +struct _dnbd3_connection +{ + int fd; // socket fd to remote server + int signal; // write end of pipe used to wake up the process + pthread_t thread; // thread holding the connection + pthread_spinlock_t lock; // lock for synchronization on request queue etc. + dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; + volatile int queuelen; // length of queue + dnbd3_image_t *image; // image that this uplink is used for do not call get/release for this pointer + dnbd3_host_t currentServer; // Current server we're connected to + volatile int rttTestResult; // RTT_* + dnbd3_host_t betterServer; // The better server + int betterFd; // Active connection to better server, ready to use + uint8_t *recvBuffer; // Buffer for receiving payload + int recvBufferLen; // Len of ^^ +}; typedef struct { @@ -29,6 +66,7 @@ typedef struct char comment[COMMENT_LENGTH]; time_t last_told; dnbd3_host_t host; + int rtt[]; } dnbd3_alt_server_t; typedef struct @@ -44,7 +82,7 @@ typedef struct * and the lower_name would then be * rz/zfs/windows7 zfs.vmdk */ -typedef struct +struct _dnbd3_image { char *path; // absolute path of the image char *lower_name; // relative path, all lowercase, minus revision ID @@ -52,12 +90,13 @@ typedef struct uint32_t *crc32; // list of crc32 checksums for each 16MiB block in image dnbd3_connection_t *uplink; // pointer to a server connection uint64_t filesize; // size of image + int cacheFd; // used to write to the image, in case it is relayed. ONLY USE FROM UPLINK THREAD! int rid; // revision of image int users; // clients currently using this image time_t atime; // last access time char working; // TRUE if image exists and completeness is == 100% or a working upstream proxy is connected pthread_spinlock_t lock; -} dnbd3_image_t; +}; typedef struct { diff --git a/src/server/image.c b/src/server/image.c index 660186c..4bc26b4 100644 --- a/src/server/image.c +++ b/src/server/image.c @@ -335,6 +335,30 @@ static int image_try_load(char *base, char *path) memlogf( "[WARNING] Empty image file '%s'", path ); goto load_error; } + if ( fileSize % DNBD3_BLOCK_SIZE != 0 ) { + memlogf( "[INFO] Image size of '%s' is not a multiple of %d, fixing...", path, (int)DNBD3_BLOCK_SIZE ); + const int missing = DNBD3_BLOCK_SIZE - (fileSize % DNBD3_BLOCK_SIZE); + char buffer[missing]; + memset( buffer, 0, missing ); + int tmpFd = open( path, O_WRONLY | O_APPEND ); + int success = FALSE; + if ( tmpFd < 0 ) { + memlogf( "[WARNING] Can't open image for writing, can't fix." ); + } else if ( lseek( tmpFd, fileSize, SEEK_SET ) != fileSize ) { + memlogf( "[WARNING] lseek failed, can't fix." ); + } else if ( write( tmpFd, buffer, missing ) != missing ) { + memlogf( "[WARNING] write failed, can't fix." ); + } else { + success = TRUE; + } + if ( tmpFd >= 0 ) close( tmpFd ); + if ( success ) { + fileSize += missing; + } else { + fileSize -= (DNBD3_BLOCK_SIZE - missing); + } + + } // 1. Allocate memory for the cache map if the image is incomplete sprintf( mapFile, "%s.map", path ); int fdMap = open( mapFile, O_RDONLY ); @@ -346,6 +370,7 @@ static int image_try_load(char *base, char *path) memlogf( "[WARNING] Could only read %d of expected %d bytes of cache map of '%s'", (int)rd, (int)map_size, path ); } close( fdMap ); + // Later on we check if the hash map says the image is complete } // TODO: Maybe try sha-256 or 512 first if you're paranoid (to be implemented) const int hashBlocks = IMGSIZE_TO_HASHBLOCKS( fileSize ); @@ -439,6 +464,15 @@ static int image_try_load(char *base, char *path) free( image->cache_map ); image->cache_map = NULL; image->working = TRUE; + } else { + image->working = FALSE; + image->cacheFd = open( path, O_WRONLY ); + if ( image->cacheFd < 0 ) { + memlogf( "[ERROR] Could not open incomplete image %s for writing!", path ); + image = image_free( image ); + goto load_error; + } + uplink_init( image ); } // Prevent freeing in cleanup cache_map = NULL; @@ -496,7 +530,7 @@ int image_generate_crc_file(char *image) close( fdImage ); return FALSE; } - // CRC of all CRCs goes first. Don't know it yet, write 4 bytes dummy data. +// CRC of all CRCs goes first. Don't know it yet, write 4 bytes dummy data. if ( write( fdCrc, crcFile, 4 ) != 4 ) { printf( "Write error\n" ); close( fdImage ); @@ -547,7 +581,7 @@ int image_generate_crc_file(char *image) close( fdImage ); printf( "done!\nGenerating master-crc..." ); fflush( stdout ); - // File is written - read again to calc master crc +// File is written - read again to calc master crc if ( lseek( fdCrc, 4, SEEK_SET ) != 4 ) { printf( "Could not seek to beginning of crc list in file\n" ); close( fdCrc ); @@ -587,13 +621,13 @@ static int image_check_blocks_crc32(int fd, uint32_t *crc32list, int *blocks) { char buffer[40000]; while ( *blocks != -1 ) { - if ( lseek( fd, *blocks * HASH_BLOCK_SIZE, SEEK_SET ) != *blocks * HASH_BLOCK_SIZE) { + if ( lseek( fd, *blocks * HASH_BLOCK_SIZE, SEEK_SET ) != *blocks * HASH_BLOCK_SIZE ) { memlogf( "Seek error" ); return FALSE; } uint32_t crc = crc32( 0L, Z_NULL, 0 ); int bytes = 0; - while ( bytes < HASH_BLOCK_SIZE) { + while ( bytes < HASH_BLOCK_SIZE ) { const int n = MIN(sizeof(buffer), HASH_BLOCK_SIZE - bytes); const int r = read( fd, buffer, n ); if ( r <= 0 ) { diff --git a/src/server/net.c b/src/server/net.c index ac82116..16b202f 100644 --- a/src/server/net.c +++ b/src/server/net.c @@ -61,7 +61,7 @@ static inline char recv_request_header(int sock, dnbd3_request_t *request) return FALSE; } #ifdef _DEBUG - if (_fake_delay) usleep(_fake_delay); + if ( _fake_delay ) usleep( _fake_delay ); #endif return TRUE; } @@ -125,19 +125,19 @@ void *net_client_handler(void *dnbd3_client) uint16_t rid, client_version; /* - char map_x, bit_mask; - uint64_t map_y; - uint64_t todo_size = 0; - uint64_t todo_offset = 0; - uint64_t cur_offset = 0; - uint64_t last_offset = 0; - */ + char map_x, bit_mask; + uint64_t map_y; + uint64_t todo_size = 0; + uint64_t todo_offset = 0; + uint64_t cur_offset = 0; + uint64_t last_offset = 0; + */ dnbd3_server_entry_t server_list[NUMBER_SERVERS]; // Set to zero to make valgrind happy - memset(&reply, 0, sizeof(reply)); - memset(&payload, 0, sizeof(payload)); + memset( &reply, 0, sizeof(reply) ); + memset( &payload, 0, sizeof(payload) ); reply.magic = dnbd3_packet_magic; // Receive first packet. This must be CMD_SELECT_IMAGE by protocol specification @@ -337,7 +337,17 @@ void *net_client_handler(void *dnbd3_client) break; case CMD_SET_CLIENT_MODE: - client->is_server = FALSE; + image->atime = time( NULL ); + break; + + case CMD_GET_CRC32: + reply.cmd = CMD_GET_CRC32; + if ( image->crc32 == NULL ) { + reply.size = 0; + } else { + reply.size = IMGSIZE_TO_HASHBLOCKS(image->filesize) * 4; + } + send_reply( client->sock, &reply, image->crc32 ); break; default: @@ -347,19 +357,19 @@ void *net_client_handler(void *dnbd3_client) } /* - // Check for messages that have been queued from another thread - while ( client->sendqueue != NULL ) { - dnbd3_binstring_t *message = NULL; - spin_lock( &client->lock ); - if ( client->sendqueue != NULL ) { - message = client->sendqueue->data; - client->sendqueue = g_slist_remove( client->sendqueue, message ); - } - spin_unlock( &client->lock ); - send_data( client->sock, message->data, message->len ); - free( message ); - } - */ + // Check for messages that have been queued from another thread + while ( client->sendqueue != NULL ) { + dnbd3_binstring_t *message = NULL; + spin_lock( &client->lock ); + if ( client->sendqueue != NULL ) { + message = client->sendqueue->data; + client->sendqueue = g_slist_remove( client->sendqueue, message ); + } + spin_unlock( &client->lock ); + send_data( client->sock, message->data, message->len ); + free( message ); + } + */ } } diff --git a/src/server/server.c b/src/server/server.c index d48bb25..575d9c4 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -323,10 +323,11 @@ void dnbd3_remove_client(dnbd3_client_t *client) { int i; spin_lock( &_clients_lock ); + const int cutoff = MAX(10, _num_clients / 2); for (i = _num_clients - 1; i >= 0; --i) { if ( _clients[i] != client ) continue; _clients[i] = NULL; - if ( i + 1 == _num_clients ) --_num_clients; + if ( i > cutoff && i + 1 == _num_clients ) --_num_clients; } spin_unlock( &_clients_lock ); } @@ -334,7 +335,7 @@ void dnbd3_remove_client(dnbd3_client_t *client) /** * Free the client struct recursively. * !! Make sure to call this function after removing the client from _dnbd3_clients !! - * Locks on: _clients[].lock + * Locks on: _clients[].lock, might call function that locks on _images and _image[] */ dnbd3_client_t* dnbd3_free_client(dnbd3_client_t *client) { diff --git a/src/server/uplink.c b/src/server/uplink.c index 07fe35b..92cf944 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -1,13 +1,24 @@ #include "uplink.h" #include "locks.h" +#include "memlog.h" +#include "sockhelper.h" #include #include #include +#include +#include +#include +#include +#include +#include dnbd3_alt_server_t *_alt_servers[SERVER_MAX_ALTS]; int _num_alts = 0; pthread_spinlock_t _alts_lock; +static void* uplink_mainloop(void *data); +static void uplink_handle_receive(dnbd3_connection_t *link); + /** * Get known (working) alt servers, ordered by network closeness * (by finding the smallest possible subnet) @@ -32,12 +43,12 @@ int uplink_get_matching_alt_servers(dnbd3_host_t *host, dnbd3_server_entry_t *ou const int dist = uplink_net_closeness( host, &_alt_servers[i]->host ); for (j = 0; j < size; ++j) { if ( j < count && dist <= distance[j] ) continue; - if (j > count) break; // Should never happen but just in case... + if ( j > count ) break; // Should never happen but just in case... if ( j < count ) { // Check if we're in the middle and need to move other entries... - if (j + 1 < size) { - memmove(&output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1)); - memmove(&distance[j + 1], &distance[j], sizeof(int) * (size - j - 1)); + if ( j + 1 < size ) { + memmove( &output[j + 1], &output[j], sizeof(dnbd3_server_entry_t) * (size - j - 1) ); + memmove( &distance[j + 1], &distance[j], sizeof(int) * (size - j - 1) ); } } else { count++; @@ -72,7 +83,227 @@ int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2) return retval; } -void uplink_shutdown( dnbd3_connection_t *uplink) +// ############ uplink connection handling + +/** + * Create and initialize an uplink instance for the given + * image. Uplinks run in their own thread. + * Locks on: _images[].lock + */ +int uplink_init(dnbd3_image_t *image) +{ + dnbd3_connection_t *link = NULL; + assert( image != NULL ); + spin_lock( &image->lock ); + assert( image->uplink == NULL ); + if ( image->cache_map == NULL ) { + memlogf( "[WARNING] Uplink was requested for image %s, but it is already complete", image->lower_name ); + goto failure; + } + link = image->uplink = calloc( 1, sizeof(dnbd3_connection_t) ); + link->image = image; + link->queuelen = 0; + link->fd = -1; + link->signal = -1; + link->betterFd = -1; + link->rttTestResult = RTT_IDLE; + link->recvBufferLen = 0; + spin_init( &link->lock, PTHREAD_PROCESS_PRIVATE ); + if ( 0 != pthread_create( &(link->thread), NULL, &uplink_mainloop, (void *)(uintptr_t)link ) ) { + memlogf( "[ERROR] Could not start thread for new client." ); + goto failure; + } + spin_unlock( &image->lock ); + return TRUE; + failure: ; + if ( link != NULL ) free( link ); + link = image->uplink = NULL; + spin_unlock( &image->lock ); + return FALSE; +} + +void uplink_shutdown(dnbd3_connection_t *uplink) +{ + assert( uplink != NULL ); + if ( uplink->fd != -1 ) close( uplink->fd ); + pthread_join( uplink->thread, NULL ); +} + +static void* uplink_mainloop(void *data) { + const int MAXEVENTS = 3; + struct epoll_event ev, events[MAXEVENTS]; + dnbd3_connection_t *link = (dnbd3_connection_t*)data; + int fdEpoll = -1, fdPipe = -1; + int numSocks, i, waitTime; + int altCheckInterval = SERVER_RTT_DELAY_INIT; + time_t nextAltCheck = 0; + char buffer[100]; + // + assert( link != NULL ); + assert( link->queuelen == 0 ); + // + fdEpoll = epoll_create( 2 ); + if ( fdEpoll == -1 ) { + memlogf( "[WARNING] epoll_create failed. Uplink unavailable." ); + goto cleanup; + } + { + int pipes[2]; + if ( pipe( pipes ) < 0 ) { + memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); + goto cleanup; + } + sock_set_nonblock( pipes[0] ); + sock_set_nonblock( pipes[1] ); + fdPipe = pipes[0]; + link->signal = pipes[1]; + memset( &ev, 0, sizeof(ev) ); + ev.events = EPOLLIN; + ev.data.fd = fdPipe; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, fdPipe, &ev ) < 0 ) { + memlogf( "[WARNING] adding signal-pipe to epoll set failed" ); + goto cleanup; + } + } + while ( !_shutdown ) { + if ( link->rttTestResult == RTT_DOCHANGE ) { + link->rttTestResult = RTT_IDLE; + // The rttTest worker thread has finished our request. + // And says it's better to switch to another server + if ( link->fd != -1 ) close( link->fd ); + link->fd = link->betterFd; + link->betterFd = -1; + link->currentServer = link->betterServer; + memset( &ev, 0, sizeof(ev) ); + ev.events = EPOLLIN; + ev.data.fd = link->fd; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { + memlogf( "[WARNING] adding uplink to epoll set failed" ); + goto cleanup; + } + // The rtt worker already did the handshake for our image, so there's nothing + // more to do here + } + // epoll() + if ( link->fd == -1 ) { + waitTime = 1500; + } else { + waitTime = (time( NULL ) - nextAltCheck) * 1000; + if ( waitTime < 1500 ) waitTime = 1500; + } + numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime ); + if ( numSocks < 0 ) { // Error? + memlogf( "[DEBUG] epoll_wait() error %d", (int)errno); + usleep( 10000 ); + continue; + } + for (i = 0; i < numSocks; ++i) { // Check all events + if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { + if ( events[i].data.fd == fdPipe ) { + memlogf( "[WARNING] epoll error on signal-pipe!" ); + goto cleanup; + } + close( events[i].data.fd ); + if ( events[i].data.fd == link->fd ) { + link->fd = -1; + printf( "[DEBUG] Uplink gone away, panic!\n" ); + nextAltCheck = 0; + } + continue; + } + // No error, handle normally + if ( events[i].data.fd == fdPipe ) { + while ( read( fdPipe, buffer, sizeof buffer ) > 0 ) { + } // Throw data away, this is just used for waking this thread up + } else if ( events[i].data.fd == link->fd ) { + uplink_handle_receive( link ); + if ( link->fd == -1 ) nextAltCheck = 0; + } else { + printf( "[DEBUG] Sanity check: unknown FD ready on epoll! Closing...\n" ); + close( events[i].data.fd ); + } + } + } + cleanup: ; + if ( link->fd != -1 ) close( link->fd ); + link->fd = -1; + if ( link->signal != -1 ) close( link->signal ); + link->signal = -1; + if ( fdPipe != -1 ) close( fdPipe ); + if ( fdEpoll != -1 ) close( fdEpoll ); + return NULL ; +} + +static void uplink_handle_receive(dnbd3_connection_t *link) +{ + dnbd3_reply_t reply; + int ret, i; + ret = recv( link->fd, &reply, sizeof reply, MSG_WAITALL ); + if ( ret != sizeof reply ) { + memlogf( "[INFO] Lost connection to uplink server." ); + goto error_cleanup; + } + fixup_reply( reply ); + if ( reply.size > 9000000 ) { + memlogf( "[WARNING] Pure evil: Uplink server sent too much payload!" ); + goto error_cleanup; + } + if ( link->recvBufferLen < reply.size ) { + if ( link->recvBuffer != NULL ) free( link->recvBuffer ); + link->recvBufferLen = MIN(9000000, reply.size + 8192); + link->recvBuffer = malloc( link->recvBufferLen ); + } + uint32_t done = 0; + while ( done < reply.size ) { + ret = recv( link->fd, link->recvBuffer + done, reply.size - done, 0 ); + if ( ret <= 0 ) { + memlogf( "[INFO] Lost connection to uplink server" ); + goto error_cleanup; + } + done += ret; + } + // Payload read completely + // 1) Write to cache file + assert( link->image->cacheFd != -1 ); + // 2) Figure out which clients are interested in it + const uint64_t start = reply.handle; + const uint64_t end = reply.handle + reply.size; + struct iovec iov[2]; + reply.magic = dnbd3_packet_magic; + spin_lock( &link->lock ); + for (i = 0; i < link->queuelen; ++i) { + dnbd3_queued_request_t * const req = &link->queue[i]; + assert( req->status != ULR_PROCESSING ); + if ( req->status != ULR_PENDING ) continue; + if ( req->from >= start && req->to <= end ) { // Match :-) + req->status = ULR_PROCESSING; + } + } + for (i = link->queuelen - 1; i >= 0; --i) { + dnbd3_queued_request_t * const req = &link->queue[i]; + if ( req->status != ULR_PROCESSING ) continue; + assert( req->from >= start && req->to <= end ); + reply.cmd = CMD_GET_BLOCK; + reply.handle = req->handle; + reply.size = req->to - req->from; + iov[0].iov_base = &reply; + iov[0].iov_len = sizeof reply; + iov[1].iov_base = link->recvBuffer + (req->from - start); + iov[1].iov_len = reply.size; + fixup_reply( reply ); + spin_unlock( &link->lock ); + // send: Don't care about errors here, let the client + // connection thread deal with it if something goes wrong here + writev( req->socket, iov, 2 ); + spin_lock( &link->lock ); + req->status = ULR_FREE; + if ( i > 20 && i == link->queuelen - 1 ) link->queuelen--; + } + spin_unlock( &link->lock ); + return; + error_cleanup: ; + close( link->fd ); + link->fd = -1; return; } diff --git a/src/server/uplink.h b/src/server/uplink.h index 2a876ec..7c9878e 100644 --- a/src/server/uplink.h +++ b/src/server/uplink.h @@ -12,6 +12,8 @@ int uplink_get_matching_alt_servers(dnbd3_host_t *host, dnbd3_server_entry_t *ou int uplink_net_closeness(dnbd3_host_t *host1, dnbd3_host_t *host2); +int uplink_init(dnbd3_image_t *image); + void uplink_shutdown( dnbd3_connection_t *uplink); #endif /* UPLINK_H_ */ -- cgit v1.2.3-55-g7522