summaryrefslogtreecommitdiffstats
path: root/src/server/uplink.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/uplink.c')
-rw-r--r--src/server/uplink.c47
1 files changed, 42 insertions, 5 deletions
diff --git a/src/server/uplink.c b/src/server/uplink.c
index 3541728..5eda88f 100644
--- a/src/server/uplink.c
+++ b/src/server/uplink.c
@@ -13,6 +13,7 @@
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
+#include <inttypes.h>
static void* uplink_mainloop(void *data);
static void uplink_send_requests(dnbd3_connection_t *link, int newOnly);
@@ -63,11 +64,14 @@ void uplink_shutdown(dnbd3_image_t *image)
assert( image != NULL );
if ( image->uplink == NULL || image->uplink->shutdown ) return;
dnbd3_connection_t * const uplink = image->uplink;
+ spin_lock( &uplink->queueLock );
image->uplink = NULL;
uplink->shutdown = TRUE;
if ( uplink->signal != -1 ) write( uplink->signal, "", 1 );
- pthread_join( uplink->thread, NULL );
- spin_lock( &uplink->queueLock );
+ if ( uplink->image != NULL ) {
+ pthread_join( uplink->thread, NULL );
+ }
+ free( uplink->recvBuffer );
spin_unlock( &uplink->queueLock );
spin_destroy( &uplink->queueLock );
free( uplink );
@@ -78,7 +82,12 @@ void uplink_shutdown(dnbd3_image_t *image)
*/
int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint32_t length)
{
- if ( client == NULL || client->image == NULL || client->image->uplink == NULL ) return FALSE;
+ if ( client == NULL || client->image == NULL ) return FALSE;
+ spin_lock( &client->image->lock );
+ if ( client->image->uplink == NULL ) {
+ spin_unlock( &client->image->lock );
+ return FALSE;
+ }
dnbd3_connection_t * const uplink = client->image->uplink;
int foundExisting = FALSE; // Is there a pending request that is a superset of our range?
int i;
@@ -86,6 +95,7 @@ int uplink_request(dnbd3_client_t *client, uint64_t handle, uint64_t start, uint
const uint64_t end = start + length;
spin_lock( &uplink->queueLock );
+ spin_unlock( &client->image->lock );
for (i = 0; i < uplink->queueLen; ++i) {
if ( freeSlot == -1 && uplink->queue[i].status == ULR_FREE ) freeSlot = i;
if ( uplink->queue[i].status != ULR_PENDING && uplink->queue[i].status != ULR_NEW ) continue;
@@ -127,6 +137,7 @@ static void* uplink_mainloop(void *data)
int fdEpoll = -1, fdPipe = -1;
int numSocks, i, waitTime;
int altCheckInterval = SERVER_RTT_DELAY_INIT;
+ int bFree = FALSE;
time_t nextAltCheck = 0;
char buffer[100];
//
@@ -226,6 +237,7 @@ static void* uplink_mainloop(void *data)
uplink_send_requests( link, FALSE );
link->betterFd = -1;
link->currentServer = link->betterServer;
+ link->image->working = TRUE;
memset( &ev, 0, sizeof(ev) );
ev.events = EPOLLIN;
ev.data.fd = link->fd;
@@ -241,11 +253,35 @@ static void* uplink_mainloop(void *data)
if ( link->rttTestResult == RTT_IDLE || link->rttTestResult == RTT_DONTCHANGE ) {
const time_t now = time( NULL );
if ( nextAltCheck - now > SERVER_RTT_DELAY_MAX ) {
+ // This probably means the system time was changed - handle this case properly by capping the timeout
nextAltCheck = now + SERVER_RTT_DELAY_MAX;
} else if ( now >= nextAltCheck ) {
+ // It seems it's time for a check
+ if ( image_is_complete( link->image ) ) {
+ // Quit work if image is complete
+ if ( spin_trylock( &link->image->lock ) == 0 ) {
+ if ( link->image->cache_map != NULL ) {
+ free( link->image->cache_map );
+ link->image->cache_map = NULL;
+ }
+ link->image->uplink = NULL;
+ link->shutdown = TRUE;
+ free( link->recvBuffer );
+ link->recvBuffer = NULL;
+ bFree = TRUE;
+ spin_lock( &link->queueLock );
+ spin_unlock( &link->queueLock );
+ spin_destroy( &link->queueLock );
+ spin_unlock( &link->image->lock );
+ pthread_detach( link->thread );
+ goto cleanup;
+ }
+ } else {
+ // Not complete- do measurement
+ altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
+ }
altCheckInterval = MIN(altCheckInterval + 1, SERVER_RTT_DELAY_MAX);
nextAltCheck = now + altCheckInterval;
- altserver_find_uplink( link ); // This will set RTT_INPROGRESS (synchronous)
}
}
}
@@ -262,6 +298,7 @@ static void* uplink_mainloop(void *data)
while ( link->rttTestResult == RTT_INPROGRESS )
usleep( 10000 );
if ( link->betterFd != -1 ) close( link->betterFd );
+ if ( bFree ) free( link );
return NULL ;
}
@@ -275,7 +312,7 @@ static void uplink_send_requests(dnbd3_connection_t *link, int newOnly)
for (j = 0; j < link->queueLen; ++j) {
if ( link->queue[j].status != ULR_NEW && (newOnly || link->queue[j].status != ULR_PENDING) ) continue;
link->queue[j].status = ULR_PENDING;
- request.handle = link->queue[j].handle;
+ request.handle = link->queue[j].from; // HACK: Store offset in handle too, as it won't be included in the reply
request.cmd = CMD_GET_BLOCK;
request.offset = link->queue[j].from;
request.size = link->queue[j].to - link->queue[j].from;