summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-13 18:39:26 +0200
committerSimon Rettberg2016-04-13 18:39:26 +0200
commit5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622 (patch)
tree5bdc5411cd9954577e5489d5e4271c800d826e9c /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
parent[client] fix bad commit (diff)
downloadtutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.gz
tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.xz
tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.zip
(WiP) Global image sync
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java115
1 files changed, 107 insertions, 8 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
index f7a9de85..27e11185 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
@@ -2,7 +2,10 @@ package org.openslx.bwlp.sat.fileserv;
import java.io.File;
import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -10,17 +13,23 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
+import org.openslx.bwlp.sat.database.mappers.DbImage;
+import org.openslx.bwlp.sat.database.mappers.DbUser;
+import org.openslx.bwlp.sat.database.models.LocalImageVersion;
import org.openslx.bwlp.sat.util.Constants;
+import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
-import org.openslx.bwlp.sat.util.GrowingThreadPoolExecutor;
import org.openslx.bwlp.sat.util.PrioThreadFactory;
+import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TAuthorizationException;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TNotFoundException;
+import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.thrifthelper.ThriftManager;
+import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
@@ -50,20 +59,31 @@ public class SyncTransferHandler {
private final Runnable worker = new Runnable() {
@Override
public void run() {
- for (IncomingDataTransfer download : downloads.values()) {
+ final long now = System.currentTimeMillis();
+ for (Iterator<IncomingDataTransfer> it = downloads.values().iterator(); it.hasNext();) {
+ IncomingDataTransfer download = it.next();
if (download.isActive())
download.heartBeat(transferPool);
+ if (download.isComplete(now)) {
+ LOGGER.info("Download from master server complete");
+ it.remove();
+ }
}
- for (OutgoingDataTransfer upload : uploads.values()) {
+ for (Iterator<OutgoingDataTransfer> it = uploads.values().iterator(); it.hasNext();) {
+ OutgoingDataTransfer upload = it.next();
if (upload.isActive())
upload.heartBeat(transferPool);
+ if (upload.isComplete(now)) {
+ LOGGER.info("Upload to master server complete");
+ it.remove();
+ }
}
}
};
@Override
public void fire() {
- if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1)
+ if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2)
return;
transferPool.execute(worker);
}
@@ -75,18 +95,77 @@ public class SyncTransferHandler {
QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56));
}
- public synchronized static String requestImageDownload(ImagePublishData image)
+ public synchronized static String requestImageUpload(String userToken, LocalImageVersion img)
+ throws SQLException, TNotFoundException, TInvocationException, TAuthorizationException,
+ TTransferRejectedException {
+ TransferInformation transferInfo;
+ OutgoingDataTransfer existing = uploads.get(img.imageVersionId);
+ if (existing != null)
+ return existing.getId();
+ File absFile = FileSystem.composeAbsoluteImagePath(img);
+ if (!absFile.isFile() || !absFile.canRead()) {
+ LOGGER.error("Cannot upload " + img.imageVersionId + ": file missing: "
+ + absFile.getAbsolutePath());
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Source file not readable");
+ }
+ if (absFile.length() != img.fileSize) {
+ LOGGER.error("Cannot upload" + img.imageVersionId + ": wrong file size - expected "
+ + img.fileSize + ", got " + absFile.length());
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "File corrupted on satellite server");
+ }
+ checkUploadCount();
+ ImageDetailsRead details = DbImage.getImageDetails(null, img.imageBaseId);
+ List<ByteBuffer> blockHashes = DbImage.getBlockHashes(img.imageVersionId);
+ ImagePublishData publishData = new ImagePublishData();
+ publishData.createTime = img.createTime;
+ publishData.description = details.description;
+ publishData.fileSize = img.fileSize;
+ publishData.imageBaseId = img.imageBaseId;
+ publishData.imageName = details.imageName;
+ publishData.imageVersionId = img.imageVersionId;
+ publishData.isTemplate = details.isTemplate;
+ publishData.osId = details.osId;
+ publishData.user = DbUser.getOrNull(img.uploaderId);
+ publishData.virtId = details.virtId;
+ try {
+ transferInfo = ThriftManager.getMasterClient().submitImage(userToken, publishData, blockHashes);
+ } catch (TAuthorizationException e) {
+ LOGGER.warn("Master server rejected our session on uploadImage", e);
+ throw e;
+ } catch (TInvocationException e) {
+ LOGGER.warn("Master server made a boo-boo on uploadImage", e);
+ throw e;
+ } catch (TTransferRejectedException e) {
+ LOGGER.warn("Master server rejected our upload request", e);
+ throw e;
+ } catch (TException e) {
+ LOGGER.warn("Unknown exception on uploadImage to master server", e);
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Communication with master server failed");
+ }
+ OutgoingDataTransfer transfer = new OutgoingDataTransfer(transferInfo, absFile);
+ uploads.put(transfer.getId(), transfer);
+ transfer.heartBeat(transferPool);
+ return transfer.getId();
+ }
+
+ public synchronized static String requestImageDownload(String userToken, ImagePublishData image)
throws TInvocationException, TAuthorizationException, TNotFoundException {
TransferInformation transferInfo;
// Already replicating this one?
- if (downloads.containsKey(image.imageVersionId))
- return image.imageVersionId;
+ IncomingDataTransfer existing = downloads.get(image.imageVersionId);
+ if (existing != null)
+ return existing.getId();
checkDownloadCount();
try {
- transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId);
+ transferInfo = ThriftManager.getMasterClient().downloadImage(userToken, image.imageVersionId);
} catch (TAuthorizationException e) {
LOGGER.warn("Master server rejected our session on downloadImage", e);
throw e;
+ } catch (TInvocationException e) {
+ LOGGER.warn("Master server made a boo-boo on downloadImage", e);
+ throw e;
} catch (TNotFoundException e) {
LOGGER.warn("Master server couldn't find image on downloadImage", e);
throw e;
@@ -131,6 +210,26 @@ public class SyncTransferHandler {
}
}
+ private static void checkUploadCount() throws TInvocationException {
+ Iterator<OutgoingDataTransfer> it = uploads.values().iterator();
+ final long now = System.currentTimeMillis();
+ int activeUploads = 0;
+ while (it.hasNext()) {
+ OutgoingDataTransfer download = it.next();
+ if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) {
+ download.cancel();
+ it.remove();
+ continue;
+ }
+ activeUploads++;
+ }
+ if (activeUploads > Constants.MAX_MASTER_UPLOADS) {
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
+ "Server busy. Too many running uploads (" + activeUploads + "/"
+ + Constants.MAX_MASTER_UPLOADS + ").");
+ }
+ }
+
/**
* Get an upload instance by given token.
*