#include "connection.h" #include "helper.h" #include "../clientconfig.h" #include "../shared/protocol.h" #include "../shared/fdsignal.h" #include "../shared/sockhelper.h" #include "../shared/log.h" #include #include #include #include #include #include #include #include /* Constants */ static const size_t SHORTBUF = 100; #define MAX_ALTS (16) #define MAX_ALTS_ACTIVE (5) #define MAX_HOSTS_PER_ADDRESS (2) // If a server wasn't reachable this many times, we slowly start skipping it on measurements static const int FAIL_BACKOFF_START_COUNT = 8; #define RTT_COUNT (4) /* Module variables */ // Init guard static bool connectionInitDone = false; static bool threadInitDone = false; static pthread_mutex_t mutexInit = PTHREAD_MUTEX_INITIALIZER; static bool keepRunning = true; // List of pending requests static struct { dnbd3_async_t *head; dnbd3_async_t *tail; pthread_spinlock_t lock; } requests; // Connection for the image static struct { char *name; uint16_t rid; uint64_t size; } image; static struct { int sockFd; pthread_mutex_t sendMutex; dnbd3_signal_t* panicSignal; dnbd3_host_t currentServer; uint64_t startupTime; } connection; // Known alt servers typedef struct _alt_server { dnbd3_host_t host; int consecutiveFails; int rtt; int rtts[RTT_COUNT]; int rttIndex; int bestCount; } alt_server_t; static alt_server_t altservers[MAX_ALTS]; static dnbd3_server_entry_t newservers[MAX_ALTS]; static pthread_spinlock_t altLock; /* Static methods */ static void* connection_receiveThreadMain(void *sock); static void* connection_backgroundThread(void *something); static void addAltServers(); static void sortAltServers(); static void probeAltServers(); static void switchConnection(int sockFd, alt_server_t *srv); static void requestAltServers(); static bool throwDataAway(int sockFd, uint32_t amount); static void enqueueRequest(dnbd3_async_t *request); static dnbd3_async_t* removeRequest(dnbd3_async_t *request); static uint64_t nowMilli(); static uint64_t nowMicro(); bool connection_init(const char *hosts, const char *lowerImage, const uint16_t rid) { int sock = -1; char host[SHORTBUF]; size_t hlen; serialized_buffer_t buffer; uint16_t remoteVersion, remoteRid; char *remoteName; uint64_t remoteSize; struct sockaddr_storage sa; socklen_t salen; poll_list_t *cons = sock_newPollList(); pthread_mutex_lock( &mutexInit ); if ( !connectionInitDone && keepRunning ) { dnbd3_host_t tempHosts[MAX_HOSTS_PER_ADDRESS]; const char *current, *end; int altIndex = 0; memset( altservers, 0, sizeof altservers ); connection.sockFd = -1; current = hosts; do { // Get next host from string while ( *current == ' ' ) current++; end = strchr( current, ' ' ); size_t len = (end == NULL ? SHORTBUF : (size_t)( end - current ) + 1); if ( len > SHORTBUF ) len = SHORTBUF; snprintf( host, len, "%s", current ); int newHosts = sock_resolveToDnbd3Host( host, tempHosts, MAX_HOSTS_PER_ADDRESS ); for ( int i = 0; i < newHosts; ++i ) { if ( altIndex >= MAX_ALTS ) break; altservers[altIndex].host = tempHosts[i]; altIndex += 1; } current = end + 1; } while ( end != NULL && altIndex < MAX_ALTS ); logadd( LOG_INFO, "Got %d servers from init call", altIndex ); // Connect for ( int i = 0; i < altIndex + 5; ++i ) { if ( i >= altIndex ) { // Additional iteration - no corresponding slot in altservers, this // is just so we can make a final calls with longer timeout sock = sock_multiConnect( cons, NULL, 400, 1000 ); if ( sock == -2 ) { logadd( LOG_ERROR, "Could not connect to any host" ); sock = -1; break; } } else { if ( altservers[i].host.type == 0 ) continue; // Try to connect - 100ms timeout sock = sock_multiConnect( cons, &altservers[i].host, 100, 1000 ); } if ( sock == -2 || sock == -1 ) continue; salen = sizeof(sa); if ( getpeername( sock, (struct sockaddr*)&sa, &salen ) == -1 ) { logadd( LOG_ERROR, "getpeername on successful connection failed!? (errno=%d)", errno ); close( sock ); sock = -1; continue; } hlen = sock_printable( (struct sockaddr*)&sa, salen, host, sizeof(host) ); logadd( LOG_INFO, "Connected to %.*s", (int)hlen, host ); if ( !dnbd3_select_image( sock, lowerImage, rid, 0 ) ) { logadd( LOG_ERROR, "Could not send select image" ); } else if ( !dnbd3_select_image_reply( &buffer, sock, &remoteVersion, &remoteName, &remoteRid, &remoteSize ) ) { logadd( LOG_ERROR, "Could not read select image reply (%d)", errno ); } else if ( rid != 0 && rid != remoteRid ) { logadd( LOG_ERROR, "rid mismatch (want: %d, got: %d)", (int)rid, (int)remoteRid ); } else { logadd( LOG_INFO, "Requested: '%s:%d'", lowerImage, (int)rid ); logadd( LOG_INFO, "Returned: '%s:%d'", remoteName, (int)remoteRid ); sock_setTimeout( sock, SOCKET_KEEPALIVE_TIMEOUT * 1000 ); image.name = strdup( remoteName ); image.rid = remoteRid; image.size = remoteSize; if ( !sock_sockaddrToDnbd3( (struct sockaddr*)&sa, &connection.currentServer ) ) { logadd( LOG_ERROR, "sockaddr to dnbd3_host_t failed!?" ); connection.currentServer.type = 0; } connection.panicSignal = signal_new(); connection.startupTime = nowMilli(); connection.sockFd = sock; requests.head = NULL; requests.tail = NULL; requestAltServers(); break; } // Failed if ( sock != -1 ) { close( sock ); sock = -1; } } if ( sock != -1 ) { connectionInitDone = true; } } pthread_mutex_unlock( &mutexInit ); sock_destroyPollList( cons ); return sock != -1; } bool connection_initThreads() { pthread_mutex_lock( &mutexInit ); if ( !keepRunning || !connectionInitDone || threadInitDone || connection.sockFd == -1 ) { pthread_mutex_unlock( &mutexInit ); return false; } bool success = true; pthread_t thread; threadInitDone = true; logadd( LOG_DEBUG1, "Initializing stuff" ); if ( pthread_mutex_init( &connection.sendMutex, NULL ) != 0 || pthread_spin_init( &requests.lock, PTHREAD_PROCESS_PRIVATE ) != 0 || pthread_spin_init( &altLock, PTHREAD_PROCESS_PRIVATE ) != 0 ) { logadd( LOG_ERROR, "Mutex or spinlock init failure" ); success = false; } else { if ( pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)connection.sockFd ) != 0 ) { logadd( LOG_ERROR, "Could not create receive thread" ); success = false; } else if ( pthread_create( &thread, NULL, &connection_backgroundThread, NULL ) != 0 ) { logadd( LOG_ERROR, "Could not create background thread" ); success = false; } } if ( !success ) { close( connection.sockFd ); connection.sockFd = -1; } pthread_mutex_unlock( &mutexInit ); return success; } uint64_t connection_getImageSize() { return image.size; } bool connection_read(dnbd3_async_t *request) { if ( !connectionInitDone ) return false; pthread_mutex_lock( &connection.sendMutex ); enqueueRequest( request ); if ( connection.sockFd != -1 ) { if ( !dnbd3_get_block( connection.sockFd, request->offset, request->length, (uint64_t)request, 0 ) ) { shutdown( connection.sockFd, SHUT_RDWR ); connection.sockFd = -1; pthread_mutex_unlock( &connection.sendMutex ); signal_call( connection.panicSignal ); return true; } } pthread_mutex_unlock( &connection.sendMutex ); return true; } void connection_close() { if ( keepRunning ) { logadd( LOG_INFO, "Tearing down dnbd3 connections and workers" ); } pthread_mutex_lock( &mutexInit ); keepRunning = false; if ( !connectionInitDone ) { pthread_mutex_unlock( &mutexInit ); return; } pthread_mutex_unlock( &mutexInit ); pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { shutdown( connection.sockFd, SHUT_RDWR ); } pthread_mutex_unlock( &connection.sendMutex ); } size_t connection_printStats(char *buffer, const size_t len) { int ret; size_t remaining = len; if ( remaining > 0 ) { ret = snprintf( buffer, remaining, "Image: %s\nRevision: %d\n\nCurrent connection time: %ds\n\n", image.name, (int)image.rid, (int)( (nowMilli() - connection.startupTime) / 1000 ) ); if ( ret < 0 ) { ret = 0; } if ( (size_t)ret >= remaining ) { return len; } remaining -= ret; buffer += ret; } int i = -1; pthread_spin_lock( &altLock ); while ( remaining > 3 && ++i < MAX_ALTS ) { if ( altservers[i].host.type == 0 ) continue; if ( isSameAddressPort( &connection.currentServer, &altservers[i].host ) ) { *buffer++ = '*'; } else if ( i >= MAX_ALTS_ACTIVE ) { *buffer++ = '-'; } else { *buffer++ = ' '; } const size_t addrlen = sock_printHost( &altservers[i].host, buffer, remaining ); remaining -= (addrlen + 1); // For space or * above buffer += addrlen; if ( remaining < 3 ) break; int width = addrlen >= 35 ? 0 : 35 - (int)addrlen; char *unit; int value; if ( altservers[i].rtt > 5000 ) { unit = "ms "; value = altservers[i].rtt / 1000; } else { unit = "µs"; value = altservers[i].rtt; width += 3; } ret = snprintf( buffer, remaining, "% *d %s Unreachable: % 4d BestCount: % 4d\n", width, value, unit, altservers[i].consecutiveFails, altservers[i].bestCount ); if ( ret < 0 ) { ret = 0; } if ( (size_t)ret >= remaining ) { remaining = 0; break; } remaining -= ret; buffer += ret; } pthread_spin_unlock( &altLock ); return len - remaining; } static void* connection_receiveThreadMain(void *sockPtr) { int sockFd = (int)(size_t)sockPtr; dnbd3_reply_t reply; pthread_detach( pthread_self() ); while ( keepRunning ) { int ret; do { ret = dnbd3_read_reply( sockFd, &reply, true ); if ( ret == REPLY_OK ) break; } while ( ret == REPLY_INTR || ret == REPLY_AGAIN ); if ( ret != REPLY_OK ) { logadd( LOG_DEBUG1, "Error receiving reply on receiveThread (%d)", ret ); goto fail; } if ( reply.cmd == CMD_GET_BLOCK ) { // Get block reply. find matching request dnbd3_async_t *request = removeRequest( (dnbd3_async_t*)reply.handle ); if ( request == NULL ) { logadd( LOG_WARNING, "WARNING BUG ALERT SOMETHING: Got block reply with no matching request" ); if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { logadd( LOG_DEBUG1, "....and choked on reply payload" ); goto fail; } } else { // Found a match const ssize_t ret = sock_recv( sockFd, request->buffer, request->length ); if ( ret != (ssize_t)request->length ) { logadd( LOG_DEBUG1, "receiving payload for a block reply failed" ); connection_read( request ); goto fail; } // Success, wake up caller request->success = true; request->finished = true; signal_call( request->signal ); } } else if ( reply.cmd == CMD_GET_SERVERS ) { // List of known alt servers dnbd3_server_entry_t entries[MAX_ALTS]; const int count = MIN( reply.size / sizeof(dnbd3_server_entry_t), MAX_ALTS ); const size_t relevantSize = sizeof(dnbd3_server_entry_t) * count; if ( sock_recv( sockFd, entries, relevantSize ) != (ssize_t)relevantSize || !throwDataAway( sockFd, reply.size - (uint32_t)relevantSize ) ) { logadd( LOG_DEBUG1, "Error receiving list of alt servers." ); goto fail; } pthread_spin_lock( &altLock ); memcpy( newservers, entries, relevantSize ); pthread_spin_unlock( &altLock ); } else { // TODO: Handle the others? if ( reply.size != 0 && !throwDataAway( sockFd, reply.size ) ) { logadd( LOG_DEBUG1, "Could not throw %d bytes away on CMD %d", (int)reply.size, (int)reply.cmd ); goto fail; } } } logadd( LOG_DEBUG1, "Aus der Schleife rausgeflogen! ARRRRRRRRRR" ); fail:; // Make sure noone is trying to use the socket for sending by locking, pthread_mutex_lock( &connection.sendMutex ); // then just set the fd to -1, but only if it's the same fd as ours, // as someone could have established a new connection already if ( connection.sockFd == sockFd ) { connection.sockFd = -1; signal_call( connection.panicSignal ); } pthread_mutex_unlock( &connection.sendMutex ); // As we're the only reader, it's safe to close the socket now close( sockFd ); return NULL; } static void* connection_backgroundThread(void *something UNUSED) { uint64_t nextKeepalive = 0; uint64_t nextRttCheck = 0; while ( keepRunning ) { const uint64_t now = nowMilli(); if ( now < nextKeepalive && now < nextRttCheck ) { int waitTime = (int)( MIN( nextKeepalive, nextRttCheck ) - now ); int waitRes = signal_wait( connection.panicSignal, waitTime ); if ( waitRes == SIGNAL_ERROR ) { logadd( LOG_WARNING, "Error waiting on signal in background thread! Errno = %d", errno ); } } // Woken up, see what we have to do const bool panic = connection.sockFd == -1; // Check alt servers if ( panic || now >= nextRttCheck ) { addAltServers(); sortAltServers(); probeAltServers(); if ( panic || connection.startupTime + ( STARTUP_MODE_DURATION * 1000ull ) > now ) { nextRttCheck = now + TIMER_INTERVAL_PROBE_STARTUP * 1000ull; } else { nextRttCheck = now + TIMER_INTERVAL_PROBE_NORMAL * 1000ull; } } // Send keepalive packet if ( now >= nextKeepalive ) { pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { dnbd3_request_t request; request.magic = dnbd3_packet_magic; request.cmd = CMD_KEEPALIVE; request.handle = request.offset = request.size = 0; fixup_request( request ); ssize_t ret = sock_sendAll( connection.sockFd, &request, sizeof request, 2 ); if ( (size_t)ret != sizeof request ) { shutdown( connection.sockFd, SHUT_RDWR ); connection.sockFd = -1; nextRttCheck = now; } } pthread_mutex_unlock( &connection.sendMutex ); nextKeepalive = now + TIMER_INTERVAL_KEEPALIVE_PACKET * 1000ull; } } return NULL; } // Private quick helpers static void addAltServers() { pthread_spin_lock( &altLock ); for ( int nIdx = 0; nIdx < MAX_ALTS; ++nIdx ) { if ( newservers[nIdx].host.type == 0 ) continue; // Got a new alt server, see if it's already known for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { if ( isSameAddress( &newservers[nIdx].host, &altservers[eIdx].host ) ) { goto skip_server; } } // Not known yet, add - find free slot int slot = -1; for ( int eIdx = 0; eIdx < MAX_ALTS; ++eIdx ) { if ( altservers[eIdx].host.type == 0 ) { slot = eIdx; // free - bail out and use this one break; } if ( altservers[eIdx].consecutiveFails > FAIL_BACKOFF_START_COUNT && slot != -1 && altservers[slot].consecutiveFails < altservers[eIdx].consecutiveFails ) { // Replace an existing alt-server that failed recently if we got no more slots slot = eIdx; } } if ( slot != -1 ) { char txt[200]; sock_printHost( &newservers[nIdx].host, txt, 200 ); logadd( LOG_DEBUG1, "new server %s in slot %d", txt, slot ); altservers[slot].consecutiveFails = 0; altservers[slot].bestCount = 0; altservers[slot].rtts[0] = RTT_UNREACHABLE; altservers[slot].rttIndex = 1; altservers[slot].host = newservers[nIdx].host; } skip_server:; } memset( newservers, 0, sizeof(newservers) ); pthread_spin_unlock( &altLock ); } /** * Find a server at index >= MAX_ALTS_ACTIVE (one that isn't considered for switching over) * that has been inactive for a while, then look if there's an active server that's failed * a couple of times recently. Swap both if found. */ static void sortAltServers() { int ac = 0; pthread_spin_lock( &altLock ); for ( int ia = MAX_ALTS_ACTIVE; ia < MAX_ALTS; ++ia ) { alt_server_t * const inactive = &altservers[ia]; if ( inactive->host.type == 0 || inactive->consecutiveFails > 0 ) continue; while ( ac < MAX_ALTS_ACTIVE ) { if ( altservers[ac].host.type == 0 || altservers[ac].consecutiveFails > FAIL_BACKOFF_START_COUNT ) break; ac++; } if ( ac == MAX_ALTS_ACTIVE ) break; // Switch! alt_server_t * const active = &altservers[ac]; dnbd3_host_t tmp = inactive->host; inactive->host = active->host; inactive->consecutiveFails = FAIL_BACKOFF_START_COUNT * 4; inactive->bestCount = 0; inactive->rtts[0] = RTT_UNREACHABLE; inactive->rttIndex = 1; active->host = tmp; active->consecutiveFails = 0; active->bestCount = 0; active->rtts[0] = RTT_UNREACHABLE; active->rttIndex = 1; } pthread_spin_unlock( &altLock ); } static void probeAltServers() { serialized_buffer_t buffer; dnbd3_reply_t reply; int bestIndex = -1; int currentIndex = -1; int bestSock = -1; int currentRtt = RTT_UNREACHABLE; uint16_t remoteRid, remoteProto; uint64_t remoteSize; char *remoteName; bool doSwitch; const bool panic = connection.sockFd == -1; uint64_t testOffset = 0; uint32_t testLength = RTT_BLOCK_SIZE; dnbd3_async_t *request = NULL; if ( panic ) { pthread_spin_lock( &requests.lock ); if ( requests.head != NULL ) { request = requests.head; testOffset = requests.head->offset; testLength = requests.head->length; } pthread_spin_unlock( &requests.lock ); if ( testOffset != 0 ) { logadd( LOG_DEBUG1, "Panic with pending %" PRIu64 ":%" PRIu32, testOffset, testLength ); } } for ( int altIndex = 0; altIndex < (panic ? MAX_ALTS : MAX_ALTS_ACTIVE); ++altIndex ) { alt_server_t * const srv = &altservers[altIndex]; if ( srv->host.type == 0 ) continue; if ( !panic && srv->consecutiveFails > FAIL_BACKOFF_START_COUNT && rand() % srv->consecutiveFails >= FAIL_BACKOFF_START_COUNT ) { continue; } if ( srv->rttIndex >= RTT_COUNT ) { srv->rttIndex = 0; } else { srv->rttIndex += 1; } // Probe const uint64_t start = nowMicro(); errno = 0; int sock = sock_connect( &srv->host, panic ? 1000 : 333, 1000 ); if ( sock == -1 ) { logadd( LOG_DEBUG1, "Could not connect for probing. errno = %d", errno ); goto fail; } if ( !dnbd3_select_image( sock, image.name, image.rid, 0 ) ) { logadd( LOG_DEBUG1, "probe: select_image failed" ); goto fail; } if ( !dnbd3_select_image_reply( &buffer, sock, &remoteProto, &remoteName, &remoteRid, &remoteSize )) { logadd( LOG_DEBUG1, "probe: select image reply failed" ); goto fail; } if ( remoteProto < MIN_SUPPORTED_SERVER ) { logadd( LOG_WARNING, "Unsupported remote version (local: %d, remote: %d)", (int)PROTOCOL_VERSION, (int)remoteProto ); srv->consecutiveFails += 10; goto fail; } if ( remoteRid != image.rid || strcmp( remoteName, image.name ) != 0 ) { logadd( LOG_WARNING, "Remote rid or name mismatch (got '%s')", remoteName ); srv->consecutiveFails += 10; goto fail; } if ( !dnbd3_get_block( sock, testOffset, testLength, 0, 0 ) ) { logadd( LOG_DEBUG1, "-> block request fail" ); goto fail; } int a = 111; if ( !(a = dnbd3_get_reply( sock, &reply )) || reply.size != testLength ) { logadd( LOG_DEBUG1, "<- get block reply fail %d %d", a, (int)reply.size ); goto fail; } if ( request != NULL && removeRequest( request ) != NULL ) { // Request successfully removed from queue const ssize_t ret = sock_recv( sock, request->buffer, request->length ); if ( ret != (ssize_t)request->length ) { logadd( LOG_DEBUG1, "[RTT] receiving payload for a block reply failed" ); // Failure, add to queue again connection_read( request ); goto fail; } // Success, wake up caller logadd( LOG_DEBUG1, "[RTT] Successful direct probe" ); request->success = true; request->finished = true; signal_call( request->signal ); } else { // Wasn't a request that's in our request queue if ( !throwDataAway( sock, testLength ) ) { logadd( LOG_DEBUG1, "<- get block reply payload fail" ); goto fail; } } // Yay, success // Panic mode? Just switch to server if ( panic ) { switchConnection( sock, srv ); return; } // Non-panic mode: // Update stats of server const uint64_t end = nowMicro(); srv->consecutiveFails = 0; srv->rtts[srv->rttIndex] = (int)(end - start); srv->rtt = 0; for ( int i = 0; i < RTT_COUNT; ++i ) { srv->rtt += srv->rtts[i]; } srv->rtt /= RTT_COUNT; // Remember rtt if this server matches the current one if ( isSameAddressPort( &srv->host, &connection.currentServer ) ) { currentRtt = srv->rtt; currentIndex = altIndex; } // Keep socket open if this is currently the best one if ( bestIndex == -1 || altservers[bestIndex].rtt > srv->rtt ) { bestIndex = altIndex; if ( bestSock != -1 ) { close( bestSock ); } bestSock = sock; } else { close( sock ); } continue; fail:; if ( sock != -1 ) { close( sock ); } srv->rtts[srv->rttIndex] = RTT_UNREACHABLE; srv->consecutiveFails += 1; } doSwitch = false; if ( bestIndex != -1 ) { // Time-sensitive switch decision: If a server was best for some consecutive measurements, // we switch no matter how small the difference to the current server is pthread_spin_lock( &altLock ); for ( int i = 0; i < MAX_ALTS_ACTIVE; ++i ) { if ( i == bestIndex ) { if ( altservers[i].bestCount < 50 ) { altservers[i].bestCount += 2; } // Switch with increasing probability the higher the bestCount is if ( altservers[i].bestCount > 12 && altservers[i].rtt < currentRtt && altservers[i].bestCount > rand() % 50 ) { doSwitch = true; } } else if ( altservers[i].bestCount > 0 ) { altservers[i].bestCount--; } } for ( int i = MAX_ALTS_ACTIVE; i < MAX_ALTS; ++i ) { if ( altservers[i].consecutiveFails > 0 ) { altservers[i].consecutiveFails--; } } pthread_spin_unlock( &altLock ); // This takes care of the situation where two servers alternate being the best server all the time if ( doSwitch && currentIndex != -1 && altservers[bestIndex].bestCount - altservers[currentIndex].bestCount < 8 ) { doSwitch = false; } // Regular logic: Apply threshold when considering switch if ( !doSwitch ) { doSwitch = currentRtt > altservers[bestIndex].rtt + RTT_ABSOLUTE_THRESHOLD || RTT_THRESHOLD_FACTOR(currentRtt) > altservers[bestIndex].rtt + 1000; } } // Switch if a better server was found if ( doSwitch ) { logadd( LOG_INFO, "Current: %dµs, best: %dµs. Will switch!", currentRtt, altservers[bestIndex].rtt ); for ( int i = 0; i < MAX_ALTS; ++i ) { if ( i != bestIndex ) { altservers[i].bestCount = 0; } } switchConnection( bestSock, &altservers[bestIndex] ); } else if ( bestIndex != -1 ) { // No switch close( bestSock ); } } static void switchConnection(int sockFd, alt_server_t *srv) { pthread_t thread; struct sockaddr_storage addr; socklen_t addrLen = sizeof(addr); char message[200] = "Connection switched to "; const size_t len = strlen( message ); int ret; dnbd3_async_t *queue, *it; pthread_mutex_lock( &connection.sendMutex ); if ( connection.sockFd != -1 ) { shutdown( connection.sockFd, SHUT_RDWR ); } ret = getpeername( sockFd, (struct sockaddr*)&addr, &addrLen ); if ( ret == 0 ) { connection.currentServer = srv->host; connection.sockFd = sockFd; pthread_spin_lock( &requests.lock ); queue = requests.head; requests.head = requests.tail = NULL; pthread_spin_unlock( &requests.lock ); } else { connection.sockFd = -1; } requestAltServers(); pthread_mutex_unlock( &connection.sendMutex ); if ( ret != 0 ) { close( sockFd ); logadd( LOG_WARNING, "Could not getpeername after connection switch, assuming connection already dead again. (Errno=%d)", errno ); signal_call( connection.panicSignal ); return; } connection.startupTime = nowMilli(); pthread_create( &thread, NULL, &connection_receiveThreadMain, (void*)(size_t)sockFd ); sock_printable( (struct sockaddr*)&addr, sizeof(addr), message + len, sizeof(message) - len ); logadd( LOG_INFO, "%s", message ); // resend queue if ( queue != NULL ) { pthread_mutex_lock( &connection.sendMutex ); dnbd3_async_t *next = NULL; for ( it = queue; it != NULL; it = next ) { logadd( LOG_DEBUG1, "Requeue after server change" ); next = it->next; enqueueRequest( it ); if ( connection.sockFd != -1 && !dnbd3_get_block( connection.sockFd, it->offset, it->length, (uint64_t)it, 0 ) ) { logadd( LOG_WARNING, "Resending pending request failed, re-entering panic mode" ); shutdown( connection.sockFd, SHUT_RDWR ); connection.sockFd = -1; signal_call( connection.panicSignal ); } } pthread_mutex_unlock( &connection.sendMutex ); } } /** * Does not lock, so get the sendMutex first! */ static void requestAltServers() { if ( connection.sockFd == -1 ) return; dnbd3_request_t request = { 0 }; request.magic = dnbd3_packet_magic; request.cmd = CMD_GET_SERVERS; fixup_request( request ); if ( sock_sendAll( connection.sockFd, &request, sizeof(request), 2 ) != (ssize_t)sizeof(request) ) { logadd( LOG_WARNING, "Connection failed while requesting alt server list" ); shutdown( connection.sockFd, SHUT_RDWR ); connection.sockFd = -1; } } static bool throwDataAway(int sockFd, uint32_t amount) { size_t done = 0; char tempBuffer[SHORTBUF]; while ( done < amount ) { const ssize_t ret = sock_recv( sockFd, tempBuffer, MIN( amount - done, SHORTBUF ) ); if ( ret <= 0 ) return false; done += (size_t)ret; } return true; } static void enqueueRequest(dnbd3_async_t *request) { request->next = NULL; request->finished = false; request->success = false; pthread_spin_lock( &requests.lock ); //logadd( LOG_DEBUG2, "Queue: %p @ %s : %d", request, file, line ); if ( requests.head == NULL ) { requests.head = requests.tail = request; } else { requests.tail->next = request; requests.tail = request; } pthread_spin_unlock( &requests.lock ); } static dnbd3_async_t* removeRequest(dnbd3_async_t *request) { pthread_spin_lock( &requests.lock ); //logadd( LOG_DEBUG2, "Remov: %p @ %s : %d", request, file, line ); dnbd3_async_t *iterator, *prev = NULL; for ( iterator = requests.head; iterator != NULL; iterator = iterator->next ) { if ( iterator == request ) { // Found it, break! if ( prev != NULL ) { prev->next = iterator->next; } else { requests.head = iterator->next; } if ( requests.tail == iterator ) { requests.tail = prev; } break; } prev = iterator; } pthread_spin_unlock( &requests.lock ); return iterator; } static uint64_t nowMilli() { struct timespec ts; if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { printf( "clock_gettime() failed. Errno: %d\n", errno ); return 0; } return ( ts.tv_sec * 1000ull ) + ( ts.tv_nsec / 1000000ull ); } static uint64_t nowMicro() { struct timespec ts; if ( clock_gettime( CLOCK_MONOTONIC_RAW, &ts ) != 0 ) { printf( "clock_gettime() failed. Errno: %d\n", errno ); return 0; } return ( ts.tv_sec * 1000000ull ) + ( ts.tv_nsec / 1000ull ); }