From 7b8e3ffcbccf6d02bfcb0c9c9c2d259362357fb8 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 31 Dec 2014 17:19:20 +0100 Subject: [SERVER] Create compilation unit for wait/signalling logic (using eventfd) --- src/server/altservers.c | 42 +++++++++++++++--------------------------- src/server/globals.c | 2 +- src/server/signal.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++ src/server/signal.h | 42 ++++++++++++++++++++++++++++++++++++++++++ src/server/uplink.c | 40 ++++++++++++++-------------------------- 5 files changed, 118 insertions(+), 54 deletions(-) create mode 100644 src/server/signal.c create mode 100644 src/server/signal.h (limited to 'src') diff --git a/src/server/altservers.c b/src/server/altservers.c index 106e07a..7781e69 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -6,6 +6,7 @@ #include "helper.h" #include "globals.h" #include "image.h" +#include "signal.h" #include #include #include @@ -19,7 +20,7 @@ static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; static pthread_spinlock_t pendingLockProduce; // Lock for adding something to pending. (NULL -> nonNULL) -static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removegin something (nunNULL -> NULL) +static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL) static int signalPipe = -1; static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; @@ -51,7 +52,7 @@ void altservers_init() void altservers_shutdown() { if ( !initDone ) return; - write( signalPipe, "", 1 ); // Wake altservers thread up + signal_call( signalPipe ); // Wake altservers thread up thread_join( altThread, NULL ); } @@ -147,7 +148,7 @@ void altservers_findUplink(dnbd3_connection_t *uplink) pending[i] = uplink; uplink->rttTestResult = RTT_INPROGRESS; spin_unlock( &pendingLockProduce ); - write( signalPipe, "", 1 ); // Wake altservers thread up + signal_call( signalPipe ); // Wake altservers thread up return; } // End of loop - no free slot @@ -341,7 +342,7 @@ static void *altservers_main(void *data) const int MAXEVENTS = 3; const int ALTS = 4; struct epoll_event ev, events[MAXEVENTS]; - int readPipe = -1, fdEpoll = -1; + int fdEpoll = -1; int numSocks, ret, itLink, itAlt, numAlts; int found; char buffer[DNBD3_BLOCK_SIZE ]; @@ -365,19 +366,15 @@ static void *altservers_main(void *data) goto cleanup; } { - int pipes[2]; - if ( pipe( pipes ) < 0 ) { - memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); + signalPipe = signal_new(); + if ( signalPipe < 0 ) { + memlogf( "[WARNING] error creating signal object. Uplink feature unavailable." ); goto cleanup; } - sock_set_nonblock( pipes[0] ); - sock_set_nonblock( pipes[1] ); - readPipe = pipes[0]; - signalPipe = pipes[1]; memset( &ev, 0, sizeof(ev) ); ev.events = EPOLLIN; - ev.data.fd = readPipe; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, readPipe, &ev ) < 0 ) { + ev.data.fd = signalPipe; + if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, signalPipe, &ev ) < 0 ) { memlogf( "[WARNING] adding read-signal-pipe to epoll set failed" ); goto cleanup; } @@ -392,18 +389,10 @@ static void *altservers_main(void *data) usleep( 100000 ); } if ( _shutdown ) goto cleanup; - // Empty pipe - do { - ret = read( readPipe, buffer, sizeof buffer ); - } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up - if ( ret == 0 ) { - memlogf( "[WARNING] Signal pipe of alservers_main closed! Things will break!" ); - } - if ( ret < 0 ) { - ret = errno; - if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { - memlogf( "[WARNING] Errno %d on pipe-read on alservers_main! Things will break!", ret ); - } + // Clear signal + ret = signal_clear( signalPipe ); + if ( ret == SIGNAL_ERROR ) { + memlogf( "[WARNING] Error on signal_clear on alservers_main! Things will break!" ); } // Work your way through the queue for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) { @@ -548,8 +537,7 @@ static void *altservers_main(void *data) } cleanup: ; if ( fdEpoll != -1 ) close( fdEpoll ); - if ( readPipe != -1 ) close( readPipe ); - if ( signalPipe != -1 ) close( signalPipe ); + if ( signalPipe != -1 ) signal_close( signalPipe ); signalPipe = -1; return NULL ; } diff --git a/src/server/globals.c b/src/server/globals.c index 37bbf90..0f26169 100644 --- a/src/server/globals.c +++ b/src/server/globals.c @@ -16,7 +16,7 @@ int _isProxy = FALSE; int _proxyPrivateOnly = FALSE; int _uplinkTimeout = 1250; int _clientTimeout = 15000; -int _backgroundReplication = FALSE; +int _backgroundReplication = TRUE; #define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0) #define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0; } while (0) diff --git a/src/server/signal.c b/src/server/signal.c new file mode 100644 index 0000000..f0988b9 --- /dev/null +++ b/src/server/signal.c @@ -0,0 +1,46 @@ +#include "signal.h" +#include +#include +#include +#include +#include + +int signal_new() +{ + return eventfd( 0, EFD_NONBLOCK ); +} + +int signal_call(int signalFd) +{ + static uint64_t one = 1; + return write( signalFd, &one, sizeof one ) == sizeof one; +} + +int signal_wait(int signalFd, int timeoutMs) +{ + struct pollfd ps = { + .fd = signalFd, + .events = POLLIN + }; + int ret = poll( &ps, 1, timeoutMs ); + if ( ret == 0 ) return SIGNAL_TIMEOUT; + if ( ret == -1 ) return SIGNAL_ERROR; + if ( ps.revents & ( POLLERR | POLLNVAL ) ) return SIGNAL_ERROR; + return signal_clear( signalFd ); +} + +int signal_clear(int signalFd) +{ + uint64_t ret; + if ( read( signalFd, &ret, sizeof ret ) != sizeof ret ) { + if ( errno == EAGAIN ) return 0; + return SIGNAL_ERROR; + } + return (int)ret; +} + +void signal_close(int signalFd) +{ + close( signalFd ); +} + diff --git a/src/server/signal.h b/src/server/signal.h new file mode 100644 index 0000000..0504274 --- /dev/null +++ b/src/server/signal.h @@ -0,0 +1,42 @@ +#ifndef _SIGNAL_H_ +#define _SIGNAL_H_ + +#define SIGNAL_OK (0) +#define SIGNAL_TIMEOUT (-2) +#define SIGNAL_ERROR (-1) + +/** + * Create a new signal fd (eventfd), nonblocking. + * @return >= 0 on success, which is the fd; < 0 on error + */ +int signal_new(); + +/** + * Trigger the given signal, so a wait or clear call will succeed. + * @return SIGNAL_OK on success, SIGNAL_ERROR on error + */ +int signal_call(int signalFd); + +/** + * Wait for given signal, with an optional timeout. + * If timeout == 0, wait forever. + * @return > 0 telling how many times the signal was called, + * SIGNAL_TIMEOUT if the timeout was reached, + * SIGNAL_ERROR if some error occured + */ +int signal_wait(int signalFd, int timeoutMs); + +/** + * Clears any pending signals on this signal fd. + * @return number of signals that were pending, + * SIGNAL_ERROR if some error occured + */ +int signal_clear(int signalFd); + +/** + * Close the given signal. + */ +void signal_close(int signalFd); + +#endif + diff --git a/src/server/uplink.c b/src/server/uplink.c index 5b05873..cfe959c 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -7,13 +7,13 @@ #include "altservers.h" #include "helper.h" #include "protocol.h" +#include "signal.h" #include #include #include #include #include -#include #include #include #include @@ -103,8 +103,7 @@ void uplink_shutdown(dnbd3_image_t *image) } image->uplink = NULL; uplink->shutdown = TRUE; - static uint64_t counter = 1; - if ( uplink->signal != -1 ) write( uplink->signal, &counter, sizeof(counter) ); + if ( uplink->signal != -1 ) signal_call( uplink->signal ); pthread_t thread = uplink->thread; spin_unlock( &uplink->queueLock ); spin_unlock( &image->lock ); @@ -190,12 +189,10 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint #ifdef _DEBUG uplink->queue[freeSlot].entered = time( NULL ); #endif - const int signalFd = uplink->signal; spin_unlock( &uplink->queueLock ); if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - static uint64_t counter = 1; - write( signalFd, &counter, sizeof(counter) ); + signal_call( uplink->signal ); } return TRUE; } @@ -225,7 +222,7 @@ static void* uplink_mainloop(void *data) memlogf( "[WARNING] epoll_create failed. Uplink unavailable." ); goto cleanup; } - link->signal = eventfd( 0, EFD_NONBLOCK ); + link->signal = signal_new(); if ( link->signal < 0 ) { memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); goto cleanup; @@ -264,7 +261,7 @@ static void* uplink_mainloop(void *data) setThreadName( buffer ); } memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN; + ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI; ev.data.fd = link->fd; if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { memlogf( "[WARNING] adding uplink to epoll set failed" ); @@ -288,9 +285,9 @@ static void* uplink_mainloop(void *data) // Check all events for (i = 0; i < numSocks; ++i) { // Check for errors.... - if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { + if ( (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) || !(events[i].events & EPOLLIN) ) { if ( events[i].data.fd == link->signal ) { - memlogf( "[WARNING] epoll error on signal-pipe!" ); + memlogf( "[WARNING] epoll error on signal in uplink_mainloop!" ); goto cleanup; } if ( events[i].data.fd == link->fd ) { @@ -307,18 +304,8 @@ static void* uplink_mainloop(void *data) // No error, handle normally if ( events[i].data.fd == link->signal ) { // Event on the signal fd -> a client requests data - int ret; - do { - ret = read( link->signal, buffer, sizeof buffer ); - } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up - if ( ret == 0 ) { - memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name ); - } - if ( ret < 0 ) { - ret = errno; - if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { - memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name ); - } + if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { + memlogf( "[WARNING] Errno on eventfd on uplink for %s! Things will break!", link->image->lower_name ); } if ( link->fd != -1 ) { // Uplink seems fine, relay requests to it... @@ -372,6 +359,7 @@ static void* uplink_mainloop(void *data) discoverFailCount++; nextAltCheck = time( NULL ) + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED); } + // TODO: If background replication is disabled, send keepalive every 30 seconds or so #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { int resend = FALSE; @@ -411,7 +399,7 @@ static void* uplink_mainloop(void *data) spin_unlock( &link->image->lock ); spin_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); - if ( signal != -1 ) close( signal ); + if ( signal != -1 ) signal_close( signal ); if ( fdEpoll != -1 ) close( fdEpoll ); // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) @@ -536,13 +524,13 @@ static void uplink_handle_receive(dnbd3_connection_t *link) memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); goto error_cleanup; } - if ( inReply.size > 9000000 ) { + if ( inReply.size > 9000000 ) { // TODO: Configurable memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } if ( link->recvBufferLen < inReply.size ) { if ( link->recvBuffer != NULL ) free( link->recvBuffer ); - link->recvBufferLen = MIN(9000000, inReply.size + 8192); + link->recvBufferLen = MIN(9000000, inReply.size + 8192); // XXX dont miss occurrence link->recvBuffer = malloc( link->recvBufferLen ); } uint32_t done = 0; @@ -599,7 +587,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) req->status = ULR_FREE; pthread_mutex_lock( &client->sendMutex ); spin_unlock( &link->queueLock ); - writev( client->sock, iov, 2 ); + if ( client->sock != -1 ) writev( client->sock, iov, 2 ); pthread_mutex_unlock( &client->sendMutex ); spin_lock( &link->queueLock ); } -- cgit v1.2.3-55-g7522