summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSimon Rettberg2015-01-09 18:09:12 +0100
committerSimon Rettberg2015-01-09 18:09:12 +0100
commit895ac724d0fea228a1695348bd76b05c4c4f18a7 (patch)
treec5799ee8df6df57e1d752546e71f50079ca7fe84 /src
parent[SERVER] Support blocking signals, saves a syscall in threadpool (diff)
downloaddnbd3-895ac724d0fea228a1695348bd76b05c4c4f18a7.tar.gz
dnbd3-895ac724d0fea228a1695348bd76b05c4c4f18a7.tar.xz
dnbd3-895ac724d0fea228a1695348bd76b05c4c4f18a7.zip
[SERVER] Refactoring, extending protocol.h
Diffstat (limited to 'src')
-rw-r--r--src/server/protocol.h35
-rw-r--r--src/server/uplink.c35
2 files changed, 40 insertions, 30 deletions
diff --git a/src/server/protocol.h b/src/server/protocol.h
index 6d7af40..c0eda5b 100644
--- a/src/server/protocol.h
+++ b/src/server/protocol.h
@@ -4,16 +4,35 @@
#include "../types.h"
#include "../serialize.h"
-#define FLAGS8_SERVER 1
+#define FLAGS8_SERVER (1)
-static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply)
+#define REPLY_OK (0)
+#define REPLY_ERRNO (-1)
+#define REPLY_AGAIN (-2)
+#define REPLY_INTR (-3)
+#define REPLY_CLOSED (-4)
+#define REPLY_INCOMPLETE (-5)
+#define REPLY_WRONGMAGIC (-6)
+
+static inline int dnbd3_read_reply(int sock, dnbd3_reply_t *reply, bool wait)
{
- if ( recv( sock, reply, sizeof(*reply), MSG_WAITALL ) != sizeof(*reply) ) {
- return false;
+ int ret = recv( sock, reply, sizeof(*reply), (wait ? MSG_WAITALL : MSG_DONTWAIT) | MSG_NOSIGNAL );
+ if ( ret == 0 ) return REPLY_CLOSED;
+ if ( ret < 0 ) {
+ if ( ret == EAGAIN || ret == EWOULDBLOCK ) return REPLY_AGAIN;
+ if ( ret == EINTR ) return REPLY_INTR;
+ return REPLY_ERRNO;
}
+ if ( !wait && ret != sizeof(*reply) ) ret += recv( sock, reply + ret, sizeof(*reply) - ret, MSG_WAITALL | MSG_NOSIGNAL );
+ if ( ret != sizeof(*reply) ) return REPLY_INCOMPLETE;
fixup_reply( *reply );
- if ( reply->magic != dnbd3_packet_magic ) return false;
- return true;
+ if ( reply->magic != dnbd3_packet_magic ) return REPLY_WRONGMAGIC;
+ return REPLY_OK;
+}
+
+static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply)
+{
+ return dnbd3_read_reply( sock, reply, true ) == REPLY_OK;
}
static inline bool dnbd3_select_image(int sock, char *lower_name, uint16_t rid, uint8_t flags8)
@@ -74,7 +93,7 @@ static inline bool dnbd3_get_crc32(int sock, uint32_t *master, void *buffer, siz
reply.size -= 4;
if ( reply.cmd != CMD_GET_CRC32 || reply.size > *bufferLen ) return false;
*bufferLen = reply.size;
- if ( recv( sock, master, sizeof(uint32_t), MSG_WAITALL ) != sizeof(uint32_t) ) return false;
+ if ( recv( sock, master, sizeof(uint32_t), MSG_WAITALL | MSG_NOSIGNAL ) != sizeof(uint32_t) ) return false;
int done = 0;
while ( done < reply.size ) {
const int ret = recv( sock, (char*)buffer + done, reply.size - done, 0 );
@@ -101,7 +120,7 @@ static inline bool dnbd3_select_image_reply(serialized_buffer_t *buffer, int soc
return false;
}
// receive reply payload
- if ( recv( sock, buffer, reply.size, MSG_WAITALL ) != reply.size ) {
+ if ( recv( sock, buffer, reply.size, MSG_WAITALL | MSG_NOSIGNAL ) != reply.size ) {
return false;
}
// handle/check reply payload
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 82c1f5a..d1ae48e 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -362,7 +362,7 @@ static void* uplink_mainloop(void *data)
#ifdef _DEBUG
if ( link->fd != -1 && !link->shutdown ) {
bool resend = false;
- time_t deadline = time( NULL ) - 10;
+ time_t deadline = time( NULL ) - 15;
spin_lock( &link->queueLock );
for (i = 0; i < link->queueLen; ++i) {
if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) {
@@ -489,36 +489,29 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
dnbd3_reply_t inReply, outReply;
int ret, i;
for (;;) {
- ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT | MSG_NOSIGNAL );
- if ( ret < 0 ) {
- const int err = errno;
- if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) break; // OK cases
- goto error_cleanup;
- }
- if ( ret == 0 ) {
+ ret = dnbd3_read_reply( link->fd, &inReply, false );
+ if ( ret == REPLY_INTR && !_shutdown && !link->shutdown ) continue;
+ if ( ret == REPLY_AGAIN ) break;
+ if ( ret == REPLY_CLOSED ) {
memlogf( "[INFO] Uplink: Remote host hung up (%s)", link->image->path );
goto error_cleanup;
}
- if ( ret != sizeof inReply ) ret += recv( link->fd, &inReply + ret, sizeof(inReply) - ret, MSG_WAITALL | MSG_NOSIGNAL );
- if ( ret != sizeof inReply ) {
- const int err = errno;
- memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply),
- err );
+ if ( ret == REPLY_WRONGMAGIC ) {
+ memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
goto error_cleanup;
}
- fixup_reply( inReply );
- if ( inReply.magic != dnbd3_packet_magic ) {
- memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path );
+ if ( ret != REPLY_OK ) {
+ memlogf( "[INFO] Uplink: Connection error (%s)", link->image->path );
goto error_cleanup;
}
if ( inReply.size > 9000000 ) { // TODO: Configurable
memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path );
goto error_cleanup;
}
+
if ( link->recvBufferLen < inReply.size ) {
- if ( link->recvBuffer != NULL ) free( link->recvBuffer );
- link->recvBufferLen = MIN(9000000, inReply.size + 8192); // XXX dont miss occurrence
- link->recvBuffer = malloc( link->recvBufferLen );
+ link->recvBufferLen = MIN(9000000, inReply.size + 65536); // XXX dont miss occurrence
+ link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen );
}
uint32_t done = 0;
while ( done < inReply.size ) {
@@ -538,9 +531,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link)
const uint64_t end = inReply.handle + inReply.size;
// 1) Write to cache file
assert( link->image->cacheFd != -1 );
- iov[0].iov_base = link->recvBuffer;
- iov[0].iov_len = inReply.size;
- ret = (int)pwritev( link->image->cacheFd, iov, 1, start );
+ ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start );
if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true );
// 2) Figure out which clients are interested in it
spin_lock( &link->queueLock );