summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
authorSimon Rettberg2014-12-31 17:19:20 +0100
committerSimon Rettberg2014-12-31 17:19:20 +0100
commit7b8e3ffcbccf6d02bfcb0c9c9c2d259362357fb8 (patch)
tree80dd759e320ff74bc8bb56fb9a9858c57f99320a /src/server/uplink.c
parent[SERVER] Add setting to enable/disable background replication, add comments t... (diff)
downloaddnbd3-7b8e3ffcbccf6d02bfcb0c9c9c2d259362357fb8.tar.gz
dnbd3-7b8e3ffcbccf6d02bfcb0c9c9c2d259362357fb8.tar.xz
dnbd3-7b8e3ffcbccf6d02bfcb0c9c9c2d259362357fb8.zip
[SERVER] Create compilation unit for wait/signalling logic (using eventfd)
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c40
1 files changed, 14 insertions, 26 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 5b05873..cfe959c 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -7,13 +7,13 @@
#include "altservers.h"
#include "helper.h"
#include "protocol.h"
+#include "signal.h"
#include <pthread.h>
#include <sys/socket.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/errno.h>
-#include <sys/eventfd.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
@@ -103,8 +103,7 @@ void uplink_shutdown(dnbd3_image_t *image)
}
image->uplink = NULL;
uplink->shutdown = TRUE;
- static uint64_t counter = 1;
- if ( uplink->signal != -1 ) write( uplink->signal, &counter, sizeof(counter) );
+ if ( uplink->signal != -1 ) signal_call( uplink->signal );
pthread_t thread = uplink->thread;
spin_unlock( &uplink->queueLock );
spin_unlock( &image->lock );
@@ -190,12 +189,10 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
#ifdef _DEBUG
uplink->queue[freeSlot].entered = time( NULL );
#endif
- const int signalFd = uplink->signal;
spin_unlock( &uplink->queueLock );
if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
- static uint64_t counter = 1;
- write( signalFd, &counter, sizeof(counter) );
+ signal_call( uplink->signal );
}
return TRUE;
}
@@ -225,7 +222,7 @@ static void* uplink_mainloop(void *data)
memlogf( "[WARNING] epoll_create failed. Uplink unavailable." );
goto cleanup;
}
- link->signal = eventfd( 0, EFD_NONBLOCK );
+ link->signal = signal_new();
if ( link->signal < 0 ) {
memlogf( "[WARNING] error creating pipe. Uplink unavailable." );
goto cleanup;
@@ -264,7 +261,7 @@ static void* uplink_mainloop(void *data)
setThreadName( buffer );
}
memset( &ev, 0, sizeof(ev) );
- ev.events = EPOLLIN;
+ ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI;
ev.data.fd = link->fd;
if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
memlogf( "[WARNING] adding uplink to epoll set failed" );
@@ -288,9 +285,9 @@ static void* uplink_mainloop(void *data)
// Check all events
for (i = 0; i < numSocks; ++i) {
// Check for errors....
- if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) {
+ if ( (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) || !(events[i].events & EPOLLIN) ) {
if ( events[i].data.fd == link->signal ) {
- memlogf( "[WARNING] epoll error on signal-pipe!" );
+ memlogf( "[WARNING] epoll error on signal in uplink_mainloop!" );
goto cleanup;
}
if ( events[i].data.fd == link->fd ) {
@@ -307,18 +304,8 @@ static void* uplink_mainloop(void *data)
// No error, handle normally
if ( events[i].data.fd == link->signal ) {
// Event on the signal fd -> a client requests data
- int ret;
- do {
- ret = read( link->signal, buffer, sizeof buffer );
- } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up
- if ( ret == 0 ) {
- memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name );
- }
- if ( ret < 0 ) {
- ret = errno;
- if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) {
- memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name );
- }
+ if ( signal_clear( link->signal ) == SIGNAL_ERROR ) {
+ memlogf( "[WARNING] Errno on eventfd on uplink for %s! Things will break!", link->image->lower_name );
}
if ( link->fd != -1 ) {
// Uplink seems fine, relay requests to it...
@@ -372,6 +359,7 @@ static void* uplink_mainloop(void *data)
discoverFailCount++;
nextAltCheck = time( NULL ) + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED);
}
+ // TODO: If background replication is disabled, send keepalive every 30 seconds or so
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
int resend = FALSE;
@@ -411,7 +399,7 @@ static void* uplink_mainloop(void *data)
spin_unlock( &link->image->lock );
spin_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
- if ( signal != -1 ) close( signal );
+ if ( signal != -1 ) signal_close( signal );
if ( fdEpoll != -1 ) close( fdEpoll );
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
@@ -536,13 +524,13 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
goto error_cleanup;
}
- if ( inReply.size > 9000000 ) {
+ if ( inReply.size > 9000000 ) { // TODO: Configurable
memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
goto error_cleanup;
}
if ( link->recvBufferLen < inReply.size ) {
if ( link->recvBuffer != NULL ) free( link->recvBuffer );
- link->recvBufferLen = MIN(9000000, inReply.size + 8192);
+ link->recvBufferLen = MIN(9000000, inReply.size + 8192); // XXX dont miss occurrence
link->recvBuffer = malloc( link->recvBufferLen );
}
uint32_t done = 0;
@@ -599,7 +587,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
req->status = ULR_FREE;
pthread_mutex_lock( &client->sendMutex );
spin_unlock( &link->queueLock );
- writev( client->sock, iov, 2 );
+ if ( client->sock != -1 ) writev( client->sock, iov, 2 );
pthread_mutex_unlock( &client->sendMutex );
spin_lock( &link->queueLock );
}