summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2016-02-05 15:05:30 +0100
committerSimon Rettberg2016-02-05 15:05:30 +0100
commit627645acc074eab7a3694a267bc2a643d8b3e57a (patch)
tree5f61225803c369ab1295ce0ee36a33ae6cb51eb8 /src/server/uplink.c
parent[SERVER] BREAKING: Get rid of pseudo case-insensitivity (diff)
downloaddnbd3-627645acc074eab7a3694a267bc2a643d8b3e57a.tar.gz
dnbd3-627645acc074eab7a3694a267bc2a643d8b3e57a.tar.xz
dnbd3-627645acc074eab7a3694a267bc2a643d8b3e57a.zip
First steps in make signals more abstract from the underlying mechanism; replace epoll with poll.
We now don't assume that a signal equals a single fd (eventfd on Linux). The next step would be to create a version of signal.c that uses a pipe internally, so it can be used on other platforms, like *BSD. This is also the reason epoll was replaced with poll in uplink.c
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c121
1 files changed, 48 insertions, 73 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 3f14266..d37cb4f 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -12,7 +12,6 @@
#include <pthread.h>
#include <sys/socket.h>
#include <string.h>
-#include <sys/epoll.h>
#include <sys/errno.h>
#include <assert.h>
#include <unistd.h>
@@ -21,6 +20,7 @@
#include <inttypes.h>
#include <zlib.h>
#include <fcntl.h>
+#include <poll.h>
static uint64_t totalBytesReceived = 0;
@@ -75,7 +75,7 @@ bool uplink_init(dnbd3_image_t *image, int sock, dnbd3_host_t *host)
link->bytesReceived = 0;
link->queueLen = 0;
link->fd = -1;
- link->signal = -1;
+ link->signal = NULL;
link->replicationHandle = 0;
spin_lock( &link->rttLock );
if ( sock >= 0 ) {
@@ -232,7 +232,9 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
spin_unlock( &uplink->queueLock );
if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
- signal_call( uplink->signal );
+ if ( signal_call( uplink->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Cannot wake up uplink thread; errno=%d", (int)errno );
+ }
}
return true;
}
@@ -243,37 +245,30 @@ bool uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uin
*/
static void* uplink_mainloop(void *data)
{
- const int MAXEVENTS = 3;
- struct epoll_event ev, events[MAXEVENTS];
+#define EV_SIGNAL (0)
+#define EV_SOCKET (1)
+#define EV_COUNT (2)
+ struct pollfd events[EV_COUNT];
dnbd3_connection_t *link = (dnbd3_connection_t*)data;
- int fdEpoll = -1;
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
int discoverFailCount = 0;
time_t nextAltCheck = 0, nextKeepalive = 0;
char buffer[200];
+ memset( events, 0, sizeof(events) );
//
assert( link != NULL );
setThreadName( "idle-uplink" );
blockNoncriticalSignals();
//
- fdEpoll = epoll_create( 2 );
- if ( fdEpoll == -1 ) {
- logadd( LOG_WARNING, "epoll_create failed. Uplink unavailable." );
- goto cleanup;
- }
link->signal = signal_new();
- if ( link->signal < 0 ) {
- logadd( LOG_WARNING, "error creating pipe. Uplink unavailable." );
- goto cleanup;
- }
- memset( &ev, 0, sizeof(ev) );
- ev.events = EPOLLIN;
- ev.data.fd = link->signal;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->signal, &ev ) < 0 ) {
- logadd( LOG_WARNING, "adding eventfd to epoll set failed" );
+ if ( link->signal == NULL ) {
+ logadd( LOG_WARNING, "error creating signal. Uplink unavailable." );
goto cleanup;
}
+ events[EV_SIGNAL].events = POLLIN;
+ events[EV_SIGNAL].fd = signal_getWaitFd( link->signal );
+ events[EV_SOCKET].fd = -1;
while ( !_shutdown && !link->shutdown ) {
// Check if server switch is in order
spin_lock( &link->rttLock );
@@ -305,71 +300,52 @@ static void* uplink_mainloop(void *data)
// Re-send all pending requests
uplink_sendRequests( link, false );
uplink_sendReplicationRequest( link );
- memset( &ev, 0, sizeof(ev) );
- ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI;
- ev.data.fd = link->fd;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
- logadd( LOG_WARNING, "adding uplink to epoll set failed" );
- goto cleanup;
- }
+ events[EV_SOCKET].events = POLLIN | POLLRDHUP;
+ events[EV_SOCKET].fd = link->fd;
nextAltCheck = time( NULL ) + altCheckInterval;
// The rtt worker already did the handshake for our image, so there's nothing
// more to do here
}
- // epoll()
+ // poll()
waitTime = (time( NULL ) - nextAltCheck) * 1000;
if ( waitTime < 1500 ) waitTime = 1500;
if ( waitTime > 5000 ) waitTime = 5000;
- numSocks = epoll_wait( fdEpoll, events, MAXEVENTS, waitTime );
+ numSocks = poll( events, EV_COUNT, waitTime );
if ( _shutdown || link->shutdown ) goto cleanup;
if ( numSocks == -1 ) { // Error?
if ( errno == EINTR ) continue;
- logadd( LOG_DEBUG1, "epoll_wait() error %d", (int)errno);
+ logadd( LOG_DEBUG1, "poll() error %d", (int)errno );
usleep( 10000 );
continue;
}
- // Check all events
- for ( i = 0; i < numSocks; ++i ) {
- // Check for errors....
- if ( (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) || !(events[i].events & EPOLLIN) ) {
- if ( events[i].data.fd == link->signal ) {
- logadd( LOG_WARNING, "epoll error on signal in uplink_mainloop!" );
- goto cleanup;
- }
- if ( events[i].data.fd == link->fd ) {
- link->fd = -1;
- close( events[i].data.fd );
- logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
- } else if ( events[i].data.fd == link->signal ) {
- logadd( LOG_DEBUG1, "Error on uplink signal fd!\n" );
- } else {
- logadd( LOG_DEBUG1, "Error on unknown FD in uplink epoll\n" );
- close( events[i].data.fd );
- }
- continue;
+ // Check events
+ // Signal
+ if ( (events[EV_SIGNAL].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ logadd( LOG_WARNING, "poll error on signal in uplink_mainloop!" );
+ goto cleanup;
+ } else if ( (events[EV_SIGNAL].revents & POLLIN) ) {
+ // signal triggered -> pending requests
+ if ( signal_clear( link->signal ) == SIGNAL_ERROR ) {
+ logadd( LOG_WARNING, "Errno on signal on uplink for %s! Things will break!", link->image->name );
}
- // No error, handle normally
- if ( events[i].data.fd == link->signal ) {
- // Event on the signal fd -> a client requests data
- if ( signal_clear( link->signal ) == SIGNAL_ERROR ) {
- logadd( LOG_WARNING, "Errno on eventfd on uplink for %s! Things will break!", link->image->name );
- }
- if ( link->fd != -1 ) {
- // Uplink seems fine, relay requests to it...
- uplink_sendRequests( link, true );
- }
- } else if ( events[i].data.fd == link->fd ) {
- uplink_handleReceive( link );
- if ( link->fd == -1 ) nextAltCheck = 0;
- if ( _shutdown || link->shutdown ) goto cleanup;
- } else {
- logadd( LOG_DEBUG1, "Sanity check: unknown FD ready on epoll! Closing...\n" );
- close( events[i].data.fd );
+ if ( link->fd != -1 ) {
+ // Uplink seems fine, relay requests to it...
+ uplink_sendRequests( link, true );
}
}
- // Done handling epoll sockets
- const time_t now = time( NULL );
+ // Uplink socket
+ if ( (events[EV_SOCKET].revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) ) {
+ link->fd = -1;
+ close( events[EV_SOCKET].fd );
+ events[EV_SOCKET].fd = -1;
+ logadd( LOG_DEBUG1, "Uplink gone away, panic!\n" );
+ } else if ( (events[EV_SOCKET].revents & POLLIN) ) {
+ uplink_handleReceive( link );
+ if ( link->fd == -1 ) nextAltCheck = 0;
+ if ( _shutdown || link->shutdown ) goto cleanup;
+ }
// Send keep alive if nothing is happening
+ const time_t now = time( NULL );
if ( link->fd != -1 && link->replicationHandle == 0 && now > nextKeepalive ) {
nextKeepalive = now + 20;
if ( !uplink_sendKeepalive( link->fd ) ) {
@@ -426,7 +402,7 @@ static void* uplink_mainloop(void *data)
}
spin_unlock( &link->queueLock );
if ( resend )
- uplink_sendRequests( link, true );
+ uplink_sendRequests( link, true );
}
#endif
}
@@ -436,9 +412,9 @@ static void* uplink_mainloop(void *data)
spin_lock( &link->queueLock );
link->image->uplink = NULL;
const int fd = link->fd;
- const int signal = link->signal;
+ const dnbd3_signal_t* signal = link->signal;
link->fd = -1;
- link->signal = -1;
+ link->signal = NULL;
if ( !link->shutdown ) {
link->shutdown = true;
thread_detach( link->thread );
@@ -446,8 +422,7 @@ static void* uplink_mainloop(void *data)
spin_unlock( &link->image->lock );
spin_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
- if ( signal != -1 ) signal_close( signal );
- if ( fdEpoll != -1 ) close( fdEpoll );
+ if ( signal != NULL ) signal_close( signal );
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );