From 7b8e3ffcbccf6d02bfcb0c9c9c2d259362357fb8 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 31 Dec 2014 17:19:20 +0100 Subject: [SERVER] Create compilation unit for wait/signalling logic (using eventfd) --- src/server/uplink.c | 40 ++++++++++++++-------------------------- 1 file changed, 14 insertions(+), 26 deletions(-) (limited to 'src/server/uplink.c') diff --git a/src/server/uplink.c b/src/server/uplink.c index 5b05873..cfe959c 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -7,13 +7,13 @@ #include "altservers.h" #include "helper.h" #include "protocol.h" +#include "signal.h" #include #include #include #include #include -#include #include #include #include @@ -103,8 +103,7 @@ void uplink_shutdown(dnbd3_image_t *image) } image->uplink = NULL; uplink->shutdown = TRUE; - static uint64_t counter = 1; - if ( uplink->signal != -1 ) write( uplink->signal, &counter, sizeof(counter) ); + if ( uplink->signal != -1 ) signal_call( uplink->signal ); pthread_t thread = uplink->thread; spin_unlock( &uplink->queueLock ); spin_unlock( &image->lock ); @@ -190,12 +189,10 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint #ifdef _DEBUG uplink->queue[freeSlot].entered = time( NULL ); #endif - const int signalFd = uplink->signal; spin_unlock( &uplink->queueLock ); if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed - static uint64_t counter = 1; - write( signalFd, &counter, sizeof(counter) ); + signal_call( uplink->signal ); } return TRUE; } @@ -225,7 +222,7 @@ static void* uplink_mainloop(void *data) memlogf( "[WARNING] epoll_create failed. Uplink unavailable." ); goto cleanup; } - link->signal = eventfd( 0, EFD_NONBLOCK ); + link->signal = signal_new(); if ( link->signal < 0 ) { memlogf( "[WARNING] error creating pipe. Uplink unavailable." ); goto cleanup; @@ -264,7 +261,7 @@ static void* uplink_mainloop(void *data) setThreadName( buffer ); } memset( &ev, 0, sizeof(ev) ); - ev.events = EPOLLIN; + ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI; ev.data.fd = link->fd; if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) { memlogf( "[WARNING] adding uplink to epoll set failed" ); @@ -288,9 +285,9 @@ static void* uplink_mainloop(void *data) // Check all events for (i = 0; i < numSocks; ++i) { // Check for errors.... - if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) { + if ( (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) || !(events[i].events & EPOLLIN) ) { if ( events[i].data.fd == link->signal ) { - memlogf( "[WARNING] epoll error on signal-pipe!" ); + memlogf( "[WARNING] epoll error on signal in uplink_mainloop!" ); goto cleanup; } if ( events[i].data.fd == link->fd ) { @@ -307,18 +304,8 @@ static void* uplink_mainloop(void *data) // No error, handle normally if ( events[i].data.fd == link->signal ) { // Event on the signal fd -> a client requests data - int ret; - do { - ret = read( link->signal, buffer, sizeof buffer ); - } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up - if ( ret == 0 ) { - memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name ); - } - if ( ret < 0 ) { - ret = errno; - if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) { - memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name ); - } + if ( signal_clear( link->signal ) == SIGNAL_ERROR ) { + memlogf( "[WARNING] Errno on eventfd on uplink for %s! Things will break!", link->image->lower_name ); } if ( link->fd != -1 ) { // Uplink seems fine, relay requests to it... @@ -372,6 +359,7 @@ static void* uplink_mainloop(void *data) discoverFailCount++; nextAltCheck = time( NULL ) + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED); } + // TODO: If background replication is disabled, send keepalive every 30 seconds or so #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { int resend = FALSE; @@ -411,7 +399,7 @@ static void* uplink_mainloop(void *data) spin_unlock( &link->image->lock ); spin_unlock( &link->queueLock ); if ( fd != -1 ) close( fd ); - if ( signal != -1 ) close( signal ); + if ( signal != -1 ) signal_close( signal ); if ( fdEpoll != -1 ) close( fdEpoll ); // Wait for the RTT check to finish/fail if it's in progress while ( link->rttTestResult == RTT_INPROGRESS ) @@ -536,13 +524,13 @@ static void uplink_handle_receive(dnbd3_connection_t *link) memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); goto error_cleanup; } - if ( inReply.size > 9000000 ) { + if ( inReply.size > 9000000 ) { // TODO: Configurable memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } if ( link->recvBufferLen < inReply.size ) { if ( link->recvBuffer != NULL ) free( link->recvBuffer ); - link->recvBufferLen = MIN(9000000, inReply.size + 8192); + link->recvBufferLen = MIN(9000000, inReply.size + 8192); // XXX dont miss occurrence link->recvBuffer = malloc( link->recvBufferLen ); } uint32_t done = 0; @@ -599,7 +587,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link) req->status = ULR_FREE; pthread_mutex_lock( &client->sendMutex ); spin_unlock( &link->queueLock ); - writev( client->sock, iov, 2 ); + if ( client->sock != -1 ) writev( client->sock, iov, 2 ); pthread_mutex_unlock( &client->sendMutex ); spin_lock( &link->queueLock ); } -- cgit v1.2.3-55-g7522