summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2016-08-30 17:58:31 +0200
committerSimon Rettberg2016-08-30 17:58:31 +0200
commit8e81742c997fcf712092e291bfd127368c52a5b2 (patch)
tree4eeb7f2600d2f77069626a93ebd737e314b14af3
parent[client] fix "Show published images" button not beeing properly initialized (diff)
downloadtutor-module-8e81742c997fcf712092e291bfd127368c52a5b2.tar.gz
tutor-module-8e81742c997fcf712092e291bfd127368c52a5b2.tar.xz
tutor-module-8e81742c997fcf712092e291bfd127368c52a5b2.zip
[server] Improve handling of global image exchange transfers wrt. crashed peers or connection drops
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java2
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java31
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java45
3 files changed, 54 insertions, 24 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
index 6e4ae1e0..bde280b3 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
@@ -137,6 +137,8 @@ public class IncomingDataTransfer extends IncomingTransferBase {
public void heartBeat(ExecutorService pool) {
if (masterTransferInfo == null)
return;
+ if (connectFailCount() > 50)
+ return;
synchronized (this) {
if (getActiveConnectionCount() >= 1)
return;
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
index d87cdb9e..6ef4d4a6 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
@@ -4,25 +4,21 @@ import java.io.File;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
import org.openslx.bwlp.sat.util.Configuration;
+import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Uploader;
import org.openslx.filetransfer.util.OutgoingTransferBase;
+import org.openslx.thrifthelper.ThriftManager;
public class OutgoingDataTransfer extends OutgoingTransferBase {
private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class);
- /**
- * Remote peer is downloading, so we have Uploaders
- */
- private List<Uploader> uploads = new ArrayList<>();
-
private final TransferInformation masterTransferInfo;
/**
@@ -55,10 +51,21 @@ public class OutgoingDataTransfer extends OutgoingTransferBase {
public synchronized void heartBeat(ExecutorService pool) {
if (masterTransferInfo == null)
return;
- synchronized (uploads) {
- if (uploads.size() >= 1)
+ if (connectFailCount() > 50)
+ return;
+ if (connectFailCount() > 5) {
+ try {
+ ThriftManager.getMasterClient().queryUploadStatus(masterTransferInfo.token);
+ } catch (TInvalidTokenException e) {
+ LOGGER.info("Master server forgot about upload " + masterTransferInfo.token + ", aborting...");
+ connectFails.set(100);
return;
+ } catch (TException e) {
+ LOGGER.warn("Cannot query master server for upload status of " + masterTransferInfo.token, e);
+ }
}
+ if (getActiveConnectionCount() >= 1)
+ return;
Uploader uploader = null;
Exception connectException = null;
if (masterTransferInfo.plainPort != 0) {
@@ -83,17 +90,13 @@ public class OutgoingDataTransfer extends OutgoingTransferBase {
if (uploader == null) {
LOGGER.debug("Cannot connect to master server for uploading", connectException);
} else {
- synchronized (uploads) {
- uploads.add(uploader);
- }
runConnectionInternal(uploader, pool);
}
}
@Override
public String getRelativePath() {
- // TODO Auto-generated method stub
- return null;
+ throw new RuntimeException("Not implemented");
}
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
index f3d64784..23892f09 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
@@ -4,11 +4,9 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -63,9 +61,10 @@ public class SyncTransferHandler {
/**
* All currently running uploads to master, by image version id
*/
- private static final Map<String, OutgoingDataTransfer> uploadsByVersionId = Collections.synchronizedMap(new WeakHashMap<String, OutgoingDataTransfer>());
+ private static final Map<String, OutgoingDataTransfer> uploadsByVersionId = new ConcurrentHashMap<>();
private static Task heartBeatTask = new Task() {
+ private int skips = 0;
private final Runnable worker = new Runnable() {
@Override
public void run() {
@@ -75,7 +74,10 @@ public class SyncTransferHandler {
if (download.isActive())
download.heartBeat(transferPool);
if (download.isComplete(now)) {
- LOGGER.info("Download from master server complete");
+ LOGGER.info("Download <" + download.getId() + "> from master server complete");
+ it.remove();
+ } else if (download.hasReachedIdleTimeout(now) || download.connectFailCount() > 50) {
+ LOGGER.info("Download <" + download.getId() + "> errored out");
it.remove();
}
}
@@ -84,7 +86,18 @@ public class SyncTransferHandler {
if (upload.isActive())
upload.heartBeat(transferPool);
if (upload.isComplete(now)) {
- LOGGER.info("Upload to master server complete");
+ LOGGER.info("Upload <" + upload.getId() + "> to master server complete");
+ it.remove();
+ } else if (upload.hasReachedIdleTimeout(now) || upload.connectFailCount() > 50) {
+ LOGGER.info("Upload <" + upload.getId() + "> errored out");
+ it.remove();
+ }
+ }
+ for (Iterator<OutgoingDataTransfer> it = uploadsByVersionId.values().iterator(); it.hasNext();) {
+ OutgoingDataTransfer upload = it.next();
+ if (upload.isComplete(now)) {
+ it.remove();
+ } else if (upload.hasReachedIdleTimeout(now) || upload.connectFailCount() > 50) {
it.remove();
}
}
@@ -93,8 +106,11 @@ public class SyncTransferHandler {
@Override
public void fire() {
- if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2)
- return;
+ if (uploadsByTransferId.isEmpty() && uploadsByVersionId.isEmpty() && downloads.isEmpty())
+ return; // Nothing to do anyways, don't wake up another thread
+ if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2 && ++skips < 10)
+ return; // Quite busy, don't trigger heartbeat
+ skips = 0;
transferPool.execute(worker);
}
};
@@ -110,8 +126,11 @@ public class SyncTransferHandler {
TAuthorizationException, TTransferRejectedException {
TransferInformation transferInfo;
OutgoingDataTransfer existing = uploadsByVersionId.get(imgVersion.imageVersionId);
- if (existing != null)
+ if (existing != null) {
+ LOGGER.info("Client wants to upload image " + imgVersion.imageVersionId
+ + " which is already in progess via " + existing.getId());
return existing.getId();
+ }
File absFile = FileSystem.composeAbsoluteImagePath(imgVersion);
if (!absFile.isFile() || !absFile.canRead()) {
LOGGER.error("Cannot upload " + imgVersion.imageVersionId + ": file missing: "
@@ -167,6 +186,8 @@ public class SyncTransferHandler {
uploadsByVersionId.put(imgVersion.imageVersionId, transfer);
uploadsByTransferId.put(transfer.getId(), transfer);
transfer.heartBeat(transferPool);
+ LOGGER.info("Client wants to upload image " + imgVersion.imageVersionId
+ + ", created transfer " + transfer.getId());
return transfer.getId();
}
@@ -237,7 +258,9 @@ public class SyncTransferHandler {
it.remove();
continue;
}
- activeDownloads++;
+ if (upload.countsTowardsConnectionLimit(now)) {
+ activeDownloads++;
+ }
}
if (activeDownloads >= Constants.MAX_MASTER_DOWNLOADS) {
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
@@ -257,7 +280,9 @@ public class SyncTransferHandler {
it.remove();
continue;
}
- activeUploads++;
+ if (download.countsTowardsConnectionLimit(now)) {
+ activeUploads++;
+ }
}
if (activeUploads >= Constants.MAX_MASTER_UPLOADS) {
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,