From 627645acc074eab7a3694a267bc2a643d8b3e57a Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 5 Feb 2016 15:05:30 +0100 Subject: First steps in make signals more abstract from the underlying mechanism; replace epoll with poll. We now don't assume that a signal equals a single fd (eventfd on Linux). The next step would be to create a version of signal.c that uses a pipe internally, so it can be used on other platforms, like *BSD. This is also the reason epoll was replaced with poll in uplink.c --- src/server/uplink.c | 121 +++++++++++++++++++++------------------------------- 1 file changed, 48 insertions(+), 73 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 3f14266..d37cb4f 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -21,6 +20,7 @@ #include #include #include +#include static uint64_t totalBytesReceived = 0; @@ -75,7 +75,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host) link->bytesReceived = 0; link->queueLen = 0; link->fd = -1; - link->signal = -1; + link->signal = NULL; link->replicationHandle = 0; spin_lock( &link->rttLock ); if ( sock >= 0 ) { @@ -232,7 +232,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin spin_unlock( &uplink->queueLock ); if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - signal_call( uplink->signal ); + if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno ); + } } return true; } @@ -243,37 +245,30 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin */ static void* uplink_mainloop(void *data) { - const int MAXEVENTS = 3; - struct epoll_event ev, events[MAXEVENTS]; +#define EV_SIGNAL (0) +#define EV_SOCKET (1) +#define EV_COUNT (2) + struct pollfd events[EV_COUNT]; dnbd3_connection_t *link = (dnbd3_connection_t*)data; - int fdEpoll = -1; int numSocks, i, waitTime; int altCheckInterval = SERVER_RTT_DELAY_INIT; int discoverFailCount = 0; time_t nextAltCheck = 0, nextKeepalive = 0; char buffer[200]; + memset( events, 0, sizeof(events) ); // assert( link != NULL ); setThreadName( "idle-uplink" ); blockNoncriticalSignals(); // - fdEpoll = epoll_create( 2 ); - if ( fdEpoll == -1 ) { - logadd( LOG_WARNING, "epoll_create failed. Uplink unavailable." ); - goto cleanup; - } link->signal = signal_new(); - if ( link->signal < 0 ) { - logadd( LOG_WARNING, "error creating pipe. Uplink unavailable." ); - goto cleanup; - } - memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN; - ev.data.fd = link->signal; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) { - logadd( LOG_WARNING, "adding eventfd to epoll set failed" ); + if ( link->signal == NULL ) { + logadd( LOG_WARNING, "error creating signal. Uplink unavailable." ); goto cleanup; } + events[EV_SIGNAL].events = POLLIN; + events[EV_SIGNAL].fd = signal_getWaitFd( link->signal ); + events[EV_SOCKET].fd = -1; while ( !_shutdown && !link->shutdown ) { // Check if server switch is in order spin_lock( &link->rttLock ); @@ -305,71 +300,52 @@ static void* uplink_mainloop(void *data) // Re-send all pending requests uplink_sendRequests( link, false ); uplink_sendReplicationRequest( link ); - memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI; - ev.data.fd = link->fd; - if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { - logadd( LOG_WARNING, "adding uplink to epoll set failed" ); - goto cleanup; - } + events[EV_SOCKET].events = POLLIN | POLLRDHUP; + events[EV_SOCKET].fd = link->fd; nextAltCheck = time( NULL ) + altCheckInterval; // The rtt worker already did the handshake for our image, so there's nothing // more to do here } - // epoll() + // poll() waitTime = (time( NULL ) - nextAltCheck) * 1000; if ( waitTime < 1500 ) waitTime = 1500; if ( waitTime > 5000 ) waitTime = 5000; - numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime ); + numSocks = poll( events, EV_COUNT, waitTime ); if ( _shutdown || link->shutdown ) goto cleanup; if ( numSocks == -1 ) { // Error? if ( errno == EINTR ) continue; - logadd( LOG_DEBUG1, "epoll_wait() error %d", (int)errno); + logadd( LOG_DEBUG1, "poll() error %d", (int)errno ); usleep( 10000 ); continue; } - // Check all events - for ( i = 0; i < numSocks; ++i ) { - // Check for errors.... - if ( (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) || !(events[i].events & EPOLLIN) ) { - if ( events[i].data.fd == link->signal ) { - logadd( LOG_WARNING, "epoll error on signal in uplink_mainloop!" ); - goto cleanup; - } - if ( events[i].data.fd == link->fd ) { - link->fd = -1; - close( events[i].data.fd ); - logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); - } else if ( events[i].data.fd == link->signal ) { - logadd( LOG_DEBUG1, "Error on uplink signal fd!\n" ); - } else { - logadd( LOG_DEBUG1, "Error on unknown FD in uplink epoll\n" ); - close( events[i].data.fd ); - } - continue; + // Check events + // Signal + if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" ); + goto cleanup; + } else if ( (events[EV_SIGNAL].revents & POLLIN) ) { + // signal triggered -> pending requests + if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { + logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name ); } - // No error, handle normally - if ( events[i].data.fd == link->signal ) { - // Event on the signal fd -> a client requests data - if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { - logadd( LOG_WARNING, "Errno on eventfd on uplink for %s! Things will break!", link->image->name ); - } - if ( link->fd != -1 ) { - // Uplink seems fine, relay requests to it... - uplink_sendRequests( link, true ); - } - } else if ( events[i].data.fd == link->fd ) { - uplink_handleReceive( link ); - if ( link->fd == -1 ) nextAltCheck = 0; - if ( _shutdown || link->shutdown ) goto cleanup; - } else { - logadd( LOG_DEBUG1, "Sanity check: unknown FD ready on epoll! Closing...\n" ); - close( events[i].data.fd ); + if ( link->fd != -1 ) { + // Uplink seems fine, relay requests to it... + uplink_sendRequests( link, true ); } } - // Done handling epoll sockets - const time_t now = time( NULL ); + // Uplink socket + if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) { + link->fd = -1; + close( events[EV_SOCKET].fd ); + events[EV_SOCKET].fd = -1; + logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" ); + } else if ( (events[EV_SOCKET].revents & POLLIN) ) { + uplink_handleReceive( link ); + if ( link->fd == -1 ) nextAltCheck = 0; + if ( _shutdown || link->shutdown ) goto cleanup; + } // Send keep alive if nothing is happening + const time_t now = time( NULL ); if ( link->fd != -1 && link->replicationHandle == 0 && now > nextKeepalive ) { nextKeepalive = now + 20; if ( !uplink_sendKeepalive( link->fd ) ) { @@ -426,7 +402,7 @@ static void* uplink_mainloop(void *data) } spin_unlock( &link->queueLock ); if ( resend ) - uplink_sendRequests( link, true ); + uplink_sendRequests( link, true ); } #endif } @@ -436,9 +412,9 @@ static void* uplink_mainloop(void *data) spin_lock( &link->queueLock ); link->image->uplink = NULL; const int fd = link->fd; - const int signal = link->signal; + const dnbd3_signal_t* signal = link->signal; link->fd = -1; - link->signal = -1; + link->signal = NULL; if ( !link->shutdown ) { link->shutdown = true; thread_detach( link->thread ); @@ -446,8 +422,7 @@ static void* uplink_mainloop(void *data) spin_unlock( &link->image->lock ); spin_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); - if ( signal != -1 ) signal_close( signal ); - if ( fdEpoll != -1 ) close( fdEpoll ); + if ( signal != NULL ) signal_close( signal ); // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) usleep( 10000 ); -- cgit v1.2.3-55-g7522