From 627645acc074eab7a3694a267bc2a643d8b3e57a Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 5 Feb 2016 15:05:30 +0100 Subject: First steps in make signals more abstract from the underlying mechanism; replace epoll with poll. We now don't assume that a signal equals a single fd (eventfd on Linux). The next step would be to create a version of signal.c that uses a pipe internally, so it can be used on other platforms, like *BSD. This is also the reason epoll was replaced with poll in uplink.c --- src/server/altservers.c | 21 ++++----- src/server/globals.h | 3 +- src/server/locks.c | 4 +- src/server/threadpool.c | 18 +++---- src/server/uplink.c | 121 +++++++++++++++++++----------------------------- 5 files changed, 71 insertions(+), 96 deletions(-) (limited to 'src/server') diff --git a/src/server/altservers.c b/src/server/altservers.c index e4a7dca..4ebf2f6 100644 --- a/src/server/altservers.c +++ b/src/server/altservers.c @@ -21,7 +21,7 @@ static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS]; static pthread_spinlock_t pendingLockWrite; // Lock for adding something to pending. (NULL -> nonNULL) static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL) -static int signalFd = -1; +static dnbd3_signal_t* runSignal = NULL; static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS]; static int numAltServers = 0; @@ -53,7 +53,7 @@ void altservers_init() void altservers_shutdown() { if ( !initDone ) return; - signal_call( signalFd ); // Wake altservers thread up + signal_call( runSignal ); // Wake altservers thread up thread_join( altThread, NULL ); } @@ -154,7 +154,7 @@ void altservers_findUplink(dnbd3_connection_t *uplink) pending[i] = uplink; uplink->rttTestResult = RTT_INPROGRESS; spin_unlock( &pendingLockWrite ); - signal_call( signalFd ); // Wake altservers thread up + signal_call( runSignal ); // Wake altservers thread up return; } // End of loop - no free slot @@ -365,16 +365,16 @@ static void *altservers_main(void *data UNUSED) for (int i = 0; i < SERVER_MAX_PENDING_ALT_CHECKS; ++i) pending[i] = NULL; spin_unlock( &pendingLockWrite ); - // Init signal-pipe - signalFd = signal_new(); - if ( signalFd < 0 ) { + // Init signal + runSignal = signal_new(); + if ( runSignal == NULL ) { logadd( LOG_WARNING, "error creating signal object. Uplink feature unavailable." ); goto cleanup; } // LOOP while ( !_shutdown ) { // Wait 5 seconds max. - ret = signal_wait( signalFd, 5000 ); + ret = signal_wait( runSignal, 5000 ); if ( _shutdown ) goto cleanup; if ( ret == SIGNAL_ERROR ) { if ( errno == EAGAIN || errno == EINTR ) continue; @@ -511,8 +511,7 @@ static void *altservers_main(void *data UNUSED) uplink->betterServer = servers[bestIndex]; uplink->rttTestResult = RTT_DOCHANGE; spin_unlock( &uplink->rttLock ); - static uint64_t counter = 1; - write( uplink->signal, &counter, sizeof(counter) ); + signal_call( uplink->signal ); } else if (bestSock == -1) { // No server was reachable spin_lock( &uplink->rttLock ); @@ -540,8 +539,8 @@ static void *altservers_main(void *data UNUSED) } } cleanup: ; - if ( signalFd != -1 ) signal_close( signalFd ); - signalFd = -1; + if ( runSignal != NULL ) signal_close( runSignal ); + runSignal = NULL; return NULL ; } diff --git a/src/server/globals.h b/src/server/globals.h index b0380fa..6f9652b 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -2,6 +2,7 @@ #define _GLOBALS_H_ #include "../types.h" +#include "../shared/signal.h" #include #include #include @@ -42,7 +43,7 @@ typedef struct struct _dnbd3_connection { int fd; // socket fd to remote server - int signal; // eventfd used to wake up the process + dnbd3_signal_t* signal; // used to wake up the process pthread_t thread; // thread holding the connection pthread_spinlock_t queueLock; // lock for synchronization on request queue etc. dnbd3_queued_request_t queue[SERVER_MAX_UPLINK_QUEUE]; diff --git a/src/server/locks.c b/src/server/locks.c index bc8b18e..294f862 100644 --- a/src/server/locks.c +++ b/src/server/locks.c @@ -50,7 +50,7 @@ static int init_done = 0; static pthread_spinlock_t initdestory; static int lockId = 0; static pthread_t watchdog = 0; -static int watchdogSignal = -1; +static dnbd3_signal_t* watchdogSignal = NULL; static void *debug_thread_watchdog(void *something); @@ -278,7 +278,7 @@ static void *debug_thread_watchdog(void *something UNUSED) } pthread_spin_unlock( &initdestory ); } - if ( watchdogSignal == -1 || signal_wait( watchdogSignal, 5000 ) == SIGNAL_ERROR ) sleep( 5 ); + if ( watchdogSignal == NULL || signal_wait( watchdogSignal, 5000 ) == SIGNAL_ERROR ) sleep( 5 ); } return NULL ; } diff --git a/src/server/threadpool.c b/src/server/threadpool.c index b1c46a3..12cbe03 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -10,7 +10,7 @@ typedef struct _entry_t { struct _entry_t *next; pthread_t thread; - int signalFd; + dnbd3_signal_t* signal; void *(*startRoutine)(void *); void * arg; } entry_t; @@ -43,7 +43,7 @@ void threadpool_close() while ( ptr != NULL ) { entry_t *current = ptr; ptr = ptr->next; - signal_call( current->signalFd ); + signal_call( current->signal ); } spin_unlock( &poolLock ); spin_destroy( &poolLock ); @@ -61,15 +61,15 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) logadd( LOG_WARNING, "Could not alloc entry_t for new thread\n" ); return false; } - entry->signalFd = signal_newBlocking(); - if ( entry->signalFd < 0 ) { - logadd( LOG_WARNING, "Could not create signalFd for new thread pool thread\n" ); + entry->signal = signal_newBlocking(); + if ( entry->signal == NULL ) { + logadd( LOG_WARNING, "Could not create signal for new thread pool thread\n" ); free( entry ); return false; } if ( 0 != thread_create( &(entry->thread), &threadAttrs, threadpool_worker, (void*)entry ) ) { logadd( LOG_WARNING, "Could not create new thread for thread pool\n" ); - signal_close( entry->signalFd ); + signal_close( entry->signal ); free( entry ); return false; } @@ -77,7 +77,7 @@ bool threadpool_run(void *(*startRoutine)(void *), void *arg) entry->next = NULL; entry->startRoutine = startRoutine; entry->arg = arg; - signal_call( entry->signalFd ); + signal_call( entry->signal ); return true; } @@ -90,7 +90,7 @@ static void *threadpool_worker(void *entryPtr) entry_t *entry = (entry_t*)entryPtr; for ( ;; ) { // Wait for signal from outside that we have work to do - int ret = signal_clear( entry->signalFd ); + int ret = signal_clear( entry->signal ); if ( _shutdown ) break; if ( ret > 0 ) { if ( entry->startRoutine == NULL ) { @@ -123,7 +123,7 @@ static void *threadpool_worker(void *entryPtr) logadd( LOG_DEBUG1, "Unexpected return value %d for signal_wait in threadpool worker!", ret ); } } - signal_close( entry->signalFd ); + signal_close( entry->signal ); free( entry ); return NULL; } diff --git a/src/server/uplink.c b/src/server/uplink.c index 3f14266..d37cb4f 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -21,6 +20,7 @@ #include #include #include +#include static uint64_t totalBytesReceived = 0; @@ -75,7 +75,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->bytesReceived = 0; link->queueLen = 0; link->fd = -1; - link->signal = -1; + link->signal = NULL; link->replicationHandle = 0; spin_lock( &link->rttLock ); if ( sock >= 0 ) { @@ -232,7 +232,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin spin_unlock( &uplink->queueLock ); if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - signal_call( uplink->signal ); + if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); + } } return true; } @@ -243,37 +245,30 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin */ static void* uplink_mainloop(void *data) { - const int MAXEVENTS = 3; - struct epoll_event ev, events[MAXEVENTS]; +#define EV_SIGNAL (0) +#define EV_SOCKET (1) +#define EV_COUNT (2) + struct pollfd events[EV_COUNT]; dnbd3_connection_t *link = (dnbd3_connection_t*)data; - int fdEpoll = -1; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; int discoverFailCount = 0; time_t nextAltCheck = 0, nextKeepalive = 0; char buffer[200]; + memset( events, 0, sizeof(events) ); // assert( link != NULL ); setThreadName( "idle-uplink" ); blockNoncriticalSignals(); // - fdEpoll = epoll_create( 2 ); - if ( fdEpoll == -1 ) { - logadd( LOG_WARNING, "epoll_create failed. Uplink unavailable." ); - goto cleanup; - } link->signal = signal_new(); - if ( link->signal < 0 ) { - logadd( LOG_WARNING, "error creating pipe. Uplink unavailable." ); - goto cleanup; - } - memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN; - ev.data.fd = link->signal; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) { - logadd( LOG_WARNING, "adding eventfd to epoll set failed" ); + if ( link->signal == NULL ) { + logadd( LOG_WARNING, "error creating signal. Uplink unavailable." ); goto cleanup; } + events[EV_SIGNAL].events = POLLIN; + events[EV_SIGNAL].fd = signal_getWaitFd( link->signal ); + events[EV_SOCKET].fd = -1; while ( !_shutdown && !link->shutdown ) { // Check if server switch is in order spin_lock( &link->rttLock ); @@ -305,71 +300,52 @@ static void* uplink_mainloop(void *data) // Re-send all pending requests uplink_sendRequests( link, false ); uplink_sendReplicationRequest( link ); - memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI; - ev.data.fd = link->fd; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { - logadd( LOG_WARNING, "adding uplink to epoll set failed" ); - goto cleanup; - } + events[EV_SOCKET].events = POLLIN | POLLRDHUP; + events[EV_SOCKET].fd = link->fd; nextAltCheck = time( NULL ) + altCheckInterval; // The rtt worker already did the handshake for our image, so there's nothing // more to do here } - // epoll() + // poll() waitTime = (time( NULL ) - nextAltCheck) * 1000; if ( waitTime < 1500 ) waitTime = 1500; if ( waitTime > 5000 ) waitTime = 5000; - numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime ); + numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || link->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? if ( errno == EINTR ) continue; - logadd( LOG_DEBUG1, "epoll_wait() error %d", (int)errno); + logadd( LOG_DEBUG1, "poll() error %d", (int)errno ); usleep( 10000 ); continue; } - // Check all events - for ( i = 0; i < numSocks; ++i ) { - // Check for errors.... - if ( (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) || !(events[i].events & EPOLLIN) ) { - if ( events[i].data.fd == link->signal ) { - logadd( LOG_WARNING, "epoll error on signal in uplink_mainloop!" ); - goto cleanup; - } - if ( events[i].data.fd == link->fd ) { - link->fd = -1; - close( events[i].data.fd ); - logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); - } else if ( events[i].data.fd == link->signal ) { - logadd( LOG_DEBUG1, "Error on uplink signal fd!\n" ); - } else { - logadd( LOG_DEBUG1, "Error on unknown FD in uplink epoll\n" ); - close( events[i].data.fd ); - } - continue; + // Check events + // Signal + if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" ); + goto cleanup; + } else if ( (events[EV_SIGNAL].revents & POLLIN) ) { + // signal triggered -> pending requests + if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name ); } - // No error, handle normally - if ( events[i].data.fd == link->signal ) { - // Event on the signal fd -> a client requests data - if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Errno on eventfd on uplink for %s! Things will break!", link->image->name ); - } - if ( link->fd != -1 ) { - // Uplink seems fine, relay requests to it... - uplink_sendRequests( link, true ); - } - } else if ( events[i].data.fd == link->fd ) { - uplink_handleReceive( link ); - if ( link->fd == -1 ) nextAltCheck = 0; - if ( _shutdown || link->shutdown ) goto cleanup; - } else { - logadd( LOG_DEBUG1, "Sanity check: unknown FD ready on epoll! Closing...\n" ); - close( events[i].data.fd ); + if ( link->fd != -1 ) { + // Uplink seems fine, relay requests to it... + uplink_sendRequests( link, true ); } } - // Done handling epoll sockets - const time_t now = time( NULL ); + // Uplink socket + if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + link->fd = -1; + close( events[EV_SOCKET].fd ); + events[EV_SOCKET].fd = -1; + logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); + } else if ( (events[EV_SOCKET].revents & POLLIN) ) { + uplink_handleReceive( link ); + if ( link->fd == -1 ) nextAltCheck = 0; + if ( _shutdown || link->shutdown ) goto cleanup; + } // Send keep alive if nothing is happening + const time_t now = time( NULL ); if ( link->fd != -1 && link->replicationHandle == 0 && now > nextKeepalive ) { nextKeepalive = now + 20; if ( !uplink_sendKeepalive( link->fd ) ) { @@ -426,7 +402,7 @@ static void* uplink_mainloop(void *data) } spin_unlock( &link->queueLock ); if ( resend ) - uplink_sendRequests( link, true ); + uplink_sendRequests( link, true ); } #endif } @@ -436,9 +412,9 @@ static void* uplink_mainloop(void *data) spin_lock( &link->queueLock ); link->image->uplink = NULL; const int fd = link->fd; - const int signal = link->signal; + const dnbd3_signal_t* signal = link->signal; link->fd = -1; - link->signal = -1; + link->signal = NULL; if ( !link->shutdown ) { link->shutdown = true; thread_detach( link->thread ); @@ -446,8 +422,7 @@ static void* uplink_mainloop(void *data) spin_unlock( &link->image->lock ); spin_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); - if ( signal != -1 ) signal_close( signal ); - if ( fdEpoll != -1 ) close( fdEpoll ); + if ( signal != NULL ) signal_close( signal ); // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); -- cgit v1.2.3-55-g7522