summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/altservers.c42
-rw-r--r--src/server/globals.c2
-rw-r--r--src/server/signal.c46
-rw-r--r--src/server/signal.h42
-rw-r--r--src/server/uplink.c40
5 files changed, 118 insertions, 54 deletions
diff --git a/src/server/altservers.c b/src/server/altservers.c
index 106e07a..7781e69 100644
--- a/src/server/altservers.c
+++ b/src/server/altservers.c
@@ -6,6 +6,7 @@
#include "helper.h"
#include "globals.h"
#include "image.h"
+#include "signal.h"
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
@@ -19,7 +20,7 @@
static dnbd3_connection_t *pending[SERVER_MAX_PENDING_ALT_CHECKS];
static pthread_spinlock_t pendingLockProduce; // Lock for adding something to pending. (NULL -> nonNULL)
-static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removegin something (nunNULL -> NULL)
+static pthread_mutex_t pendingLockConsume = PTHREAD_MUTEX_INITIALIZER; // Lock for removing something (nonNULL -> NULL)
static int signalPipe = -1;
static dnbd3_alt_server_t altServers[SERVER_MAX_ALTS];
@@ -51,7 +52,7 @@ void altservers_init()
void altservers_shutdown()
{
if ( !initDone ) return;
- write( signalPipe, "", 1 ); // Wake altservers thread up
+ signal_call( signalPipe ); // Wake altservers thread up
thread_join( altThread, NULL );
}
@@ -147,7 +148,7 @@ void altservers_findUplink(dnbd3_connection_t *uplink)
pending[i] = uplink;
uplink->rttTestResult = RTT_INPROGRESS;
spin_unlock( &pendingLockProduce );
- write( signalPipe, "", 1 ); // Wake altservers thread up
+ signal_call( signalPipe ); // Wake altservers thread up
return;
}
// End of loop - no free slot
@@ -341,7 +342,7 @@ static void *altservers_main(void *data)
const int MAXEVENTS = 3;
const int ALTS = 4;
struct epoll_event ev, events[MAXEVENTS];
- int readPipe = -1, fdEpoll = -1;
+ int fdEpoll = -1;
int numSocks, ret, itLink, itAlt, numAlts;
int found;
char buffer[DNBD3_BLOCK_SIZE ];
@@ -365,19 +366,15 @@ static void *altservers_main(void *data)
goto cleanup;
}
{
- int pipes[2];
- if ( pipe( pipes ) < 0 ) {
- memlogf( "[WARNING] error creating pipe. Uplink unavailable." );
+ signalPipe = signal_new();
+ if ( signalPipe < 0 ) {
+ memlogf( "[WARNING] error creating signal object. Uplink feature unavailable." );
goto cleanup;
}
- sock_set_nonblock( pipes[0] );
- sock_set_nonblock( pipes[1] );
- readPipe = pipes[0];
- signalPipe = pipes[1];
memset( &ev, 0, sizeof(ev) );
ev.events = EPOLLIN;
- ev.data.fd = readPipe;
- if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, readPipe, &ev ) < 0 ) {
+ ev.data.fd = signalPipe;
+ if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, signalPipe, &ev ) < 0 ) {
memlogf( "[WARNING] adding read-signal-pipe to epoll set failed" );
goto cleanup;
}
@@ -392,18 +389,10 @@ static void *altservers_main(void *data)
usleep( 100000 );
}
if ( _shutdown ) goto cleanup;
- // Empty pipe
- do {
- ret = read( readPipe, buffer, sizeof buffer );
- } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up
- if ( ret == 0 ) {
- memlogf( "[WARNING] Signal pipe of alservers_main closed! Things will break!" );
- }
- if ( ret < 0 ) {
- ret = errno;
- if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) {
- memlogf( "[WARNING] Errno %d on pipe-read on alservers_main! Things will break!", ret );
- }
+ // Clear signal
+ ret = signal_clear( signalPipe );
+ if ( ret == SIGNAL_ERROR ) {
+ memlogf( "[WARNING] Error on signal_clear on alservers_main! Things will break!" );
}
// Work your way through the queue
for (itLink = 0; itLink < SERVER_MAX_PENDING_ALT_CHECKS; ++itLink) {
@@ -548,8 +537,7 @@ static void *altservers_main(void *data)
}
cleanup: ;
if ( fdEpoll != -1 ) close( fdEpoll );
- if ( readPipe != -1 ) close( readPipe );
- if ( signalPipe != -1 ) close( signalPipe );
+ if ( signalPipe != -1 ) signal_close( signalPipe );
signalPipe = -1;
return NULL ;
}
diff --git a/src/server/globals.c b/src/server/globals.c
index 37bbf90..0f26169 100644
--- a/src/server/globals.c
+++ b/src/server/globals.c
@@ -16,7 +16,7 @@ int _isProxy = FALSE;
int _proxyPrivateOnly = FALSE;
int _uplinkTimeout = 1250;
int _clientTimeout = 15000;
-int _backgroundReplication = FALSE;
+int _backgroundReplication = TRUE;
#define SAVE_TO_VAR_STR(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) { if (_ ## kk != NULL) free(_ ## kk); _ ## kk = strdup(value); } } while (0)
#define SAVE_TO_VAR_BOOL(ss, kk) do { if (strcmp(section, #ss) == 0 && strcmp(key, #kk) == 0) _ ## kk = atoi(value) != 0 || strcmp(value, "true") == 0 || strcmp(value, "True") == 0 || strcmp(value, "TRUE") == 0; } while (0)
diff --git a/src/server/signal.c b/src/server/signal.c
new file mode 100644
index 0000000..f0988b9
--- /dev/null
+++ b/src/server/signal.c
@@ -0,0 +1,46 @@
+#include "signal.h"
+#include <sys/eventfd.h>
+#include <poll.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <unistd.h>
+
+int signal_new()
+{
+ return eventfd( 0, EFD_NONBLOCK );
+}
+
+int signal_call(int signalFd)
+{
+ static uint64_t one = 1;
+ return write( signalFd, &one, sizeof one ) == sizeof one;
+}
+
+int signal_wait(int signalFd, int timeoutMs)
+{
+ struct pollfd ps = {
+ .fd = signalFd,
+ .events = POLLIN
+ };
+ int ret = poll( &ps, 1, timeoutMs );
+ if ( ret == 0 ) return SIGNAL_TIMEOUT;
+ if ( ret == -1 ) return SIGNAL_ERROR;
+ if ( ps.revents & ( POLLERR | POLLNVAL ) ) return SIGNAL_ERROR;
+ return signal_clear( signalFd );
+}
+
+int signal_clear(int signalFd)
+{
+ uint64_t ret;
+ if ( read( signalFd, &ret, sizeof ret ) != sizeof ret ) {
+ if ( errno == EAGAIN ) return 0;
+ return SIGNAL_ERROR;
+ }
+ return (int)ret;
+}
+
+void signal_close(int signalFd)
+{
+ close( signalFd );
+}
+
diff --git a/src/server/signal.h b/src/server/signal.h
new file mode 100644
index 0000000..0504274
--- /dev/null
+++ b/src/server/signal.h
@@ -0,0 +1,42 @@
+#ifndef _SIGNAL_H_
+#define _SIGNAL_H_
+
+#define SIGNAL_OK (0)
+#define SIGNAL_TIMEOUT (-2)
+#define SIGNAL_ERROR (-1)
+
+/**
+ * Create a new signal fd (eventfd), nonblocking.
+ * @return >= 0 on success, which is the fd; < 0 on error
+ */
+int signal_new();
+
+/**
+ * Trigger the given signal, so a wait or clear call will succeed.
+ * @return SIGNAL_OK on success, SIGNAL_ERROR on error
+ */
+int signal_call(int signalFd);
+
+/**
+ * Wait for given signal, with an optional timeout.
+ * If timeout == 0, wait forever.
+ * @return > 0 telling how many times the signal was called,
+ * SIGNAL_TIMEOUT if the timeout was reached,
+ * SIGNAL_ERROR if some error occured
+ */
+int signal_wait(int signalFd, int timeoutMs);
+
+/**
+ * Clears any pending signals on this signal fd.
+ * @return number of signals that were pending,
+ * SIGNAL_ERROR if some error occured
+ */
+int signal_clear(int signalFd);
+
+/**
+ * Close the given signal.
+ */
+void signal_close(int signalFd);
+
+#endif
+
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 5b05873..cfe959c 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -7,13 +7,13 @@
#include "altservers.h"
#include "helper.h"
#include "protocol.h"
+#include "signal.h"
#include <pthread.h>
#include <sys/socket.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/errno.h>
-#include <sys/eventfd.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
@@ -103,8 +103,7 @@ void uplink_shutdown(dnbd3_image_t *image)
}
image->uplink = NULL;
uplink->shutdown = TRUE;
- static uint64_t counter = 1;
- if ( uplink->signal != -1 ) write( uplink->signal, &counter, sizeof(counter) );
+ if ( uplink->signal != -1 ) signal_call( uplink->signal );
pthread_t thread = uplink->thread;
spin_unlock( &uplink->queueLock );
spin_unlock( &image->lock );
@@ -190,12 +189,10 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
#ifdef _DEBUG
uplink->queue[freeSlot].entered = time( NULL );
#endif
- const int signalFd = uplink->signal;
spin_unlock( &uplink->queueLock );
if ( foundExisting == -1 ) { // Only wake up uplink thread if the request needs to be relayed
- static uint64_t counter = 1;
- write( signalFd, &counter, sizeof(counter) );
+ signal_call( uplink->signal );
}
return TRUE;
}
@@ -225,7 +222,7 @@ static void* uplink_mainloop(void *data)
memlogf( "[WARNING] epoll_create failed. Uplink unavailable." );
goto cleanup;
}
- link->signal = eventfd( 0, EFD_NONBLOCK );
+ link->signal = signal_new();
if ( link->signal < 0 ) {
memlogf( "[WARNING] error creating pipe. Uplink unavailable." );
goto cleanup;
@@ -264,7 +261,7 @@ static void* uplink_mainloop(void *data)
setThreadName( buffer );
}
memset( &ev, 0, sizeof(ev) );
- ev.events = EPOLLIN;
+ ev.events = EPOLLIN | EPOLLRDHUP | EPOLLPRI;
ev.data.fd = link->fd;
if ( epoll_ctl( fdEpoll, EPOLL_CTL_ADD, link->fd, &ev ) < 0 ) {
memlogf( "[WARNING] adding uplink to epoll set failed" );
@@ -288,9 +285,9 @@ static void* uplink_mainloop(void *data)
// Check all events
for (i = 0; i < numSocks; ++i) {
// Check for errors....
- if ( (events[i].events & (EPOLLERR | EPOLLHUP)) || !(events[i].events & EPOLLIN) ) {
+ if ( (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) || !(events[i].events & EPOLLIN) ) {
if ( events[i].data.fd == link->signal ) {
- memlogf( "[WARNING] epoll error on signal-pipe!" );
+ memlogf( "[WARNING] epoll error on signal in uplink_mainloop!" );
goto cleanup;
}
if ( events[i].data.fd == link->fd ) {
@@ -307,18 +304,8 @@ static void* uplink_mainloop(void *data)
// No error, handle normally
if ( events[i].data.fd == link->signal ) {
// Event on the signal fd -> a client requests data
- int ret;
- do {
- ret = read( link->signal, buffer, sizeof buffer );
- } while ( ret == sizeof buffer ); // Throw data away, this is just used for waking this thread up
- if ( ret == 0 ) {
- memlogf( "[WARNING] Eventfd of uplink for %s closed! Things will break!", link->image->lower_name );
- }
- if ( ret < 0 ) {
- ret = errno;
- if ( ret != EAGAIN && ret != EWOULDBLOCK && ret != EBUSY && ret != EINTR ) {
- memlogf( "[WARNING] Errno %d on eventfd on uplink for %s! Things will break!", ret, link->image->lower_name );
- }
+ if ( signal_clear( link->signal ) == SIGNAL_ERROR ) {
+ memlogf( "[WARNING] Errno on eventfd on uplink for %s! Things will break!", link->image->lower_name );
}
if ( link->fd != -1 ) {
// Uplink seems fine, relay requests to it...
@@ -372,6 +359,7 @@ static void* uplink_mainloop(void *data)
discoverFailCount++;
nextAltCheck = time( NULL ) + (discoverFailCount < 5 ? altCheckInterval : SERVER_RTT_DELAY_FAILED);
}
+ // TODO: If background replication is disabled, send keepalive every 30 seconds or so
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
int resend = FALSE;
@@ -411,7 +399,7 @@ static void* uplink_mainloop(void *data)
spin_unlock( &link->image->lock );
spin_unlock( &link->queueLock );
if ( fd != -1 ) close( fd );
- if ( signal != -1 ) close( signal );
+ if ( signal != -1 ) signal_close( signal );
if ( fdEpoll != -1 ) close( fdEpoll );
// Wait for the RTT check to finish/fail if it's in progress
while ( link->rttTestResult == RTT_INPROGRESS )
@@ -536,13 +524,13 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
goto error_cleanup;
}
- if ( inReply.size > 9000000 ) {
+ if ( inReply.size > 9000000 ) { // TODO: Configurable
memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
goto error_cleanup;
}
if ( link->recvBufferLen < inReply.size ) {
if ( link->recvBuffer != NULL ) free( link->recvBuffer );
- link->recvBufferLen = MIN(9000000, inReply.size + 8192);
+ link->recvBufferLen = MIN(9000000, inReply.size + 8192); // XXX dont miss occurrence
link->recvBuffer = malloc( link->recvBufferLen );
}
uint32_t done = 0;
@@ -599,7 +587,7 @@ static void uplink_handle_receive(dnbd3_connection_t *link)
req->status = ULR_FREE;
pthread_mutex_lock( &client->sendMutex );
spin_unlock( &link->queueLock );
- writev( client->sock, iov, 2 );
+ if ( client->sock != -1 ) writev( client->sock, iov, 2 );
pthread_mutex_unlock( &client->sendMutex );
spin_lock( &link->queueLock );
}