From acac3e9ac91505ab50321b465e7f44f277e9454f Mon Sep 17 00:00:00 2001 From: sr Date: Tue, 15 Jan 2013 13:11:58 +0100 Subject: [SERVER] Add socket helper module to simplify connection setup [SERVER] Add more comments to job.c --- src/server/job.c | 80 ++++++++++++++++++------------------------------- src/server/rpc.c | 20 ++----------- src/server/sockhelper.c | 64 +++++++++++++++++++++++++++++++++++++++ src/server/sockhelper.h | 23 ++++++++++++++ 4 files changed, 118 insertions(+), 69 deletions(-) create mode 100644 src/server/sockhelper.c create mode 100644 src/server/sockhelper.h diff --git a/src/server/job.c b/src/server/job.c index cc3065c..b9e17f6 100644 --- a/src/server/job.c +++ b/src/server/job.c @@ -13,9 +13,7 @@ #include #include #include -#include -#include -#include +#include "sockhelper.h" #include @@ -136,7 +134,7 @@ static void connect_proxy_images() msg.len = (uint16_t)sizeof(dnbd3_ioctl_t); msg.read_ahead_kb = DEFAULT_READ_AHEAD_KB; msg.is_server = TRUE; - for (n = 0 ;; ++n) + for (n = 0 ;; ++n) // Iterate over all images { pthread_spin_lock(&_spinlock); dnbd3_image_t *image = g_slist_nth_data(_dnbd3_images, n); @@ -146,7 +144,7 @@ static void connect_proxy_images() break; } if (image->working && image->cache_map && image->file) - { // Check if cache is complete + { // Image is relayed and already connected, check if cache is complete int complete = TRUE, j; const int map_len_bytes = IMGSIZE_TO_MAPBYTES(image->filesize); for (j = 0; j < map_len_bytes - 1; ++j) @@ -157,8 +155,8 @@ static void connect_proxy_images() break; } } - if (complete) - { + if (complete) // Every block except the last one is complete + { // Last one might need extra treatment if it's not a full byte const int blocks_in_last_byte = (image->filesize >> 12) & 7; uint8_t last_byte = 0; if (blocks_in_last_byte == 0) @@ -168,7 +166,7 @@ static void connect_proxy_images() last_byte |= (1 << j); complete = ((image->cache_map[map_len_bytes - 1] & last_byte) == last_byte); } - if (!complete) + if (!complete) // Image is not complete, finished handling it { pthread_spin_unlock(&_spinlock); continue; @@ -192,13 +190,14 @@ static void connect_proxy_images() continue; } if (image->working || image->file || image->low_name == NULL) - { // Nothing to do + { // Image is a local one, nothing to do pthread_spin_unlock(&_spinlock); continue; } + // Image is relayed and not connected yet char *devname = get_free_device(); if (devname == NULL) - { // All devices busy + { // All devices busy, can't connect pthread_spin_unlock(&_spinlock); continue; } @@ -209,15 +208,15 @@ static void connect_proxy_images() memcpy(servers, image->servers, sizeof(servers[0]) * NUMBER_SERVERS); pthread_spin_unlock(&_spinlock); int dh = open(devname, O_RDWR); - if (dh < 0) + if (dh < 0) // Open device so we can issue ioctls to it { pthread_spin_lock(&_spinlock); - return_free_device(devname); + return_free_device(devname); // Failed :-( pthread_spin_unlock(&_spinlock); continue; } for (s = 0; s < NUMBER_SERVERS; ++s) - { + { // Try to connect to any of the alt servers known for that image if (servers[s].host.type == 0) continue; // connect device @@ -229,7 +228,9 @@ static void connect_proxy_images() if (ioctl(dh, IOCTL_OPEN, &msg) < 0) continue; printf("[DEBUG] Connected! Adding alt servers...\n"); - // connected + // connected. we manually add all known alt servers to this + // device, so even if the initial server doesn't consider some + // of those as trusted servers, they are still used for failover/load balancing for (++s; s < NUMBER_SERVERS; ++s) { if (servers[s].host.type == 0) @@ -258,7 +259,10 @@ static void connect_proxy_images() long long oct = 0; int t, ret; for (t = 0; t < 10 && dh >= 0; ++t) - { // For some reason the ioctl might return 0 right after connecting + { // For some reason the getsize-ioctl might return 0 right after connecting + // No idea why this happen. Maybe the IOCTL_OPEN call returns before the + // connection is fully established, but I have no idea why. + // So let's retry a couple of times if it fails. ret = ioctl(dh, BLKGETSIZE64, &oct); if (ret == 0 && oct > 0) break; @@ -313,7 +317,7 @@ static void connect_proxy_images() if (isworking && !(alloc_cache && image->cache_file)) { image->working = TRUE; - if (!image->cache_file) + if (!image->cache_file) // This should be removed. proxy with no cache is completely pointless. memlogf("[WARNING] Proxy-Mode enabled without cache directory. This will most likely hurt performance."); goto continue_with_next_image; } @@ -325,6 +329,8 @@ static void connect_proxy_images() if (ch >= 0) { // Pre-allocate disk space + // TODO: Check if this has a performance impact on the rest of the server (ie. client lag) + // If so, do this gracefully by incrementing size and sleeping in between. printf("[DEBUG] Pre-allocating disk space...\n"); lseek(ch, fs - 1, SEEK_SET); write(ch, &ch, 1); @@ -345,8 +351,9 @@ static void connect_proxy_images() memlogf("[WARNING] Could not pre-allocate %s", cfname); } break; - } // <-- end loop over servers - // If this point is reached, replication was not successful + } // <-- end of loop over servers + // If this point is reached, setting up replication was not successful, + // so lock and free the filename allocated earlier, and return the device. pthread_spin_lock(&_spinlock); if (g_slist_find(_dnbd3_images, image) != NULL) { @@ -377,15 +384,9 @@ static void query_servers() { if (_trusted_servers == NULL) return; - struct timeval client_timeout, connect_timeout; - client_timeout.tv_sec = 0; - client_timeout.tv_usec = 500 * 1000; - connect_timeout.tv_sec = 1; - connect_timeout.tv_usec = 0; int client_sock, num; dnbd3_trusted_server_t *server; dnbd3_host_t host; - struct sockaddr_in addr4; char xmlbuffer[MAX_RPC_PAYLOAD]; for (num = 0;; ++num) { @@ -401,33 +402,10 @@ static void query_servers() host = server->host; // Copy host, in case server gets deleted by another thread pthread_spin_unlock(&_spinlock); // Connect - if (host.type != AF_INET) - { - printf("[DEBUG] Unsupported addr type '%d', ignoring trusted server.\n", (int)host.type); - continue; - } - // Create socket (Extend for IPv6) - if ((client_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) - { - printf("[DEBUG] Error creating server-to-server socket.\n"); + host.port = htons(ntohs(host.port) + 1); // RPC port is client port + 1 + client_sock = sock_connect(&host, 800, 600); + if (client_sock == -1) continue; - } - // Set host (IPv4) - memset(&addr4, 0, sizeof(addr4)); - addr4.sin_family = AF_INET; - memcpy(&addr4.sin_addr.s_addr, host.addr, 4); - addr4.sin_port = htons(ntohs(host.port) + 1); - // Connect to server - setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &connect_timeout, sizeof(connect_timeout)); - setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &connect_timeout, sizeof(connect_timeout)); - if (connect(client_sock, (struct sockaddr *)&addr4, sizeof(addr4)) < 0) - { - printf("[DEBUG] Could not connect to other server...\n"); - goto communication_error; - } - // Apply read/write timeout - setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &client_timeout, sizeof(client_timeout)); - setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &client_timeout, sizeof(client_timeout)); // // Send and receive info from server // Send message @@ -632,7 +610,7 @@ static char *create_cache_filename(char *name, int rid, char *buffer, int maxlen } FILE *fh; while ((fh = fopen(buffer, "rb"))) - { // Alter file name as long as a file by that name already exists + { // Alter file name as long as a file by that name already exists (el cheapo edition) fclose(fh); char *c = buffer + rand() % strlen(buffer); *c = rand() % 26 + 'A'; diff --git a/src/server/rpc.c b/src/server/rpc.c index 2cc688b..ffd386c 100644 --- a/src/server/rpc.c +++ b/src/server/rpc.c @@ -25,8 +25,6 @@ #include "memlog.h" #include "helper.h" -#include -#include #include #include #include @@ -35,10 +33,9 @@ #include #include #include -#include -#include #include #include +#include "sockhelper.h" #include #include @@ -608,19 +605,6 @@ void dnbd3_rpc_send(int cmd) LIBXML_TEST_VERSION struct sockaddr_in server; - struct timeval client_timeout; - - // Create socket - if ((client_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) - { - perror("ERROR: RPC socket"); - exit(EXIT_FAILURE); - } - - client_timeout.tv_sec = 4; - client_timeout.tv_usec = 0; - setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &client_timeout, sizeof(client_timeout)); - setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &client_timeout, sizeof(client_timeout)); memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; // IPv4 @@ -628,7 +612,7 @@ void dnbd3_rpc_send(int cmd) server.sin_port = htons(RPC_PORT); // set port number // Connect to server - if (connect(client_sock, (struct sockaddr *)&server, sizeof(server)) < 0) + if ((client_sock = sock_connect4(&server, 2000, 1000)) == -1) { perror("ERROR: RPC connect"); exit(EXIT_FAILURE); diff --git a/src/server/sockhelper.c b/src/server/sockhelper.c new file mode 100644 index 0000000..c72c74f --- /dev/null +++ b/src/server/sockhelper.c @@ -0,0 +1,64 @@ +#include "sockhelper.h" +#include +#include + +static inline int connect_shared(const int client_sock, void* addr, const int addrlen, int connect_ms, int rw_ms) +{ + struct timeval tv; + // Connect to server + tv.tv_sec = connect_ms / 1000; + tv.tv_usec = connect_ms * 1000; + setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + if (connect(client_sock, (struct sockaddr *)&addr, addrlen) == -1) + { + return -1; + } + // Apply read/write timeout + tv.tv_sec = rw_ms / 1000; + tv.tv_usec = rw_ms * 1000; + setsockopt(client_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(client_sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + return client_sock; +} + +int sock_connect4(struct sockaddr_in *addr, const int connect_ms, const int rw_ms) +{ + int client_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (client_sock == -1) return -1; + return connect_shared(client_sock, addr, sizeof(struct sockaddr_in), connect_ms, rw_ms); +} + +int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_ms) +{ + int client_sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (client_sock == -1) return -1; + return connect_shared(client_sock, addr, sizeof(struct sockaddr_in6), connect_ms, rw_ms); +} + +int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms) +{ + if (addr->type == AF_INET) + { + // Set host (IPv4) + struct sockaddr_in addr4; + memset(&addr4, 0, sizeof(addr4)); + addr4.sin_family = AF_INET; + memcpy(&addr4.sin_addr.s_addr, addr->addr, 4); + addr4.sin_port = addr->port; + return sock_connect4(&addr4, connect_ms, rw_ms); + } + else if (addr->type == AF_INET6) + { + // Set host (IPv6) + struct sockaddr_in6 addr6; + memset(&addr6, 0, sizeof(addr6)); + addr6.sin6_family = AF_INET6; + memcpy(&addr6.sin6_addr.s6_addr, addr->addr, 16); + addr6.sin6_port = addr->port; + return sock_connect6(&addr6, connect_ms, rw_ms); + } + printf("[DEBUG] Unsupported address type: %d\n", (int)addr->type); + return -1; +} + diff --git a/src/server/sockhelper.h b/src/server/sockhelper.h new file mode 100644 index 0000000..28525c4 --- /dev/null +++ b/src/server/sockhelper.h @@ -0,0 +1,23 @@ +#ifndef SOCKHELPER_H_ +#define SOCKHELPER_H_ + +#include +#include "../types.h" +#include +#include +#include + +int sock_connect4(struct sockaddr_in *addr, const int connect_ms, const int rw_ms); + +int sock_connect6(struct sockaddr_in6 *addr, const int connect_ms, const int rw_ms); + +/** + * Connect to given dnbd3_host_t. + * @param addr - address of host to connect to + * @param connect_ms - timeout in milliseconds after which the connection attempt fails + * @param rw_ms - read/write timeout in milliseconds to apply on successful connect + * @return socket file descriptor, or -1 on error + */ +int sock_connect(const dnbd3_host_t * const addr, const int connect_ms, const int rw_ms); + +#endif /* SOCKHELPER_H_ */ -- cgit v1.2.3-55-g7522