diff options
author | Simon Rettberg | 2016-08-30 17:58:31 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-08-30 17:58:31 +0200 |
commit | 8e81742c997fcf712092e291bfd127368c52a5b2 (patch) | |
tree | 4eeb7f2600d2f77069626a93ebd737e314b14af3 | |
parent | [client] fix "Show published images" button not beeing properly initialized (diff) | |
download | tutor-module-8e81742c997fcf712092e291bfd127368c52a5b2.tar.gz tutor-module-8e81742c997fcf712092e291bfd127368c52a5b2.tar.xz tutor-module-8e81742c997fcf712092e291bfd127368c52a5b2.zip |
[server] Improve handling of global image exchange transfers wrt. crashed peers or connection drops
3 files changed, 54 insertions, 24 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java index 6e4ae1e0..bde280b3 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -137,6 +137,8 @@ public class IncomingDataTransfer extends IncomingTransferBase { public void heartBeat(ExecutorService pool) { if (masterTransferInfo == null) return; + if (connectFailCount() > 50) + return; synchronized (this) { if (getActiveConnectionCount() >= 1) return; diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java index d87cdb9e..6ef4d4a6 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java @@ -4,25 +4,21 @@ import java.io.File; import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.log4j.Logger; +import org.apache.thrift.TException; import org.openslx.bwlp.sat.util.Configuration; +import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Uploader; import org.openslx.filetransfer.util.OutgoingTransferBase; +import org.openslx.thrifthelper.ThriftManager; public class OutgoingDataTransfer extends OutgoingTransferBase { private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class); - /** - * Remote peer is downloading, so we have Uploaders - */ - private List<Uploader> uploads = new ArrayList<>(); - private final TransferInformation masterTransferInfo; /** @@ -55,10 +51,21 @@ public class OutgoingDataTransfer extends OutgoingTransferBase { public synchronized void heartBeat(ExecutorService pool) { if (masterTransferInfo == null) return; - synchronized (uploads) { - if (uploads.size() >= 1) + if (connectFailCount() > 50) + return; + if (connectFailCount() > 5) { + try { + ThriftManager.getMasterClient().queryUploadStatus(masterTransferInfo.token); + } catch (TInvalidTokenException e) { + LOGGER.info("Master server forgot about upload " + masterTransferInfo.token + ", aborting..."); + connectFails.set(100); return; + } catch (TException e) { + LOGGER.warn("Cannot query master server for upload status of " + masterTransferInfo.token, e); + } } + if (getActiveConnectionCount() >= 1) + return; Uploader uploader = null; Exception connectException = null; if (masterTransferInfo.plainPort != 0) { @@ -83,17 +90,13 @@ public class OutgoingDataTransfer extends OutgoingTransferBase { if (uploader == null) { LOGGER.debug("Cannot connect to master server for uploading", connectException); } else { - synchronized (uploads) { - uploads.add(uploader); - } runConnectionInternal(uploader, pool); } } @Override public String getRelativePath() { - // TODO Auto-generated method stub - return null; + throw new RuntimeException("Not implemented"); } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java index f3d64784..23892f09 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -4,11 +4,9 @@ import java.io.File; import java.io.FileNotFoundException; import java.nio.ByteBuffer; import java.sql.SQLException; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.WeakHashMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -63,9 +61,10 @@ public class SyncTransferHandler { /** * All currently running uploads to master, by image version id */ - private static final Map<String, OutgoingDataTransfer> uploadsByVersionId = Collections.synchronizedMap(new WeakHashMap<String, OutgoingDataTransfer>()); + private static final Map<String, OutgoingDataTransfer> uploadsByVersionId = new ConcurrentHashMap<>(); private static Task heartBeatTask = new Task() { + private int skips = 0; private final Runnable worker = new Runnable() { @Override public void run() { @@ -75,7 +74,10 @@ public class SyncTransferHandler { if (download.isActive()) download.heartBeat(transferPool); if (download.isComplete(now)) { - LOGGER.info("Download from master server complete"); + LOGGER.info("Download <" + download.getId() + "> from master server complete"); + it.remove(); + } else if (download.hasReachedIdleTimeout(now) || download.connectFailCount() > 50) { + LOGGER.info("Download <" + download.getId() + "> errored out"); it.remove(); } } @@ -84,7 +86,18 @@ public class SyncTransferHandler { if (upload.isActive()) upload.heartBeat(transferPool); if (upload.isComplete(now)) { - LOGGER.info("Upload to master server complete"); + LOGGER.info("Upload <" + upload.getId() + "> to master server complete"); + it.remove(); + } else if (upload.hasReachedIdleTimeout(now) || upload.connectFailCount() > 50) { + LOGGER.info("Upload <" + upload.getId() + "> errored out"); + it.remove(); + } + } + for (Iterator<OutgoingDataTransfer> it = uploadsByVersionId.values().iterator(); it.hasNext();) { + OutgoingDataTransfer upload = it.next(); + if (upload.isComplete(now)) { + it.remove(); + } else if (upload.hasReachedIdleTimeout(now) || upload.connectFailCount() > 50) { it.remove(); } } @@ -93,8 +106,11 @@ public class SyncTransferHandler { @Override public void fire() { - if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2) - return; + if (uploadsByTransferId.isEmpty() && uploadsByVersionId.isEmpty() && downloads.isEmpty()) + return; // Nothing to do anyways, don't wake up another thread + if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2 && ++skips < 10) + return; // Quite busy, don't trigger heartbeat + skips = 0; transferPool.execute(worker); } }; @@ -110,8 +126,11 @@ public class SyncTransferHandler { TAuthorizationException, TTransferRejectedException { TransferInformation transferInfo; OutgoingDataTransfer existing = uploadsByVersionId.get(imgVersion.imageVersionId); - if (existing != null) + if (existing != null) { + LOGGER.info("Client wants to upload image " + imgVersion.imageVersionId + + " which is already in progess via " + existing.getId()); return existing.getId(); + } File absFile = FileSystem.composeAbsoluteImagePath(imgVersion); if (!absFile.isFile() || !absFile.canRead()) { LOGGER.error("Cannot upload " + imgVersion.imageVersionId + ": file missing: " @@ -167,6 +186,8 @@ public class SyncTransferHandler { uploadsByVersionId.put(imgVersion.imageVersionId, transfer); uploadsByTransferId.put(transfer.getId(), transfer); transfer.heartBeat(transferPool); + LOGGER.info("Client wants to upload image " + imgVersion.imageVersionId + + ", created transfer " + transfer.getId()); return transfer.getId(); } @@ -237,7 +258,9 @@ public class SyncTransferHandler { it.remove(); continue; } - activeDownloads++; + if (upload.countsTowardsConnectionLimit(now)) { + activeDownloads++; + } } if (activeDownloads >= Constants.MAX_MASTER_DOWNLOADS) { throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, @@ -257,7 +280,9 @@ public class SyncTransferHandler { it.remove(); continue; } - activeUploads++; + if (download.countsTowardsConnectionLimit(now)) { + activeUploads++; + } } if (activeUploads >= Constants.MAX_MASTER_UPLOADS) { throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, |