From 895ac724d0fea228a1695348bd76b05c4c4f18a7 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 9 Jan 2015 18:09:12 +0100 Subject: [SERVER] Refactoring, extending protocol.h --- src/server/protocol.h | 35 +++++++++++++++++++++++++++-------- src/server/uplink.c | 35 +++++++++++++---------------------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/server/protocol.h b/src/server/protocol.h index 6d7af40..c0eda5b 100644 --- a/src/server/protocol.h +++ b/src/server/protocol.h @@ -4,16 +4,35 @@ #include "../types.h" #include "../serialize.h" -#define FLAGS8_SERVER 1 +#define FLAGS8_SERVER (1) -static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply) +#define REPLY_OK (0) +#define REPLY_ERRNO (-1) +#define REPLY_AGAIN (-2) +#define REPLY_INTR (-3) +#define REPLY_CLOSED (-4) +#define REPLY_INCOMPLETE (-5) +#define REPLY_WRONGMAGIC (-6) + +static inline int dnbd3_read_reply(int sock, dnbd3_reply_t *reply, bool wait) { - if ( recv( sock, reply, sizeof(*reply), MSG_WAITALL ) != sizeof(*reply) ) { - return false; + int ret = recv( sock, reply, sizeof(*reply), (wait ? MSG_WAITALL : MSG_DONTWAIT) | MSG_NOSIGNAL ); + if ( ret == 0 ) return REPLY_CLOSED; + if ( ret < 0 ) { + if ( ret == EAGAIN || ret == EWOULDBLOCK ) return REPLY_AGAIN; + if ( ret == EINTR ) return REPLY_INTR; + return REPLY_ERRNO; } + if ( !wait && ret != sizeof(*reply) ) ret += recv( sock, reply + ret, sizeof(*reply) - ret, MSG_WAITALL | MSG_NOSIGNAL ); + if ( ret != sizeof(*reply) ) return REPLY_INCOMPLETE; fixup_reply( *reply ); - if ( reply->magic != dnbd3_packet_magic ) return false; - return true; + if ( reply->magic != dnbd3_packet_magic ) return REPLY_WRONGMAGIC; + return REPLY_OK; +} + +static inline bool dnbd3_get_reply(int sock, dnbd3_reply_t *reply) +{ + return dnbd3_read_reply( sock, reply, true ) == REPLY_OK; } static inline bool dnbd3_select_image(int sock, char *lower_name, uint16_t rid, uint8_t flags8) @@ -74,7 +93,7 @@ static inline bool dnbd3_get_crc32(int sock, uint32_t *master, void *buffer, siz reply.size -= 4; if ( reply.cmd != CMD_GET_CRC32 || reply.size > *bufferLen ) return false; *bufferLen = reply.size; - if ( recv( sock, master, sizeof(uint32_t), MSG_WAITALL ) != sizeof(uint32_t) ) return false; + if ( recv( sock, master, sizeof(uint32_t), MSG_WAITALL | MSG_NOSIGNAL ) != sizeof(uint32_t) ) return false; int done = 0; while ( done < reply.size ) { const int ret = recv( sock, (char*)buffer + done, reply.size - done, 0 ); @@ -101,7 +120,7 @@ static inline bool dnbd3_select_image_reply(serialized_buffer_t *buffer, int soc return false; } // receive reply payload - if ( recv( sock, buffer, reply.size, MSG_WAITALL ) != reply.size ) { + if ( recv( sock, buffer, reply.size, MSG_WAITALL | MSG_NOSIGNAL ) != reply.size ) { return false; } // handle/check reply payload diff --git a/src/server/uplink.c b/src/server/uplink.c index 82c1f5a..d1ae48e 100644 --- a/src/server/uplink.c +++ b/src/server/uplink.c @@ -362,7 +362,7 @@ static void* uplink_mainloop(void *data) #ifdef _DEBUG if ( link->fd != -1 && !link->shutdown ) { bool resend = false; - time_t deadline = time( NULL ) - 10; + time_t deadline = time( NULL ) - 15; spin_lock( &link->queueLock ); for (i = 0; i < link->queueLen; ++i) { if ( link->queue[i].status != ULR_FREE && link->queue[i].entered < deadline ) { @@ -489,36 +489,29 @@ static void uplink_handleReceive(dnbd3_connection_t *link) dnbd3_reply_t inReply, outReply; int ret, i; for (;;) { - ret = recv( link->fd, &inReply, sizeof inReply, MSG_DONTWAIT | MSG_NOSIGNAL ); - if ( ret < 0 ) { - const int err = errno; - if ( err == EAGAIN || err == EWOULDBLOCK || err == EINTR ) break; // OK cases - goto error_cleanup; - } - if ( ret == 0 ) { + ret = dnbd3_read_reply( link->fd, &inReply, false ); + if ( ret == REPLY_INTR && !_shutdown && !link->shutdown ) continue; + if ( ret == REPLY_AGAIN ) break; + if ( ret == REPLY_CLOSED ) { memlogf( "[INFO] Uplink: Remote host hung up (%s)", link->image->path ); goto error_cleanup; } - if ( ret != sizeof inReply ) ret += recv( link->fd, &inReply + ret, sizeof(inReply) - ret, MSG_WAITALL | MSG_NOSIGNAL ); - if ( ret != sizeof inReply ) { - const int err = errno; - memlogf( "[INFO] Lost connection to uplink server for %s (header %d/%d, e=%d)", link->image->path, ret, (int)sizeof(inReply), - err ); + if ( ret == REPLY_WRONGMAGIC ) { + memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); goto error_cleanup; } - fixup_reply( inReply ); - if ( inReply.magic != dnbd3_packet_magic ) { - memlogf( "[WARNING] Uplink server's packet did not start with dnbd3_packet_magic (%s)", link->image->path ); + if ( ret != REPLY_OK ) { + memlogf( "[INFO] Uplink: Connection error (%s)", link->image->path ); goto error_cleanup; } if ( inReply.size > 9000000 ) { // TODO: Configurable memlogf( "[WARNING] Pure evil: Uplink server sent too much payload for %s", link->image->path ); goto error_cleanup; } + if ( link->recvBufferLen < inReply.size ) { - if ( link->recvBuffer != NULL ) free( link->recvBuffer ); - link->recvBufferLen = MIN(9000000, inReply.size + 8192); // XXX dont miss occurrence - link->recvBuffer = malloc( link->recvBufferLen ); + link->recvBufferLen = MIN(9000000, inReply.size + 65536); // XXX dont miss occurrence + link->recvBuffer = realloc( link->recvBuffer, link->recvBufferLen ); } uint32_t done = 0; while ( done < inReply.size ) { @@ -538,9 +531,7 @@ static void uplink_handleReceive(dnbd3_connection_t *link) const uint64_t end = inReply.handle + inReply.size; // 1) Write to cache file assert( link->image->cacheFd != -1 ); - iov[0].iov_base = link->recvBuffer; - iov[0].iov_len = inReply.size; - ret = (int)pwritev( link->image->cacheFd, iov, 1, start ); + ret = (int)pwrite( link->image->cacheFd, link->recvBuffer, inReply.size, start ); if ( ret > 0 ) image_updateCachemap( link->image, start, start + ret, true ); // 2) Figure out which clients are interested in it spin_lock( &link->queueLock ); -- cgit v1.2.3-55-g7522